1use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17use std::rc::Rc;
18
19use differential_dataflow::consolidation::{consolidate, consolidate_updates};
20use differential_dataflow::logging::{BatchEvent, DropEvent};
21use itertools::Itertools;
22use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
23use mz_dyncfg::ConfigSet;
24use mz_ore::iter::IteratorExt;
25use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
26use mz_repr::{Diff, Timestamp};
27use timely::PartialOrder;
28use timely::progress::Antichain;
29
30use crate::logging::compute::{
31 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
32 ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
33 Logger as ComputeLogger,
34};
35use crate::sink::correction_v2::{CorrectionV2, Data};
36
37pub(super) enum Correction<D: Data> {
44 V1(CorrectionV1<D>),
45 V2(CorrectionV2<D>),
46}
47
48impl<D: Data> Correction<D> {
49 pub fn new(
51 metrics: SinkMetrics,
52 worker_metrics: SinkWorkerMetrics,
53 logging: Option<Logging>,
54 config: &ConfigSet,
55 ) -> Self {
56 if ENABLE_CORRECTION_V2.get(config) {
57 Self::V2(CorrectionV2::new(metrics, worker_metrics, logging))
58 } else {
59 let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
60 Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
61 }
62 }
63
64 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
66 match self {
67 Self::V1(c) => c.insert(updates),
68 Self::V2(c) => c.insert(updates),
69 }
70 }
71
72 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
74 match self {
75 Self::V1(c) => c.insert_negated(updates),
76 Self::V2(c) => c.insert_negated(updates),
77 }
78 }
79
80 pub fn updates_before(
82 &mut self,
83 upper: &Antichain<Timestamp>,
84 ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
85 match self {
86 Self::V1(c) => Box::new(c.updates_before(upper)),
87 Self::V2(c) => Box::new(c.updates_before(upper)),
88 }
89 }
90
91 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
97 match self {
98 Self::V1(c) => c.advance_since(since),
99 Self::V2(c) => c.advance_since(since),
100 }
101 }
102
103 pub fn consolidate_at_since(&mut self) {
105 match self {
106 Self::V1(c) => c.consolidate_at_since(),
107 Self::V2(c) => c.consolidate_at_since(),
108 }
109 }
110}
111
112pub(super) struct CorrectionV1<D> {
124 updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
126 since: Antichain<Timestamp>,
128
129 total_size: LengthAndCapacity,
133 metrics: SinkMetrics,
135 worker_metrics: SinkWorkerMetrics,
137 growth_dampener: usize,
139}
140
141impl<D> CorrectionV1<D> {
142 pub fn new(
144 metrics: SinkMetrics,
145 worker_metrics: SinkWorkerMetrics,
146 growth_dampener: usize,
147 ) -> Self {
148 Self {
149 updates: Default::default(),
150 since: Antichain::from_elem(Timestamp::MIN),
151 total_size: Default::default(),
152 metrics,
153 worker_metrics,
154 growth_dampener,
155 }
156 }
157
158 fn update_metrics(&mut self, new_size: LengthAndCapacity) {
160 let old_size = self.total_size;
161 let len_delta = UpdateDelta::new(new_size.length, old_size.length);
162 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
163 self.metrics
164 .report_correction_update_deltas(len_delta, cap_delta);
165 self.worker_metrics
166 .report_correction_update_totals(new_size.length, new_size.capacity);
167
168 self.total_size = new_size;
169 }
170}
171
172impl<D: Data> CorrectionV1<D> {
173 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
175 let Some(since_ts) = self.since.as_option() else {
176 updates.clear();
178 return;
179 };
180
181 for (_, time, _) in &mut *updates {
182 *time = std::cmp::max(*time, *since_ts);
183 }
184 self.insert_inner(updates);
185 }
186
187 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
189 let Some(since_ts) = self.since.as_option() else {
190 updates.clear();
192 return;
193 };
194
195 for (_, time, diff) in &mut *updates {
196 *time = std::cmp::max(*time, *since_ts);
197 *diff = -*diff;
198 }
199 self.insert_inner(updates);
200 }
201
202 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
206 consolidate_updates(updates);
207 updates.sort_unstable_by_key(|(_, time, _)| *time);
208
209 let mut new_size = self.total_size;
210 let mut updates = updates.drain(..).peekable();
211 while let Some(&(_, time, _)) = updates.peek() {
212 debug_assert!(
213 self.since.less_equal(&time),
214 "update not advanced by `since`"
215 );
216
217 let data = updates
218 .peeking_take_while(|(_, t, _)| *t == time)
219 .map(|(d, _, r)| (d, r));
220
221 use std::collections::btree_map::Entry;
222 match self.updates.entry(time) {
223 Entry::Vacant(entry) => {
224 let mut vec: ConsolidatingVec<_> = data.collect();
225 vec.growth_dampener = self.growth_dampener;
226 new_size += (vec.len(), vec.capacity());
227 entry.insert(vec);
228 }
229 Entry::Occupied(mut entry) => {
230 let vec = entry.get_mut();
231 new_size -= (vec.len(), vec.capacity());
232 vec.extend(data);
233 new_size += (vec.len(), vec.capacity());
234 }
235 }
236 }
237
238 self.update_metrics(new_size);
239 }
240
241 pub fn updates_within<'a>(
247 &'a mut self,
248 lower: &Antichain<Timestamp>,
249 upper: &Antichain<Timestamp>,
250 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
251 assert!(PartialOrder::less_equal(lower, upper));
252
253 let start = match lower.as_option() {
254 Some(ts) => Bound::Included(*ts),
255 None => Bound::Excluded(Timestamp::MAX),
256 };
257 let end = match upper.as_option() {
258 Some(ts) => Bound::Excluded(*ts),
259 None => Bound::Unbounded,
260 };
261
262 let update_count = self.consolidate((start, end));
263
264 let range = self.updates.range((start, end));
265 range
266 .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
267 .exact_size(update_count)
268 }
269
270 pub fn updates_before<'a>(
272 &'a mut self,
273 upper: &Antichain<Timestamp>,
274 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
275 let lower = Antichain::from_elem(Timestamp::MIN);
276 self.updates_within(&lower, upper)
277 }
278
279 fn consolidate<R>(&mut self, range: R) -> usize
283 where
284 R: RangeBounds<Timestamp>,
285 {
286 let mut new_size = self.total_size;
287
288 let updates = self.updates.range_mut(range);
289 let count = updates.fold(0, |acc, (_, data)| {
290 new_size -= (data.len(), data.capacity());
291 data.consolidate();
292 new_size += (data.len(), data.capacity());
293 acc + data.len()
294 });
295
296 self.update_metrics(new_size);
297 count
298 }
299
300 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
306 assert!(PartialOrder::less_equal(&self.since, &since));
307
308 if since != self.since {
309 self.advance_by(&since);
310 self.since = since;
311 }
312 }
313
314 pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
318 let Some(target_ts) = frontier.as_option() else {
319 self.updates.clear();
320 self.update_metrics(Default::default());
321 return;
322 };
323
324 let mut new_size = self.total_size;
325 while let Some((ts, data)) = self.updates.pop_first() {
326 if frontier.less_equal(&ts) {
327 self.updates.insert(ts, data);
329 break;
330 }
331
332 use std::collections::btree_map::Entry;
333 match self.updates.entry(*target_ts) {
334 Entry::Vacant(entry) => {
335 entry.insert(data);
336 }
337 Entry::Occupied(mut entry) => {
338 let vec = entry.get_mut();
339 new_size -= (data.len(), data.capacity());
340 new_size -= (vec.len(), vec.capacity());
341 vec.extend(data);
342 new_size += (vec.len(), vec.capacity());
343 }
344 }
345 }
346
347 self.update_metrics(new_size);
348 }
349
350 pub fn consolidate_at_since(&mut self) {
352 let Some(since_ts) = self.since.as_option() else {
353 return;
354 };
355
356 let start = Bound::Included(*since_ts);
357 let end = match since_ts.try_step_forward() {
358 Some(ts) => Bound::Excluded(ts),
359 None => Bound::Unbounded,
360 };
361
362 self.consolidate((start, end));
363 }
364}
365
366impl<D> Drop for CorrectionV1<D> {
367 fn drop(&mut self) {
368 self.update_metrics(Default::default());
369 }
370}
371
372#[derive(Clone, Copy, Debug, Default)]
374pub(super) struct LengthAndCapacity {
375 pub length: usize,
376 pub capacity: usize,
377}
378
379impl AddAssign<Self> for LengthAndCapacity {
380 fn add_assign(&mut self, size: Self) {
381 self.length += size.length;
382 self.capacity += size.capacity;
383 }
384}
385
386impl AddAssign<(usize, usize)> for LengthAndCapacity {
387 fn add_assign(&mut self, (len, cap): (usize, usize)) {
388 self.length += len;
389 self.capacity += cap;
390 }
391}
392
393impl SubAssign<(usize, usize)> for LengthAndCapacity {
394 fn sub_assign(&mut self, (len, cap): (usize, usize)) {
395 self.length -= len;
396 self.capacity -= cap;
397 }
398}
399
400#[derive(Debug)]
406pub(crate) struct ConsolidatingVec<D> {
407 data: Vec<(D, Diff)>,
408 min_capacity: usize,
411 growth_dampener: usize,
417}
418
419impl<D: Ord> ConsolidatingVec<D> {
420 pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
422 ConsolidatingVec {
423 data: Vec::new(),
424 min_capacity,
425 growth_dampener,
426 }
427 }
428
429 pub fn len(&self) -> usize {
431 self.data.len()
432 }
433
434 pub fn capacity(&self) -> usize {
436 self.data.capacity()
437 }
438
439 pub fn push(&mut self, item: (D, Diff)) {
447 let capacity = self.data.capacity();
448 if self.data.len() == capacity {
449 self.consolidate();
451
452 let length = self.data.len();
455 let dampener = self.growth_dampener;
456 if capacity < length + length / (dampener + 1) {
457 let new_cap = capacity + capacity / (dampener + 1);
461 self.data.reserve_exact(new_cap - length);
462 }
463 }
464
465 self.data.push(item);
466 }
467
468 pub fn consolidate(&mut self) {
470 consolidate(&mut self.data);
471
472 if self.data.len() < self.data.capacity() / 4 {
477 self.data.shrink_to(self.min_capacity);
478 }
479 }
480
481 pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
483 self.data.iter()
484 }
485
486 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
488 self.data.iter_mut()
489 }
490}
491
492impl<D> IntoIterator for ConsolidatingVec<D> {
493 type Item = (D, Diff);
494 type IntoIter = std::vec::IntoIter<(D, Diff)>;
495
496 fn into_iter(self) -> Self::IntoIter {
497 self.data.into_iter()
498 }
499}
500
501impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
502 fn from_iter<I>(iter: I) -> Self
503 where
504 I: IntoIterator<Item = (D, Diff)>,
505 {
506 Self {
507 data: Vec::from_iter(iter),
508 min_capacity: 0,
509 growth_dampener: 0,
510 }
511 }
512}
513
514impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
515 fn extend<I>(&mut self, iter: I)
516 where
517 I: IntoIterator<Item = (D, Diff)>,
518 {
519 for item in iter {
520 self.push(item);
521 }
522 }
523}
524
525#[derive(Clone, Copy, Debug, Default)]
527pub(super) struct SizeMetrics {
528 pub size: usize,
529 pub capacity: usize,
530 pub allocations: usize,
531}
532
533impl AddAssign<Self> for SizeMetrics {
534 fn add_assign(&mut self, other: Self) {
535 self.size += other.size;
536 self.capacity += other.capacity;
537 self.allocations += other.allocations;
538 }
539}
540
541#[derive(Clone, Debug)]
547pub(super) struct Logging(Rc<RefCell<LoggingInner>>);
548
549impl Logging {
550 pub fn new(
551 compute_logger: ComputeLogger,
552 differential_logger: differential_dataflow::logging::Logger,
553 operator_id: usize,
554 address: Vec<usize>,
555 ) -> Self {
556 let inner = LoggingInner::new(compute_logger, differential_logger, operator_id, address);
557 Self(Rc::new(RefCell::new(inner)))
558 }
559
560 pub fn chain_created(&self, updates: usize) {
562 self.0.borrow_mut().chain_created(updates);
563 }
564
565 pub fn chain_dropped(&self, updates: usize) {
567 self.0.borrow_mut().chain_dropped(updates);
568 }
569
570 pub fn report_size_diff(&self, diff: isize) {
572 self.0.borrow_mut().report_size_diff(diff);
573 }
574
575 pub fn report_capacity_diff(&self, diff: isize) {
577 self.0.borrow_mut().report_capacity_diff(diff);
578 }
579
580 pub fn report_allocations_diff(&self, diff: isize) {
582 self.0.borrow_mut().report_allocations_diff(diff);
583 }
584}
585
586struct LoggingInner {
592 compute_logger: ComputeLogger,
593 differential_logger: differential_dataflow::logging::Logger,
594 operator_id: usize,
595}
596
597impl fmt::Debug for LoggingInner {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 f.debug_struct("LoggingInner")
600 .field("operator_id", &self.operator_id)
601 .finish_non_exhaustive()
602 }
603}
604
605impl LoggingInner {
606 fn new(
607 compute_logger: ComputeLogger,
608 differential_logger: differential_dataflow::logging::Logger,
609 operator_id: usize,
610 address: Vec<usize>,
611 ) -> Self {
612 compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
613 ArrangementHeapSizeOperator {
614 operator_id,
615 address,
616 },
617 ));
618
619 Self {
620 compute_logger,
621 differential_logger,
622 operator_id,
623 }
624 }
625
626 fn chain_created(&self, updates: usize) {
627 self.differential_logger.log(BatchEvent {
628 operator: self.operator_id,
629 length: updates,
630 });
631 }
632
633 fn chain_dropped(&self, updates: usize) {
634 self.differential_logger.log(DropEvent {
635 operator: self.operator_id,
636 length: updates,
637 });
638 }
639
640 fn report_size_diff(&self, diff: isize) {
641 if diff != 0 {
642 self.compute_logger
643 .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
644 operator_id: self.operator_id,
645 delta_size: diff,
646 }));
647 }
648 }
649
650 fn report_capacity_diff(&self, diff: isize) {
651 if diff != 0 {
652 self.compute_logger
653 .log(&ComputeEvent::ArrangementHeapCapacity(
654 ArrangementHeapCapacity {
655 operator_id: self.operator_id,
656 delta_capacity: diff,
657 },
658 ));
659 }
660 }
661
662 fn report_allocations_diff(&self, diff: isize) {
663 if diff != 0 {
664 self.compute_logger
665 .log(&ComputeEvent::ArrangementHeapAllocations(
666 ArrangementHeapAllocations {
667 operator_id: self.operator_id,
668 delta_allocations: diff,
669 },
670 ));
671 }
672 }
673}
674
675impl Drop for LoggingInner {
676 fn drop(&mut self) {
677 self.compute_logger
678 .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
679 ArrangementHeapSizeOperatorDrop {
680 operator_id: self.operator_id,
681 },
682 ));
683 }
684}