1use std::collections::BTreeMap;
14use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
15
16use differential_dataflow::consolidation::{consolidate, consolidate_updates};
17use itertools::Itertools;
18use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
19use mz_dyncfg::ConfigSet;
20use mz_ore::iter::IteratorExt;
21use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
22use mz_repr::{Diff, Timestamp};
23use timely::PartialOrder;
24use timely::progress::Antichain;
25
26use crate::sink::correction_v2::{CorrectionV2, Data};
27
28pub(super) enum Correction<D: Data> {
35 V1(CorrectionV1<D>),
36 V2(CorrectionV2<D>),
37}
38
39impl<D: Data> Correction<D> {
40 pub fn new(
42 metrics: SinkMetrics,
43 worker_metrics: SinkWorkerMetrics,
44 config: &ConfigSet,
45 ) -> Self {
46 if ENABLE_CORRECTION_V2.get(config) {
47 Self::V2(CorrectionV2::new(metrics, worker_metrics))
48 } else {
49 let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
50 Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
51 }
52 }
53
54 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
56 match self {
57 Self::V1(c) => c.insert(updates),
58 Self::V2(c) => c.insert(updates),
59 }
60 }
61
62 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
64 match self {
65 Self::V1(c) => c.insert_negated(updates),
66 Self::V2(c) => c.insert_negated(updates),
67 }
68 }
69
70 pub fn updates_before(
72 &mut self,
73 upper: &Antichain<Timestamp>,
74 ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
75 match self {
76 Self::V1(c) => Box::new(c.updates_before(upper)),
77 Self::V2(c) => Box::new(c.updates_before(upper)),
78 }
79 }
80
81 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
87 match self {
88 Self::V1(c) => c.advance_since(since),
89 Self::V2(c) => c.advance_since(since),
90 }
91 }
92
93 pub fn consolidate_at_since(&mut self) {
95 match self {
96 Self::V1(c) => c.consolidate_at_since(),
97 Self::V2(c) => c.consolidate_at_since(),
98 }
99 }
100}
101
102pub(super) struct CorrectionV1<D> {
114 updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
116 since: Antichain<Timestamp>,
118
119 total_size: LengthAndCapacity,
123 metrics: SinkMetrics,
125 worker_metrics: SinkWorkerMetrics,
127 growth_dampener: usize,
129}
130
131impl<D> CorrectionV1<D> {
132 pub fn new(
134 metrics: SinkMetrics,
135 worker_metrics: SinkWorkerMetrics,
136 growth_dampener: usize,
137 ) -> Self {
138 Self {
139 updates: Default::default(),
140 since: Antichain::from_elem(Timestamp::MIN),
141 total_size: Default::default(),
142 metrics,
143 worker_metrics,
144 growth_dampener,
145 }
146 }
147
148 fn update_metrics(&mut self, new_size: LengthAndCapacity) {
150 let old_size = self.total_size;
151 let len_delta = UpdateDelta::new(new_size.length, old_size.length);
152 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
153 self.metrics
154 .report_correction_update_deltas(len_delta, cap_delta);
155 self.worker_metrics
156 .report_correction_update_totals(new_size.length, new_size.capacity);
157
158 self.total_size = new_size;
159 }
160}
161
162impl<D: Data> CorrectionV1<D> {
163 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
165 let Some(since_ts) = self.since.as_option() else {
166 return;
168 };
169
170 for (_, time, _) in &mut *updates {
171 *time = std::cmp::max(*time, *since_ts);
172 }
173 self.insert_inner(updates);
174 }
175
176 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
178 let Some(since_ts) = self.since.as_option() else {
179 updates.clear();
181 return;
182 };
183
184 for (_, time, diff) in &mut *updates {
185 *time = std::cmp::max(*time, *since_ts);
186 *diff = -*diff;
187 }
188 self.insert_inner(updates);
189 }
190
191 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
195 consolidate_updates(updates);
196 updates.sort_unstable_by_key(|(_, time, _)| *time);
197
198 let mut new_size = self.total_size;
199 let mut updates = updates.drain(..).peekable();
200 while let Some(&(_, time, _)) = updates.peek() {
201 debug_assert!(
202 self.since.less_equal(&time),
203 "update not advanced by `since`"
204 );
205
206 let data = updates
207 .peeking_take_while(|(_, t, _)| *t == time)
208 .map(|(d, _, r)| (d, r));
209
210 use std::collections::btree_map::Entry;
211 match self.updates.entry(time) {
212 Entry::Vacant(entry) => {
213 let mut vec: ConsolidatingVec<_> = data.collect();
214 vec.growth_dampener = self.growth_dampener;
215 new_size += (vec.len(), vec.capacity());
216 entry.insert(vec);
217 }
218 Entry::Occupied(mut entry) => {
219 let vec = entry.get_mut();
220 new_size -= (vec.len(), vec.capacity());
221 vec.extend(data);
222 new_size += (vec.len(), vec.capacity());
223 }
224 }
225 }
226
227 self.update_metrics(new_size);
228 }
229
230 pub fn updates_within<'a>(
236 &'a mut self,
237 lower: &Antichain<Timestamp>,
238 upper: &Antichain<Timestamp>,
239 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
240 assert!(PartialOrder::less_equal(lower, upper));
241
242 let start = match lower.as_option() {
243 Some(ts) => Bound::Included(*ts),
244 None => Bound::Excluded(Timestamp::MAX),
245 };
246 let end = match upper.as_option() {
247 Some(ts) => Bound::Excluded(*ts),
248 None => Bound::Unbounded,
249 };
250
251 let update_count = self.consolidate((start, end));
252
253 let range = self.updates.range((start, end));
254 range
255 .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
256 .exact_size(update_count)
257 }
258
259 pub fn updates_before<'a>(
261 &'a mut self,
262 upper: &Antichain<Timestamp>,
263 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
264 let lower = Antichain::from_elem(Timestamp::MIN);
265 self.updates_within(&lower, upper)
266 }
267
268 fn consolidate<R>(&mut self, range: R) -> usize
272 where
273 R: RangeBounds<Timestamp>,
274 {
275 let mut new_size = self.total_size;
276
277 let updates = self.updates.range_mut(range);
278 let count = updates.fold(0, |acc, (_, data)| {
279 new_size -= (data.len(), data.capacity());
280 data.consolidate();
281 new_size += (data.len(), data.capacity());
282 acc + data.len()
283 });
284
285 self.update_metrics(new_size);
286 count
287 }
288
289 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
295 assert!(PartialOrder::less_equal(&self.since, &since));
296
297 if since != self.since {
298 self.advance_by(&since);
299 self.since = since;
300 }
301 }
302
303 pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
307 let Some(target_ts) = frontier.as_option() else {
308 self.updates.clear();
309 self.update_metrics(Default::default());
310 return;
311 };
312
313 let mut new_size = self.total_size;
314 while let Some((ts, data)) = self.updates.pop_first() {
315 if frontier.less_equal(&ts) {
316 self.updates.insert(ts, data);
318 break;
319 }
320
321 use std::collections::btree_map::Entry;
322 match self.updates.entry(*target_ts) {
323 Entry::Vacant(entry) => {
324 entry.insert(data);
325 }
326 Entry::Occupied(mut entry) => {
327 let vec = entry.get_mut();
328 new_size -= (data.len(), data.capacity());
329 new_size -= (vec.len(), vec.capacity());
330 vec.extend(data);
331 new_size += (vec.len(), vec.capacity());
332 }
333 }
334 }
335
336 self.update_metrics(new_size);
337 }
338
339 pub fn consolidate_at_since(&mut self) {
341 let Some(since_ts) = self.since.as_option() else {
342 return;
343 };
344
345 let start = Bound::Included(*since_ts);
346 let end = match since_ts.try_step_forward() {
347 Some(ts) => Bound::Excluded(ts),
348 None => Bound::Unbounded,
349 };
350
351 self.consolidate((start, end));
352 }
353}
354
355impl<D> Drop for CorrectionV1<D> {
356 fn drop(&mut self) {
357 self.update_metrics(Default::default());
358 }
359}
360
361#[derive(Clone, Copy, Debug, Default)]
363pub(super) struct LengthAndCapacity {
364 pub length: usize,
365 pub capacity: usize,
366}
367
368impl AddAssign<Self> for LengthAndCapacity {
369 fn add_assign(&mut self, size: Self) {
370 self.length += size.length;
371 self.capacity += size.capacity;
372 }
373}
374
375impl AddAssign<(usize, usize)> for LengthAndCapacity {
376 fn add_assign(&mut self, (len, cap): (usize, usize)) {
377 self.length += len;
378 self.capacity += cap;
379 }
380}
381
382impl SubAssign<(usize, usize)> for LengthAndCapacity {
383 fn sub_assign(&mut self, (len, cap): (usize, usize)) {
384 self.length -= len;
385 self.capacity -= cap;
386 }
387}
388
389#[derive(Debug)]
395pub(crate) struct ConsolidatingVec<D> {
396 data: Vec<(D, Diff)>,
397 min_capacity: usize,
400 growth_dampener: usize,
406}
407
408impl<D: Ord> ConsolidatingVec<D> {
409 pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
411 ConsolidatingVec {
412 data: Vec::new(),
413 min_capacity,
414 growth_dampener,
415 }
416 }
417
418 pub fn len(&self) -> usize {
420 self.data.len()
421 }
422
423 pub fn capacity(&self) -> usize {
425 self.data.capacity()
426 }
427
428 pub fn push(&mut self, item: (D, Diff)) {
436 let capacity = self.data.capacity();
437 if self.data.len() == capacity {
438 self.consolidate();
440
441 let length = self.data.len();
444 let dampener = self.growth_dampener;
445 if capacity < length + length / (dampener + 1) {
446 let new_cap = capacity + capacity / (dampener + 1);
450 self.data.reserve_exact(new_cap - length);
451 }
452 }
453
454 self.data.push(item);
455 }
456
457 pub fn consolidate(&mut self) {
459 consolidate(&mut self.data);
460
461 if self.data.len() < self.data.capacity() / 4 {
466 self.data.shrink_to(self.min_capacity);
467 }
468 }
469
470 pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
472 self.data.iter()
473 }
474
475 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
477 self.data.iter_mut()
478 }
479}
480
481impl<D> IntoIterator for ConsolidatingVec<D> {
482 type Item = (D, Diff);
483 type IntoIter = std::vec::IntoIter<(D, Diff)>;
484
485 fn into_iter(self) -> Self::IntoIter {
486 self.data.into_iter()
487 }
488}
489
490impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
491 fn from_iter<I>(iter: I) -> Self
492 where
493 I: IntoIterator<Item = (D, Diff)>,
494 {
495 Self {
496 data: Vec::from_iter(iter),
497 min_capacity: 0,
498 growth_dampener: 0,
499 }
500 }
501}
502
503impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
504 fn extend<I>(&mut self, iter: I)
505 where
506 I: IntoIterator<Item = (D, Diff)>,
507 {
508 for item in iter {
509 self.push(item);
510 }
511 }
512}