Skip to main content

mz_compute/sink/
correction.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The `Correction` data structure used by `persist_sink::write_batches` to stash updates before
11//! they are written into batches.
12
13use std::collections::BTreeMap;
14use std::fmt;
15use std::num::NonZeroIsize;
16use std::ops::{AddAssign, Bound, RangeBounds, SubAssign};
17
18use differential_dataflow::consolidation::{consolidate, consolidate_updates};
19use differential_dataflow::logging::{BatchEvent, DropEvent};
20use itertools::Itertools;
21use mz_compute_types::dyncfgs::{
22    CONSOLIDATING_VEC_GROWTH_DAMPENER, CORRECTION_V2_CHAIN_PROPORTIONALITY,
23    CORRECTION_V2_CHUNK_SIZE, ENABLE_CORRECTION_V2,
24};
25use mz_dyncfg::ConfigSet;
26use mz_ore::iter::IteratorExt;
27use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
28use mz_repr::{Diff, Timestamp};
29use timely::PartialOrder;
30use timely::progress::Antichain;
31use tokio::sync::mpsc;
32
33use crate::logging::compute::{
34    ArrangementHeapAllocations, ArrangementHeapCapacity, ArrangementHeapSize,
35    ArrangementHeapSizeOperator, ArrangementHeapSizeOperatorDrop, ComputeEvent,
36    Logger as ComputeLogger,
37};
38use crate::sink::correction_v2::{CorrectionV2, Data};
39
40/// A data structure suitable for storing updates in a self-correcting persist sink.
41///
42/// Selects one of two correction buffer implementations. `V1` is the original simple
43/// implementation that stores updates in non-spillable memory. `V2` improves on `V1` by supporting
44/// spill-to-disk but is less battle-tested so for now we want to keep the option of reverting to
45/// `V1` in a pinch. The plan is to remove `V1` eventually.
46pub enum Correction<D: Data> {
47    /// Correction buffer based on a [`CorrectionV1`].
48    V1(CorrectionV1<D>),
49    /// Correction buffer based on a [`CorrectionV2`].
50    V2(CorrectionV2<D>),
51}
52
53impl<D: Data> Correction<D> {
54    /// Construct a new `Correction` instance.
55    pub fn new(
56        metrics: SinkMetrics,
57        worker_metrics: SinkWorkerMetrics,
58        logging: Option<ChannelLogging>,
59        config: &ConfigSet,
60    ) -> Self {
61        if ENABLE_CORRECTION_V2.get(config) {
62            let prop = CORRECTION_V2_CHAIN_PROPORTIONALITY.get(config);
63            let chunk_size = CORRECTION_V2_CHUNK_SIZE.get(config);
64            Self::V2(CorrectionV2::new(
65                metrics,
66                worker_metrics,
67                logging,
68                prop,
69                chunk_size,
70            ))
71        } else {
72            let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(config);
73            Self::V1(CorrectionV1::new(metrics, worker_metrics, growth_dampener))
74        }
75    }
76
77    /// Insert a batch of updates.
78    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
79        match self {
80            Self::V1(c) => c.insert(updates),
81            Self::V2(c) => c.insert(updates),
82        }
83    }
84
85    /// Insert a batch of updates, after negating their diffs.
86    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
87        match self {
88            Self::V1(c) => c.insert_negated(updates),
89            Self::V2(c) => c.insert_negated(updates),
90        }
91    }
92
93    /// Consolidate and return updates before the given `upper`.
94    pub fn updates_before(
95        &mut self,
96        upper: &Antichain<Timestamp>,
97    ) -> Box<dyn Iterator<Item = (D, Timestamp, Diff)> + Send + '_> {
98        match self {
99            Self::V1(c) => Box::new(c.updates_before(upper)),
100            Self::V2(c) => Box::new(c.updates_before(upper)),
101        }
102    }
103
104    /// Advance the since frontier.
105    ///
106    /// # Panics
107    ///
108    /// Panics if the given `since` is less than the current since frontier.
109    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
110        match self {
111            Self::V1(c) => c.advance_since(since),
112            Self::V2(c) => c.advance_since(since),
113        }
114    }
115
116    /// Consolidate all updates at the current `since`.
117    pub fn consolidate_at_since(&mut self) {
118        match self {
119            Self::V1(c) => c.consolidate_at_since(),
120            Self::V2(c) => c.consolidate_at_since(),
121        }
122    }
123}
124
125/// A collection holding `persist_sink` updates.
126///
127/// The `CorrectionV1` data structure is purpose-built for the `persist_sink::write_batches`
128/// operator:
129///
130///  * It stores updates by time, to enable efficient separation between updates that should
131///    be written to a batch and updates whose time has not yet arrived.
132///  * It eschews an interface for directly removing previously inserted updates. Instead, updates
133///    are removed by inserting them again, with negated diffs. Stored updates are continuously
134///    consolidated to give them opportunity to cancel each other out.
135///  * It provides an interface for advancing all contained updates to a given frontier.
136pub struct CorrectionV1<D> {
137    /// Stashed updates by time.
138    updates: BTreeMap<Timestamp, ConsolidatingVec<D>>,
139    /// Frontier to which all update times are advanced.
140    since: Antichain<Timestamp>,
141
142    /// Total length and capacity of vectors in `updates`.
143    ///
144    /// Tracked to maintain metrics.
145    total_size: LengthAndCapacity,
146    /// Global persist sink metrics.
147    metrics: SinkMetrics,
148    /// Per-worker persist sink metrics.
149    worker_metrics: SinkWorkerMetrics,
150    /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling.
151    growth_dampener: usize,
152}
153
154impl<D> CorrectionV1<D> {
155    /// Construct a new `CorrectionV1` instance.
156    pub fn new(
157        metrics: SinkMetrics,
158        worker_metrics: SinkWorkerMetrics,
159        growth_dampener: usize,
160    ) -> Self {
161        Self {
162            updates: Default::default(),
163            since: Antichain::from_elem(Timestamp::MIN),
164            total_size: Default::default(),
165            metrics,
166            worker_metrics,
167            growth_dampener,
168        }
169    }
170
171    /// Update persist sink metrics to the given new length and capacity.
172    fn update_metrics(&mut self, new_size: LengthAndCapacity) {
173        let old_size = self.total_size;
174        let len_delta = UpdateDelta::new(new_size.length, old_size.length);
175        let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
176        self.metrics
177            .report_correction_update_deltas(len_delta, cap_delta);
178        self.worker_metrics
179            .report_correction_update_totals(new_size.length, new_size.capacity);
180
181        self.total_size = new_size;
182    }
183}
184
185impl<D: Data> CorrectionV1<D> {
186    /// Insert a batch of updates.
187    pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
188        let Some(since_ts) = self.since.as_option() else {
189            // If the since frontier is empty, discard all updates.
190            updates.clear();
191            return;
192        };
193
194        for (_, time, _) in &mut *updates {
195            *time = std::cmp::max(*time, *since_ts);
196        }
197        self.insert_inner(updates);
198    }
199
200    /// Insert a batch of updates, after negating their diffs.
201    pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
202        let Some(since_ts) = self.since.as_option() else {
203            // If the since frontier is empty, discard all updates.
204            updates.clear();
205            return;
206        };
207
208        for (_, time, diff) in &mut *updates {
209            *time = std::cmp::max(*time, *since_ts);
210            *diff = -*diff;
211        }
212        self.insert_inner(updates);
213    }
214
215    /// Insert a batch of updates.
216    ///
217    /// The given `updates` must all have been advanced by `self.since`.
218    fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
219        consolidate_updates(updates);
220        updates.sort_unstable_by_key(|(_, time, _)| *time);
221
222        let mut new_size = self.total_size;
223        let mut updates = updates.drain(..).peekable();
224        while let Some(&(_, time, _)) = updates.peek() {
225            debug_assert!(
226                self.since.less_equal(&time),
227                "update not advanced by `since`"
228            );
229
230            let data = updates
231                .peeking_take_while(|(_, t, _)| *t == time)
232                .map(|(d, _, r)| (d, r));
233
234            use std::collections::btree_map::Entry;
235            match self.updates.entry(time) {
236                Entry::Vacant(entry) => {
237                    let mut vec: ConsolidatingVec<_> = data.collect();
238                    vec.growth_dampener = self.growth_dampener;
239                    new_size += (vec.len(), vec.capacity());
240                    entry.insert(vec);
241                }
242                Entry::Occupied(mut entry) => {
243                    let vec = entry.get_mut();
244                    new_size -= (vec.len(), vec.capacity());
245                    vec.extend(data);
246                    new_size += (vec.len(), vec.capacity());
247                }
248            }
249        }
250
251        self.update_metrics(new_size);
252    }
253
254    /// Consolidate and return updates within the given bounds.
255    ///
256    /// # Panics
257    ///
258    /// Panics if `lower` is not less than or equal to `upper`.
259    pub fn updates_within<'a>(
260        &'a mut self,
261        lower: &Antichain<Timestamp>,
262        upper: &Antichain<Timestamp>,
263    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + use<'a, D> {
264        assert!(PartialOrder::less_equal(lower, upper));
265
266        let start = match lower.as_option() {
267            Some(ts) => Bound::Included(*ts),
268            None => Bound::Excluded(Timestamp::MAX),
269        };
270        let end = match upper.as_option() {
271            Some(ts) => Bound::Excluded(*ts),
272            None => Bound::Unbounded,
273        };
274
275        let update_count = self.consolidate((start, end));
276
277        let range = self.updates.range((start, end));
278        range
279            .flat_map(|(t, data)| data.iter().map(|(d, r)| (d.clone(), *t, *r)))
280            .exact_size(update_count)
281    }
282
283    /// Consolidate and return updates before the given `upper`.
284    pub fn updates_before<'a>(
285        &'a mut self,
286        upper: &Antichain<Timestamp>,
287    ) -> impl Iterator<Item = (D, Timestamp, Diff)> + ExactSizeIterator + Send + use<'a, D> {
288        let lower = Antichain::from_elem(Timestamp::MIN);
289        self.updates_within(&lower, upper)
290    }
291
292    /// Consolidate the updates at the times in the given range.
293    ///
294    /// Returns the number of updates remaining in the range afterwards.
295    fn consolidate<R>(&mut self, range: R) -> usize
296    where
297        R: RangeBounds<Timestamp>,
298    {
299        let mut new_size = self.total_size;
300
301        let updates = self.updates.range_mut(range);
302        let count = updates.fold(0, |acc, (_, data)| {
303            new_size -= (data.len(), data.capacity());
304            data.consolidate();
305            new_size += (data.len(), data.capacity());
306            acc + data.len()
307        });
308
309        self.update_metrics(new_size);
310        count
311    }
312
313    /// Advance the since frontier.
314    ///
315    /// # Panics
316    ///
317    /// Panics if the given `since` is less than the current since frontier.
318    pub fn advance_since(&mut self, since: Antichain<Timestamp>) {
319        assert!(PartialOrder::less_equal(&self.since, &since));
320
321        if since != self.since {
322            self.advance_by(&since);
323            self.since = since;
324        }
325    }
326
327    /// Advance all contained updates by the given frontier.
328    ///
329    /// If the given frontier is empty, all remaining updates are discarded.
330    pub fn advance_by(&mut self, frontier: &Antichain<Timestamp>) {
331        let Some(target_ts) = frontier.as_option() else {
332            self.updates.clear();
333            self.update_metrics(Default::default());
334            return;
335        };
336
337        let mut new_size = self.total_size;
338        while let Some((ts, data)) = self.updates.pop_first() {
339            if frontier.less_equal(&ts) {
340                // We have advanced all updates that can advance.
341                self.updates.insert(ts, data);
342                break;
343            }
344
345            use std::collections::btree_map::Entry;
346            match self.updates.entry(*target_ts) {
347                Entry::Vacant(entry) => {
348                    entry.insert(data);
349                }
350                Entry::Occupied(mut entry) => {
351                    let vec = entry.get_mut();
352                    new_size -= (data.len(), data.capacity());
353                    new_size -= (vec.len(), vec.capacity());
354                    vec.extend(data);
355                    new_size += (vec.len(), vec.capacity());
356                }
357            }
358        }
359
360        self.update_metrics(new_size);
361    }
362
363    /// Consolidate all updates at the current `since`.
364    pub fn consolidate_at_since(&mut self) {
365        let Some(since_ts) = self.since.as_option() else {
366            return;
367        };
368
369        let start = Bound::Included(*since_ts);
370        let end = match since_ts.try_step_forward() {
371            Some(ts) => Bound::Excluded(ts),
372            None => Bound::Unbounded,
373        };
374
375        self.consolidate((start, end));
376    }
377}
378
379impl<D> Drop for CorrectionV1<D> {
380    fn drop(&mut self) {
381        self.update_metrics(Default::default());
382    }
383}
384
385/// Helper type for convenient tracking of length and capacity together.
386#[derive(Clone, Copy, Debug, Default)]
387pub(super) struct LengthAndCapacity {
388    pub length: usize,
389    pub capacity: usize,
390}
391
392impl AddAssign<Self> for LengthAndCapacity {
393    fn add_assign(&mut self, size: Self) {
394        self.length += size.length;
395        self.capacity += size.capacity;
396    }
397}
398
399impl AddAssign<(usize, usize)> for LengthAndCapacity {
400    fn add_assign(&mut self, (len, cap): (usize, usize)) {
401        self.length += len;
402        self.capacity += cap;
403    }
404}
405
406impl SubAssign<(usize, usize)> for LengthAndCapacity {
407    fn sub_assign(&mut self, (len, cap): (usize, usize)) {
408        self.length -= len;
409        self.capacity -= cap;
410    }
411}
412
413/// A vector that consolidates its contents.
414///
415/// The vector is filled with updates until it reaches capacity. At this point, the updates are
416/// consolidated to free up space. This process repeats until the consolidation recovered less than
417/// half of the vector's capacity, at which point the capacity is doubled.
418#[derive(Debug)]
419pub(crate) struct ConsolidatingVec<D> {
420    data: Vec<(D, Diff)>,
421    /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
422    /// might start smaller than this.
423    min_capacity: usize,
424    /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`.
425    ///
426    /// If consolidation didn't free enough space, at least a linear amount, increase the capacity
427    /// Setting this to 0 results in doubling whenever the list is at least half full.
428    /// Larger numbers result in more conservative approaches that use more CPU, but less memory.
429    growth_dampener: usize,
430}
431
432impl<D: Ord> ConsolidatingVec<D> {
433    /// Return the length of the vector.
434    pub fn len(&self) -> usize {
435        self.data.len()
436    }
437
438    /// Return the capacity of the vector.
439    pub fn capacity(&self) -> usize {
440        self.data.capacity()
441    }
442
443    /// Pushes `item` into the vector.
444    ///
445    /// If the vector does not have sufficient capacity, we'll first consolidate and then increase
446    /// its capacity if the consolidated results still occupy a significant fraction of the vector.
447    ///
448    /// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
449    /// but amortizes to O(log n).
450    pub fn push(&mut self, item: (D, Diff)) {
451        let capacity = self.data.capacity();
452        if self.data.len() == capacity {
453            // The vector is full. First, consolidate to try to recover some space.
454            self.consolidate();
455
456            // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length.
457            // This corresponds to `cap < len + len/(n+1)`, which is the logic we use.
458            let length = self.data.len();
459            let dampener = self.growth_dampener;
460            if capacity < length + length / (dampener + 1) {
461                // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves
462                // determining the target capacity, and then reserving an amount that achieves this
463                // while working around the existing length.
464                let new_cap = capacity + capacity / (dampener + 1);
465                self.data.reserve_exact(new_cap - length);
466            }
467        }
468
469        self.data.push(item);
470    }
471
472    /// Consolidate the contents.
473    pub fn consolidate(&mut self) {
474        consolidate(&mut self.data);
475
476        // We may have the opportunity to reclaim allocated memory.
477        // Given that `push` will at most double the capacity when the vector is more than half full, and
478        // we want to avoid entering into a resizing cycle, we choose to only shrink if the
479        // vector's length is less than one fourth of its capacity.
480        if self.data.len() < self.data.capacity() / 4 {
481            self.data.shrink_to(self.min_capacity);
482        }
483    }
484
485    /// Return an iterator over the borrowed items.
486    pub fn iter(&self) -> impl Iterator<Item = &(D, Diff)> {
487        self.data.iter()
488    }
489}
490
491impl<D> IntoIterator for ConsolidatingVec<D> {
492    type Item = (D, Diff);
493    type IntoIter = std::vec::IntoIter<(D, Diff)>;
494
495    fn into_iter(self) -> Self::IntoIter {
496        self.data.into_iter()
497    }
498}
499
500impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
501    fn from_iter<I>(iter: I) -> Self
502    where
503        I: IntoIterator<Item = (D, Diff)>,
504    {
505        Self {
506            data: Vec::from_iter(iter),
507            min_capacity: 0,
508            growth_dampener: 0,
509        }
510    }
511}
512
513impl<D: Ord> Extend<(D, Diff)> for ConsolidatingVec<D> {
514    fn extend<I>(&mut self, iter: I)
515    where
516        I: IntoIterator<Item = (D, Diff)>,
517    {
518        for item in iter {
519            self.push(item);
520        }
521    }
522}
523
524/// Helper type for convenient tracking of various size metrics together.
525#[derive(Clone, Copy, Debug, Default)]
526pub(super) struct SizeMetrics {
527    pub size: usize,
528    pub capacity: usize,
529    pub allocations: usize,
530}
531
532impl AddAssign<Self> for SizeMetrics {
533    fn add_assign(&mut self, other: Self) {
534        self.size += other.size;
535        self.capacity += other.capacity;
536        self.allocations += other.allocations;
537    }
538}
539
540/// A logging event sent from the Tokio task back to the Timely thread.
541#[derive(Debug)]
542pub enum LoggingEvent {
543    /// A chain with the given number of updates was created.
544    ChainCreated(usize),
545    /// A chain with the given number of updates was dropped.
546    ChainDropped(usize),
547    /// The heap size of the correction buffer changed by the given amount.
548    SizeDiff(NonZeroIsize),
549    /// The heap capacity of the correction buffer changed by the given amount.
550    CapacityDiff(NonZeroIsize),
551    /// The number of allocations of the correction buffer changed by the given amount.
552    AllocationsDiff(NonZeroIsize),
553}
554
555/// Channel-based logging for corrections on a Tokio task. `Send`-safe.
556///
557/// Sends logging events to the Timely thread, where they are applied to the real `Logging`
558/// instance. This allows corrections on the Tokio task to participate in introspection logging
559/// without holding `Rc<RefCell<..>>`.
560#[derive(Clone, Debug)]
561pub struct ChannelLogging(mpsc::UnboundedSender<LoggingEvent>);
562
563impl ChannelLogging {
564    /// Construct a new `ChannelLogging` sending events on the given channel.
565    pub fn new(tx: mpsc::UnboundedSender<LoggingEvent>) -> Self {
566        Self(tx)
567    }
568
569    /// Report the creation of a chain with the given number of updates.
570    pub fn chain_created(&self, updates: usize) {
571        let _ = self.0.send(LoggingEvent::ChainCreated(updates));
572    }
573
574    /// Report the dropping of a chain with the given number of updates.
575    pub fn chain_dropped(&self, updates: usize) {
576        let _ = self.0.send(LoggingEvent::ChainDropped(updates));
577    }
578
579    /// Report a change in heap size by the given amount.
580    pub fn report_size_diff(&self, diff: isize) {
581        if let Some(diff) = NonZeroIsize::new(diff) {
582            let _ = self.0.send(LoggingEvent::SizeDiff(diff));
583        }
584    }
585
586    /// Report a change in heap capacity by the given amount.
587    pub fn report_capacity_diff(&self, diff: isize) {
588        if let Some(diff) = NonZeroIsize::new(diff) {
589            let _ = self.0.send(LoggingEvent::CapacityDiff(diff));
590        }
591    }
592
593    /// Report a change in the number of allocations by the given amount.
594    pub fn report_allocations_diff(&self, diff: isize) {
595        if let Some(diff) = NonZeroIsize::new(diff) {
596            let _ = self.0.send(LoggingEvent::AllocationsDiff(diff));
597        }
598    }
599}
600
601/// State for correction buffer logging on the Timely thread.
602///
603/// Drains [`LoggingEvent`]s sent by [`ChannelLogging`] from the Tokio task and applies them
604/// to the compute and differential loggers. Emits `ArrangementHeapSizeOperator` on construction
605/// and `ArrangementHeapSizeOperatorDrop` on drop.
606// TODO: Correction buffer logging currently reuses the arrangement batch and size logging. This
607// isn't strictly correct as a correction buffer is not an arrangement. Consider refactoring this
608// to be about "operator sizes" instead.
609pub(super) struct CorrectionLogger {
610    compute_logger: ComputeLogger,
611    differential_logger: differential_dataflow::logging::Logger,
612    operator_id: usize,
613    rx: mpsc::UnboundedReceiver<LoggingEvent>,
614    /// Net number of batches logged (BatchEvent - DropEvent).
615    net_batches: isize,
616    /// Net number of records logged across all batch/drop/merge events.
617    net_records: isize,
618    /// Cumulative heap size delta, for retraction on drop.
619    net_size: isize,
620    /// Cumulative heap capacity delta, for retraction on drop.
621    net_capacity: isize,
622    /// Cumulative heap allocations delta, for retraction on drop.
623    net_allocations: isize,
624}
625
626impl fmt::Debug for CorrectionLogger {
627    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
628        f.debug_struct("CorrectionLogger")
629            .field("operator_id", &self.operator_id)
630            .finish_non_exhaustive()
631    }
632}
633
634impl CorrectionLogger {
635    pub fn new(
636        compute_logger: ComputeLogger,
637        differential_logger: differential_dataflow::logging::Logger,
638        operator_id: usize,
639        address: Vec<usize>,
640        rx: mpsc::UnboundedReceiver<LoggingEvent>,
641    ) -> Self {
642        compute_logger.log(&ComputeEvent::ArrangementHeapSizeOperator(
643            ArrangementHeapSizeOperator {
644                operator_id,
645                address,
646            },
647        ));
648
649        Self {
650            compute_logger,
651            differential_logger,
652            operator_id,
653            rx,
654            net_batches: 0,
655            net_records: 0,
656            net_size: 0,
657            net_capacity: 0,
658            net_allocations: 0,
659        }
660    }
661
662    /// Drain logging events from the channel and apply them locally.
663    pub fn apply_events(&mut self) {
664        use LoggingEvent::*;
665
666        while let Ok(event) = self.rx.try_recv() {
667            match event {
668                ChainCreated(length) => {
669                    self.net_batches += 1;
670                    self.net_records += isize::try_from(length).expect("must fit");
671                    self.differential_logger.log(BatchEvent {
672                        operator: self.operator_id,
673                        length,
674                    });
675                }
676                ChainDropped(length) => {
677                    self.net_batches -= 1;
678                    self.net_records -= isize::try_from(length).expect("must fit");
679                    self.differential_logger.log(DropEvent {
680                        operator: self.operator_id,
681                        length,
682                    });
683                }
684                SizeDiff(delta_size) => {
685                    self.net_size += delta_size.get();
686                    self.compute_logger.log(&ComputeEvent::ArrangementHeapSize(
687                        ArrangementHeapSize {
688                            operator_id: self.operator_id,
689                            delta_size: delta_size.get(),
690                        },
691                    ));
692                }
693                CapacityDiff(delta_capacity) => {
694                    self.net_capacity += delta_capacity.get();
695                    self.compute_logger
696                        .log(&ComputeEvent::ArrangementHeapCapacity(
697                            ArrangementHeapCapacity {
698                                operator_id: self.operator_id,
699                                delta_capacity: delta_capacity.get(),
700                            },
701                        ));
702                }
703                AllocationsDiff(delta_allocations) => {
704                    self.net_allocations += delta_allocations.get();
705                    self.compute_logger
706                        .log(&ComputeEvent::ArrangementHeapAllocations(
707                            ArrangementHeapAllocations {
708                                operator_id: self.operator_id,
709                                delta_allocations: delta_allocations.get(),
710                            },
711                        ));
712                }
713            }
714        }
715    }
716}
717
718impl Drop for CorrectionLogger {
719    fn drop(&mut self) {
720        // Drain any events that arrived before the drop. Note that the Tokio task
721        // may still be running (abort is async), so some events may not have arrived
722        // yet. We retract any remaining batch/record counts below.
723        self.apply_events();
724
725        // Retract any outstanding batch and record counts that weren't balanced by
726        // ChainDropped events. This handles the case where the Tokio task is aborted
727        // and its Correction destructors haven't run yet (abort is async).
728        //
729        // Each DropEvent retracts one batch and `length` records, so we emit one per
730        // outstanding batch, with the first carrying all outstanding records.
731        for i in 0..self.net_batches {
732            let length = if i == 0 {
733                usize::try_from(self.net_records).unwrap_or(0)
734            } else {
735                0
736            };
737            self.differential_logger.log(DropEvent {
738                operator: self.operator_id,
739                length,
740            });
741        }
742
743        // Retract any outstanding heap size/capacity/allocations deltas.
744        if self.net_size != 0 {
745            self.compute_logger
746                .log(&ComputeEvent::ArrangementHeapSize(ArrangementHeapSize {
747                    operator_id: self.operator_id,
748                    delta_size: -self.net_size,
749                }));
750        }
751        if self.net_capacity != 0 {
752            self.compute_logger
753                .log(&ComputeEvent::ArrangementHeapCapacity(
754                    ArrangementHeapCapacity {
755                        operator_id: self.operator_id,
756                        delta_capacity: -self.net_capacity,
757                    },
758                ));
759        }
760        if self.net_allocations != 0 {
761            self.compute_logger
762                .log(&ComputeEvent::ArrangementHeapAllocations(
763                    ArrangementHeapAllocations {
764                        operator_id: self.operator_id,
765                        delta_allocations: -self.net_allocations,
766                    },
767                ));
768        }
769
770        self.compute_logger
771            .log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
772                ArrangementHeapSizeOperatorDrop {
773                    operator_id: self.operator_id,
774                },
775            ));
776    }
777}