Skip to main content

mz_compute/sink/
correction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The `Correction` data structure used by `persist_sink::write_batches` to stash updates before
11//! they are written into batches.
12
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17use std::rc::Rc;
18
19use differential_dataflow::consolidation::{consolidate, consolidate_updates};
20use differential_dataflow::logging::{BatchEvent, DropEvent};
21use itertools::Itertools;
22use mz_compute_types::dyncfgs::{
23    CONSOLIDATING_VEC_GROWTH_DAMPENER, CORRECTION_V2_CHAIN_PROPORTIONALITY,
24    CORRECTION_V2_CHUNK_SIZE, ENABLE_CORRECTION_V2,
25};
26use mz_dyncfg::ConfigSet;
27use mz_ore::iter::IteratorExt;
28use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
29use mz_repr::{Diff, Timestamp};
30use timely::PartialOrder;
31use timely::progress::Antichain;
32
33use crate::logging::compute::{
34    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
35    ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
36    Logger as ComputeLogger,
37};
38use crate::sink::correction_v2::{CorrectionV2, Data};
39
40/// A data structure suitable for storing updates in a self-correcting persist sink.
41///
42/// Selects one of two correction buffer implementations. `V1` is the original simple
43/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
44/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
45/// `V1` in a pinch. The plan is to remove `V1` eventually.
46pub(super) enum Correction<D: Data> {
47    V1(CorrectionV1<D>),
48    V2(CorrectionV2<D>),
49}
50
51impl<D: Data> Correction<D> {
52    /// Construct a new `Correction` instance.
53    pub fn new(
54        metrics: SinkMetrics,
55        worker_metrics: SinkWorkerMetrics,
56        logging: Option<Logging>,
57        config: &ConfigSet,
58    ) -> Self {
59        if ENABLE_CORRECTION_V2.get(config) {
60            let prop = CORRECTION_V2_CHAIN_PROPORTIONALITY.get(config);
61            let chunk_size = CORRECTION_V2_CHUNK_SIZE.get(config);
62            Self::V2(CorrectionV2::new(
63                metrics,
64                worker_metrics,
65                logging,
66                prop,
67                chunk_size,
68            ))
69        } else {
70            let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
71            Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
72        }
73    }
74
75    /// Insert a batch of updates.
76    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
77        match self {
78            Self::V1(c) => c.insert(updates),
79            Self::V2(c) => c.insert(updates),
80        }
81    }
82
83    /// Insert a batch of updates, after negating their diffs.
84    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
85        match self {
86            Self::V1(c) => c.insert_negated(updates),
87            Self::V2(c) => c.insert_negated(updates),
88        }
89    }
90
91    /// Consolidate and return updates before the given `upper`.
92    pub fn updates_before(
93        &mut self,
94        upper: &Antichain<Timestamp>,
95    ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
96        match self {
97            Self::V1(c) => Box::new(c.updates_before(upper)),
98            Self::V2(c) => Box::new(c.updates_before(upper)),
99        }
100    }
101
102    /// Advance the since frontier.
103    ///
104    /// # Panics
105    ///
106    /// Panics if the given `since` is less than the current since frontier.
107    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
108        match self {
109            Self::V1(c) => c.advance_since(since),
110            Self::V2(c) => c.advance_since(since),
111        }
112    }
113
114    /// Consolidate all updates at the current `since`.
115    pub fn consolidate_at_since(&mut self) {
116        match self {
117            Self::V1(c) => c.consolidate_at_since(),
118            Self::V2(c) => c.consolidate_at_since(),
119        }
120    }
121}
122
123/// A collection holding `persist_sink` updates.
124///
125/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
126/// operator:
127///
128///  * It stores updates by time, to enable efficient separation between updates that should
129///    be written to a batch and updates whose time has not yet arrived.
130///  * It eschews an interface for directly removing previously inserted updates. Instead, updates
131///    are removed by inserting them again, with negated diffs. Stored updates are continuously
132///    consolidated to give them opportunity to cancel each other out.
133///  * It provides an interface for advancing all contained updates to a given frontier.
134pub(super) struct CorrectionV1<D> {
135    /// Stashed updates by time.
136    updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
137    /// Frontier to which all update times are advanced.
138    since: Antichain<Timestamp>,
139
140    /// Total length and capacity of vectors in `updates`.
141    ///
142    /// Tracked to maintain metrics.
143    total_size: LengthAndCapacity,
144    /// Global persist sink metrics.
145    metrics: SinkMetrics,
146    /// Per-worker persist sink metrics.
147    worker_metrics: SinkWorkerMetrics,
148    /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
149    growth_dampener: usize,
150}
151
152impl<D> CorrectionV1<D> {
153    /// Construct a new `CorrectionV1` instance.
154    pub fn new(
155        metrics: SinkMetrics,
156        worker_metrics: SinkWorkerMetrics,
157        growth_dampener: usize,
158    ) -> Self {
159        Self {
160            updates: Default::default(),
161            since: Antichain::from_elem(Timestamp::MIN),
162            total_size: Default::default(),
163            metrics,
164            worker_metrics,
165            growth_dampener,
166        }
167    }
168
169    /// Update persist sink metrics to the given new length and capacity.
170    fn update_metrics(&mut self, new_size: LengthAndCapacity) {
171        let old_size = self.total_size;
172        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
173        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
174        self.metrics
175            .report_correction_update_deltas(len_delta, cap_delta);
176        self.worker_metrics
177            .report_correction_update_totals(new_size.length, new_size.capacity);
178
179        self.total_size = new_size;
180    }
181}
182
183impl<D: Data> CorrectionV1<D> {
184    /// Insert a batch of updates.
185    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
186        let Some(since_ts) = self.since.as_option() else {
187            // If the since frontier is empty, discard all updates.
188            updates.clear();
189            return;
190        };
191
192        for (_, time, _) in &mut *updates {
193            *time = std::cmp::max(*time, *since_ts);
194        }
195        self.insert_inner(updates);
196    }
197
198    /// Insert a batch of updates, after negating their diffs.
199    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
200        let Some(since_ts) = self.since.as_option() else {
201            // If the since frontier is empty, discard all updates.
202            updates.clear();
203            return;
204        };
205
206        for (_, time, diff) in &mut *updates {
207            *time = std::cmp::max(*time, *since_ts);
208            *diff = -*diff;
209        }
210        self.insert_inner(updates);
211    }
212
213    /// Insert a batch of updates.
214    ///
215    /// The given `updates` must all have been advanced by `self.since`.
216    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
217        consolidate_updates(updates);
218        updates.sort_unstable_by_key(|(_, time, _)| *time);
219
220        let mut new_size = self.total_size;
221        let mut updates = updates.drain(..).peekable();
222        while let Some(&(_, time, _)) = updates.peek() {
223            debug_assert!(
224                self.since.less_equal(&time),
225                "update not advanced by `since`"
226            );
227
228            let data = updates
229                .peeking_take_while(|(_, t, _)| *t == time)
230                .map(|(d, _, r)| (d, r));
231
232            use std::collections::btree_map::Entry;
233            match self.updates.entry(time) {
234                Entry::Vacant(entry) => {
235                    let mut vec: ConsolidatingVec<_> = data.collect();
236                    vec.growth_dampener = self.growth_dampener;
237                    new_size += (vec.len(), vec.capacity());
238                    entry.insert(vec);
239                }
240                Entry::Occupied(mut entry) => {
241                    let vec = entry.get_mut();
242                    new_size -= (vec.len(), vec.capacity());
243                    vec.extend(data);
244                    new_size += (vec.len(), vec.capacity());
245                }
246            }
247        }
248
249        self.update_metrics(new_size);
250    }
251
252    /// Consolidate and return updates within the given bounds.
253    ///
254    /// # Panics
255    ///
256    /// Panics if `lower` is not less than or equal to `upper`.
257    pub fn updates_within<'a>(
258        &'a mut self,
259        lower: &Antichain<Timestamp>,
260        upper: &Antichain<Timestamp>,
261    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
262        assert!(PartialOrder::less_equal(lower, upper));
263
264        let start = match lower.as_option() {
265            Some(ts) => Bound::Included(*ts),
266            None => Bound::Excluded(Timestamp::MAX),
267        };
268        let end = match upper.as_option() {
269            Some(ts) => Bound::Excluded(*ts),
270            None => Bound::Unbounded,
271        };
272
273        let update_count = self.consolidate((start, end));
274
275        let range = self.updates.range((start, end));
276        range
277            .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
278            .exact_size(update_count)
279    }
280
281    /// Consolidate and return updates before the given `upper`.
282    pub fn updates_before<'a>(
283        &'a mut self,
284        upper: &Antichain<Timestamp>,
285    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
286        let lower = Antichain::from_elem(Timestamp::MIN);
287        self.updates_within(&lower, upper)
288    }
289
290    /// Consolidate the updates at the times in the given range.
291    ///
292    /// Returns the number of updates remaining in the range afterwards.
293    fn consolidate<R>(&mut self, range: R) -> usize
294    where
295        R: RangeBounds<Timestamp>,
296    {
297        let mut new_size = self.total_size;
298
299        let updates = self.updates.range_mut(range);
300        let count = updates.fold(0, |acc, (_, data)| {
301            new_size -= (data.len(), data.capacity());
302            data.consolidate();
303            new_size += (data.len(), data.capacity());
304            acc + data.len()
305        });
306
307        self.update_metrics(new_size);
308        count
309    }
310
311    /// Advance the since frontier.
312    ///
313    /// # Panics
314    ///
315    /// Panics if the given `since` is less than the current since frontier.
316    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
317        assert!(PartialOrder::less_equal(&self.since, &since));
318
319        if since != self.since {
320            self.advance_by(&since);
321            self.since = since;
322        }
323    }
324
325    /// Advance all contained updates by the given frontier.
326    ///
327    /// If the given frontier is empty, all remaining updates are discarded.
328    pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
329        let Some(target_ts) = frontier.as_option() else {
330            self.updates.clear();
331            self.update_metrics(Default::default());
332            return;
333        };
334
335        let mut new_size = self.total_size;
336        while let Some((ts, data)) = self.updates.pop_first() {
337            if frontier.less_equal(&ts) {
338                // We have advanced all updates that can advance.
339                self.updates.insert(ts, data);
340                break;
341            }
342
343            use std::collections::btree_map::Entry;
344            match self.updates.entry(*target_ts) {
345                Entry::Vacant(entry) => {
346                    entry.insert(data);
347                }
348                Entry::Occupied(mut entry) => {
349                    let vec = entry.get_mut();
350                    new_size -= (data.len(), data.capacity());
351                    new_size -= (vec.len(), vec.capacity());
352                    vec.extend(data);
353                    new_size += (vec.len(), vec.capacity());
354                }
355            }
356        }
357
358        self.update_metrics(new_size);
359    }
360
361    /// Consolidate all updates at the current `since`.
362    pub fn consolidate_at_since(&mut self) {
363        let Some(since_ts) = self.since.as_option() else {
364            return;
365        };
366
367        let start = Bound::Included(*since_ts);
368        let end = match since_ts.try_step_forward() {
369            Some(ts) => Bound::Excluded(ts),
370            None => Bound::Unbounded,
371        };
372
373        self.consolidate((start, end));
374    }
375}
376
377impl<D> Drop for CorrectionV1<D> {
378    fn drop(&mut self) {
379        self.update_metrics(Default::default());
380    }
381}
382
383/// Helper type for convenient tracking of length and capacity together.
384#[derive(Clone, Copy, Debug, Default)]
385pub(super) struct LengthAndCapacity {
386    pub length: usize,
387    pub capacity: usize,
388}
389
390impl AddAssign<Self> for LengthAndCapacity {
391    fn add_assign(&mut self, size: Self) {
392        self.length += size.length;
393        self.capacity += size.capacity;
394    }
395}
396
397impl AddAssign<(usize, usize)> for LengthAndCapacity {
398    fn add_assign(&mut self, (len, cap): (usize, usize)) {
399        self.length += len;
400        self.capacity += cap;
401    }
402}
403
404impl SubAssign<(usize, usize)> for LengthAndCapacity {
405    fn sub_assign(&mut self, (len, cap): (usize, usize)) {
406        self.length -= len;
407        self.capacity -= cap;
408    }
409}
410
411/// A vector that consolidates its contents.
412///
413/// The vector is filled with updates until it reaches capacity. At this point, the updates are
414/// consolidated to free up space. This process repeats until the consolidation recovered less than
415/// half of the vector's capacity, at which point the capacity is doubled.
416#[derive(Debug)]
417pub(crate) struct ConsolidatingVec<D> {
418    data: Vec<(D, Diff)>,
419    /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
420    /// might start smaller than this.
421    min_capacity: usize,
422    /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
423    ///
424    /// If consolidation didn't free enough space, at least a linear amount, increase the capacity
425    /// Setting this to 0 results in doubling whenever the list is at least half full.
426    /// Larger numbers result in more conservative approaches that use more CPU, but less memory.
427    growth_dampener: usize,
428}
429
430impl<D: Ord> ConsolidatingVec<D> {
431    /// Creates a new instance from the necessary configuration arguments.
432    pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
433        ConsolidatingVec {
434            data: Vec::new(),
435            min_capacity,
436            growth_dampener,
437        }
438    }
439
440    /// Return the length of the vector.
441    pub fn len(&self) -> usize {
442        self.data.len()
443    }
444
445    /// Return the capacity of the vector.
446    pub fn capacity(&self) -> usize {
447        self.data.capacity()
448    }
449
450    /// Pushes `item` into the vector.
451    ///
452    /// If the vector does not have sufficient capacity, we'll first consolidate and then increase
453    /// its capacity if the consolidated results still occupy a significant fraction of the vector.
454    ///
455    /// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
456    /// but amortizes to O(log n).
457    pub fn push(&mut self, item: (D, Diff)) {
458        let capacity = self.data.capacity();
459        if self.data.len() == capacity {
460            // The vector is full. First, consolidate to try to recover some space.
461            self.consolidate();
462
463            // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
464            // This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
465            let length = self.data.len();
466            let dampener = self.growth_dampener;
467            if capacity < length + length / (dampener + 1) {
468                // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
469                // determining the target capacity, and then reserving an amount that achieves this
470                // while working around the existing length.
471                let new_cap = capacity + capacity / (dampener + 1);
472                self.data.reserve_exact(new_cap - length);
473            }
474        }
475
476        self.data.push(item);
477    }
478
479    /// Consolidate the contents.
480    pub fn consolidate(&mut self) {
481        consolidate(&mut self.data);
482
483        // We may have the opportunity to reclaim allocated memory.
484        // Given that `push` will at most double the capacity when the vector is more than half full, and
485        // we want to avoid entering into a resizing cycle, we choose to only shrink if the
486        // vector's length is less than one fourth of its capacity.
487        if self.data.len() < self.data.capacity() / 4 {
488            self.data.shrink_to(self.min_capacity);
489        }
490    }
491
492    /// Return an iterator over the borrowed items.
493    pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
494        self.data.iter()
495    }
496
497    /// Returns mutable access to the underlying items.
498    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
499        self.data.iter_mut()
500    }
501}
502
503impl<D> IntoIterator for ConsolidatingVec<D> {
504    type Item = (D, Diff);
505    type IntoIter = std::vec::IntoIter<(D, Diff)>;
506
507    fn into_iter(self) -> Self::IntoIter {
508        self.data.into_iter()
509    }
510}
511
512impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
513    fn from_iter<I>(iter: I) -> Self
514    where
515        I: IntoIterator<Item = (D, Diff)>,
516    {
517        Self {
518            data: Vec::from_iter(iter),
519            min_capacity: 0,
520            growth_dampener: 0,
521        }
522    }
523}
524
525impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
526    fn extend<I>(&mut self, iter: I)
527    where
528        I: IntoIterator<Item = (D, Diff)>,
529    {
530        for item in iter {
531            self.push(item);
532        }
533    }
534}
535
536/// Helper type for convenient tracking of various size metrics together.
537#[derive(Clone, Copy, Debug, Default)]
538pub(super) struct SizeMetrics {
539    pub size: usize,
540    pub capacity: usize,
541    pub allocations: usize,
542}
543
544impl AddAssign<Self> for SizeMetrics {
545    fn add_assign(&mut self, other: Self) {
546        self.size += other.size;
547        self.capacity += other.capacity;
548        self.allocations += other.allocations;
549    }
550}
551
552/// Helper for correction buffer logging.
553///
554// TODO: Correction buffer logging currently reuses the arrangement batch and size logging. This
555// isn't strictly correct as a correction buffer is not an arrangement. Consider refactoring this
556// to be about "operator sizes" instead.
557#[derive(Clone, Debug)]
558pub(super) struct Logging(Rc<RefCell<LoggingInner>>);
559
560impl Logging {
561    pub fn new(
562        compute_logger: ComputeLogger,
563        differential_logger: differential_dataflow::logging::Logger,
564        operator_id: usize,
565        address: Vec<usize>,
566    ) -> Self {
567        let inner = LoggingInner::new(compute_logger, differential_logger, operator_id, address);
568        Self(Rc::new(RefCell::new(inner)))
569    }
570
571    /// A new chain was created.
572    pub fn chain_created(&self, updates: usize) {
573        self.0.borrow_mut().chain_created(updates);
574    }
575
576    /// A chain was dropped.
577    pub fn chain_dropped(&self, updates: usize) {
578        self.0.borrow_mut().chain_dropped(updates);
579    }
580
581    /// Report a heap size diff.
582    pub fn report_size_diff(&self, diff: isize) {
583        self.0.borrow_mut().report_size_diff(diff);
584    }
585
586    /// Report a heap capacity diff.
587    pub fn report_capacity_diff(&self, diff: isize) {
588        self.0.borrow_mut().report_capacity_diff(diff);
589    }
590
591    /// Report a heap allocations diff.
592    pub fn report_allocations_diff(&self, diff: isize) {
593        self.0.borrow_mut().report_allocations_diff(diff);
594    }
595}
596
597/// Inner state for correction buffer logging.
598///
599/// This is separate from `Logging` and shared via an `Rc`, to ensure we only emit
600/// `ArrangementHeapSizeOperator{Drop}` events once, even though we pass the logger to both the Ok
601/// and the Err correction buffer.
602struct LoggingInner {
603    compute_logger: ComputeLogger,
604    differential_logger: differential_dataflow::logging::Logger,
605    operator_id: usize,
606}
607
608impl fmt::Debug for LoggingInner {
609    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
610        f.debug_struct("LoggingInner")
611            .field("operator_id", &self.operator_id)
612            .finish_non_exhaustive()
613    }
614}
615
616impl LoggingInner {
617    fn new(
618        compute_logger: ComputeLogger,
619        differential_logger: differential_dataflow::logging::Logger,
620        operator_id: usize,
621        address: Vec<usize>,
622    ) -> Self {
623        compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
624            ArrangementHeapSizeOperator {
625                operator_id,
626                address,
627            },
628        ));
629
630        Self {
631            compute_logger,
632            differential_logger,
633            operator_id,
634        }
635    }
636
637    fn chain_created(&self, updates: usize) {
638        self.differential_logger.log(BatchEvent {
639            operator: self.operator_id,
640            length: updates,
641        });
642    }
643
644    fn chain_dropped(&self, updates: usize) {
645        self.differential_logger.log(DropEvent {
646            operator: self.operator_id,
647            length: updates,
648        });
649    }
650
651    fn report_size_diff(&self, diff: isize) {
652        if diff != 0 {
653            self.compute_logger
654                .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
655                    operator_id: self.operator_id,
656                    delta_size: diff,
657                }));
658        }
659    }
660
661    fn report_capacity_diff(&self, diff: isize) {
662        if diff != 0 {
663            self.compute_logger
664                .log(&ComputeEvent::ArrangementHeapCapacity(
665                    ArrangementHeapCapacity {
666                        operator_id: self.operator_id,
667                        delta_capacity: diff,
668                    },
669                ));
670        }
671    }
672
673    fn report_allocations_diff(&self, diff: isize) {
674        if diff != 0 {
675            self.compute_logger
676                .log(&ComputeEvent::ArrangementHeapAllocations(
677                    ArrangementHeapAllocations {
678                        operator_id: self.operator_id,
679                        delta_allocations: diff,
680                    },
681                ));
682        }
683    }
684}
685
686impl Drop for LoggingInner {
687    fn drop(&mut self) {
688        self.compute_logger
689            .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
690                ArrangementHeapSizeOperatorDrop {
691                    operator_id: self.operator_id,
692                },
693            ));
694    }
695}