1use std::collections::BTreeMap;
14use std::fmt;
15use std::num::NonZeroIsize;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17
18use differential_dataflow::consolidation::{consolidate, consolidate_updates};
19use differential_dataflow::logging::{BatchEvent, DropEvent};
20use itertools::Itertools;
21use mz_compute_types::dyncfgs::{
22 CONSOLIDATING_VEC_GROWTH_DAMPENER, CORRECTION_V2_CHAIN_PROPORTIONALITY,
23 CORRECTION_V2_CHUNK_SIZE, ENABLE_CORRECTION_V2,
24};
25use mz_dyncfg::ConfigSet;
26use mz_ore::iter::IteratorExt;
27use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
28use mz_repr::{Diff, Timestamp};
29use timely::PartialOrder;
30use timely::progress::Antichain;
31use tokio::sync::mpsc;
32
33use crate::logging::compute::{
34 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
35 ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
36 Logger as ComputeLogger,
37};
38use crate::sink::correction_v2::{CorrectionV2, Data};
39
40pub enum Correction<D: Data> {
47 V1(CorrectionV1<D>),
49 V2(CorrectionV2<D>),
51}
52
53impl<D: Data> Correction<D> {
54 pub fn new(
56 metrics: SinkMetrics,
57 worker_metrics: SinkWorkerMetrics,
58 logging: Option<ChannelLogging>,
59 config: &ConfigSet,
60 ) -> Self {
61 if ENABLE_CORRECTION_V2.get(config) {
62 let prop = CORRECTION_V2_CHAIN_PROPORTIONALITY.get(config);
63 let chunk_size = CORRECTION_V2_CHUNK_SIZE.get(config);
64 Self::V2(CorrectionV2::new(
65 metrics,
66 worker_metrics,
67 logging,
68 prop,
69 chunk_size,
70 ))
71 } else {
72 let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
73 Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
74 }
75 }
76
77 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
79 match self {
80 Self::V1(c) => c.insert(updates),
81 Self::V2(c) => c.insert(updates),
82 }
83 }
84
85 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
87 match self {
88 Self::V1(c) => c.insert_negated(updates),
89 Self::V2(c) => c.insert_negated(updates),
90 }
91 }
92
93 pub fn updates_before(
95 &mut self,
96 upper: &Antichain<Timestamp>,
97 ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + Send + '_> {
98 match self {
99 Self::V1(c) => Box::new(c.updates_before(upper)),
100 Self::V2(c) => Box::new(c.updates_before(upper)),
101 }
102 }
103
104 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
110 match self {
111 Self::V1(c) => c.advance_since(since),
112 Self::V2(c) => c.advance_since(since),
113 }
114 }
115
116 pub fn consolidate_at_since(&mut self) {
118 match self {
119 Self::V1(c) => c.consolidate_at_since(),
120 Self::V2(c) => c.consolidate_at_since(),
121 }
122 }
123}
124
125pub struct CorrectionV1<D> {
137 updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
139 since: Antichain<Timestamp>,
141
142 total_size: LengthAndCapacity,
146 metrics: SinkMetrics,
148 worker_metrics: SinkWorkerMetrics,
150 growth_dampener: usize,
152}
153
154impl<D> CorrectionV1<D> {
155 pub fn new(
157 metrics: SinkMetrics,
158 worker_metrics: SinkWorkerMetrics,
159 growth_dampener: usize,
160 ) -> Self {
161 Self {
162 updates: Default::default(),
163 since: Antichain::from_elem(Timestamp::MIN),
164 total_size: Default::default(),
165 metrics,
166 worker_metrics,
167 growth_dampener,
168 }
169 }
170
171 fn update_metrics(&mut self, new_size: LengthAndCapacity) {
173 let old_size = self.total_size;
174 let len_delta = UpdateDelta::new(new_size.length, old_size.length);
175 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
176 self.metrics
177 .report_correction_update_deltas(len_delta, cap_delta);
178 self.worker_metrics
179 .report_correction_update_totals(new_size.length, new_size.capacity);
180
181 self.total_size = new_size;
182 }
183}
184
185impl<D: Data> CorrectionV1<D> {
186 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
188 let Some(since_ts) = self.since.as_option() else {
189 updates.clear();
191 return;
192 };
193
194 for (_, time, _) in &mut *updates {
195 *time = std::cmp::max(*time, *since_ts);
196 }
197 self.insert_inner(updates);
198 }
199
200 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
202 let Some(since_ts) = self.since.as_option() else {
203 updates.clear();
205 return;
206 };
207
208 for (_, time, diff) in &mut *updates {
209 *time = std::cmp::max(*time, *since_ts);
210 *diff = -*diff;
211 }
212 self.insert_inner(updates);
213 }
214
215 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
219 consolidate_updates(updates);
220 updates.sort_unstable_by_key(|(_, time, _)| *time);
221
222 let mut new_size = self.total_size;
223 let mut updates = updates.drain(..).peekable();
224 while let Some(&(_, time, _)) = updates.peek() {
225 debug_assert!(
226 self.since.less_equal(&time),
227 "update not advanced by `since`"
228 );
229
230 let data = updates
231 .peeking_take_while(|(_, t, _)| *t == time)
232 .map(|(d, _, r)| (d, r));
233
234 use std::collections::btree_map::Entry;
235 match self.updates.entry(time) {
236 Entry::Vacant(entry) => {
237 let mut vec: ConsolidatingVec<_> = data.collect();
238 vec.growth_dampener = self.growth_dampener;
239 new_size += (vec.len(), vec.capacity());
240 entry.insert(vec);
241 }
242 Entry::Occupied(mut entry) => {
243 let vec = entry.get_mut();
244 new_size -= (vec.len(), vec.capacity());
245 vec.extend(data);
246 new_size += (vec.len(), vec.capacity());
247 }
248 }
249 }
250
251 self.update_metrics(new_size);
252 }
253
254 pub fn updates_within<'a>(
260 &'a mut self,
261 lower: &Antichain<Timestamp>,
262 upper: &Antichain<Timestamp>,
263 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
264 assert!(PartialOrder::less_equal(lower, upper));
265
266 let start = match lower.as_option() {
267 Some(ts) => Bound::Included(*ts),
268 None => Bound::Excluded(Timestamp::MAX),
269 };
270 let end = match upper.as_option() {
271 Some(ts) => Bound::Excluded(*ts),
272 None => Bound::Unbounded,
273 };
274
275 let update_count = self.consolidate((start, end));
276
277 let range = self.updates.range((start, end));
278 range
279 .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
280 .exact_size(update_count)
281 }
282
283 pub fn updates_before<'a>(
285 &'a mut self,
286 upper: &Antichain<Timestamp>,
287 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + Send + use<'a, D> {
288 let lower = Antichain::from_elem(Timestamp::MIN);
289 self.updates_within(&lower, upper)
290 }
291
292 fn consolidate<R>(&mut self, range: R) -> usize
296 where
297 R: RangeBounds<Timestamp>,
298 {
299 let mut new_size = self.total_size;
300
301 let updates = self.updates.range_mut(range);
302 let count = updates.fold(0, |acc, (_, data)| {
303 new_size -= (data.len(), data.capacity());
304 data.consolidate();
305 new_size += (data.len(), data.capacity());
306 acc + data.len()
307 });
308
309 self.update_metrics(new_size);
310 count
311 }
312
313 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
319 assert!(PartialOrder::less_equal(&self.since, &since));
320
321 if since != self.since {
322 self.advance_by(&since);
323 self.since = since;
324 }
325 }
326
327 pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
331 let Some(target_ts) = frontier.as_option() else {
332 self.updates.clear();
333 self.update_metrics(Default::default());
334 return;
335 };
336
337 let mut new_size = self.total_size;
338 while let Some((ts, data)) = self.updates.pop_first() {
339 if frontier.less_equal(&ts) {
340 self.updates.insert(ts, data);
342 break;
343 }
344
345 use std::collections::btree_map::Entry;
346 match self.updates.entry(*target_ts) {
347 Entry::Vacant(entry) => {
348 entry.insert(data);
349 }
350 Entry::Occupied(mut entry) => {
351 let vec = entry.get_mut();
352 new_size -= (data.len(), data.capacity());
353 new_size -= (vec.len(), vec.capacity());
354 vec.extend(data);
355 new_size += (vec.len(), vec.capacity());
356 }
357 }
358 }
359
360 self.update_metrics(new_size);
361 }
362
363 pub fn consolidate_at_since(&mut self) {
365 let Some(since_ts) = self.since.as_option() else {
366 return;
367 };
368
369 let start = Bound::Included(*since_ts);
370 let end = match since_ts.try_step_forward() {
371 Some(ts) => Bound::Excluded(ts),
372 None => Bound::Unbounded,
373 };
374
375 self.consolidate((start, end));
376 }
377}
378
379impl<D> Drop for CorrectionV1<D> {
380 fn drop(&mut self) {
381 self.update_metrics(Default::default());
382 }
383}
384
385#[derive(Clone, Copy, Debug, Default)]
387pub(super) struct LengthAndCapacity {
388 pub length: usize,
389 pub capacity: usize,
390}
391
392impl AddAssign<Self> for LengthAndCapacity {
393 fn add_assign(&mut self, size: Self) {
394 self.length += size.length;
395 self.capacity += size.capacity;
396 }
397}
398
399impl AddAssign<(usize, usize)> for LengthAndCapacity {
400 fn add_assign(&mut self, (len, cap): (usize, usize)) {
401 self.length += len;
402 self.capacity += cap;
403 }
404}
405
406impl SubAssign<(usize, usize)> for LengthAndCapacity {
407 fn sub_assign(&mut self, (len, cap): (usize, usize)) {
408 self.length -= len;
409 self.capacity -= cap;
410 }
411}
412
413#[derive(Debug)]
419pub(crate) struct ConsolidatingVec<D> {
420 data: Vec<(D, Diff)>,
421 min_capacity: usize,
424 growth_dampener: usize,
430}
431
432impl<D: Ord> ConsolidatingVec<D> {
433 pub fn len(&self) -> usize {
435 self.data.len()
436 }
437
438 pub fn capacity(&self) -> usize {
440 self.data.capacity()
441 }
442
443 pub fn push(&mut self, item: (D, Diff)) {
451 let capacity = self.data.capacity();
452 if self.data.len() == capacity {
453 self.consolidate();
455
456 let length = self.data.len();
459 let dampener = self.growth_dampener;
460 if capacity < length + length / (dampener + 1) {
461 let new_cap = capacity + capacity / (dampener + 1);
465 self.data.reserve_exact(new_cap - length);
466 }
467 }
468
469 self.data.push(item);
470 }
471
472 pub fn consolidate(&mut self) {
474 consolidate(&mut self.data);
475
476 if self.data.len() < self.data.capacity() / 4 {
481 self.data.shrink_to(self.min_capacity);
482 }
483 }
484
485 pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
487 self.data.iter()
488 }
489}
490
491impl<D> IntoIterator for ConsolidatingVec<D> {
492 type Item = (D, Diff);
493 type IntoIter = std::vec::IntoIter<(D, Diff)>;
494
495 fn into_iter(self) -> Self::IntoIter {
496 self.data.into_iter()
497 }
498}
499
500impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
501 fn from_iter<I>(iter: I) -> Self
502 where
503 I: IntoIterator<Item = (D, Diff)>,
504 {
505 Self {
506 data: Vec::from_iter(iter),
507 min_capacity: 0,
508 growth_dampener: 0,
509 }
510 }
511}
512
513impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
514 fn extend<I>(&mut self, iter: I)
515 where
516 I: IntoIterator<Item = (D, Diff)>,
517 {
518 for item in iter {
519 self.push(item);
520 }
521 }
522}
523
524#[derive(Clone, Copy, Debug, Default)]
526pub(super) struct SizeMetrics {
527 pub size: usize,
528 pub capacity: usize,
529 pub allocations: usize,
530}
531
532impl AddAssign<Self> for SizeMetrics {
533 fn add_assign(&mut self, other: Self) {
534 self.size += other.size;
535 self.capacity += other.capacity;
536 self.allocations += other.allocations;
537 }
538}
539
540#[derive(Debug)]
542pub enum LoggingEvent {
543 ChainCreated(usize),
545 ChainDropped(usize),
547 SizeDiff(NonZeroIsize),
549 CapacityDiff(NonZeroIsize),
551 AllocationsDiff(NonZeroIsize),
553}
554
555#[derive(Clone, Debug)]
561pub struct ChannelLogging(mpsc::UnboundedSender<LoggingEvent>);
562
563impl ChannelLogging {
564 pub fn new(tx: mpsc::UnboundedSender<LoggingEvent>) -> Self {
566 Self(tx)
567 }
568
569 pub fn chain_created(&self, updates: usize) {
571 let _ = self.0.send(LoggingEvent::ChainCreated(updates));
572 }
573
574 pub fn chain_dropped(&self, updates: usize) {
576 let _ = self.0.send(LoggingEvent::ChainDropped(updates));
577 }
578
579 pub fn report_size_diff(&self, diff: isize) {
581 if let Some(diff) = NonZeroIsize::new(diff) {
582 let _ = self.0.send(LoggingEvent::SizeDiff(diff));
583 }
584 }
585
586 pub fn report_capacity_diff(&self, diff: isize) {
588 if let Some(diff) = NonZeroIsize::new(diff) {
589 let _ = self.0.send(LoggingEvent::CapacityDiff(diff));
590 }
591 }
592
593 pub fn report_allocations_diff(&self, diff: isize) {
595 if let Some(diff) = NonZeroIsize::new(diff) {
596 let _ = self.0.send(LoggingEvent::AllocationsDiff(diff));
597 }
598 }
599}
600
601pub(super) struct CorrectionLogger {
610 compute_logger: ComputeLogger,
611 differential_logger: differential_dataflow::logging::Logger,
612 operator_id: usize,
613 rx: mpsc::UnboundedReceiver<LoggingEvent>,
614 net_batches: isize,
616 net_records: isize,
618 net_size: isize,
620 net_capacity: isize,
622 net_allocations: isize,
624}
625
626impl fmt::Debug for CorrectionLogger {
627 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
628 f.debug_struct("CorrectionLogger")
629 .field("operator_id", &self.operator_id)
630 .finish_non_exhaustive()
631 }
632}
633
634impl CorrectionLogger {
635 pub fn new(
636 compute_logger: ComputeLogger,
637 differential_logger: differential_dataflow::logging::Logger,
638 operator_id: usize,
639 address: Vec<usize>,
640 rx: mpsc::UnboundedReceiver<LoggingEvent>,
641 ) -> Self {
642 compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
643 ArrangementHeapSizeOperator {
644 operator_id,
645 address,
646 },
647 ));
648
649 Self {
650 compute_logger,
651 differential_logger,
652 operator_id,
653 rx,
654 net_batches: 0,
655 net_records: 0,
656 net_size: 0,
657 net_capacity: 0,
658 net_allocations: 0,
659 }
660 }
661
662 pub fn apply_events(&mut self) {
664 use LoggingEvent::*;
665
666 while let Ok(event) = self.rx.try_recv() {
667 match event {
668 ChainCreated(length) => {
669 self.net_batches += 1;
670 self.net_records += isize::try_from(length).expect("must fit");
671 self.differential_logger.log(BatchEvent {
672 operator: self.operator_id,
673 length,
674 });
675 }
676 ChainDropped(length) => {
677 self.net_batches -= 1;
678 self.net_records -= isize::try_from(length).expect("must fit");
679 self.differential_logger.log(DropEvent {
680 operator: self.operator_id,
681 length,
682 });
683 }
684 SizeDiff(delta_size) => {
685 self.net_size += delta_size.get();
686 self.compute_logger.log(&ComputeEvent::ArrangementHeapSize(
687 ArrangementHeapSize {
688 operator_id: self.operator_id,
689 delta_size: delta_size.get(),
690 },
691 ));
692 }
693 CapacityDiff(delta_capacity) => {
694 self.net_capacity += delta_capacity.get();
695 self.compute_logger
696 .log(&ComputeEvent::ArrangementHeapCapacity(
697 ArrangementHeapCapacity {
698 operator_id: self.operator_id,
699 delta_capacity: delta_capacity.get(),
700 },
701 ));
702 }
703 AllocationsDiff(delta_allocations) => {
704 self.net_allocations += delta_allocations.get();
705 self.compute_logger
706 .log(&ComputeEvent::ArrangementHeapAllocations(
707 ArrangementHeapAllocations {
708 operator_id: self.operator_id,
709 delta_allocations: delta_allocations.get(),
710 },
711 ));
712 }
713 }
714 }
715 }
716}
717
718impl Drop for CorrectionLogger {
719 fn drop(&mut self) {
720 self.apply_events();
724
725 for i in 0..self.net_batches {
732 let length = if i == 0 {
733 usize::try_from(self.net_records).unwrap_or(0)
734 } else {
735 0
736 };
737 self.differential_logger.log(DropEvent {
738 operator: self.operator_id,
739 length,
740 });
741 }
742
743 if self.net_size != 0 {
745 self.compute_logger
746 .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
747 operator_id: self.operator_id,
748 delta_size: -self.net_size,
749 }));
750 }
751 if self.net_capacity != 0 {
752 self.compute_logger
753 .log(&ComputeEvent::ArrangementHeapCapacity(
754 ArrangementHeapCapacity {
755 operator_id: self.operator_id,
756 delta_capacity: -self.net_capacity,
757 },
758 ));
759 }
760 if self.net_allocations != 0 {
761 self.compute_logger
762 .log(&ComputeEvent::ArrangementHeapAllocations(
763 ArrangementHeapAllocations {
764 operator_id: self.operator_id,
765 delta_allocations: -self.net_allocations,
766 },
767 ));
768 }
769
770 self.compute_logger
771 .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
772 ArrangementHeapSizeOperatorDrop {
773 operator_id: self.operator_id,
774 },
775 ));
776 }
777}