1use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17use std::rc::Rc;
18
19use differential_dataflow::consolidation::{consolidate, consolidate_updates};
20use differential_dataflow::logging::{BatchEvent, DropEvent};
21use itertools::Itertools;
22use mz_compute_types::dyncfgs::{
23 CONSOLIDATING_VEC_GROWTH_DAMPENER, CORRECTION_V2_CHAIN_PROPORTIONALITY,
24 CORRECTION_V2_CHUNK_SIZE, ENABLE_CORRECTION_V2,
25};
26use mz_dyncfg::ConfigSet;
27use mz_ore::iter::IteratorExt;
28use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
29use mz_repr::{Diff, Timestamp};
30use timely::PartialOrder;
31use timely::progress::Antichain;
32
33use crate::logging::compute::{
34 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
35 ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
36 Logger as ComputeLogger,
37};
38use crate::sink::correction_v2::{CorrectionV2, Data};
39
40pub(super) enum Correction<D: Data> {
47 V1(CorrectionV1<D>),
48 V2(CorrectionV2<D>),
49}
50
51impl<D: Data> Correction<D> {
52 pub fn new(
54 metrics: SinkMetrics,
55 worker_metrics: SinkWorkerMetrics,
56 logging: Option<Logging>,
57 config: &ConfigSet,
58 ) -> Self {
59 if ENABLE_CORRECTION_V2.get(config) {
60 let prop = CORRECTION_V2_CHAIN_PROPORTIONALITY.get(config);
61 let chunk_size = CORRECTION_V2_CHUNK_SIZE.get(config);
62 Self::V2(CorrectionV2::new(
63 metrics,
64 worker_metrics,
65 logging,
66 prop,
67 chunk_size,
68 ))
69 } else {
70 let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
71 Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
72 }
73 }
74
75 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
77 match self {
78 Self::V1(c) => c.insert(updates),
79 Self::V2(c) => c.insert(updates),
80 }
81 }
82
83 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
85 match self {
86 Self::V1(c) => c.insert_negated(updates),
87 Self::V2(c) => c.insert_negated(updates),
88 }
89 }
90
91 pub fn updates_before(
93 &mut self,
94 upper: &Antichain<Timestamp>,
95 ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
96 match self {
97 Self::V1(c) => Box::new(c.updates_before(upper)),
98 Self::V2(c) => Box::new(c.updates_before(upper)),
99 }
100 }
101
102 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
108 match self {
109 Self::V1(c) => c.advance_since(since),
110 Self::V2(c) => c.advance_since(since),
111 }
112 }
113
114 pub fn consolidate_at_since(&mut self) {
116 match self {
117 Self::V1(c) => c.consolidate_at_since(),
118 Self::V2(c) => c.consolidate_at_since(),
119 }
120 }
121}
122
123pub(super) struct CorrectionV1<D> {
135 updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
137 since: Antichain<Timestamp>,
139
140 total_size: LengthAndCapacity,
144 metrics: SinkMetrics,
146 worker_metrics: SinkWorkerMetrics,
148 growth_dampener: usize,
150}
151
152impl<D> CorrectionV1<D> {
153 pub fn new(
155 metrics: SinkMetrics,
156 worker_metrics: SinkWorkerMetrics,
157 growth_dampener: usize,
158 ) -> Self {
159 Self {
160 updates: Default::default(),
161 since: Antichain::from_elem(Timestamp::MIN),
162 total_size: Default::default(),
163 metrics,
164 worker_metrics,
165 growth_dampener,
166 }
167 }
168
169 fn update_metrics(&mut self, new_size: LengthAndCapacity) {
171 let old_size = self.total_size;
172 let len_delta = UpdateDelta::new(new_size.length, old_size.length);
173 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
174 self.metrics
175 .report_correction_update_deltas(len_delta, cap_delta);
176 self.worker_metrics
177 .report_correction_update_totals(new_size.length, new_size.capacity);
178
179 self.total_size = new_size;
180 }
181}
182
183impl<D: Data> CorrectionV1<D> {
184 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
186 let Some(since_ts) = self.since.as_option() else {
187 updates.clear();
189 return;
190 };
191
192 for (_, time, _) in &mut *updates {
193 *time = std::cmp::max(*time, *since_ts);
194 }
195 self.insert_inner(updates);
196 }
197
198 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
200 let Some(since_ts) = self.since.as_option() else {
201 updates.clear();
203 return;
204 };
205
206 for (_, time, diff) in &mut *updates {
207 *time = std::cmp::max(*time, *since_ts);
208 *diff = -*diff;
209 }
210 self.insert_inner(updates);
211 }
212
213 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
217 consolidate_updates(updates);
218 updates.sort_unstable_by_key(|(_, time, _)| *time);
219
220 let mut new_size = self.total_size;
221 let mut updates = updates.drain(..).peekable();
222 while let Some(&(_, time, _)) = updates.peek() {
223 debug_assert!(
224 self.since.less_equal(&time),
225 "update not advanced by `since`"
226 );
227
228 let data = updates
229 .peeking_take_while(|(_, t, _)| *t == time)
230 .map(|(d, _, r)| (d, r));
231
232 use std::collections::btree_map::Entry;
233 match self.updates.entry(time) {
234 Entry::Vacant(entry) => {
235 let mut vec: ConsolidatingVec<_> = data.collect();
236 vec.growth_dampener = self.growth_dampener;
237 new_size += (vec.len(), vec.capacity());
238 entry.insert(vec);
239 }
240 Entry::Occupied(mut entry) => {
241 let vec = entry.get_mut();
242 new_size -= (vec.len(), vec.capacity());
243 vec.extend(data);
244 new_size += (vec.len(), vec.capacity());
245 }
246 }
247 }
248
249 self.update_metrics(new_size);
250 }
251
252 pub fn updates_within<'a>(
258 &'a mut self,
259 lower: &Antichain<Timestamp>,
260 upper: &Antichain<Timestamp>,
261 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
262 assert!(PartialOrder::less_equal(lower, upper));
263
264 let start = match lower.as_option() {
265 Some(ts) => Bound::Included(*ts),
266 None => Bound::Excluded(Timestamp::MAX),
267 };
268 let end = match upper.as_option() {
269 Some(ts) => Bound::Excluded(*ts),
270 None => Bound::Unbounded,
271 };
272
273 let update_count = self.consolidate((start, end));
274
275 let range = self.updates.range((start, end));
276 range
277 .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
278 .exact_size(update_count)
279 }
280
281 pub fn updates_before<'a>(
283 &'a mut self,
284 upper: &Antichain<Timestamp>,
285 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
286 let lower = Antichain::from_elem(Timestamp::MIN);
287 self.updates_within(&lower, upper)
288 }
289
290 fn consolidate<R>(&mut self, range: R) -> usize
294 where
295 R: RangeBounds<Timestamp>,
296 {
297 let mut new_size = self.total_size;
298
299 let updates = self.updates.range_mut(range);
300 let count = updates.fold(0, |acc, (_, data)| {
301 new_size -= (data.len(), data.capacity());
302 data.consolidate();
303 new_size += (data.len(), data.capacity());
304 acc + data.len()
305 });
306
307 self.update_metrics(new_size);
308 count
309 }
310
311 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
317 assert!(PartialOrder::less_equal(&self.since, &since));
318
319 if since != self.since {
320 self.advance_by(&since);
321 self.since = since;
322 }
323 }
324
325 pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
329 let Some(target_ts) = frontier.as_option() else {
330 self.updates.clear();
331 self.update_metrics(Default::default());
332 return;
333 };
334
335 let mut new_size = self.total_size;
336 while let Some((ts, data)) = self.updates.pop_first() {
337 if frontier.less_equal(&ts) {
338 self.updates.insert(ts, data);
340 break;
341 }
342
343 use std::collections::btree_map::Entry;
344 match self.updates.entry(*target_ts) {
345 Entry::Vacant(entry) => {
346 entry.insert(data);
347 }
348 Entry::Occupied(mut entry) => {
349 let vec = entry.get_mut();
350 new_size -= (data.len(), data.capacity());
351 new_size -= (vec.len(), vec.capacity());
352 vec.extend(data);
353 new_size += (vec.len(), vec.capacity());
354 }
355 }
356 }
357
358 self.update_metrics(new_size);
359 }
360
361 pub fn consolidate_at_since(&mut self) {
363 let Some(since_ts) = self.since.as_option() else {
364 return;
365 };
366
367 let start = Bound::Included(*since_ts);
368 let end = match since_ts.try_step_forward() {
369 Some(ts) => Bound::Excluded(ts),
370 None => Bound::Unbounded,
371 };
372
373 self.consolidate((start, end));
374 }
375}
376
377impl<D> Drop for CorrectionV1<D> {
378 fn drop(&mut self) {
379 self.update_metrics(Default::default());
380 }
381}
382
383#[derive(Clone, Copy, Debug, Default)]
385pub(super) struct LengthAndCapacity {
386 pub length: usize,
387 pub capacity: usize,
388}
389
390impl AddAssign<Self> for LengthAndCapacity {
391 fn add_assign(&mut self, size: Self) {
392 self.length += size.length;
393 self.capacity += size.capacity;
394 }
395}
396
397impl AddAssign<(usize, usize)> for LengthAndCapacity {
398 fn add_assign(&mut self, (len, cap): (usize, usize)) {
399 self.length += len;
400 self.capacity += cap;
401 }
402}
403
404impl SubAssign<(usize, usize)> for LengthAndCapacity {
405 fn sub_assign(&mut self, (len, cap): (usize, usize)) {
406 self.length -= len;
407 self.capacity -= cap;
408 }
409}
410
411#[derive(Debug)]
417pub(crate) struct ConsolidatingVec<D> {
418 data: Vec<(D, Diff)>,
419 min_capacity: usize,
422 growth_dampener: usize,
428}
429
430impl<D: Ord> ConsolidatingVec<D> {
431 pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
433 ConsolidatingVec {
434 data: Vec::new(),
435 min_capacity,
436 growth_dampener,
437 }
438 }
439
440 pub fn len(&self) -> usize {
442 self.data.len()
443 }
444
445 pub fn capacity(&self) -> usize {
447 self.data.capacity()
448 }
449
450 pub fn push(&mut self, item: (D, Diff)) {
458 let capacity = self.data.capacity();
459 if self.data.len() == capacity {
460 self.consolidate();
462
463 let length = self.data.len();
466 let dampener = self.growth_dampener;
467 if capacity < length + length / (dampener + 1) {
468 let new_cap = capacity + capacity / (dampener + 1);
472 self.data.reserve_exact(new_cap - length);
473 }
474 }
475
476 self.data.push(item);
477 }
478
479 pub fn consolidate(&mut self) {
481 consolidate(&mut self.data);
482
483 if self.data.len() < self.data.capacity() / 4 {
488 self.data.shrink_to(self.min_capacity);
489 }
490 }
491
492 pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
494 self.data.iter()
495 }
496
497 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
499 self.data.iter_mut()
500 }
501}
502
503impl<D> IntoIterator for ConsolidatingVec<D> {
504 type Item = (D, Diff);
505 type IntoIter = std::vec::IntoIter<(D, Diff)>;
506
507 fn into_iter(self) -> Self::IntoIter {
508 self.data.into_iter()
509 }
510}
511
512impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
513 fn from_iter<I>(iter: I) -> Self
514 where
515 I: IntoIterator<Item = (D, Diff)>,
516 {
517 Self {
518 data: Vec::from_iter(iter),
519 min_capacity: 0,
520 growth_dampener: 0,
521 }
522 }
523}
524
525impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
526 fn extend<I>(&mut self, iter: I)
527 where
528 I: IntoIterator<Item = (D, Diff)>,
529 {
530 for item in iter {
531 self.push(item);
532 }
533 }
534}
535
536#[derive(Clone, Copy, Debug, Default)]
538pub(super) struct SizeMetrics {
539 pub size: usize,
540 pub capacity: usize,
541 pub allocations: usize,
542}
543
544impl AddAssign<Self> for SizeMetrics {
545 fn add_assign(&mut self, other: Self) {
546 self.size += other.size;
547 self.capacity += other.capacity;
548 self.allocations += other.allocations;
549 }
550}
551
552#[derive(Clone, Debug)]
558pub(super) struct Logging(Rc<RefCell<LoggingInner>>);
559
560impl Logging {
561 pub fn new(
562 compute_logger: ComputeLogger,
563 differential_logger: differential_dataflow::logging::Logger,
564 operator_id: usize,
565 address: Vec<usize>,
566 ) -> Self {
567 let inner = LoggingInner::new(compute_logger, differential_logger, operator_id, address);
568 Self(Rc::new(RefCell::new(inner)))
569 }
570
571 pub fn chain_created(&self, updates: usize) {
573 self.0.borrow_mut().chain_created(updates);
574 }
575
576 pub fn chain_dropped(&self, updates: usize) {
578 self.0.borrow_mut().chain_dropped(updates);
579 }
580
581 pub fn report_size_diff(&self, diff: isize) {
583 self.0.borrow_mut().report_size_diff(diff);
584 }
585
586 pub fn report_capacity_diff(&self, diff: isize) {
588 self.0.borrow_mut().report_capacity_diff(diff);
589 }
590
591 pub fn report_allocations_diff(&self, diff: isize) {
593 self.0.borrow_mut().report_allocations_diff(diff);
594 }
595}
596
597struct LoggingInner {
603 compute_logger: ComputeLogger,
604 differential_logger: differential_dataflow::logging::Logger,
605 operator_id: usize,
606}
607
608impl fmt::Debug for LoggingInner {
609 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
610 f.debug_struct("LoggingInner")
611 .field("operator_id", &self.operator_id)
612 .finish_non_exhaustive()
613 }
614}
615
616impl LoggingInner {
617 fn new(
618 compute_logger: ComputeLogger,
619 differential_logger: differential_dataflow::logging::Logger,
620 operator_id: usize,
621 address: Vec<usize>,
622 ) -> Self {
623 compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
624 ArrangementHeapSizeOperator {
625 operator_id,
626 address,
627 },
628 ));
629
630 Self {
631 compute_logger,
632 differential_logger,
633 operator_id,
634 }
635 }
636
637 fn chain_created(&self, updates: usize) {
638 self.differential_logger.log(BatchEvent {
639 operator: self.operator_id,
640 length: updates,
641 });
642 }
643
644 fn chain_dropped(&self, updates: usize) {
645 self.differential_logger.log(DropEvent {
646 operator: self.operator_id,
647 length: updates,
648 });
649 }
650
651 fn report_size_diff(&self, diff: isize) {
652 if diff != 0 {
653 self.compute_logger
654 .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
655 operator_id: self.operator_id,
656 delta_size: diff,
657 }));
658 }
659 }
660
661 fn report_capacity_diff(&self, diff: isize) {
662 if diff != 0 {
663 self.compute_logger
664 .log(&ComputeEvent::ArrangementHeapCapacity(
665 ArrangementHeapCapacity {
666 operator_id: self.operator_id,
667 delta_capacity: diff,
668 },
669 ));
670 }
671 }
672
673 fn report_allocations_diff(&self, diff: isize) {
674 if diff != 0 {
675 self.compute_logger
676 .log(&ComputeEvent::ArrangementHeapAllocations(
677 ArrangementHeapAllocations {
678 operator_id: self.operator_id,
679 delta_allocations: diff,
680 },
681 ));
682 }
683 }
684}
685
686impl Drop for LoggingInner {
687 fn drop(&mut self) {
688 self.compute_logger
689 .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
690 ArrangementHeapSizeOperatorDrop {
691 operator_id: self.operator_id,
692 },
693 ));
694 }
695}