Skip to main content

mz_compute/sink/
correction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The `Correction` data structure used by `persist_sink::write_batches` to stash updates before
11//! they are written into batches.
12
13use std::collections::BTreeMap;
14use std::fmt;
15use std::num::NonZeroIsize;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17
18use differential_dataflow::consolidation::{consolidate, consolidate_updates};
19use differential_dataflow::logging::{BatchEvent, DropEvent};
20use itertools::Itertools;
21use mz_compute_types::dyncfgs::{
22    CONSOLIDATING_VEC_GROWTH_DAMPENER, CORRECTION_V2_CHAIN_PROPORTIONALITY,
23    CORRECTION_V2_CHUNK_SIZE, ENABLE_CORRECTION_V2,
24};
25use mz_dyncfg::ConfigSet;
26use mz_ore::iter::IteratorExt;
27use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
28use mz_repr::{Diff, Timestamp};
29use timely::PartialOrder;
30use timely::progress::Antichain;
31use tokio::sync::mpsc;
32
33use crate::logging::compute::{
34    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
35    ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
36    Logger as ComputeLogger,
37};
38use crate::sink::correction_v2::{CorrectionV2, Data};
39
40/// A data structure suitable for storing updates in a self-correcting persist sink.
41///
42/// Selects one of two correction buffer implementations. `V1` is the original simple
43/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
44/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
45/// `V1` in a pinch. The plan is to remove `V1` eventually.
46pub(super) enum Correction<D: Data> {
47    V1(CorrectionV1<D>),
48    V2(CorrectionV2<D>),
49}
50
51impl<D: Data> Correction<D> {
52    /// Construct a new `Correction` instance.
53    pub fn new(
54        metrics: SinkMetrics,
55        worker_metrics: SinkWorkerMetrics,
56        logging: Option<ChannelLogging>,
57        config: &ConfigSet,
58    ) -> Self {
59        if ENABLE_CORRECTION_V2.get(config) {
60            let prop = CORRECTION_V2_CHAIN_PROPORTIONALITY.get(config);
61            let chunk_size = CORRECTION_V2_CHUNK_SIZE.get(config);
62            Self::V2(CorrectionV2::new(
63                metrics,
64                worker_metrics,
65                logging,
66                prop,
67                chunk_size,
68            ))
69        } else {
70            let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
71            Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
72        }
73    }
74
75    /// Insert a batch of updates.
76    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
77        match self {
78            Self::V1(c) => c.insert(updates),
79            Self::V2(c) => c.insert(updates),
80        }
81    }
82
83    /// Insert a batch of updates, after negating their diffs.
84    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
85        match self {
86            Self::V1(c) => c.insert_negated(updates),
87            Self::V2(c) => c.insert_negated(updates),
88        }
89    }
90
91    /// Consolidate and return updates before the given `upper`.
92    pub fn updates_before(
93        &mut self,
94        upper: &Antichain<Timestamp>,
95    ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + Send + '_> {
96        match self {
97            Self::V1(c) => Box::new(c.updates_before(upper)),
98            Self::V2(c) => Box::new(c.updates_before(upper)),
99        }
100    }
101
102    /// Advance the since frontier.
103    ///
104    /// # Panics
105    ///
106    /// Panics if the given `since` is less than the current since frontier.
107    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
108        match self {
109            Self::V1(c) => c.advance_since(since),
110            Self::V2(c) => c.advance_since(since),
111        }
112    }
113
114    /// Consolidate all updates at the current `since`.
115    pub fn consolidate_at_since(&mut self) {
116        match self {
117            Self::V1(c) => c.consolidate_at_since(),
118            Self::V2(c) => c.consolidate_at_since(),
119        }
120    }
121}
122
123/// A collection holding `persist_sink` updates.
124///
125/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
126/// operator:
127///
128///  * It stores updates by time, to enable efficient separation between updates that should
129///    be written to a batch and updates whose time has not yet arrived.
130///  * It eschews an interface for directly removing previously inserted updates. Instead, updates
131///    are removed by inserting them again, with negated diffs. Stored updates are continuously
132///    consolidated to give them opportunity to cancel each other out.
133///  * It provides an interface for advancing all contained updates to a given frontier.
134pub(super) struct CorrectionV1<D> {
135    /// Stashed updates by time.
136    updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
137    /// Frontier to which all update times are advanced.
138    since: Antichain<Timestamp>,
139
140    /// Total length and capacity of vectors in `updates`.
141    ///
142    /// Tracked to maintain metrics.
143    total_size: LengthAndCapacity,
144    /// Global persist sink metrics.
145    metrics: SinkMetrics,
146    /// Per-worker persist sink metrics.
147    worker_metrics: SinkWorkerMetrics,
148    /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
149    growth_dampener: usize,
150}
151
152impl<D> CorrectionV1<D> {
153    /// Construct a new `CorrectionV1` instance.
154    pub fn new(
155        metrics: SinkMetrics,
156        worker_metrics: SinkWorkerMetrics,
157        growth_dampener: usize,
158    ) -> Self {
159        Self {
160            updates: Default::default(),
161            since: Antichain::from_elem(Timestamp::MIN),
162            total_size: Default::default(),
163            metrics,
164            worker_metrics,
165            growth_dampener,
166        }
167    }
168
169    /// Update persist sink metrics to the given new length and capacity.
170    fn update_metrics(&mut self, new_size: LengthAndCapacity) {
171        let old_size = self.total_size;
172        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
173        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
174        self.metrics
175            .report_correction_update_deltas(len_delta, cap_delta);
176        self.worker_metrics
177            .report_correction_update_totals(new_size.length, new_size.capacity);
178
179        self.total_size = new_size;
180    }
181}
182
183impl<D: Data> CorrectionV1<D> {
184    /// Insert a batch of updates.
185    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
186        let Some(since_ts) = self.since.as_option() else {
187            // If the since frontier is empty, discard all updates.
188            updates.clear();
189            return;
190        };
191
192        for (_, time, _) in &mut *updates {
193            *time = std::cmp::max(*time, *since_ts);
194        }
195        self.insert_inner(updates);
196    }
197
198    /// Insert a batch of updates, after negating their diffs.
199    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
200        let Some(since_ts) = self.since.as_option() else {
201            // If the since frontier is empty, discard all updates.
202            updates.clear();
203            return;
204        };
205
206        for (_, time, diff) in &mut *updates {
207            *time = std::cmp::max(*time, *since_ts);
208            *diff = -*diff;
209        }
210        self.insert_inner(updates);
211    }
212
213    /// Insert a batch of updates.
214    ///
215    /// The given `updates` must all have been advanced by `self.since`.
216    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
217        consolidate_updates(updates);
218        updates.sort_unstable_by_key(|(_, time, _)| *time);
219
220        let mut new_size = self.total_size;
221        let mut updates = updates.drain(..).peekable();
222        while let Some(&(_, time, _)) = updates.peek() {
223            debug_assert!(
224                self.since.less_equal(&time),
225                "update not advanced by `since`"
226            );
227
228            let data = updates
229                .peeking_take_while(|(_, t, _)| *t == time)
230                .map(|(d, _, r)| (d, r));
231
232            use std::collections::btree_map::Entry;
233            match self.updates.entry(time) {
234                Entry::Vacant(entry) => {
235                    let mut vec: ConsolidatingVec<_> = data.collect();
236                    vec.growth_dampener = self.growth_dampener;
237                    new_size += (vec.len(), vec.capacity());
238                    entry.insert(vec);
239                }
240                Entry::Occupied(mut entry) => {
241                    let vec = entry.get_mut();
242                    new_size -= (vec.len(), vec.capacity());
243                    vec.extend(data);
244                    new_size += (vec.len(), vec.capacity());
245                }
246            }
247        }
248
249        self.update_metrics(new_size);
250    }
251
252    /// Consolidate and return updates within the given bounds.
253    ///
254    /// # Panics
255    ///
256    /// Panics if `lower` is not less than or equal to `upper`.
257    pub fn updates_within<'a>(
258        &'a mut self,
259        lower: &Antichain<Timestamp>,
260        upper: &Antichain<Timestamp>,
261    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
262        assert!(PartialOrder::less_equal(lower, upper));
263
264        let start = match lower.as_option() {
265            Some(ts) => Bound::Included(*ts),
266            None => Bound::Excluded(Timestamp::MAX),
267        };
268        let end = match upper.as_option() {
269            Some(ts) => Bound::Excluded(*ts),
270            None => Bound::Unbounded,
271        };
272
273        let update_count = self.consolidate((start, end));
274
275        let range = self.updates.range((start, end));
276        range
277            .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
278            .exact_size(update_count)
279    }
280
281    /// Consolidate and return updates before the given `upper`.
282    pub fn updates_before<'a>(
283        &'a mut self,
284        upper: &Antichain<Timestamp>,
285    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + Send + use<'a, D> {
286        let lower = Antichain::from_elem(Timestamp::MIN);
287        self.updates_within(&lower, upper)
288    }
289
290    /// Consolidate the updates at the times in the given range.
291    ///
292    /// Returns the number of updates remaining in the range afterwards.
293    fn consolidate<R>(&mut self, range: R) -> usize
294    where
295        R: RangeBounds<Timestamp>,
296    {
297        let mut new_size = self.total_size;
298
299        let updates = self.updates.range_mut(range);
300        let count = updates.fold(0, |acc, (_, data)| {
301            new_size -= (data.len(), data.capacity());
302            data.consolidate();
303            new_size += (data.len(), data.capacity());
304            acc + data.len()
305        });
306
307        self.update_metrics(new_size);
308        count
309    }
310
311    /// Advance the since frontier.
312    ///
313    /// # Panics
314    ///
315    /// Panics if the given `since` is less than the current since frontier.
316    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
317        assert!(PartialOrder::less_equal(&self.since, &since));
318
319        if since != self.since {
320            self.advance_by(&since);
321            self.since = since;
322        }
323    }
324
325    /// Advance all contained updates by the given frontier.
326    ///
327    /// If the given frontier is empty, all remaining updates are discarded.
328    pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
329        let Some(target_ts) = frontier.as_option() else {
330            self.updates.clear();
331            self.update_metrics(Default::default());
332            return;
333        };
334
335        let mut new_size = self.total_size;
336        while let Some((ts, data)) = self.updates.pop_first() {
337            if frontier.less_equal(&ts) {
338                // We have advanced all updates that can advance.
339                self.updates.insert(ts, data);
340                break;
341            }
342
343            use std::collections::btree_map::Entry;
344            match self.updates.entry(*target_ts) {
345                Entry::Vacant(entry) => {
346                    entry.insert(data);
347                }
348                Entry::Occupied(mut entry) => {
349                    let vec = entry.get_mut();
350                    new_size -= (data.len(), data.capacity());
351                    new_size -= (vec.len(), vec.capacity());
352                    vec.extend(data);
353                    new_size += (vec.len(), vec.capacity());
354                }
355            }
356        }
357
358        self.update_metrics(new_size);
359    }
360
361    /// Consolidate all updates at the current `since`.
362    pub fn consolidate_at_since(&mut self) {
363        let Some(since_ts) = self.since.as_option() else {
364            return;
365        };
366
367        let start = Bound::Included(*since_ts);
368        let end = match since_ts.try_step_forward() {
369            Some(ts) => Bound::Excluded(ts),
370            None => Bound::Unbounded,
371        };
372
373        self.consolidate((start, end));
374    }
375}
376
377impl<D> Drop for CorrectionV1<D> {
378    fn drop(&mut self) {
379        self.update_metrics(Default::default());
380    }
381}
382
383/// Helper type for convenient tracking of length and capacity together.
384#[derive(Clone, Copy, Debug, Default)]
385pub(super) struct LengthAndCapacity {
386    pub length: usize,
387    pub capacity: usize,
388}
389
390impl AddAssign<Self> for LengthAndCapacity {
391    fn add_assign(&mut self, size: Self) {
392        self.length += size.length;
393        self.capacity += size.capacity;
394    }
395}
396
397impl AddAssign<(usize, usize)> for LengthAndCapacity {
398    fn add_assign(&mut self, (len, cap): (usize, usize)) {
399        self.length += len;
400        self.capacity += cap;
401    }
402}
403
404impl SubAssign<(usize, usize)> for LengthAndCapacity {
405    fn sub_assign(&mut self, (len, cap): (usize, usize)) {
406        self.length -= len;
407        self.capacity -= cap;
408    }
409}
410
411/// A vector that consolidates its contents.
412///
413/// The vector is filled with updates until it reaches capacity. At this point, the updates are
414/// consolidated to free up space. This process repeats until the consolidation recovered less than
415/// half of the vector's capacity, at which point the capacity is doubled.
416#[derive(Debug)]
417pub(crate) struct ConsolidatingVec<D> {
418    data: Vec<(D, Diff)>,
419    /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
420    /// might start smaller than this.
421    min_capacity: usize,
422    /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
423    ///
424    /// If consolidation didn't free enough space, at least a linear amount, increase the capacity
425    /// Setting this to 0 results in doubling whenever the list is at least half full.
426    /// Larger numbers result in more conservative approaches that use more CPU, but less memory.
427    growth_dampener: usize,
428}
429
430impl<D: Ord> ConsolidatingVec<D> {
431    /// Return the length of the vector.
432    pub fn len(&self) -> usize {
433        self.data.len()
434    }
435
436    /// Return the capacity of the vector.
437    pub fn capacity(&self) -> usize {
438        self.data.capacity()
439    }
440
441    /// Pushes `item` into the vector.
442    ///
443    /// If the vector does not have sufficient capacity, we'll first consolidate and then increase
444    /// its capacity if the consolidated results still occupy a significant fraction of the vector.
445    ///
446    /// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
447    /// but amortizes to O(log n).
448    pub fn push(&mut self, item: (D, Diff)) {
449        let capacity = self.data.capacity();
450        if self.data.len() == capacity {
451            // The vector is full. First, consolidate to try to recover some space.
452            self.consolidate();
453
454            // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
455            // This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
456            let length = self.data.len();
457            let dampener = self.growth_dampener;
458            if capacity < length + length / (dampener + 1) {
459                // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
460                // determining the target capacity, and then reserving an amount that achieves this
461                // while working around the existing length.
462                let new_cap = capacity + capacity / (dampener + 1);
463                self.data.reserve_exact(new_cap - length);
464            }
465        }
466
467        self.data.push(item);
468    }
469
470    /// Consolidate the contents.
471    pub fn consolidate(&mut self) {
472        consolidate(&mut self.data);
473
474        // We may have the opportunity to reclaim allocated memory.
475        // Given that `push` will at most double the capacity when the vector is more than half full, and
476        // we want to avoid entering into a resizing cycle, we choose to only shrink if the
477        // vector's length is less than one fourth of its capacity.
478        if self.data.len() < self.data.capacity() / 4 {
479            self.data.shrink_to(self.min_capacity);
480        }
481    }
482
483    /// Return an iterator over the borrowed items.
484    pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
485        self.data.iter()
486    }
487}
488
489impl<D> IntoIterator for ConsolidatingVec<D> {
490    type Item = (D, Diff);
491    type IntoIter = std::vec::IntoIter<(D, Diff)>;
492
493    fn into_iter(self) -> Self::IntoIter {
494        self.data.into_iter()
495    }
496}
497
498impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
499    fn from_iter<I>(iter: I) -> Self
500    where
501        I: IntoIterator<Item = (D, Diff)>,
502    {
503        Self {
504            data: Vec::from_iter(iter),
505            min_capacity: 0,
506            growth_dampener: 0,
507        }
508    }
509}
510
511impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
512    fn extend<I>(&mut self, iter: I)
513    where
514        I: IntoIterator<Item = (D, Diff)>,
515    {
516        for item in iter {
517            self.push(item);
518        }
519    }
520}
521
522/// Helper type for convenient tracking of various size metrics together.
523#[derive(Clone, Copy, Debug, Default)]
524pub(super) struct SizeMetrics {
525    pub size: usize,
526    pub capacity: usize,
527    pub allocations: usize,
528}
529
530impl AddAssign<Self> for SizeMetrics {
531    fn add_assign(&mut self, other: Self) {
532        self.size += other.size;
533        self.capacity += other.capacity;
534        self.allocations += other.allocations;
535    }
536}
537
538/// A logging event sent from the Tokio task back to the Timely thread.
539#[derive(Debug)]
540pub(super) enum LoggingEvent {
541    ChainCreated(usize),
542    ChainDropped(usize),
543    SizeDiff(NonZeroIsize),
544    CapacityDiff(NonZeroIsize),
545    AllocationsDiff(NonZeroIsize),
546}
547
548/// Channel-based logging for corrections on a Tokio task. `Send`-safe.
549///
550/// Sends logging events to the Timely thread, where they are applied to the real `Logging`
551/// instance. This allows corrections on the Tokio task to participate in introspection logging
552/// without holding `Rc<RefCell<..>>`.
553#[derive(Clone, Debug)]
554pub(super) struct ChannelLogging(mpsc::UnboundedSender<LoggingEvent>);
555
556impl ChannelLogging {
557    pub fn new(tx: mpsc::UnboundedSender<LoggingEvent>) -> Self {
558        Self(tx)
559    }
560
561    pub fn chain_created(&self, updates: usize) {
562        let _ = self.0.send(LoggingEvent::ChainCreated(updates));
563    }
564
565    pub fn chain_dropped(&self, updates: usize) {
566        let _ = self.0.send(LoggingEvent::ChainDropped(updates));
567    }
568
569    pub fn report_size_diff(&self, diff: isize) {
570        if let Some(diff) = NonZeroIsize::new(diff) {
571            let _ = self.0.send(LoggingEvent::SizeDiff(diff));
572        }
573    }
574
575    pub fn report_capacity_diff(&self, diff: isize) {
576        if let Some(diff) = NonZeroIsize::new(diff) {
577            let _ = self.0.send(LoggingEvent::CapacityDiff(diff));
578        }
579    }
580
581    pub fn report_allocations_diff(&self, diff: isize) {
582        if let Some(diff) = NonZeroIsize::new(diff) {
583            let _ = self.0.send(LoggingEvent::AllocationsDiff(diff));
584        }
585    }
586}
587
588/// State for correction buffer logging on the Timely thread.
589///
590/// Drains [`LoggingEvent`]s sent by [`ChannelLogging`] from the Tokio task and applies them
591/// to the compute and differential loggers. Emits `ArrangementHeapSizeOperator` on construction
592/// and `ArrangementHeapSizeOperatorDrop` on drop.
593// TODO: Correction buffer logging currently reuses the arrangement batch and size logging. This
594// isn't strictly correct as a correction buffer is not an arrangement. Consider refactoring this
595// to be about "operator sizes" instead.
596pub(super) struct CorrectionLogger {
597    compute_logger: ComputeLogger,
598    differential_logger: differential_dataflow::logging::Logger,
599    operator_id: usize,
600    rx: mpsc::UnboundedReceiver<LoggingEvent>,
601    /// Net number of batches logged (BatchEvent - DropEvent).
602    net_batches: isize,
603    /// Net number of records logged across all batch/drop/merge events.
604    net_records: isize,
605    /// Cumulative heap size delta, for retraction on drop.
606    net_size: isize,
607    /// Cumulative heap capacity delta, for retraction on drop.
608    net_capacity: isize,
609    /// Cumulative heap allocations delta, for retraction on drop.
610    net_allocations: isize,
611}
612
613impl fmt::Debug for CorrectionLogger {
614    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
615        f.debug_struct("CorrectionLogger")
616            .field("operator_id", &self.operator_id)
617            .finish_non_exhaustive()
618    }
619}
620
621impl CorrectionLogger {
622    pub fn new(
623        compute_logger: ComputeLogger,
624        differential_logger: differential_dataflow::logging::Logger,
625        operator_id: usize,
626        address: Vec<usize>,
627        rx: mpsc::UnboundedReceiver<LoggingEvent>,
628    ) -> Self {
629        compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
630            ArrangementHeapSizeOperator {
631                operator_id,
632                address,
633            },
634        ));
635
636        Self {
637            compute_logger,
638            differential_logger,
639            operator_id,
640            rx,
641            net_batches: 0,
642            net_records: 0,
643            net_size: 0,
644            net_capacity: 0,
645            net_allocations: 0,
646        }
647    }
648
649    /// Drain logging events from the channel and apply them locally.
650    pub fn apply_events(&mut self) {
651        use LoggingEvent::*;
652
653        while let Ok(event) = self.rx.try_recv() {
654            match event {
655                ChainCreated(length) => {
656                    self.net_batches += 1;
657                    self.net_records += isize::try_from(length).expect("must fit");
658                    self.differential_logger.log(BatchEvent {
659                        operator: self.operator_id,
660                        length,
661                    });
662                }
663                ChainDropped(length) => {
664                    self.net_batches -= 1;
665                    self.net_records -= isize::try_from(length).expect("must fit");
666                    self.differential_logger.log(DropEvent {
667                        operator: self.operator_id,
668                        length,
669                    });
670                }
671                SizeDiff(delta_size) => {
672                    self.net_size += delta_size.get();
673                    self.compute_logger.log(&ComputeEvent::ArrangementHeapSize(
674                        ArrangementHeapSize {
675                            operator_id: self.operator_id,
676                            delta_size: delta_size.get(),
677                        },
678                    ));
679                }
680                CapacityDiff(delta_capacity) => {
681                    self.net_capacity += delta_capacity.get();
682                    self.compute_logger
683                        .log(&ComputeEvent::ArrangementHeapCapacity(
684                            ArrangementHeapCapacity {
685                                operator_id: self.operator_id,
686                                delta_capacity: delta_capacity.get(),
687                            },
688                        ));
689                }
690                AllocationsDiff(delta_allocations) => {
691                    self.net_allocations += delta_allocations.get();
692                    self.compute_logger
693                        .log(&ComputeEvent::ArrangementHeapAllocations(
694                            ArrangementHeapAllocations {
695                                operator_id: self.operator_id,
696                                delta_allocations: delta_allocations.get(),
697                            },
698                        ));
699                }
700            }
701        }
702    }
703}
704
705impl Drop for CorrectionLogger {
706    fn drop(&mut self) {
707        // Drain any events that arrived before the drop. Note that the Tokio task
708        // may still be running (abort is async), so some events may not have arrived
709        // yet. We retract any remaining batch/record counts below.
710        self.apply_events();
711
712        // Retract any outstanding batch and record counts that weren't balanced by
713        // ChainDropped events. This handles the case where the Tokio task is aborted
714        // and its Correction destructors haven't run yet (abort is async).
715        //
716        // Each DropEvent retracts one batch and `length` records, so we emit one per
717        // outstanding batch, with the first carrying all outstanding records.
718        for i in 0..self.net_batches {
719            let length = if i == 0 {
720                usize::try_from(self.net_records).unwrap_or(0)
721            } else {
722                0
723            };
724            self.differential_logger.log(DropEvent {
725                operator: self.operator_id,
726                length,
727            });
728        }
729
730        // Retract any outstanding heap size/capacity/allocations deltas.
731        if self.net_size != 0 {
732            self.compute_logger
733                .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
734                    operator_id: self.operator_id,
735                    delta_size: -self.net_size,
736                }));
737        }
738        if self.net_capacity != 0 {
739            self.compute_logger
740                .log(&ComputeEvent::ArrangementHeapCapacity(
741                    ArrangementHeapCapacity {
742                        operator_id: self.operator_id,
743                        delta_capacity: -self.net_capacity,
744                    },
745                ));
746        }
747        if self.net_allocations != 0 {
748            self.compute_logger
749                .log(&ComputeEvent::ArrangementHeapAllocations(
750                    ArrangementHeapAllocations {
751                        operator_id: self.operator_id,
752                        delta_allocations: -self.net_allocations,
753                    },
754                ));
755        }
756
757        self.compute_logger
758            .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
759                ArrangementHeapSizeOperatorDrop {
760                    operator_id: self.operator_id,
761                },
762            ));
763    }
764}