1use std::collections::BTreeMap;
14use std::fmt;
15use std::num::NonZeroIsize;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17
18use differential_dataflow::consolidation::{consolidate, consolidate_updates};
19use differential_dataflow::logging::{BatchEvent, DropEvent};
20use itertools::Itertools;
21use mz_compute_types::dyncfgs::{
22 CONSOLIDATING_VEC_GROWTH_DAMPENER, CORRECTION_V2_CHAIN_PROPORTIONALITY,
23 CORRECTION_V2_CHUNK_SIZE, ENABLE_CORRECTION_V2,
24};
25use mz_dyncfg::ConfigSet;
26use mz_ore::iter::IteratorExt;
27use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
28use mz_repr::{Diff, Timestamp};
29use timely::PartialOrder;
30use timely::progress::Antichain;
31use tokio::sync::mpsc;
32
33use crate::logging::compute::{
34 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
35 ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
36 Logger as ComputeLogger,
37};
38use crate::sink::correction_v2::{CorrectionV2, Data};
39
40pub(super) enum Correction<D: Data> {
47 V1(CorrectionV1<D>),
48 V2(CorrectionV2<D>),
49}
50
51impl<D: Data> Correction<D> {
52 pub fn new(
54 metrics: SinkMetrics,
55 worker_metrics: SinkWorkerMetrics,
56 logging: Option<ChannelLogging>,
57 config: &ConfigSet,
58 ) -> Self {
59 if ENABLE_CORRECTION_V2.get(config) {
60 let prop = CORRECTION_V2_CHAIN_PROPORTIONALITY.get(config);
61 let chunk_size = CORRECTION_V2_CHUNK_SIZE.get(config);
62 Self::V2(CorrectionV2::new(
63 metrics,
64 worker_metrics,
65 logging,
66 prop,
67 chunk_size,
68 ))
69 } else {
70 let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
71 Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
72 }
73 }
74
75 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
77 match self {
78 Self::V1(c) => c.insert(updates),
79 Self::V2(c) => c.insert(updates),
80 }
81 }
82
83 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
85 match self {
86 Self::V1(c) => c.insert_negated(updates),
87 Self::V2(c) => c.insert_negated(updates),
88 }
89 }
90
91 pub fn updates_before(
93 &mut self,
94 upper: &Antichain<Timestamp>,
95 ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + Send + '_> {
96 match self {
97 Self::V1(c) => Box::new(c.updates_before(upper)),
98 Self::V2(c) => Box::new(c.updates_before(upper)),
99 }
100 }
101
102 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
108 match self {
109 Self::V1(c) => c.advance_since(since),
110 Self::V2(c) => c.advance_since(since),
111 }
112 }
113
114 pub fn consolidate_at_since(&mut self) {
116 match self {
117 Self::V1(c) => c.consolidate_at_since(),
118 Self::V2(c) => c.consolidate_at_since(),
119 }
120 }
121}
122
123pub(super) struct CorrectionV1<D> {
135 updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
137 since: Antichain<Timestamp>,
139
140 total_size: LengthAndCapacity,
144 metrics: SinkMetrics,
146 worker_metrics: SinkWorkerMetrics,
148 growth_dampener: usize,
150}
151
152impl<D> CorrectionV1<D> {
153 pub fn new(
155 metrics: SinkMetrics,
156 worker_metrics: SinkWorkerMetrics,
157 growth_dampener: usize,
158 ) -> Self {
159 Self {
160 updates: Default::default(),
161 since: Antichain::from_elem(Timestamp::MIN),
162 total_size: Default::default(),
163 metrics,
164 worker_metrics,
165 growth_dampener,
166 }
167 }
168
169 fn update_metrics(&mut self, new_size: LengthAndCapacity) {
171 let old_size = self.total_size;
172 let len_delta = UpdateDelta::new(new_size.length, old_size.length);
173 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
174 self.metrics
175 .report_correction_update_deltas(len_delta, cap_delta);
176 self.worker_metrics
177 .report_correction_update_totals(new_size.length, new_size.capacity);
178
179 self.total_size = new_size;
180 }
181}
182
183impl<D: Data> CorrectionV1<D> {
184 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
186 let Some(since_ts) = self.since.as_option() else {
187 updates.clear();
189 return;
190 };
191
192 for (_, time, _) in &mut *updates {
193 *time = std::cmp::max(*time, *since_ts);
194 }
195 self.insert_inner(updates);
196 }
197
198 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
200 let Some(since_ts) = self.since.as_option() else {
201 updates.clear();
203 return;
204 };
205
206 for (_, time, diff) in &mut *updates {
207 *time = std::cmp::max(*time, *since_ts);
208 *diff = -*diff;
209 }
210 self.insert_inner(updates);
211 }
212
213 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
217 consolidate_updates(updates);
218 updates.sort_unstable_by_key(|(_, time, _)| *time);
219
220 let mut new_size = self.total_size;
221 let mut updates = updates.drain(..).peekable();
222 while let Some(&(_, time, _)) = updates.peek() {
223 debug_assert!(
224 self.since.less_equal(&time),
225 "update not advanced by `since`"
226 );
227
228 let data = updates
229 .peeking_take_while(|(_, t, _)| *t == time)
230 .map(|(d, _, r)| (d, r));
231
232 use std::collections::btree_map::Entry;
233 match self.updates.entry(time) {
234 Entry::Vacant(entry) => {
235 let mut vec: ConsolidatingVec<_> = data.collect();
236 vec.growth_dampener = self.growth_dampener;
237 new_size += (vec.len(), vec.capacity());
238 entry.insert(vec);
239 }
240 Entry::Occupied(mut entry) => {
241 let vec = entry.get_mut();
242 new_size -= (vec.len(), vec.capacity());
243 vec.extend(data);
244 new_size += (vec.len(), vec.capacity());
245 }
246 }
247 }
248
249 self.update_metrics(new_size);
250 }
251
252 pub fn updates_within<'a>(
258 &'a mut self,
259 lower: &Antichain<Timestamp>,
260 upper: &Antichain<Timestamp>,
261 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
262 assert!(PartialOrder::less_equal(lower, upper));
263
264 let start = match lower.as_option() {
265 Some(ts) => Bound::Included(*ts),
266 None => Bound::Excluded(Timestamp::MAX),
267 };
268 let end = match upper.as_option() {
269 Some(ts) => Bound::Excluded(*ts),
270 None => Bound::Unbounded,
271 };
272
273 let update_count = self.consolidate((start, end));
274
275 let range = self.updates.range((start, end));
276 range
277 .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
278 .exact_size(update_count)
279 }
280
281 pub fn updates_before<'a>(
283 &'a mut self,
284 upper: &Antichain<Timestamp>,
285 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + Send + use<'a, D> {
286 let lower = Antichain::from_elem(Timestamp::MIN);
287 self.updates_within(&lower, upper)
288 }
289
290 fn consolidate<R>(&mut self, range: R) -> usize
294 where
295 R: RangeBounds<Timestamp>,
296 {
297 let mut new_size = self.total_size;
298
299 let updates = self.updates.range_mut(range);
300 let count = updates.fold(0, |acc, (_, data)| {
301 new_size -= (data.len(), data.capacity());
302 data.consolidate();
303 new_size += (data.len(), data.capacity());
304 acc + data.len()
305 });
306
307 self.update_metrics(new_size);
308 count
309 }
310
311 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
317 assert!(PartialOrder::less_equal(&self.since, &since));
318
319 if since != self.since {
320 self.advance_by(&since);
321 self.since = since;
322 }
323 }
324
325 pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
329 let Some(target_ts) = frontier.as_option() else {
330 self.updates.clear();
331 self.update_metrics(Default::default());
332 return;
333 };
334
335 let mut new_size = self.total_size;
336 while let Some((ts, data)) = self.updates.pop_first() {
337 if frontier.less_equal(&ts) {
338 self.updates.insert(ts, data);
340 break;
341 }
342
343 use std::collections::btree_map::Entry;
344 match self.updates.entry(*target_ts) {
345 Entry::Vacant(entry) => {
346 entry.insert(data);
347 }
348 Entry::Occupied(mut entry) => {
349 let vec = entry.get_mut();
350 new_size -= (data.len(), data.capacity());
351 new_size -= (vec.len(), vec.capacity());
352 vec.extend(data);
353 new_size += (vec.len(), vec.capacity());
354 }
355 }
356 }
357
358 self.update_metrics(new_size);
359 }
360
361 pub fn consolidate_at_since(&mut self) {
363 let Some(since_ts) = self.since.as_option() else {
364 return;
365 };
366
367 let start = Bound::Included(*since_ts);
368 let end = match since_ts.try_step_forward() {
369 Some(ts) => Bound::Excluded(ts),
370 None => Bound::Unbounded,
371 };
372
373 self.consolidate((start, end));
374 }
375}
376
377impl<D> Drop for CorrectionV1<D> {
378 fn drop(&mut self) {
379 self.update_metrics(Default::default());
380 }
381}
382
383#[derive(Clone, Copy, Debug, Default)]
385pub(super) struct LengthAndCapacity {
386 pub length: usize,
387 pub capacity: usize,
388}
389
390impl AddAssign<Self> for LengthAndCapacity {
391 fn add_assign(&mut self, size: Self) {
392 self.length += size.length;
393 self.capacity += size.capacity;
394 }
395}
396
397impl AddAssign<(usize, usize)> for LengthAndCapacity {
398 fn add_assign(&mut self, (len, cap): (usize, usize)) {
399 self.length += len;
400 self.capacity += cap;
401 }
402}
403
404impl SubAssign<(usize, usize)> for LengthAndCapacity {
405 fn sub_assign(&mut self, (len, cap): (usize, usize)) {
406 self.length -= len;
407 self.capacity -= cap;
408 }
409}
410
411#[derive(Debug)]
417pub(crate) struct ConsolidatingVec<D> {
418 data: Vec<(D, Diff)>,
419 min_capacity: usize,
422 growth_dampener: usize,
428}
429
430impl<D: Ord> ConsolidatingVec<D> {
431 pub fn len(&self) -> usize {
433 self.data.len()
434 }
435
436 pub fn capacity(&self) -> usize {
438 self.data.capacity()
439 }
440
441 pub fn push(&mut self, item: (D, Diff)) {
449 let capacity = self.data.capacity();
450 if self.data.len() == capacity {
451 self.consolidate();
453
454 let length = self.data.len();
457 let dampener = self.growth_dampener;
458 if capacity < length + length / (dampener + 1) {
459 let new_cap = capacity + capacity / (dampener + 1);
463 self.data.reserve_exact(new_cap - length);
464 }
465 }
466
467 self.data.push(item);
468 }
469
470 pub fn consolidate(&mut self) {
472 consolidate(&mut self.data);
473
474 if self.data.len() < self.data.capacity() / 4 {
479 self.data.shrink_to(self.min_capacity);
480 }
481 }
482
483 pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
485 self.data.iter()
486 }
487}
488
489impl<D> IntoIterator for ConsolidatingVec<D> {
490 type Item = (D, Diff);
491 type IntoIter = std::vec::IntoIter<(D, Diff)>;
492
493 fn into_iter(self) -> Self::IntoIter {
494 self.data.into_iter()
495 }
496}
497
498impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
499 fn from_iter<I>(iter: I) -> Self
500 where
501 I: IntoIterator<Item = (D, Diff)>,
502 {
503 Self {
504 data: Vec::from_iter(iter),
505 min_capacity: 0,
506 growth_dampener: 0,
507 }
508 }
509}
510
511impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
512 fn extend<I>(&mut self, iter: I)
513 where
514 I: IntoIterator<Item = (D, Diff)>,
515 {
516 for item in iter {
517 self.push(item);
518 }
519 }
520}
521
522#[derive(Clone, Copy, Debug, Default)]
524pub(super) struct SizeMetrics {
525 pub size: usize,
526 pub capacity: usize,
527 pub allocations: usize,
528}
529
530impl AddAssign<Self> for SizeMetrics {
531 fn add_assign(&mut self, other: Self) {
532 self.size += other.size;
533 self.capacity += other.capacity;
534 self.allocations += other.allocations;
535 }
536}
537
538#[derive(Debug)]
540pub(super) enum LoggingEvent {
541 ChainCreated(usize),
542 ChainDropped(usize),
543 SizeDiff(NonZeroIsize),
544 CapacityDiff(NonZeroIsize),
545 AllocationsDiff(NonZeroIsize),
546}
547
548#[derive(Clone, Debug)]
554pub(super) struct ChannelLogging(mpsc::UnboundedSender<LoggingEvent>);
555
556impl ChannelLogging {
557 pub fn new(tx: mpsc::UnboundedSender<LoggingEvent>) -> Self {
558 Self(tx)
559 }
560
561 pub fn chain_created(&self, updates: usize) {
562 let _ = self.0.send(LoggingEvent::ChainCreated(updates));
563 }
564
565 pub fn chain_dropped(&self, updates: usize) {
566 let _ = self.0.send(LoggingEvent::ChainDropped(updates));
567 }
568
569 pub fn report_size_diff(&self, diff: isize) {
570 if let Some(diff) = NonZeroIsize::new(diff) {
571 let _ = self.0.send(LoggingEvent::SizeDiff(diff));
572 }
573 }
574
575 pub fn report_capacity_diff(&self, diff: isize) {
576 if let Some(diff) = NonZeroIsize::new(diff) {
577 let _ = self.0.send(LoggingEvent::CapacityDiff(diff));
578 }
579 }
580
581 pub fn report_allocations_diff(&self, diff: isize) {
582 if let Some(diff) = NonZeroIsize::new(diff) {
583 let _ = self.0.send(LoggingEvent::AllocationsDiff(diff));
584 }
585 }
586}
587
588pub(super) struct CorrectionLogger {
597 compute_logger: ComputeLogger,
598 differential_logger: differential_dataflow::logging::Logger,
599 operator_id: usize,
600 rx: mpsc::UnboundedReceiver<LoggingEvent>,
601 net_batches: isize,
603 net_records: isize,
605 net_size: isize,
607 net_capacity: isize,
609 net_allocations: isize,
611}
612
613impl fmt::Debug for CorrectionLogger {
614 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615 f.debug_struct("CorrectionLogger")
616 .field("operator_id", &self.operator_id)
617 .finish_non_exhaustive()
618 }
619}
620
621impl CorrectionLogger {
622 pub fn new(
623 compute_logger: ComputeLogger,
624 differential_logger: differential_dataflow::logging::Logger,
625 operator_id: usize,
626 address: Vec<usize>,
627 rx: mpsc::UnboundedReceiver<LoggingEvent>,
628 ) -> Self {
629 compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
630 ArrangementHeapSizeOperator {
631 operator_id,
632 address,
633 },
634 ));
635
636 Self {
637 compute_logger,
638 differential_logger,
639 operator_id,
640 rx,
641 net_batches: 0,
642 net_records: 0,
643 net_size: 0,
644 net_capacity: 0,
645 net_allocations: 0,
646 }
647 }
648
649 pub fn apply_events(&mut self) {
651 use LoggingEvent::*;
652
653 while let Ok(event) = self.rx.try_recv() {
654 match event {
655 ChainCreated(length) => {
656 self.net_batches += 1;
657 self.net_records += isize::try_from(length).expect("must fit");
658 self.differential_logger.log(BatchEvent {
659 operator: self.operator_id,
660 length,
661 });
662 }
663 ChainDropped(length) => {
664 self.net_batches -= 1;
665 self.net_records -= isize::try_from(length).expect("must fit");
666 self.differential_logger.log(DropEvent {
667 operator: self.operator_id,
668 length,
669 });
670 }
671 SizeDiff(delta_size) => {
672 self.net_size += delta_size.get();
673 self.compute_logger.log(&ComputeEvent::ArrangementHeapSize(
674 ArrangementHeapSize {
675 operator_id: self.operator_id,
676 delta_size: delta_size.get(),
677 },
678 ));
679 }
680 CapacityDiff(delta_capacity) => {
681 self.net_capacity += delta_capacity.get();
682 self.compute_logger
683 .log(&ComputeEvent::ArrangementHeapCapacity(
684 ArrangementHeapCapacity {
685 operator_id: self.operator_id,
686 delta_capacity: delta_capacity.get(),
687 },
688 ));
689 }
690 AllocationsDiff(delta_allocations) => {
691 self.net_allocations += delta_allocations.get();
692 self.compute_logger
693 .log(&ComputeEvent::ArrangementHeapAllocations(
694 ArrangementHeapAllocations {
695 operator_id: self.operator_id,
696 delta_allocations: delta_allocations.get(),
697 },
698 ));
699 }
700 }
701 }
702 }
703}
704
705impl Drop for CorrectionLogger {
706 fn drop(&mut self) {
707 self.apply_events();
711
712 for i in 0..self.net_batches {
719 let length = if i == 0 {
720 usize::try_from(self.net_records).unwrap_or(0)
721 } else {
722 0
723 };
724 self.differential_logger.log(DropEvent {
725 operator: self.operator_id,
726 length,
727 });
728 }
729
730 if self.net_size != 0 {
732 self.compute_logger
733 .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
734 operator_id: self.operator_id,
735 delta_size: -self.net_size,
736 }));
737 }
738 if self.net_capacity != 0 {
739 self.compute_logger
740 .log(&ComputeEvent::ArrangementHeapCapacity(
741 ArrangementHeapCapacity {
742 operator_id: self.operator_id,
743 delta_capacity: -self.net_capacity,
744 },
745 ));
746 }
747 if self.net_allocations != 0 {
748 self.compute_logger
749 .log(&ComputeEvent::ArrangementHeapAllocations(
750 ArrangementHeapAllocations {
751 operator_id: self.operator_id,
752 delta_allocations: -self.net_allocations,
753 },
754 ));
755 }
756
757 self.compute_logger
758 .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
759 ArrangementHeapSizeOperatorDrop {
760 operator_id: self.operator_id,
761 },
762 ));
763 }
764}