1use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::fmt;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17use std::rc::Rc;
18
19use differential_dataflow::consolidation::{consolidate, consolidate_updates};
20use differential_dataflow::logging::{BatchEvent, DropEvent};
21use itertools::Itertools;
22use mz_compute_types::dyncfgs::{CONSOLIDATING_VEC_GROWTH_DAMPENER, ENABLE_CORRECTION_V2};
23use mz_dyncfg::ConfigSet;
24use mz_ore::iter::IteratorExt;
25use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
26use mz_repr::{Diff, Timestamp};
27use timely::PartialOrder;
28use timely::progress::Antichain;
29
30use crate::logging::compute::{
31 ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
32 ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
33 Logger as ComputeLogger,
34};
35use crate::sink::correction_v2::{CorrectionV2, Data};
36
37pub(super) enum Correction<D: Data> {
44 V1(CorrectionV1<D>),
45 V2(CorrectionV2<D>),
46}
47
48impl<D: Data> Correction<D> {
49 pub fn new(
51 metrics: SinkMetrics,
52 worker_metrics: SinkWorkerMetrics,
53 logging: Option<Logging>,
54 config: &ConfigSet,
55 ) -> Self {
56 if ENABLE_CORRECTION_V2.get(config) {
57 Self::V2(CorrectionV2::new(metrics, worker_metrics, logging))
58 } else {
59 let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
60 Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
61 }
62 }
63
64 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
66 match self {
67 Self::V1(c) => c.insert(updates),
68 Self::V2(c) => c.insert(updates),
69 }
70 }
71
72 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
74 match self {
75 Self::V1(c) => c.insert_negated(updates),
76 Self::V2(c) => c.insert_negated(updates),
77 }
78 }
79
80 pub fn updates_before(
82 &mut self,
83 upper: &Antichain<Timestamp>,
84 ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + '_> {
85 match self {
86 Self::V1(c) => Box::new(c.updates_before(upper)),
87 Self::V2(c) => Box::new(c.updates_before(upper)),
88 }
89 }
90
91 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
97 match self {
98 Self::V1(c) => c.advance_since(since),
99 Self::V2(c) => c.advance_since(since),
100 }
101 }
102
103 pub fn consolidate_at_since(&mut self) {
105 match self {
106 Self::V1(c) => c.consolidate_at_since(),
107 Self::V2(c) => c.consolidate_at_since(),
108 }
109 }
110}
111
112pub(super) struct CorrectionV1<D> {
124 updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
126 since: Antichain<Timestamp>,
128
129 total_size: LengthAndCapacity,
133 metrics: SinkMetrics,
135 worker_metrics: SinkWorkerMetrics,
137 growth_dampener: usize,
139}
140
141impl<D> CorrectionV1<D> {
142 pub fn new(
144 metrics: SinkMetrics,
145 worker_metrics: SinkWorkerMetrics,
146 growth_dampener: usize,
147 ) -> Self {
148 Self {
149 updates: Default::default(),
150 since: Antichain::from_elem(Timestamp::MIN),
151 total_size: Default::default(),
152 metrics,
153 worker_metrics,
154 growth_dampener,
155 }
156 }
157
158 fn update_metrics(&mut self, new_size: LengthAndCapacity) {
160 let old_size = self.total_size;
161 let len_delta = UpdateDelta::new(new_size.length, old_size.length);
162 let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
163 self.metrics
164 .report_correction_update_deltas(len_delta, cap_delta);
165 self.worker_metrics
166 .report_correction_update_totals(new_size.length, new_size.capacity);
167
168 self.total_size = new_size;
169 }
170}
171
172impl<D: Data> CorrectionV1<D> {
173 pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
175 let Some(since_ts) = self.since.as_option() else {
176 return;
178 };
179
180 for (_, time, _) in &mut *updates {
181 *time = std::cmp::max(*time, *since_ts);
182 }
183 self.insert_inner(updates);
184 }
185
186 pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
188 let Some(since_ts) = self.since.as_option() else {
189 updates.clear();
191 return;
192 };
193
194 for (_, time, diff) in &mut *updates {
195 *time = std::cmp::max(*time, *since_ts);
196 *diff = -*diff;
197 }
198 self.insert_inner(updates);
199 }
200
201 fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
205 consolidate_updates(updates);
206 updates.sort_unstable_by_key(|(_, time, _)| *time);
207
208 let mut new_size = self.total_size;
209 let mut updates = updates.drain(..).peekable();
210 while let Some(&(_, time, _)) = updates.peek() {
211 debug_assert!(
212 self.since.less_equal(&time),
213 "update not advanced by `since`"
214 );
215
216 let data = updates
217 .peeking_take_while(|(_, t, _)| *t == time)
218 .map(|(d, _, r)| (d, r));
219
220 use std::collections::btree_map::Entry;
221 match self.updates.entry(time) {
222 Entry::Vacant(entry) => {
223 let mut vec: ConsolidatingVec<_> = data.collect();
224 vec.growth_dampener = self.growth_dampener;
225 new_size += (vec.len(), vec.capacity());
226 entry.insert(vec);
227 }
228 Entry::Occupied(mut entry) => {
229 let vec = entry.get_mut();
230 new_size -= (vec.len(), vec.capacity());
231 vec.extend(data);
232 new_size += (vec.len(), vec.capacity());
233 }
234 }
235 }
236
237 self.update_metrics(new_size);
238 }
239
240 pub fn updates_within<'a>(
246 &'a mut self,
247 lower: &Antichain<Timestamp>,
248 upper: &Antichain<Timestamp>,
249 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
250 assert!(PartialOrder::less_equal(lower, upper));
251
252 let start = match lower.as_option() {
253 Some(ts) => Bound::Included(*ts),
254 None => Bound::Excluded(Timestamp::MAX),
255 };
256 let end = match upper.as_option() {
257 Some(ts) => Bound::Excluded(*ts),
258 None => Bound::Unbounded,
259 };
260
261 let update_count = self.consolidate((start, end));
262
263 let range = self.updates.range((start, end));
264 range
265 .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
266 .exact_size(update_count)
267 }
268
269 pub fn updates_before<'a>(
271 &'a mut self,
272 upper: &Antichain<Timestamp>,
273 ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
274 let lower = Antichain::from_elem(Timestamp::MIN);
275 self.updates_within(&lower, upper)
276 }
277
278 fn consolidate<R>(&mut self, range: R) -> usize
282 where
283 R: RangeBounds<Timestamp>,
284 {
285 let mut new_size = self.total_size;
286
287 let updates = self.updates.range_mut(range);
288 let count = updates.fold(0, |acc, (_, data)| {
289 new_size -= (data.len(), data.capacity());
290 data.consolidate();
291 new_size += (data.len(), data.capacity());
292 acc + data.len()
293 });
294
295 self.update_metrics(new_size);
296 count
297 }
298
299 pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
305 assert!(PartialOrder::less_equal(&self.since, &since));
306
307 if since != self.since {
308 self.advance_by(&since);
309 self.since = since;
310 }
311 }
312
313 pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
317 let Some(target_ts) = frontier.as_option() else {
318 self.updates.clear();
319 self.update_metrics(Default::default());
320 return;
321 };
322
323 let mut new_size = self.total_size;
324 while let Some((ts, data)) = self.updates.pop_first() {
325 if frontier.less_equal(&ts) {
326 self.updates.insert(ts, data);
328 break;
329 }
330
331 use std::collections::btree_map::Entry;
332 match self.updates.entry(*target_ts) {
333 Entry::Vacant(entry) => {
334 entry.insert(data);
335 }
336 Entry::Occupied(mut entry) => {
337 let vec = entry.get_mut();
338 new_size -= (data.len(), data.capacity());
339 new_size -= (vec.len(), vec.capacity());
340 vec.extend(data);
341 new_size += (vec.len(), vec.capacity());
342 }
343 }
344 }
345
346 self.update_metrics(new_size);
347 }
348
349 pub fn consolidate_at_since(&mut self) {
351 let Some(since_ts) = self.since.as_option() else {
352 return;
353 };
354
355 let start = Bound::Included(*since_ts);
356 let end = match since_ts.try_step_forward() {
357 Some(ts) => Bound::Excluded(ts),
358 None => Bound::Unbounded,
359 };
360
361 self.consolidate((start, end));
362 }
363}
364
365impl<D> Drop for CorrectionV1<D> {
366 fn drop(&mut self) {
367 self.update_metrics(Default::default());
368 }
369}
370
371#[derive(Clone, Copy, Debug, Default)]
373pub(super) struct LengthAndCapacity {
374 pub length: usize,
375 pub capacity: usize,
376}
377
378impl AddAssign<Self> for LengthAndCapacity {
379 fn add_assign(&mut self, size: Self) {
380 self.length += size.length;
381 self.capacity += size.capacity;
382 }
383}
384
385impl AddAssign<(usize, usize)> for LengthAndCapacity {
386 fn add_assign(&mut self, (len, cap): (usize, usize)) {
387 self.length += len;
388 self.capacity += cap;
389 }
390}
391
392impl SubAssign<(usize, usize)> for LengthAndCapacity {
393 fn sub_assign(&mut self, (len, cap): (usize, usize)) {
394 self.length -= len;
395 self.capacity -= cap;
396 }
397}
398
399#[derive(Debug)]
405pub(crate) struct ConsolidatingVec<D> {
406 data: Vec<(D, Diff)>,
407 min_capacity: usize,
410 growth_dampener: usize,
416}
417
418impl<D: Ord> ConsolidatingVec<D> {
419 pub fn new(min_capacity: usize, growth_dampener: usize) -> Self {
421 ConsolidatingVec {
422 data: Vec::new(),
423 min_capacity,
424 growth_dampener,
425 }
426 }
427
428 pub fn len(&self) -> usize {
430 self.data.len()
431 }
432
433 pub fn capacity(&self) -> usize {
435 self.data.capacity()
436 }
437
438 pub fn push(&mut self, item: (D, Diff)) {
446 let capacity = self.data.capacity();
447 if self.data.len() == capacity {
448 self.consolidate();
450
451 let length = self.data.len();
454 let dampener = self.growth_dampener;
455 if capacity < length + length / (dampener + 1) {
456 let new_cap = capacity + capacity / (dampener + 1);
460 self.data.reserve_exact(new_cap - length);
461 }
462 }
463
464 self.data.push(item);
465 }
466
467 pub fn consolidate(&mut self) {
469 consolidate(&mut self.data);
470
471 if self.data.len() < self.data.capacity() / 4 {
476 self.data.shrink_to(self.min_capacity);
477 }
478 }
479
480 pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
482 self.data.iter()
483 }
484
485 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (D, Diff)> {
487 self.data.iter_mut()
488 }
489}
490
491impl<D> IntoIterator for ConsolidatingVec<D> {
492 type Item = (D, Diff);
493 type IntoIter = std::vec::IntoIter<(D, Diff)>;
494
495 fn into_iter(self) -> Self::IntoIter {
496 self.data.into_iter()
497 }
498}
499
500impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
501 fn from_iter<I>(iter: I) -> Self
502 where
503 I: IntoIterator<Item = (D, Diff)>,
504 {
505 Self {
506 data: Vec::from_iter(iter),
507 min_capacity: 0,
508 growth_dampener: 0,
509 }
510 }
511}
512
513impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
514 fn extend<I>(&mut self, iter: I)
515 where
516 I: IntoIterator<Item = (D, Diff)>,
517 {
518 for item in iter {
519 self.push(item);
520 }
521 }
522}
523
524#[derive(Clone, Copy, Debug, Default)]
526pub(super) struct SizeMetrics {
527 pub size: usize,
528 pub capacity: usize,
529 pub allocations: usize,
530}
531
532impl AddAssign<Self> for SizeMetrics {
533 fn add_assign(&mut self, other: Self) {
534 self.size += other.size;
535 self.capacity += other.capacity;
536 self.allocations += other.allocations;
537 }
538}
539
540#[derive(Clone, Debug)]
546pub(super) struct Logging(Rc<RefCell<LoggingInner>>);
547
548impl Logging {
549 pub fn new(
550 compute_logger: ComputeLogger,
551 differential_logger: differential_dataflow::logging::Logger,
552 operator_id: usize,
553 address: Vec<usize>,
554 ) -> Self {
555 let inner = LoggingInner::new(compute_logger, differential_logger, operator_id, address);
556 Self(Rc::new(RefCell::new(inner)))
557 }
558
559 pub fn chain_created(&self, updates: usize) {
561 self.0.borrow_mut().chain_created(updates);
562 }
563
564 pub fn chain_dropped(&self, updates: usize) {
566 self.0.borrow_mut().chain_dropped(updates);
567 }
568
569 pub fn report_size_diff(&self, diff: isize) {
571 self.0.borrow_mut().report_size_diff(diff);
572 }
573
574 pub fn report_capacity_diff(&self, diff: isize) {
576 self.0.borrow_mut().report_capacity_diff(diff);
577 }
578
579 pub fn report_allocations_diff(&self, diff: isize) {
581 self.0.borrow_mut().report_allocations_diff(diff);
582 }
583}
584
585struct LoggingInner {
591 compute_logger: ComputeLogger,
592 differential_logger: differential_dataflow::logging::Logger,
593 operator_id: usize,
594}
595
596impl fmt::Debug for LoggingInner {
597 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598 f.debug_struct("LoggingInner")
599 .field("operator_id", &self.operator_id)
600 .finish_non_exhaustive()
601 }
602}
603
604impl LoggingInner {
605 fn new(
606 compute_logger: ComputeLogger,
607 differential_logger: differential_dataflow::logging::Logger,
608 operator_id: usize,
609 address: Vec<usize>,
610 ) -> Self {
611 compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
612 ArrangementHeapSizeOperator {
613 operator_id,
614 address,
615 },
616 ));
617
618 Self {
619 compute_logger,
620 differential_logger,
621 operator_id,
622 }
623 }
624
625 fn chain_created(&self, updates: usize) {
626 self.differential_logger.log(BatchEvent {
627 operator: self.operator_id,
628 length: updates,
629 });
630 }
631
632 fn chain_dropped(&self, updates: usize) {
633 self.differential_logger.log(DropEvent {
634 operator: self.operator_id,
635 length: updates,
636 });
637 }
638
639 fn report_size_diff(&self, diff: isize) {
640 if diff != 0 {
641 self.compute_logger
642 .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
643 operator_id: self.operator_id,
644 delta_size: diff,
645 }));
646 }
647 }
648
649 fn report_capacity_diff(&self, diff: isize) {
650 if diff != 0 {
651 self.compute_logger
652 .log(&ComputeEvent::ArrangementHeapCapacity(
653 ArrangementHeapCapacity {
654 operator_id: self.operator_id,
655 delta_capacity: diff,
656 },
657 ));
658 }
659 }
660
661 fn report_allocations_diff(&self, diff: isize) {
662 if diff != 0 {
663 self.compute_logger
664 .log(&ComputeEvent::ArrangementHeapAllocations(
665 ArrangementHeapAllocations {
666 operator_id: self.operator_id,
667 delta_allocations: diff,
668 },
669 ));
670 }
671 }
672}
673
674impl Drop for LoggingInner {
675 fn drop(&mut self) {
676 self.compute_logger
677 .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
678 ArrangementHeapSizeOperatorDrop {
679 operator_id: self.operator_id,
680 },
681 ));
682 }
683}