mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::Data;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, Stream};
35use timely::scheduling::Scheduler;
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42    SharedLoggingState, Update,
43};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::RowRowSpine;
46
47/// Type alias for a logger of compute events.
48pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51/// A dataflow exports a global ID.
52#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54    /// Identifier of the export.
55    pub export_id: GlobalId,
56    /// Timely worker index of the exporting dataflow.
57    pub dataflow_index: usize,
58}
59
60/// The export for a global id was dropped.
61#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63    /// Identifier of the export.
64    pub export_id: GlobalId,
65}
66
67/// A peek event with a [`PeekType`], and an installation status.
68#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70    /// The identifier of the view the peek targets.
71    pub id: GlobalId,
72    /// The logical timestamp requested.
73    pub time: Timestamp,
74    /// The ID of the peek.
75    pub uuid: uuid::Bytes,
76    /// The relevant _type_ of peek: index or persist.
77    // Note that this is not stored on the Peek event for data-packing reasons only.
78    pub peek_type: PeekType,
79    /// True if the peek is being installed; false if it's being removed.
80    pub installed: bool,
81}
82
83/// Frontier change event.
84#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86    pub export_id: GlobalId,
87    pub time: Timestamp,
88    pub diff: i8,
89}
90
91/// An import frontier change.
92#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94    pub import_id: GlobalId,
95    pub export_id: GlobalId,
96    pub time: Timestamp,
97    pub diff: i8,
98}
99
100/// A change in an arrangement's heap size.
101#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103    /// Operator index
104    pub operator_id: usize,
105    /// Delta of the heap size in bytes of the arrangement.
106    pub delta_size: isize,
107}
108
109/// A change in an arrangement's heap capacity.
110#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112    /// Operator index
113    pub operator_id: usize,
114    /// Delta of the heap capacity in bytes of the arrangement.
115    pub delta_capacity: isize,
116}
117
118/// A change in an arrangement's heap allocation count.
119#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121    /// Operator index
122    pub operator_id: usize,
123    /// Delta of distinct heap allocations backing the arrangement.
124    pub delta_allocations: isize,
125}
126
127/// Announcing an operator that manages an arrangement.
128#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130    /// Operator index
131    pub operator_id: usize,
132    /// The address of the operator.
133    pub address: Vec<usize>,
134}
135
136/// Drop event for an operator managing an arrangement.
137#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139    /// Operator index
140    pub operator_id: usize,
141}
142
143/// Dataflow shutdown event.
144#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146    /// Timely worker index of the dataflow.
147    pub dataflow_index: usize,
148}
149
150/// Error count update event.
151#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153    /// Identifier of the export.
154    pub export_id: GlobalId,
155    /// The change in error count.
156    pub diff: Diff,
157}
158
159/// An export is hydrated.
160#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162    /// Identifier of the export.
163    pub export_id: GlobalId,
164}
165
166/// An operator's hydration status changed.
167#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169    /// Identifier of the export.
170    pub export_id: GlobalId,
171    /// Identifier of the operator's LIR node.
172    pub lir_id: LirId,
173    /// Whether the operator is hydrated.
174    pub hydrated: bool,
175}
176
177/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
178#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180    /// The `GlobalId` in which the LIR operator is rendered.
181    ///
182    /// NB a single a dataflow may have many `GlobalId`s inside it.
183    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
184    /// tracks the many-to-one relationship between `GlobalId`s and
185    /// dataflows.
186    pub global_id: GlobalId,
187    /// The actual mapping.
188    /// Represented this way to reduce the size of `ComputeEvent`.
189    pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192/// Announce that a dataflow supports a specific global ID.
193#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195    /// The identifier of the dataflow.
196    pub dataflow_index: usize,
197    /// A `GlobalId` that is rendered as part of this dataflow.
198    pub global_id: GlobalId,
199}
200
201/// A logged compute event.
202#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204    /// A dataflow export was created.
205    Export(Export),
206    /// A dataflow export was dropped.
207    ExportDropped(ExportDropped),
208    /// Peek command.
209    Peek(PeekEvent),
210    /// Available frontier information for dataflow exports.
211    Frontier(Frontier),
212    /// Available frontier information for dataflow imports.
213    ImportFrontier(ImportFrontier),
214    /// Arrangement heap size update
215    ArrangementHeapSize(ArrangementHeapSize),
216    /// Arrangement heap size update
217    ArrangementHeapCapacity(ArrangementHeapCapacity),
218    /// Arrangement heap size update
219    ArrangementHeapAllocations(ArrangementHeapAllocations),
220    /// Arrangement size operator address
221    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222    /// Arrangement size operator dropped
223    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224    /// All operators of a dataflow have shut down.
225    DataflowShutdown(DataflowShutdown),
226    /// The number of errors in a dataflow export has changed.
227    ErrorCount(ErrorCount),
228    /// A dataflow export was hydrated.
229    Hydration(Hydration),
230    /// A dataflow operator's hydration status changed.
231    OperatorHydration(OperatorHydration),
232    /// An LIR operator was mapped to some particular dataflow operator.
233    ///
234    /// Cf. `ComputeLog::LirMaping`
235    LirMapping(LirMapping),
236    DataflowGlobal(DataflowGlobal),
237}
238
239/// A peek type distinguishing between index and persist peeks.
240#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242    /// A peek against an index.
243    Index,
244    /// A peek against persist.
245    Persist,
246}
247
248impl PeekType {
249    /// A human-readable name for a peek type.
250    fn name(self) -> &'static str {
251        match self {
252            PeekType::Index => "index",
253            PeekType::Persist => "persist",
254        }
255    }
256}
257
258/// Metadata for LIR operators.
259#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
262    operator: String,
263    /// The LIR identifier of the parent (if any).
264    parent_lir_id: Option<LirId>,
265    /// How nested the operator is (for nice indentation).
266    nesting: u8,
267    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
268    /// If `start == end`, then no operators were used.
269    operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273    /// Construct a new LIR metadata object.
274    pub fn new(
275        operator: String,
276        parent_lir_id: Option<LirId>,
277        nesting: u8,
278        operator_span: (usize, usize),
279    ) -> Self {
280        Self {
281            operator,
282            parent_lir_id,
283            nesting,
284            operator_span,
285        }
286    }
287}
288
289/// The return type of the [`construct`] function.
290pub(super) struct Return {
291    /// Collections returned by [`construct`].
292    pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295/// Constructs the logging dataflow fragment for compute logs.
296///
297/// Params
298/// * `scope`: The Timely scope hosting the log analysis dataflow.
299/// * `scheduler`: The timely scheduler to obtainer activators.
300/// * `config`: Logging configuration.
301/// * `event_queue`: The source to read compute log events from.
302/// * `compute_event_streams`: Additional compute event streams to absorb.
303/// * `shared_state`: Shared state between logging dataflow fragments.
304pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
305    mut scope: G,
306    scheduler: S,
307    config: &mz_compute_client::logging::LoggingConfig,
308    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309    shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313    scope.scoped("compute logging", move |scope| {
314        let enable_logging = config.enable_logging;
315        let (logs, token) = if enable_logging {
316            event_queue.links.mz_replay(
317                scope,
318                "compute logs",
319                config.interval,
320                event_queue.activator,
321            )
322        } else {
323            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324            (empty(scope), token)
325        };
326
327        // Build a demux operator that splits the replayed event stream up into the separate
328        // logging streams.
329        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330        let mut input = demux.new_input(&logs, Pipeline);
331        let (export_out, export) = demux.new_output();
332        let mut export_out = OutputBuilder::from(export_out);
333        let (frontier_out, frontier) = demux.new_output();
334        let mut frontier_out = OutputBuilder::from(frontier_out);
335        let (import_frontier_out, import_frontier) = demux.new_output();
336        let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337        let (peek_out, peek) = demux.new_output();
338        let mut peek_out = OutputBuilder::from(peek_out);
339        let (peek_duration_out, peek_duration) = demux.new_output();
340        let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341        let (shutdown_duration_out, shutdown_duration) = demux.new_output();
342        let mut shutdown_duration_out = OutputBuilder::from(shutdown_duration_out);
343        let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
344        let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
345        let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
346        let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
347        let (arrangement_heap_allocations_out, arrangement_heap_allocations) =
348            demux.new_output();
349        let mut arrangement_heap_allocations_out = OutputBuilder::from(arrangement_heap_allocations_out);
350        let (error_count_out, error_count) = demux.new_output();
351        let mut error_count_out = OutputBuilder::from(error_count_out);
352        let (hydration_time_out, hydration_time) = demux.new_output();
353        let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
354        let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
355        let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
356        let (lir_mapping_out, lir_mapping) = demux.new_output();
357        let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
358        let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
359        let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
360
361        let mut demux_state = DemuxState::new(scheduler, scope.index());
362        demux.build(move |_capability| {
363            move |_frontiers| {
364                let mut export = export_out.activate();
365                let mut frontier = frontier_out.activate();
366                let mut import_frontier = import_frontier_out.activate();
367                let mut peek = peek_out.activate();
368                let mut peek_duration = peek_duration_out.activate();
369                let mut shutdown_duration = shutdown_duration_out.activate();
370                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
371                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
372                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
373                let mut error_count = error_count_out.activate();
374                let mut hydration_time = hydration_time_out.activate();
375                let mut operator_hydration_status = operator_hydration_status_out.activate();
376                let mut lir_mapping = lir_mapping_out.activate();
377                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
378
379                input.for_each(|cap, data| {
380                    let mut output_sessions = DemuxOutput {
381                        export: export.session_with_builder(&cap),
382                        frontier: frontier.session_with_builder(&cap),
383                        import_frontier: import_frontier.session_with_builder(&cap),
384                        peek: peek.session_with_builder(&cap),
385                        peek_duration: peek_duration.session_with_builder(&cap),
386                        shutdown_duration: shutdown_duration.session_with_builder(&cap),
387                        arrangement_heap_allocations: arrangement_heap_allocations.session_with_builder(&cap),
388                        arrangement_heap_capacity: arrangement_heap_capacity.session_with_builder(&cap),
389                        arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
390                        error_count: error_count.session_with_builder(&cap),
391                        hydration_time: hydration_time.session_with_builder(&cap),
392                        operator_hydration_status: operator_hydration_status.session_with_builder(&cap),
393                        lir_mapping: lir_mapping.session_with_builder(&cap),
394                        dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
395                    };
396
397                    let shared_state = &mut shared_state.borrow_mut();
398                    for (time, event) in data.borrow().into_index_iter() {
399                        DemuxHandler {
400                            state: &mut demux_state,
401                            shared_state,
402                            output: &mut output_sessions,
403                            logging_interval_ms,
404                            time,
405                        }
406                        .handle(event);
407                    }
408                });
409            }
410        });
411
412        use ComputeLog::*;
413        let logs = [
414            (ArrangementHeapAllocations, arrangement_heap_allocations),
415            (ArrangementHeapCapacity, arrangement_heap_capacity),
416            (ArrangementHeapSize, arrangement_heap_size),
417            (DataflowCurrent, export),
418            (DataflowGlobal, dataflow_global_ids),
419            (ErrorCount, error_count),
420            (FrontierCurrent, frontier),
421            (HydrationTime, hydration_time),
422            (ImportFrontierCurrent, import_frontier),
423            (LirMapping, lir_mapping),
424            (OperatorHydrationStatus, operator_hydration_status),
425            (PeekCurrent, peek),
426            (PeekDuration, peek_duration),
427            (ShutdownDuration, shutdown_duration),
428        ];
429
430        // Build the output arrangements.
431        let mut collections = BTreeMap::new();
432        for (variant, stream) in logs {
433            let variant = LogVariant::Compute(variant);
434            if config.index_logs.contains_key(&variant) {
435                let trace = stream
436                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
437                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
438                        &format!("Arrange {variant:?}"),
439                    )
440                    .trace;
441                let collection = LogCollection {
442                    trace,
443                    token: Rc::clone(&token),
444                };
445                collections.insert(variant, collection);
446            }
447        }
448
449        Return { collections }
450    })
451}
452
453/// Format the given value and pack it into a `Datum::String`.
454///
455/// The `scratch` buffer is used to perform the string conversion without an allocation.
456/// Callers should not assume anything about the contents of this buffer after this function
457/// returns.
458fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
459where
460    V: Display,
461{
462    scratch.clear();
463    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
464    Datum::String(scratch)
465}
466
467/// State maintained by the demux operator.
468struct DemuxState<A> {
469    /// The timely scheduler.
470    scheduler: A,
471    /// The index of this worker.
472    worker_id: usize,
473    /// A reusable scratch string for formatting IDs.
474    scratch_string_a: String,
475    /// A reusable scratch string for formatting IDs.
476    scratch_string_b: String,
477    /// State tracked per dataflow export.
478    exports: BTreeMap<GlobalId, ExportState>,
479    /// Maps live dataflows to counts of their exports.
480    dataflow_export_counts: BTreeMap<usize, u32>,
481    /// Maps dropped dataflows to their drop time.
482    dataflow_drop_times: BTreeMap<usize, Duration>,
483    /// Contains dataflows that have shut down but not yet been dropped.
484    shutdown_dataflows: BTreeSet<usize>,
485    /// Maps pending peeks to their installation time.
486    peek_stash: BTreeMap<Uuid, Duration>,
487    /// Arrangement size stash.
488    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
489    /// LIR -> operator span mapping.
490    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
491    /// Dataflow -> `GlobalId` mapping (many-to-one).
492    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
493    /// A row packer for the arrangement heap allocations output.
494    arrangement_heap_allocations_packer: PermutedRowPacker,
495    /// A row packer for the arrangement heap capacity output.
496    arrangement_heap_capacity_packer: PermutedRowPacker,
497    /// A row packer for the arrangement heap size output.
498    arrangement_heap_size_packer: PermutedRowPacker,
499    /// A row packer for the dataflow global output.
500    dataflow_global_packer: PermutedRowPacker,
501    /// A row packer for the error count output.
502    error_count_packer: PermutedRowPacker,
503    /// A row packer for the exports output.
504    export_packer: PermutedRowPacker,
505    /// A row packer for the frontier output.
506    frontier_packer: PermutedRowPacker,
507    /// A row packer for the exports output.
508    import_frontier_packer: PermutedRowPacker,
509    /// A row packer for the LIR mapping output.
510    lir_mapping_packer: PermutedRowPacker,
511    /// A row packer for the operator hydration status output.
512    operator_hydration_status_packer: PermutedRowPacker,
513    /// A row packer for the peek durations output.
514    peek_duration_packer: PermutedRowPacker,
515    /// A row packer for the peek output.
516    peek_packer: PermutedRowPacker,
517    /// A row packer for the shutdown duration output.
518    shutdown_duration_packer: PermutedRowPacker,
519    /// A row packer for the hydration time output.
520    hydration_time_packer: PermutedRowPacker,
521}
522
523impl<A: Scheduler> DemuxState<A> {
524    fn new(scheduler: A, worker_id: usize) -> Self {
525        Self {
526            scheduler,
527            worker_id,
528            scratch_string_a: String::new(),
529            scratch_string_b: String::new(),
530            exports: Default::default(),
531            dataflow_export_counts: Default::default(),
532            dataflow_drop_times: Default::default(),
533            shutdown_dataflows: Default::default(),
534            peek_stash: Default::default(),
535            arrangement_size: Default::default(),
536            lir_mapping: Default::default(),
537            dataflow_global_ids: Default::default(),
538            arrangement_heap_allocations_packer: PermutedRowPacker::new(
539                ComputeLog::ArrangementHeapAllocations,
540            ),
541            arrangement_heap_capacity_packer: PermutedRowPacker::new(
542                ComputeLog::ArrangementHeapCapacity,
543            ),
544            arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
545            dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
546            error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
547            export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
548            frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
549            hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
550            import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
551            lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
552            operator_hydration_status_packer: PermutedRowPacker::new(
553                ComputeLog::OperatorHydrationStatus,
554            ),
555            peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
556            peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
557            shutdown_duration_packer: PermutedRowPacker::new(ComputeLog::ShutdownDuration),
558        }
559    }
560
561    /// Pack an arrangement heap allocations update key-value for the given operator.
562    fn pack_arrangement_heap_allocations_update(
563        &mut self,
564        operator_id: usize,
565    ) -> (&RowRef, &RowRef) {
566        self.arrangement_heap_allocations_packer.pack_slice(&[
567            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
568            Datum::UInt64(u64::cast_from(self.worker_id)),
569        ])
570    }
571
572    /// Pack an arrangement heap capacity update key-value for the given operator.
573    fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
574        self.arrangement_heap_capacity_packer.pack_slice(&[
575            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
576            Datum::UInt64(u64::cast_from(self.worker_id)),
577        ])
578    }
579
580    /// Pack an arrangement heap size update key-value for the given operator.
581    fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
582        self.arrangement_heap_size_packer.pack_slice(&[
583            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
584            Datum::UInt64(u64::cast_from(self.worker_id)),
585        ])
586    }
587
588    /// Pack a dataflow global update key-value for the given dataflow index and global ID.
589    fn pack_dataflow_global_update(
590        &mut self,
591        dataflow_index: usize,
592        global_id: GlobalId,
593    ) -> (&RowRef, &RowRef) {
594        self.dataflow_global_packer.pack_slice(&[
595            Datum::UInt64(u64::cast_from(dataflow_index)),
596            Datum::UInt64(u64::cast_from(self.worker_id)),
597            make_string_datum(global_id, &mut self.scratch_string_a),
598        ])
599    }
600
601    /// Pack an error count update key-value for the given export ID and count.
602    fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
603        // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
604        // per-worker error count might be negative and at the SQL level having negative multiplicities
605        // is treated as an error.
606        self.error_count_packer.pack_slice(&[
607            make_string_datum(export_id, &mut self.scratch_string_a),
608            Datum::UInt64(u64::cast_from(self.worker_id)),
609            Datum::Int64(count.into_inner()),
610        ])
611    }
612
613    /// Pack an export update key-value for the given export ID and dataflow index.
614    fn pack_export_update(
615        &mut self,
616        export_id: GlobalId,
617        dataflow_index: usize,
618    ) -> (&RowRef, &RowRef) {
619        self.export_packer.pack_slice(&[
620            make_string_datum(export_id, &mut self.scratch_string_a),
621            Datum::UInt64(u64::cast_from(self.worker_id)),
622            Datum::UInt64(u64::cast_from(dataflow_index)),
623        ])
624    }
625
626    /// Pack a hydration time update key-value for the given export ID and hydration time.
627    fn pack_hydration_time_update(
628        &mut self,
629        export_id: GlobalId,
630        time_ns: Option<u64>,
631    ) -> (&RowRef, &RowRef) {
632        self.hydration_time_packer.pack_slice(&[
633            make_string_datum(export_id, &mut self.scratch_string_a),
634            Datum::UInt64(u64::cast_from(self.worker_id)),
635            Datum::from(time_ns),
636        ])
637    }
638
639    /// Pack an import frontier update key-value for the given export ID and dataflow index.
640    fn pack_import_frontier_update(
641        &mut self,
642        export_id: GlobalId,
643        import_id: GlobalId,
644        time: Timestamp,
645    ) -> (&RowRef, &RowRef) {
646        self.import_frontier_packer.pack_slice(&[
647            make_string_datum(export_id, &mut self.scratch_string_a),
648            make_string_datum(import_id, &mut self.scratch_string_b),
649            Datum::UInt64(u64::cast_from(self.worker_id)),
650            Datum::MzTimestamp(time),
651        ])
652    }
653
654    /// Pack an LIR mapping update key-value for the given LIR operator metadata.
655    fn pack_lir_mapping_update(
656        &mut self,
657        global_id: GlobalId,
658        lir_id: LirId,
659        operator: String,
660        parent_lir_id: Option<LirId>,
661        nesting: u8,
662        operator_span: (usize, usize),
663    ) -> (&RowRef, &RowRef) {
664        self.lir_mapping_packer.pack_slice(&[
665            make_string_datum(global_id, &mut self.scratch_string_a),
666            Datum::UInt64(lir_id.into()),
667            Datum::UInt64(u64::cast_from(self.worker_id)),
668            make_string_datum(operator, &mut self.scratch_string_b),
669            parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
670            Datum::UInt16(u16::cast_from(nesting)),
671            Datum::UInt64(u64::cast_from(operator_span.0)),
672            Datum::UInt64(u64::cast_from(operator_span.1)),
673        ])
674    }
675
676    /// Pack an operator hydration status update key-value for the given export ID, LIR ID, and
677    /// hydration status.
678    fn pack_operator_hydration_status_update(
679        &mut self,
680        export_id: GlobalId,
681        lir_id: LirId,
682        hydrated: bool,
683    ) -> (&RowRef, &RowRef) {
684        self.operator_hydration_status_packer.pack_slice(&[
685            make_string_datum(export_id, &mut self.scratch_string_a),
686            Datum::UInt64(lir_id.into()),
687            Datum::UInt64(u64::cast_from(self.worker_id)),
688            Datum::from(hydrated),
689        ])
690    }
691
692    /// Pack a peek duration update key-value for the given peek and peek type.
693    fn pack_peek_duration_update(
694        &mut self,
695        peek_type: PeekType,
696        bucket: u128,
697    ) -> (&RowRef, &RowRef) {
698        self.peek_duration_packer.pack_slice(&[
699            Datum::UInt64(u64::cast_from(self.worker_id)),
700            Datum::String(peek_type.name()),
701            Datum::UInt64(bucket.try_into().expect("bucket too big")),
702        ])
703    }
704
705    /// Pack a peek update key-value for the given peek and peek type.
706    fn pack_peek_update(
707        &mut self,
708        id: GlobalId,
709        time: Timestamp,
710        uuid: Uuid,
711        peek_type: PeekType,
712    ) -> (&RowRef, &RowRef) {
713        self.peek_packer.pack_slice(&[
714            Datum::Uuid(uuid),
715            Datum::UInt64(u64::cast_from(self.worker_id)),
716            make_string_datum(id, &mut self.scratch_string_a),
717            Datum::String(peek_type.name()),
718            Datum::MzTimestamp(time),
719        ])
720    }
721
722    /// Pack a frontier update key-value for the given export ID and time.
723    fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
724        self.frontier_packer.pack_slice(&[
725            make_string_datum(export_id, &mut self.scratch_string_a),
726            Datum::UInt64(u64::cast_from(self.worker_id)),
727            Datum::MzTimestamp(time),
728        ])
729    }
730
731    /// Pack a shutdown duration update key-value for the given time bucket.
732    fn pack_shutdown_duration_update(&mut self, bucket: u128) -> (&RowRef, &RowRef) {
733        self.shutdown_duration_packer.pack_slice(&[
734            Datum::UInt64(u64::cast_from(self.worker_id)),
735            Datum::UInt64(bucket.try_into().expect("bucket too big")),
736        ])
737    }
738}
739
740/// State tracked for each dataflow export.
741struct ExportState {
742    /// The ID of the dataflow maintaining this export.
743    dataflow_index: usize,
744    /// Number of errors in this export.
745    ///
746    /// This must be a signed integer, since per-worker error counts can be negative, only the
747    /// cross-worker total has to sum up to a non-negative value.
748    error_count: Diff,
749    /// When this export was created.
750    created_at: Instant,
751    /// Whether the exported collection is hydrated.
752    hydration_time_ns: Option<u64>,
753    /// Hydration status of operators feeding this export.
754    operator_hydration: BTreeMap<LirId, bool>,
755}
756
757impl ExportState {
758    fn new(dataflow_index: usize) -> Self {
759        Self {
760            dataflow_index,
761            error_count: Diff::ZERO,
762            created_at: Instant::now(),
763            hydration_time_ns: None,
764            operator_hydration: BTreeMap::new(),
765        }
766    }
767}
768
769/// State for tracking arrangement sizes.
770#[derive(Default, Debug)]
771struct ArrangementSizeState {
772    size: isize,
773    capacity: isize,
774    count: isize,
775}
776
777/// Bundled output sessions used by the demux operator.
778struct DemuxOutput<'a, 'b> {
779    export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
780    frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
781    import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
782    peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
783    peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
784    shutdown_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
785    arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
786    arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
787    arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
788    hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
789    operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
790    error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
791    lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
792    dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
793}
794
795/// Event handler of the demux operator.
796struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
797    /// State kept by the demux operator.
798    state: &'a mut DemuxState<A>,
799    /// State shared across log receivers.
800    shared_state: &'a mut SharedLoggingState,
801    /// Demux output sessions.
802    output: &'a mut DemuxOutput<'b, 'c>,
803    /// The logging interval specifying the time granularity for the updates.
804    logging_interval_ms: u128,
805    /// The current event time.
806    time: Duration,
807}
808
809impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
810    /// Return the timestamp associated with the current event, based on the event time and the
811    /// logging interval.
812    fn ts(&self) -> Timestamp {
813        let time_ms = self.time.as_millis();
814        let interval = self.logging_interval_ms;
815        let rounded = (time_ms / interval + 1) * interval;
816        rounded.try_into().expect("must fit")
817    }
818
819    /// Handle the given compute event.
820    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
821        use ComputeEventReference::*;
822        match event {
823            Export(export) => self.handle_export(export),
824            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
825            Peek(peek) if peek.installed => self.handle_peek_install(peek),
826            Peek(peek) => self.handle_peek_retire(peek),
827            Frontier(frontier) => self.handle_frontier(frontier),
828            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
829            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
830            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
831            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
832            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
833            ArrangementHeapSizeOperatorDrop(inner) => {
834                self.handle_arrangement_heap_size_operator_dropped(inner)
835            }
836            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
837            ErrorCount(error_count) => self.handle_error_count(error_count),
838            Hydration(hydration) => self.handle_hydration(hydration),
839            OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
840            LirMapping(mapping) => self.handle_lir_mapping(mapping),
841            DataflowGlobal(global) => self.handle_dataflow_global(global),
842        }
843    }
844
845    fn handle_export(
846        &mut self,
847        ExportReference {
848            export_id,
849            dataflow_index,
850        }: Ref<'_, Export>,
851    ) {
852        let export_id = Columnar::into_owned(export_id);
853        let ts = self.ts();
854        let datum = self.state.pack_export_update(export_id, dataflow_index);
855        self.output.export.give((datum, ts, Diff::ONE));
856
857        let existing = self
858            .state
859            .exports
860            .insert(export_id, ExportState::new(dataflow_index));
861        if existing.is_some() {
862            error!(%export_id, "export already registered");
863        }
864
865        *self
866            .state
867            .dataflow_export_counts
868            .entry(dataflow_index)
869            .or_default() += 1;
870
871        // Insert hydration time logging for this export.
872        let datum = self.state.pack_hydration_time_update(export_id, None);
873        self.output.hydration_time.give((datum, ts, Diff::ONE));
874    }
875
876    fn handle_export_dropped(
877        &mut self,
878        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
879    ) {
880        let export_id = Columnar::into_owned(export_id);
881        let Some(export) = self.state.exports.remove(&export_id) else {
882            error!(%export_id, "missing exports entry at time of export drop");
883            return;
884        };
885
886        let ts = self.ts();
887        let dataflow_index = export.dataflow_index;
888
889        let datum = self.state.pack_export_update(export_id, dataflow_index);
890        self.output.export.give((datum, ts, Diff::MINUS_ONE));
891
892        match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
893            entry @ Some(0) | entry @ None => {
894                error!(
895                    %export_id,
896                    %dataflow_index,
897                    "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
898                );
899            }
900            Some(1) => self.handle_dataflow_dropped(dataflow_index),
901            Some(count) => *count -= 1,
902        }
903
904        // Remove error count logging for this export.
905        if export.error_count != Diff::ZERO {
906            let datum = self
907                .state
908                .pack_error_count_update(export_id, export.error_count);
909            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
910        }
911
912        // Remove hydration time logging for this export.
913        let datum = self
914            .state
915            .pack_hydration_time_update(export_id, export.hydration_time_ns);
916        self.output
917            .hydration_time
918            .give((datum, ts, Diff::MINUS_ONE));
919
920        // Remove operator hydration logging for this export.
921        for (lir_id, hydrated) in export.operator_hydration {
922            let datum = self
923                .state
924                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
925            self.output
926                .operator_hydration_status
927                .give((datum, ts, Diff::MINUS_ONE));
928        }
929    }
930
931    fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
932        let ts = self.ts();
933        self.state.dataflow_export_counts.remove(&dataflow_index);
934
935        if self.state.shutdown_dataflows.remove(&dataflow_index) {
936            // Dataflow has already shut down before it was dropped.
937            let datum = self.state.pack_shutdown_duration_update(0);
938            self.output.shutdown_duration.give((datum, ts, Diff::ONE));
939        } else {
940            // Dataflow has not yet shut down.
941            let existing = self
942                .state
943                .dataflow_drop_times
944                .insert(dataflow_index, self.time);
945            if existing.is_some() {
946                error!(%dataflow_index, "dataflow already dropped");
947            }
948        }
949    }
950
951    fn handle_dataflow_shutdown(
952        &mut self,
953        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
954    ) {
955        let ts = self.ts();
956
957        if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
958            // Dataflow has already been dropped.
959            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
960            let elapsed_pow = elapsed_ns.next_power_of_two();
961            let datum = self.state.pack_shutdown_duration_update(elapsed_pow);
962            self.output.shutdown_duration.give((datum, ts, Diff::ONE));
963        } else {
964            // Dataflow has not yet been dropped.
965            let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
966            if !was_new {
967                error!(%dataflow_index, "dataflow already shutdown");
968            }
969        }
970
971        // We deal with any `GlobalId` based mappings in this event.
972        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
973            for global_id in global_ids {
974                // Remove dataflow/`GlobalID` mapping.
975                let datum = self
976                    .state
977                    .pack_dataflow_global_update(dataflow_index, global_id);
978                self.output
979                    .dataflow_global_ids
980                    .give((datum, ts, Diff::MINUS_ONE));
981
982                // Remove LIR mapping.
983                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
984                    for (
985                        lir_id,
986                        LirMetadata {
987                            operator,
988                            parent_lir_id,
989                            nesting,
990                            operator_span,
991                        },
992                    ) in mappings
993                    {
994                        let datum = self.state.pack_lir_mapping_update(
995                            global_id,
996                            lir_id,
997                            operator,
998                            parent_lir_id,
999                            nesting,
1000                            operator_span,
1001                        );
1002                        self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
1003                    }
1004                }
1005            }
1006        }
1007    }
1008
1009    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
1010        let ts = self.ts();
1011        let export_id = Columnar::into_owned(export_id);
1012
1013        let Some(export) = self.state.exports.get_mut(&export_id) else {
1014            // The export might have already been dropped, in which case we are no longer
1015            // interested in its errors.
1016            return;
1017        };
1018
1019        let old_count = export.error_count;
1020        let new_count = old_count + diff;
1021        export.error_count = new_count;
1022
1023        if old_count != Diff::ZERO {
1024            let datum = self.state.pack_error_count_update(export_id, old_count);
1025            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
1026        }
1027        if new_count != Diff::ZERO {
1028            let datum = self.state.pack_error_count_update(export_id, new_count);
1029            self.output.error_count.give((datum, ts, Diff::ONE));
1030        }
1031    }
1032
1033    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
1034        let ts = self.ts();
1035        let export_id = Columnar::into_owned(export_id);
1036
1037        let Some(export) = self.state.exports.get_mut(&export_id) else {
1038            error!(%export_id, "hydration event for unknown export");
1039            return;
1040        };
1041        if export.hydration_time_ns.is_some() {
1042            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
1043            // after reconciliation. We can simply ignore these.
1044            return;
1045        }
1046
1047        let duration = export.created_at.elapsed();
1048        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1049        export.hydration_time_ns = Some(nanos);
1050
1051        let retraction = self.state.pack_hydration_time_update(export_id, None);
1052        self.output
1053            .hydration_time
1054            .give((retraction, ts, Diff::MINUS_ONE));
1055        let insertion = self
1056            .state
1057            .pack_hydration_time_update(export_id, Some(nanos));
1058        self.output.hydration_time.give((insertion, ts, Diff::ONE));
1059    }
1060
1061    fn handle_operator_hydration(
1062        &mut self,
1063        OperatorHydrationReference {
1064            export_id,
1065            lir_id,
1066            hydrated,
1067        }: Ref<'_, OperatorHydration>,
1068    ) {
1069        let ts = self.ts();
1070        let export_id = Columnar::into_owned(export_id);
1071        let lir_id = Columnar::into_owned(lir_id);
1072        let hydrated = Columnar::into_owned(hydrated);
1073
1074        let Some(export) = self.state.exports.get_mut(&export_id) else {
1075            // The export might have already been dropped, in which case we are no longer
1076            // interested in its operator hydration events.
1077            return;
1078        };
1079
1080        let old_status = export.operator_hydration.get(&lir_id).copied();
1081        export.operator_hydration.insert(lir_id, hydrated);
1082
1083        if let Some(hydrated) = old_status {
1084            let retraction = self
1085                .state
1086                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1087            self.output
1088                .operator_hydration_status
1089                .give((retraction, ts, Diff::MINUS_ONE));
1090        }
1091
1092        let insertion = self
1093            .state
1094            .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1095        self.output
1096            .operator_hydration_status
1097            .give((insertion, ts, Diff::ONE));
1098    }
1099
1100    fn handle_peek_install(
1101        &mut self,
1102        PeekEventReference {
1103            id,
1104            time,
1105            uuid,
1106            peek_type,
1107            installed: _,
1108        }: Ref<'_, PeekEvent>,
1109    ) {
1110        let id = Columnar::into_owned(id);
1111        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1112        let ts = self.ts();
1113        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1114        self.output.peek.give((datum, ts, Diff::ONE));
1115
1116        let existing = self.state.peek_stash.insert(uuid, self.time);
1117        if existing.is_some() {
1118            error!(%uuid, "peek already registered");
1119        }
1120    }
1121
1122    fn handle_peek_retire(
1123        &mut self,
1124        PeekEventReference {
1125            id,
1126            time,
1127            uuid,
1128            peek_type,
1129            installed: _,
1130        }: Ref<'_, PeekEvent>,
1131    ) {
1132        let id = Columnar::into_owned(id);
1133        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1134        let ts = self.ts();
1135        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1136        self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1137
1138        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1139            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1140            let bucket = elapsed_ns.next_power_of_two();
1141            let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1142            self.output.peek_duration.give((datum, ts, Diff::ONE));
1143        } else {
1144            error!(%uuid, "peek not yet registered");
1145        }
1146    }
1147
1148    fn handle_frontier(
1149        &mut self,
1150        FrontierReference {
1151            export_id,
1152            time,
1153            diff,
1154        }: Ref<'_, Frontier>,
1155    ) {
1156        let export_id = Columnar::into_owned(export_id);
1157        let diff = Diff::from(*diff);
1158        let ts = self.ts();
1159        let time = Columnar::into_owned(time);
1160        let datum = self.state.pack_frontier_update(export_id, time);
1161        self.output.frontier.give((datum, ts, diff));
1162    }
1163
1164    fn handle_import_frontier(
1165        &mut self,
1166        ImportFrontierReference {
1167            import_id,
1168            export_id,
1169            time,
1170            diff,
1171        }: Ref<'_, ImportFrontier>,
1172    ) {
1173        let import_id = Columnar::into_owned(import_id);
1174        let export_id = Columnar::into_owned(export_id);
1175        let diff = Diff::from(*diff);
1176        let ts = self.ts();
1177        let time = Columnar::into_owned(time);
1178        let datum = self
1179            .state
1180            .pack_import_frontier_update(export_id, import_id, time);
1181        self.output.import_frontier.give((datum, ts, diff));
1182    }
1183
1184    /// Update the allocation size for an arrangement.
1185    fn handle_arrangement_heap_size(
1186        &mut self,
1187        ArrangementHeapSizeReference {
1188            operator_id,
1189            delta_size,
1190        }: Ref<'_, ArrangementHeapSize>,
1191    ) {
1192        let ts = self.ts();
1193        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1194            return;
1195        };
1196
1197        state.size += delta_size;
1198
1199        let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1200        let diff = Diff::cast_from(delta_size);
1201        self.output.arrangement_heap_size.give((datum, ts, diff));
1202    }
1203
1204    /// Update the allocation capacity for an arrangement.
1205    fn handle_arrangement_heap_capacity(
1206        &mut self,
1207        ArrangementHeapCapacityReference {
1208            operator_id,
1209            delta_capacity,
1210        }: Ref<'_, ArrangementHeapCapacity>,
1211    ) {
1212        let ts = self.ts();
1213        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1214            return;
1215        };
1216
1217        state.capacity += delta_capacity;
1218
1219        let datum = self
1220            .state
1221            .pack_arrangement_heap_capacity_update(operator_id);
1222        let diff = Diff::cast_from(delta_capacity);
1223        self.output
1224            .arrangement_heap_capacity
1225            .give((datum, ts, diff));
1226    }
1227
1228    /// Update the allocation count for an arrangement.
1229    fn handle_arrangement_heap_allocations(
1230        &mut self,
1231        ArrangementHeapAllocationsReference {
1232            operator_id,
1233            delta_allocations,
1234        }: Ref<'_, ArrangementHeapAllocations>,
1235    ) {
1236        let ts = self.ts();
1237        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1238            return;
1239        };
1240
1241        state.count += delta_allocations;
1242
1243        let datum = self
1244            .state
1245            .pack_arrangement_heap_allocations_update(operator_id);
1246        let diff = Diff::cast_from(delta_allocations);
1247        self.output
1248            .arrangement_heap_allocations
1249            .give((datum, ts, diff));
1250    }
1251
1252    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1253    fn handle_arrangement_heap_size_operator(
1254        &mut self,
1255        ArrangementHeapSizeOperatorReference {
1256            operator_id,
1257            address,
1258        }: Ref<'_, ArrangementHeapSizeOperator>,
1259    ) {
1260        let activator = self
1261            .state
1262            .scheduler
1263            .activator_for(address.into_iter().collect());
1264        let existing = self
1265            .state
1266            .arrangement_size
1267            .insert(operator_id, Default::default());
1268        if existing.is_some() {
1269            error!(%operator_id, "arrangement size operator already registered");
1270        }
1271        let existing = self
1272            .shared_state
1273            .arrangement_size_activators
1274            .insert(operator_id, activator);
1275        if existing.is_some() {
1276            error!(%operator_id, "arrangement size activator already registered");
1277        }
1278    }
1279
1280    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1281    fn handle_arrangement_heap_size_operator_dropped(
1282        &mut self,
1283        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1284    ) {
1285        let operator_id = event.operator_id;
1286        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1287            let ts = self.ts();
1288            let allocations = self
1289                .state
1290                .pack_arrangement_heap_allocations_update(operator_id);
1291            let diff = -Diff::cast_from(state.count);
1292            self.output
1293                .arrangement_heap_allocations
1294                .give((allocations, ts, diff));
1295
1296            let capacity = self
1297                .state
1298                .pack_arrangement_heap_capacity_update(operator_id);
1299            let diff = -Diff::cast_from(state.capacity);
1300            self.output
1301                .arrangement_heap_capacity
1302                .give((capacity, ts, diff));
1303
1304            let size = self.state.pack_arrangement_heap_size_update(operator_id);
1305            let diff = -Diff::cast_from(state.size);
1306            self.output.arrangement_heap_size.give((size, ts, diff));
1307        }
1308        self.shared_state
1309            .arrangement_size_activators
1310            .remove(&operator_id);
1311    }
1312
1313    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1314    fn handle_lir_mapping(
1315        &mut self,
1316        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1317    ) {
1318        let global_id = Columnar::into_owned(global_id);
1319        // record the state (for the later drop)
1320        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1321        self.state
1322            .lir_mapping
1323            .entry(global_id)
1324            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1325            .or_insert_with(|| mappings().collect());
1326
1327        // send the datum out
1328        let ts = self.ts();
1329        for (lir_id, meta) in mapping.into_iter() {
1330            let datum = self.state.pack_lir_mapping_update(
1331                global_id,
1332                Columnar::into_owned(lir_id),
1333                Columnar::into_owned(meta.operator),
1334                Columnar::into_owned(meta.parent_lir_id),
1335                Columnar::into_owned(meta.nesting),
1336                Columnar::into_owned(meta.operator_span),
1337            );
1338            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1339        }
1340    }
1341
1342    fn handle_dataflow_global(
1343        &mut self,
1344        DataflowGlobalReference {
1345            dataflow_index,
1346            global_id,
1347        }: Ref<'_, DataflowGlobal>,
1348    ) {
1349        let global_id = Columnar::into_owned(global_id);
1350        self.state
1351            .dataflow_global_ids
1352            .entry(dataflow_index)
1353            .and_modify(|globals| {
1354                // NB BTreeSet::insert() returns `false` when the element was already in the set
1355                if !globals.insert(global_id) {
1356                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1357                }
1358            })
1359            .or_insert_with(|| BTreeSet::from([global_id]));
1360
1361        let ts = self.ts();
1362        let datum = self
1363            .state
1364            .pack_dataflow_global_update(dataflow_index, global_id);
1365        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1366    }
1367}
1368
1369/// Logging state maintained for a compute collection.
1370///
1371/// This type is used to produce appropriate log events in response to changes of logged collection
1372/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1373pub struct CollectionLogging {
1374    export_id: GlobalId,
1375    logger: Logger,
1376
1377    logged_frontier: Option<Timestamp>,
1378    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1379}
1380
1381impl CollectionLogging {
1382    /// Create new logging state for the identified collection and emit initial logging events.
1383    pub fn new(
1384        export_id: GlobalId,
1385        logger: Logger,
1386        dataflow_index: usize,
1387        import_ids: impl Iterator<Item = GlobalId>,
1388    ) -> Self {
1389        logger.log(&ComputeEvent::Export(Export {
1390            export_id,
1391            dataflow_index,
1392        }));
1393
1394        let mut self_ = Self {
1395            export_id,
1396            logger,
1397            logged_frontier: None,
1398            logged_import_frontiers: Default::default(),
1399        };
1400
1401        // Initialize frontier logging.
1402        let initial_frontier = Some(Timestamp::MIN);
1403        self_.set_frontier(initial_frontier);
1404        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1405
1406        self_
1407    }
1408
1409    /// Set the collection frontier to the given new time and emit corresponding logging events.
1410    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1411        let old_time = self.logged_frontier;
1412        self.logged_frontier = new_time;
1413
1414        if old_time != new_time {
1415            let export_id = self.export_id;
1416            let retraction = old_time.map(|time| {
1417                ComputeEvent::Frontier(Frontier {
1418                    export_id,
1419                    time,
1420                    diff: -1,
1421                })
1422            });
1423            let insertion = new_time.map(|time| {
1424                ComputeEvent::Frontier(Frontier {
1425                    export_id,
1426                    time,
1427                    diff: 1,
1428                })
1429            });
1430            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1431            self.logger.log_many(events);
1432        }
1433    }
1434
1435    /// Set the frontier of the given import to the given new time and emit corresponding logging
1436    /// events.
1437    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1438        let old_time = self.logged_import_frontiers.remove(&import_id);
1439        if let Some(time) = new_time {
1440            self.logged_import_frontiers.insert(import_id, time);
1441        }
1442
1443        if old_time != new_time {
1444            let export_id = self.export_id;
1445            let retraction = old_time.map(|time| {
1446                ComputeEvent::ImportFrontier(ImportFrontier {
1447                    import_id,
1448                    export_id,
1449                    time,
1450                    diff: -1,
1451                })
1452            });
1453            let insertion = new_time.map(|time| {
1454                ComputeEvent::ImportFrontier(ImportFrontier {
1455                    import_id,
1456                    export_id,
1457                    time,
1458                    diff: 1,
1459                })
1460            });
1461            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1462            self.logger.log_many(events);
1463        }
1464    }
1465
1466    /// Set the collection as hydrated.
1467    pub fn set_hydrated(&self) {
1468        self.logger.log(&ComputeEvent::Hydration(Hydration {
1469            export_id: self.export_id,
1470        }));
1471    }
1472}
1473
1474impl Drop for CollectionLogging {
1475    fn drop(&mut self) {
1476        // Emit retraction events to clean up events previously logged.
1477        self.set_frontier(None);
1478
1479        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1480        for import_id in import_ids {
1481            self.set_import_frontier(import_id, None);
1482        }
1483
1484        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1485            export_id: self.export_id,
1486        }));
1487    }
1488}
1489
1490/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1491/// batch streams.
1492pub(crate) trait LogDataflowErrors {
1493    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1494}
1495
1496impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1497where
1498    G: Scope,
1499    D: Data,
1500{
1501    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1502        self.inner
1503            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1504                move |input, output| {
1505                    input.for_each(|cap, data| {
1506                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1507                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1508
1509                        output.session(&cap).give_container(data);
1510                    });
1511                }
1512            })
1513            .as_collection()
1514    }
1515}
1516
1517impl<G, B> LogDataflowErrors for Stream<G, B>
1518where
1519    G: Scope,
1520    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1521{
1522    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1523        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1524            move |input, output| {
1525                input.for_each(|cap, data| {
1526                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1527                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1528
1529                    output.session(&cap).give_container(data);
1530                });
1531            }
1532        })
1533    }
1534}
1535
1536/// Return the sum of all diffs within the given batch.
1537///
1538/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1539/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1540/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1541/// batches might become large.
1542fn sum_batch_diffs<B>(batch: &B) -> Diff
1543where
1544    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1545{
1546    let mut sum = Diff::ZERO;
1547    let mut cursor = batch.cursor();
1548
1549    while cursor.key_valid(batch) {
1550        while cursor.val_valid(batch) {
1551            cursor.map_times(batch, |_t, r| sum += r);
1552            cursor.step_val(batch);
1553        }
1554        cursor.step_key(batch);
1555    }
1556
1557    sum
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562    use super::*;
1563
1564    #[mz_ore::test]
1565    fn test_compute_event_size() {
1566        // This could be a static assertion, but we don't use those yet in this crate.
1567        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1568    }
1569}