mz_compute/sink/
correction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The `Correction` data structure used by `persist_sink::write_batches` to stash updates before
11//! they are written into batches.
12
13use std::collections::BTreeMap;
14use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
15
16use differential_dataflow::consolidation::{consolidate, consolidate_updates};
17use itertools::Itertools;
18use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
19use mz_dyncfg::ConfigSet;
20use mz_ore::iter::IteratorExt;
21use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
22use mz_repr::{Diff, Timestamp};
23use timely::PartialOrder;
24use timely::progress::Antichain;
25
26use crate::sink::correction_v2::{CorrectionV2, Data};
27
28/// A data structure suitable for storing updates in a self-correcting persist sink.
29///
30/// Selects one of two correction buffer implementations. `V1` is the original simple
31/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
32/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
33/// `V1` in a pinch. The plan is to remove `V1` eventually.
34pub(super) enum Correction<D: Data> {
35    V1(CorrectionV1<D>),
36    V2(CorrectionV2<D>),
37}
38
39impl<D: Data> Correction<D> {
40    /// Construct a new `Correction` instance.
41    pub fn new(
42        metrics: SinkMetrics,
43        worker_metrics: SinkWorkerMetrics,
44        config: &ConfigSet,
45    ) -> Self {
46        if ENABLE_CORRECTION_V2.get(config) {
47            Self::V2(CorrectionV2::new(metrics, worker_metrics))
48        } else {
49            let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
50            Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
51        }
52    }
53
54    /// Insert a batch of updates.
55    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
56        match self {
57            Self::V1(c) => c.insert(updates),
58            Self::V2(c) => c.insert(updates),
59        }
60    }
61
62    /// Insert a batch of updates, after negating their diffs.
63    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
64        match self {
65            Self::V1(c) => c.insert_negated(updates),
66            Self::V2(c) => c.insert_negated(updates),
67        }
68    }
69
70    /// Consolidate and return updates before the given `upper`.
71    pub fn updates_before(
72        &mut self,
73        upper: &Antichain<Timestamp>,
74    ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
75        match self {
76            Self::V1(c) => Box::new(c.updates_before(upper)),
77            Self::V2(c) => Box::new(c.updates_before(upper)),
78        }
79    }
80
81    /// Advance the since frontier.
82    ///
83    /// # Panics
84    ///
85    /// Panics if the given `since` is less than the current since frontier.
86    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
87        match self {
88            Self::V1(c) => c.advance_since(since),
89            Self::V2(c) => c.advance_since(since),
90        }
91    }
92
93    /// Consolidate all updates at the current `since`.
94    pub fn consolidate_at_since(&mut self) {
95        match self {
96            Self::V1(c) => c.consolidate_at_since(),
97            Self::V2(c) => c.consolidate_at_since(),
98        }
99    }
100}
101
102/// A collection holding `persist_sink` updates.
103///
104/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
105/// operator:
106///
107///  * It stores updates by time, to enable efficient separation between updates that should
108///    be written to a batch and updates whose time has not yet arrived.
109///  * It eschews an interface for directly removing previously inserted updates. Instead, updates
110///    are removed by inserting them again, with negated diffs. Stored updates are continuously
111///    consolidated to give them opportunity to cancel each other out.
112///  * It provides an interface for advancing all contained updates to a given frontier.
113pub(super) struct CorrectionV1<D> {
114    /// Stashed updates by time.
115    updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
116    /// Frontier to which all update times are advanced.
117    since: Antichain<Timestamp>,
118
119    /// Total length and capacity of vectors in `updates`.
120    ///
121    /// Tracked to maintain metrics.
122    total_size: LengthAndCapacity,
123    /// Global persist sink metrics.
124    metrics: SinkMetrics,
125    /// Per-worker persist sink metrics.
126    worker_metrics: SinkWorkerMetrics,
127    /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
128    growth_dampener: usize,
129}
130
131impl<D> CorrectionV1<D> {
132    /// Construct a new `CorrectionV1` instance.
133    pub fn new(
134        metrics: SinkMetrics,
135        worker_metrics: SinkWorkerMetrics,
136        growth_dampener: usize,
137    ) -> Self {
138        Self {
139            updates: Default::default(),
140            since: Antichain::from_elem(Timestamp::MIN),
141            total_size: Default::default(),
142            metrics,
143            worker_metrics,
144            growth_dampener,
145        }
146    }
147
148    /// Update persist sink metrics to the given new length and capacity.
149    fn update_metrics(&mut self, new_size: LengthAndCapacity) {
150        let old_size = self.total_size;
151        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
152        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
153        self.metrics
154            .report_correction_update_deltas(len_delta, cap_delta);
155        self.worker_metrics
156            .report_correction_update_totals(new_size.length, new_size.capacity);
157
158        self.total_size = new_size;
159    }
160}
161
162impl<D: Data> CorrectionV1<D> {
163    /// Insert a batch of updates.
164    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
165        let Some(since_ts) = self.since.as_option() else {
166            // If the since frontier is empty, discard all updates.
167            return;
168        };
169
170        for (_, time, _) in &mut *updates {
171            *time = std::cmp::max(*time, *since_ts);
172        }
173        self.insert_inner(updates);
174    }
175
176    /// Insert a batch of updates, after negating their diffs.
177    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
178        let Some(since_ts) = self.since.as_option() else {
179            // If the since frontier is empty, discard all updates.
180            updates.clear();
181            return;
182        };
183
184        for (_, time, diff) in &mut *updates {
185            *time = std::cmp::max(*time, *since_ts);
186            *diff = -*diff;
187        }
188        self.insert_inner(updates);
189    }
190
191    /// Insert a batch of updates.
192    ///
193    /// The given `updates` must all have been advanced by `self.since`.
194    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
195        consolidate_updates(updates);
196        updates.sort_unstable_by_key(|(_, time, _)| *time);
197
198        let mut new_size = self.total_size;
199        let mut updates = updates.drain(..).peekable();
200        while let Some(&(_, time, _)) = updates.peek() {
201            debug_assert!(
202                self.since.less_equal(&time),
203                "update not advanced by `since`"
204            );
205
206            let data = updates
207                .peeking_take_while(|(_, t, _)| *t == time)
208                .map(|(d, _, r)| (d, r));
209
210            use std::collections::btree_map::Entry;
211            match self.updates.entry(time) {
212                Entry::Vacant(entry) => {
213                    let mut vec: ConsolidatingVec<_> = data.collect();
214                    vec.growth_dampener = self.growth_dampener;
215                    new_size += (vec.len(), vec.capacity());
216                    entry.insert(vec);
217                }
218                Entry::Occupied(mut entry) => {
219                    let vec = entry.get_mut();
220                    new_size -= (vec.len(), vec.capacity());
221                    vec.extend(data);
222                    new_size += (vec.len(), vec.capacity());
223                }
224            }
225        }
226
227        self.update_metrics(new_size);
228    }
229
230    /// Consolidate and return updates within the given bounds.
231    ///
232    /// # Panics
233    ///
234    /// Panics if `lower` is not less than or equal to `upper`.
235    pub fn updates_within<'a>(
236        &'a mut self,
237        lower: &Antichain<Timestamp>,
238        upper: &Antichain<Timestamp>,
239    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
240        assert!(PartialOrder::less_equal(lower, upper));
241
242        let start = match lower.as_option() {
243            Some(ts) => Bound::Included(*ts),
244            None => Bound::Excluded(Timestamp::MAX),
245        };
246        let end = match upper.as_option() {
247            Some(ts) => Bound::Excluded(*ts),
248            None => Bound::Unbounded,
249        };
250
251        let update_count = self.consolidate((start, end));
252
253        let range = self.updates.range((start, end));
254        range
255            .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
256            .exact_size(update_count)
257    }
258
259    /// Consolidate and return updates before the given `upper`.
260    pub fn updates_before<'a>(
261        &'a mut self,
262        upper: &Antichain<Timestamp>,
263    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
264        let lower = Antichain::from_elem(Timestamp::MIN);
265        self.updates_within(&lower, upper)
266    }
267
268    /// Consolidate the updates at the times in the given range.
269    ///
270    /// Returns the number of updates remaining in the range afterwards.
271    fn consolidate<R>(&mut self, range: R) -> usize
272    where
273        R: RangeBounds<Timestamp>,
274    {
275        let mut new_size = self.total_size;
276
277        let updates = self.updates.range_mut(range);
278        let count = updates.fold(0, |acc, (_, data)| {
279            new_size -= (data.len(), data.capacity());
280            data.consolidate();
281            new_size += (data.len(), data.capacity());
282            acc + data.len()
283        });
284
285        self.update_metrics(new_size);
286        count
287    }
288
289    /// Advance the since frontier.
290    ///
291    /// # Panics
292    ///
293    /// Panics if the given `since` is less than the current since frontier.
294    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
295        assert!(PartialOrder::less_equal(&self.since, &since));
296
297        if since != self.since {
298            self.advance_by(&since);
299            self.since = since;
300        }
301    }
302
303    /// Advance all contained updates by the given frontier.
304    ///
305    /// If the given frontier is empty, all remaining updates are discarded.
306    pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
307        let Some(target_ts) = frontier.as_option() else {
308            self.updates.clear();
309            self.update_metrics(Default::default());
310            return;
311        };
312
313        let mut new_size = self.total_size;
314        while let Some((ts, data)) = self.updates.pop_first() {
315            if frontier.less_equal(&ts) {
316                // We have advanced all updates that can advance.
317                self.updates.insert(ts, data);
318                break;
319            }
320
321            use std::collections::btree_map::Entry;
322            match self.updates.entry(*target_ts) {
323                Entry::Vacant(entry) => {
324                    entry.insert(data);
325                }
326                Entry::Occupied(mut entry) => {
327                    let vec = entry.get_mut();
328                    new_size -= (data.len(), data.capacity());
329                    new_size -= (vec.len(), vec.capacity());
330                    vec.extend(data);
331                    new_size += (vec.len(), vec.capacity());
332                }
333            }
334        }
335
336        self.update_metrics(new_size);
337    }
338
339    /// Consolidate all updates at the current `since`.
340    pub fn consolidate_at_since(&mut self) {
341        let Some(since_ts) = self.since.as_option() else {
342            return;
343        };
344
345        let start = Bound::Included(*since_ts);
346        let end = match since_ts.try_step_forward() {
347            Some(ts) => Bound::Excluded(ts),
348            None => Bound::Unbounded,
349        };
350
351        self.consolidate((start, end));
352    }
353}
354
355impl<D> Drop for CorrectionV1<D> {
356    fn drop(&mut self) {
357        self.update_metrics(Default::default());
358    }
359}
360
361/// Helper type for convenient tracking of length and capacity together.
362#[derive(Clone, Copy, Debug, Default)]
363pub(super) struct LengthAndCapacity {
364    pub length: usize,
365    pub capacity: usize,
366}
367
368impl AddAssign<Self> for LengthAndCapacity {
369    fn add_assign(&mut self, size: Self) {
370        self.length += size.length;
371        self.capacity += size.capacity;
372    }
373}
374
375impl AddAssign<(usize, usize)> for LengthAndCapacity {
376    fn add_assign(&mut self, (len, cap): (usize, usize)) {
377        self.length += len;
378        self.capacity += cap;
379    }
380}
381
382impl SubAssign<(usize, usize)> for LengthAndCapacity {
383    fn sub_assign(&mut self, (len, cap): (usize, usize)) {
384        self.length -= len;
385        self.capacity -= cap;
386    }
387}
388
389/// A vector that consolidates its contents.
390///
391/// The vector is filled with updates until it reaches capacity. At this point, the updates are
392/// consolidated to free up space. This process repeats until the consolidation recovered less than
393/// half of the vector's capacity, at which point the capacity is doubled.
394#[derive(Debug)]
395pub(crate) struct ConsolidatingVec<D> {
396    data: Vec<(D, Diff)>,
397    /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
398    /// might start smaller than this.
399    min_capacity: usize,
400    /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
401    ///
402    /// If consolidation didn't free enough space, at least a linear amount, increase the capacity
403    /// Setting this to 0 results in doubling whenever the list is at least half full.
404    /// Larger numbers result in more conservative approaches that use more CPU, but less memory.
405    growth_dampener: usize,
406}
407
408impl<D: Ord> ConsolidatingVec<D> {
409    /// Creates a new instance from the necessary configuration arguments.
410    pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
411        ConsolidatingVec {
412            data: Vec::new(),
413            min_capacity,
414            growth_dampener,
415        }
416    }
417
418    /// Return the length of the vector.
419    pub fn len(&self) -> usize {
420        self.data.len()
421    }
422
423    /// Return the capacity of the vector.
424    pub fn capacity(&self) -> usize {
425        self.data.capacity()
426    }
427
428    /// Pushes `item` into the vector.
429    ///
430    /// If the vector does not have sufficient capacity, we'll first consolidate and then increase
431    /// its capacity if the consolidated results still occupy a significant fraction of the vector.
432    ///
433    /// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
434    /// but amortizes to O(log n).
435    pub fn push(&mut self, item: (D, Diff)) {
436        let capacity = self.data.capacity();
437        if self.data.len() == capacity {
438            // The vector is full. First, consolidate to try to recover some space.
439            self.consolidate();
440
441            // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
442            // This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
443            let length = self.data.len();
444            let dampener = self.growth_dampener;
445            if capacity < length + length / (dampener + 1) {
446                // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
447                // determining the target capacity, and then reserving an amount that achieves this
448                // while working around the existing length.
449                let new_cap = capacity + capacity / (dampener + 1);
450                self.data.reserve_exact(new_cap - length);
451            }
452        }
453
454        self.data.push(item);
455    }
456
457    /// Consolidate the contents.
458    pub fn consolidate(&mut self) {
459        consolidate(&mut self.data);
460
461        // We may have the opportunity to reclaim allocated memory.
462        // Given that `push` will at most double the capacity when the vector is more than half full, and
463        // we want to avoid entering into a resizing cycle, we choose to only shrink if the
464        // vector's length is less than one fourth of its capacity.
465        if self.data.len() < self.data.capacity() / 4 {
466            self.data.shrink_to(self.min_capacity);
467        }
468    }
469
470    /// Return an iterator over the borrowed items.
471    pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
472        self.data.iter()
473    }
474
475    /// Returns mutable access to the underlying items.
476    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
477        self.data.iter_mut()
478    }
479}
480
481impl<D> IntoIterator for ConsolidatingVec<D> {
482    type Item = (D, Diff);
483    type IntoIter = std::vec::IntoIter<(D, Diff)>;
484
485    fn into_iter(self) -> Self::IntoIter {
486        self.data.into_iter()
487    }
488}
489
490impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
491    fn from_iter<I>(iter: I) -> Self
492    where
493        I: IntoIterator<Item = (D, Diff)>,
494    {
495        Self {
496            data: Vec::from_iter(iter),
497            min_capacity: 0,
498            growth_dampener: 0,
499        }
500    }
501}
502
503impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
504    fn extend<I>(&mut self, iter: I)
505    where
506        I: IntoIterator<Item = (D, Diff)>,
507    {
508        for item in iter {
509            self.push(item);
510        }
511    }
512}