Skip to main content

mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::batcher;
26use mz_timely_util::columnar::builder::ColumnBuilder;
27use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
28use mz_timely_util::replay::MzReplay;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, StreamVec};
35use timely::scheduling::activate::{Activations, Activator};
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42    SharedLoggingState, Update,
43};
44use crate::typedefs::RowRowSpine;
45use mz_row_spine::RowRowBuilder;
46
47/// Type alias for a logger of compute events.
48pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51/// A dataflow exports a global ID.
52#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54    /// Identifier of the export.
55    pub export_id: GlobalId,
56    /// Timely worker index of the exporting dataflow.
57    pub dataflow_index: usize,
58}
59
60/// The export for a global id was dropped.
61#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63    /// Identifier of the export.
64    pub export_id: GlobalId,
65}
66
67/// A peek event with a [`PeekType`], and an installation status.
68#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70    /// The identifier of the view the peek targets.
71    pub id: GlobalId,
72    /// The logical timestamp requested.
73    pub time: Timestamp,
74    /// The ID of the peek.
75    pub uuid: uuid::Bytes,
76    /// The relevant _type_ of peek: index or persist.
77    // Note that this is not stored on the Peek event for data-packing reasons only.
78    pub peek_type: PeekType,
79    /// True if the peek is being installed; false if it's being removed.
80    pub installed: bool,
81}
82
83/// Frontier change event.
84#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86    pub export_id: GlobalId,
87    pub time: Timestamp,
88    pub diff: i8,
89}
90
91/// An import frontier change.
92#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94    pub import_id: GlobalId,
95    pub export_id: GlobalId,
96    pub time: Timestamp,
97    pub diff: i8,
98}
99
100/// A change in an arrangement's heap size.
101#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103    /// Operator index
104    pub operator_id: usize,
105    /// Delta of the heap size in bytes of the arrangement.
106    pub delta_size: isize,
107}
108
109/// A change in an arrangement's heap capacity.
110#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112    /// Operator index
113    pub operator_id: usize,
114    /// Delta of the heap capacity in bytes of the arrangement.
115    pub delta_capacity: isize,
116}
117
118/// A change in an arrangement's heap allocation count.
119#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121    /// Operator index
122    pub operator_id: usize,
123    /// Delta of distinct heap allocations backing the arrangement.
124    pub delta_allocations: isize,
125}
126
127/// Announcing an operator that manages an arrangement.
128#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130    /// Operator index
131    pub operator_id: usize,
132    /// The address of the operator.
133    pub address: Vec<usize>,
134}
135
136/// Drop event for an operator managing an arrangement.
137#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139    /// Operator index
140    pub operator_id: usize,
141}
142
143/// Dataflow shutdown event.
144#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146    /// Timely worker index of the dataflow.
147    pub dataflow_index: usize,
148}
149
150/// Error count update event.
151#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153    /// Identifier of the export.
154    pub export_id: GlobalId,
155    /// The change in error count.
156    pub diff: Diff,
157}
158
159/// An export is hydrated.
160#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162    /// Identifier of the export.
163    pub export_id: GlobalId,
164}
165
166/// An operator's hydration status changed.
167#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169    /// Identifier of the export.
170    pub export_id: GlobalId,
171    /// Identifier of the operator's LIR node.
172    pub lir_id: LirId,
173    /// Whether the operator is hydrated.
174    pub hydrated: bool,
175}
176
177/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
178#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180    /// The `GlobalId` in which the LIR operator is rendered.
181    ///
182    /// NB a single a dataflow may have many `GlobalId`s inside it.
183    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
184    /// tracks the many-to-one relationship between `GlobalId`s and
185    /// dataflows.
186    pub global_id: GlobalId,
187    /// The actual mapping.
188    /// Represented this way to reduce the size of `ComputeEvent`.
189    pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192/// Announce that a dataflow supports a specific global ID.
193#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195    /// The identifier of the dataflow.
196    pub dataflow_index: usize,
197    /// A `GlobalId` that is rendered as part of this dataflow.
198    pub global_id: GlobalId,
199}
200
201/// A logged compute event.
202#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204    /// A dataflow export was created.
205    Export(Export),
206    /// A dataflow export was dropped.
207    ExportDropped(ExportDropped),
208    /// Peek command.
209    Peek(PeekEvent),
210    /// Available frontier information for dataflow exports.
211    Frontier(Frontier),
212    /// Available frontier information for dataflow imports.
213    ImportFrontier(ImportFrontier),
214    /// Arrangement heap size update
215    ArrangementHeapSize(ArrangementHeapSize),
216    /// Arrangement heap size update
217    ArrangementHeapCapacity(ArrangementHeapCapacity),
218    /// Arrangement heap size update
219    ArrangementHeapAllocations(ArrangementHeapAllocations),
220    /// Arrangement size operator address
221    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222    /// Arrangement size operator dropped
223    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224    /// All operators of a dataflow have shut down.
225    DataflowShutdown(DataflowShutdown),
226    /// The number of errors in a dataflow export has changed.
227    ErrorCount(ErrorCount),
228    /// A dataflow export was hydrated.
229    Hydration(Hydration),
230    /// A dataflow operator's hydration status changed.
231    OperatorHydration(OperatorHydration),
232    /// An LIR operator was mapped to some particular dataflow operator.
233    ///
234    /// Cf. `ComputeLog::LirMaping`
235    LirMapping(LirMapping),
236    DataflowGlobal(DataflowGlobal),
237}
238
239/// A peek type distinguishing between index and persist peeks.
240#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242    /// A peek against an index.
243    Index,
244    /// A peek against persist.
245    Persist,
246}
247
248impl PeekType {
249    /// A human-readable name for a peek type.
250    fn name(self) -> &'static str {
251        match self {
252            PeekType::Index => "index",
253            PeekType::Persist => "persist",
254        }
255    }
256}
257
258/// Metadata for LIR operators.
259#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
262    operator: String,
263    /// The LIR identifier of the parent (if any).
264    parent_lir_id: Option<LirId>,
265    /// How nested the operator is (for nice indentation).
266    nesting: u8,
267    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
268    /// If `start == end`, then no operators were used.
269    operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273    /// Construct a new LIR metadata object.
274    pub fn new(
275        operator: String,
276        parent_lir_id: Option<LirId>,
277        nesting: u8,
278        operator_span: (usize, usize),
279    ) -> Self {
280        Self {
281            operator,
282            parent_lir_id,
283            nesting,
284            operator_span,
285        }
286    }
287}
288
289/// The return type of the [`construct`] function.
290pub(super) struct Return {
291    /// Collections returned by [`construct`].
292    pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295/// Constructs the logging dataflow fragment for compute logs.
296///
297/// Params
298/// * `scope`: The Timely scope hosting the log analysis dataflow.
299/// * `scheduler`: The timely scheduler to obtainer activators.
300/// * `config`: Logging configuration.
301/// * `event_queue`: The source to read compute log events from.
302/// * `compute_event_streams`: Additional compute event streams to absorb.
303/// * `shared_state`: Shared state between logging dataflow fragments.
304pub(super) fn construct<'scope>(
305    scope: Scope<'scope, Timestamp>,
306    activations: Rc<RefCell<Activations>>,
307    config: &mz_compute_client::logging::LoggingConfig,
308    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309    shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313    scope.scoped("compute logging", move |scope| {
314        let enable_logging = config.enable_logging;
315        let (logs, token) = if enable_logging {
316            event_queue.links.mz_replay(
317                scope,
318                "compute logs",
319                config.interval,
320                event_queue.activator,
321            )
322        } else {
323            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324            (empty(scope), token)
325        };
326
327        // Build a demux operator that splits the replayed event stream up into the separate
328        // logging streams.
329        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330        let mut input = demux.new_input(logs, Pipeline);
331        let (export_out, export) = demux.new_output();
332        let mut export_out = OutputBuilder::from(export_out);
333        let (frontier_out, frontier) = demux.new_output();
334        let mut frontier_out = OutputBuilder::from(frontier_out);
335        let (import_frontier_out, import_frontier) = demux.new_output();
336        let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337        let (peek_out, peek) = demux.new_output();
338        let mut peek_out = OutputBuilder::from(peek_out);
339        let (peek_duration_out, peek_duration) = demux.new_output();
340        let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341        let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
342        let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
343        let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
344        let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
345        let (arrangement_heap_allocations_out, arrangement_heap_allocations) = demux.new_output();
346        let mut arrangement_heap_allocations_out =
347            OutputBuilder::from(arrangement_heap_allocations_out);
348        let (error_count_out, error_count) = demux.new_output();
349        let mut error_count_out = OutputBuilder::from(error_count_out);
350        let (hydration_time_out, hydration_time) = demux.new_output();
351        let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
352        let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
353        let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
354        let (lir_mapping_out, lir_mapping) = demux.new_output();
355        let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
356        let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
357        let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
358
359        let mut demux_state = DemuxState::new(activations, scope.index());
360        demux.build(move |_capability| {
361            move |_frontiers| {
362                let mut export = export_out.activate();
363                let mut frontier = frontier_out.activate();
364                let mut import_frontier = import_frontier_out.activate();
365                let mut peek = peek_out.activate();
366                let mut peek_duration = peek_duration_out.activate();
367                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
368                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
369                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
370                let mut error_count = error_count_out.activate();
371                let mut hydration_time = hydration_time_out.activate();
372                let mut operator_hydration_status = operator_hydration_status_out.activate();
373                let mut lir_mapping = lir_mapping_out.activate();
374                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
375
376                input.for_each(|cap, data| {
377                    let mut output_sessions = DemuxOutput {
378                        export: export.session_with_builder(&cap),
379                        frontier: frontier.session_with_builder(&cap),
380                        import_frontier: import_frontier.session_with_builder(&cap),
381                        peek: peek.session_with_builder(&cap),
382                        peek_duration: peek_duration.session_with_builder(&cap),
383                        arrangement_heap_allocations: arrangement_heap_allocations
384                            .session_with_builder(&cap),
385                        arrangement_heap_capacity: arrangement_heap_capacity
386                            .session_with_builder(&cap),
387                        arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
388                        error_count: error_count.session_with_builder(&cap),
389                        hydration_time: hydration_time.session_with_builder(&cap),
390                        operator_hydration_status: operator_hydration_status
391                            .session_with_builder(&cap),
392                        lir_mapping: lir_mapping.session_with_builder(&cap),
393                        dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
394                    };
395
396                    let shared_state = &mut shared_state.borrow_mut();
397                    for (time, event) in data.borrow().into_index_iter() {
398                        DemuxHandler {
399                            state: &mut demux_state,
400                            shared_state,
401                            output: &mut output_sessions,
402                            logging_interval_ms,
403                            time,
404                        }
405                        .handle(event);
406                    }
407                });
408            }
409        });
410
411        use ComputeLog::*;
412        let logs = [
413            (ArrangementHeapAllocations, arrangement_heap_allocations),
414            (ArrangementHeapCapacity, arrangement_heap_capacity),
415            (ArrangementHeapSize, arrangement_heap_size),
416            (DataflowCurrent, export),
417            (DataflowGlobal, dataflow_global_ids),
418            (ErrorCount, error_count),
419            (FrontierCurrent, frontier),
420            (HydrationTime, hydration_time),
421            (ImportFrontierCurrent, import_frontier),
422            (LirMapping, lir_mapping),
423            (OperatorHydrationStatus, operator_hydration_status),
424            (PeekCurrent, peek),
425            (PeekDuration, peek_duration),
426        ];
427
428        // Build the output arrangements.
429        let mut collections = BTreeMap::new();
430        for (variant, stream) in logs {
431            let variant = LogVariant::Compute(variant);
432            if config.index_logs.contains_key(&variant) {
433                let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
434                    columnar_exchange::<Row, Row, Timestamp, Diff>,
435                );
436                let trace = stream
437                    .mz_arrange_core::<
438                        _,
439                        batcher::Chunker<_>,
440                        Col2ValBatcher<_, _, _, _>,
441                        RowRowBuilder<_, _>,
442                        RowRowSpine<_, _>,
443                    >(exchange, &format!("Arrange {variant:?}"))
444                    .trace;
445                let collection = LogCollection {
446                    trace,
447                    token: Rc::clone(&token),
448                };
449                collections.insert(variant, collection);
450            }
451        }
452
453        Return { collections }
454    })
455}
456
457/// Format the given value and pack it into a `Datum::String`.
458///
459/// The `scratch` buffer is used to perform the string conversion without an allocation.
460/// Callers should not assume anything about the contents of this buffer after this function
461/// returns.
462fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
463where
464    V: Display,
465{
466    scratch.clear();
467    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
468    Datum::String(scratch)
469}
470
471/// State maintained by the demux operator.
472struct DemuxState {
473    /// The timely activations handle.
474    activations: Rc<RefCell<Activations>>,
475    /// The index of this worker.
476    worker_id: usize,
477    /// A reusable scratch string for formatting IDs.
478    scratch_string_a: String,
479    /// A reusable scratch string for formatting IDs.
480    scratch_string_b: String,
481    /// State tracked per dataflow export.
482    exports: BTreeMap<GlobalId, ExportState>,
483    /// Maps pending peeks to their installation time.
484    peek_stash: BTreeMap<Uuid, Duration>,
485    /// Arrangement size stash.
486    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
487    /// LIR -> operator span mapping.
488    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
489    /// Dataflow -> `GlobalId` mapping (many-to-one).
490    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
491    /// A row packer for the arrangement heap allocations output.
492    arrangement_heap_allocations_packer: PermutedRowPacker,
493    /// A row packer for the arrangement heap capacity output.
494    arrangement_heap_capacity_packer: PermutedRowPacker,
495    /// A row packer for the arrangement heap size output.
496    arrangement_heap_size_packer: PermutedRowPacker,
497    /// A row packer for the dataflow global output.
498    dataflow_global_packer: PermutedRowPacker,
499    /// A row packer for the error count output.
500    error_count_packer: PermutedRowPacker,
501    /// A row packer for the exports output.
502    export_packer: PermutedRowPacker,
503    /// A row packer for the frontier output.
504    frontier_packer: PermutedRowPacker,
505    /// A row packer for the exports output.
506    import_frontier_packer: PermutedRowPacker,
507    /// A row packer for the LIR mapping output.
508    lir_mapping_packer: PermutedRowPacker,
509    /// A row packer for the operator hydration status output.
510    operator_hydration_status_packer: PermutedRowPacker,
511    /// A row packer for the peek durations output.
512    peek_duration_packer: PermutedRowPacker,
513    /// A row packer for the peek output.
514    peek_packer: PermutedRowPacker,
515    /// A row packer for the hydration time output.
516    hydration_time_packer: PermutedRowPacker,
517}
518
519impl DemuxState {
520    fn new(activations: Rc<RefCell<Activations>>, worker_id: usize) -> Self {
521        Self {
522            activations,
523            worker_id,
524            scratch_string_a: String::new(),
525            scratch_string_b: String::new(),
526            exports: Default::default(),
527            peek_stash: Default::default(),
528            arrangement_size: Default::default(),
529            lir_mapping: Default::default(),
530            dataflow_global_ids: Default::default(),
531            arrangement_heap_allocations_packer: PermutedRowPacker::new(
532                ComputeLog::ArrangementHeapAllocations,
533            ),
534            arrangement_heap_capacity_packer: PermutedRowPacker::new(
535                ComputeLog::ArrangementHeapCapacity,
536            ),
537            arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
538            dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
539            error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
540            export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
541            frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
542            hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
543            import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
544            lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
545            operator_hydration_status_packer: PermutedRowPacker::new(
546                ComputeLog::OperatorHydrationStatus,
547            ),
548            peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
549            peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
550        }
551    }
552
553    /// Pack an arrangement heap allocations update key-value for the given operator.
554    fn pack_arrangement_heap_allocations_update(
555        &mut self,
556        operator_id: usize,
557    ) -> (&RowRef, &RowRef) {
558        self.arrangement_heap_allocations_packer.pack_slice(&[
559            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
560            Datum::UInt64(u64::cast_from(self.worker_id)),
561        ])
562    }
563
564    /// Pack an arrangement heap capacity update key-value for the given operator.
565    fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
566        self.arrangement_heap_capacity_packer.pack_slice(&[
567            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
568            Datum::UInt64(u64::cast_from(self.worker_id)),
569        ])
570    }
571
572    /// Pack an arrangement heap size update key-value for the given operator.
573    fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
574        self.arrangement_heap_size_packer.pack_slice(&[
575            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
576            Datum::UInt64(u64::cast_from(self.worker_id)),
577        ])
578    }
579
580    /// Pack a dataflow global update key-value for the given dataflow index and global ID.
581    fn pack_dataflow_global_update(
582        &mut self,
583        dataflow_index: usize,
584        global_id: GlobalId,
585    ) -> (&RowRef, &RowRef) {
586        self.dataflow_global_packer.pack_slice(&[
587            Datum::UInt64(u64::cast_from(dataflow_index)),
588            Datum::UInt64(u64::cast_from(self.worker_id)),
589            make_string_datum(global_id, &mut self.scratch_string_a),
590        ])
591    }
592
593    /// Pack an error count update key-value for the given export ID and count.
594    fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
595        // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
596        // per-worker error count might be negative and at the SQL level having negative multiplicities
597        // is treated as an error.
598        self.error_count_packer.pack_slice(&[
599            make_string_datum(export_id, &mut self.scratch_string_a),
600            Datum::UInt64(u64::cast_from(self.worker_id)),
601            Datum::Int64(count.into_inner()),
602        ])
603    }
604
605    /// Pack an export update key-value for the given export ID and dataflow index.
606    fn pack_export_update(
607        &mut self,
608        export_id: GlobalId,
609        dataflow_index: usize,
610    ) -> (&RowRef, &RowRef) {
611        self.export_packer.pack_slice(&[
612            make_string_datum(export_id, &mut self.scratch_string_a),
613            Datum::UInt64(u64::cast_from(self.worker_id)),
614            Datum::UInt64(u64::cast_from(dataflow_index)),
615        ])
616    }
617
618    /// Pack a hydration time update key-value for the given export ID and hydration time.
619    fn pack_hydration_time_update(
620        &mut self,
621        export_id: GlobalId,
622        time_ns: Option<u64>,
623    ) -> (&RowRef, &RowRef) {
624        self.hydration_time_packer.pack_slice(&[
625            make_string_datum(export_id, &mut self.scratch_string_a),
626            Datum::UInt64(u64::cast_from(self.worker_id)),
627            Datum::from(time_ns),
628        ])
629    }
630
631    /// Pack an import frontier update key-value for the given export ID and dataflow index.
632    fn pack_import_frontier_update(
633        &mut self,
634        export_id: GlobalId,
635        import_id: GlobalId,
636        time: Timestamp,
637    ) -> (&RowRef, &RowRef) {
638        self.import_frontier_packer.pack_slice(&[
639            make_string_datum(export_id, &mut self.scratch_string_a),
640            make_string_datum(import_id, &mut self.scratch_string_b),
641            Datum::UInt64(u64::cast_from(self.worker_id)),
642            Datum::MzTimestamp(time),
643        ])
644    }
645
646    /// Pack an LIR mapping update key-value for the given LIR operator metadata.
647    fn pack_lir_mapping_update(
648        &mut self,
649        global_id: GlobalId,
650        lir_id: LirId,
651        operator: String,
652        parent_lir_id: Option<LirId>,
653        nesting: u8,
654        operator_span: (usize, usize),
655    ) -> (&RowRef, &RowRef) {
656        self.lir_mapping_packer.pack_slice(&[
657            make_string_datum(global_id, &mut self.scratch_string_a),
658            Datum::UInt64(lir_id.into()),
659            Datum::UInt64(u64::cast_from(self.worker_id)),
660            make_string_datum(operator, &mut self.scratch_string_b),
661            parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
662            Datum::UInt16(u16::cast_from(nesting)),
663            Datum::UInt64(u64::cast_from(operator_span.0)),
664            Datum::UInt64(u64::cast_from(operator_span.1)),
665        ])
666    }
667
668    /// Pack an operator hydration status update key-value for the given export ID, LIR ID, and
669    /// hydration status.
670    fn pack_operator_hydration_status_update(
671        &mut self,
672        export_id: GlobalId,
673        lir_id: LirId,
674        hydrated: bool,
675    ) -> (&RowRef, &RowRef) {
676        self.operator_hydration_status_packer.pack_slice(&[
677            make_string_datum(export_id, &mut self.scratch_string_a),
678            Datum::UInt64(lir_id.into()),
679            Datum::UInt64(u64::cast_from(self.worker_id)),
680            Datum::from(hydrated),
681        ])
682    }
683
684    /// Pack a peek duration update key-value for the given peek and peek type.
685    fn pack_peek_duration_update(
686        &mut self,
687        peek_type: PeekType,
688        bucket: u128,
689    ) -> (&RowRef, &RowRef) {
690        self.peek_duration_packer.pack_slice(&[
691            Datum::UInt64(u64::cast_from(self.worker_id)),
692            Datum::String(peek_type.name()),
693            Datum::UInt64(bucket.try_into().expect("bucket too big")),
694        ])
695    }
696
697    /// Pack a peek update key-value for the given peek and peek type.
698    fn pack_peek_update(
699        &mut self,
700        id: GlobalId,
701        time: Timestamp,
702        uuid: Uuid,
703        peek_type: PeekType,
704    ) -> (&RowRef, &RowRef) {
705        self.peek_packer.pack_slice(&[
706            Datum::Uuid(uuid),
707            Datum::UInt64(u64::cast_from(self.worker_id)),
708            make_string_datum(id, &mut self.scratch_string_a),
709            Datum::String(peek_type.name()),
710            Datum::MzTimestamp(time),
711        ])
712    }
713
714    /// Pack a frontier update key-value for the given export ID and time.
715    fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
716        self.frontier_packer.pack_slice(&[
717            make_string_datum(export_id, &mut self.scratch_string_a),
718            Datum::UInt64(u64::cast_from(self.worker_id)),
719            Datum::MzTimestamp(time),
720        ])
721    }
722}
723
724/// State tracked for each dataflow export.
725struct ExportState {
726    /// The ID of the dataflow maintaining this export.
727    dataflow_index: usize,
728    /// Number of errors in this export.
729    ///
730    /// This must be a signed integer, since per-worker error counts can be negative, only the
731    /// cross-worker total has to sum up to a non-negative value.
732    error_count: Diff,
733    /// When this export was created.
734    created_at: Instant,
735    /// Whether the exported collection is hydrated.
736    hydration_time_ns: Option<u64>,
737    /// Hydration status of operators feeding this export.
738    operator_hydration: BTreeMap<LirId, bool>,
739}
740
741impl ExportState {
742    fn new(dataflow_index: usize) -> Self {
743        Self {
744            dataflow_index,
745            error_count: Diff::ZERO,
746            created_at: Instant::now(),
747            hydration_time_ns: None,
748            operator_hydration: BTreeMap::new(),
749        }
750    }
751}
752
753/// State for tracking arrangement sizes.
754#[derive(Default, Debug)]
755struct ArrangementSizeState {
756    size: isize,
757    capacity: isize,
758    count: isize,
759}
760
761/// Bundled output sessions used by the demux operator.
762struct DemuxOutput<'a, 'b> {
763    export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764    frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765    import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766    peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767    peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
768    arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
769    arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
770    arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
771    hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
772    operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
773    error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
774    lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
775    dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
776}
777
778/// Event handler of the demux operator.
779struct DemuxHandler<'a, 'b, 'c> {
780    /// State kept by the demux operator.
781    state: &'a mut DemuxState,
782    /// State shared across log receivers.
783    shared_state: &'a mut SharedLoggingState,
784    /// Demux output sessions.
785    output: &'a mut DemuxOutput<'b, 'c>,
786    /// The logging interval specifying the time granularity for the updates.
787    logging_interval_ms: u128,
788    /// The current event time.
789    time: Duration,
790}
791
792impl DemuxHandler<'_, '_, '_> {
793    /// Return the timestamp associated with the current event, based on the event time and the
794    /// logging interval.
795    fn ts(&self) -> Timestamp {
796        let time_ms = self.time.as_millis();
797        let interval = self.logging_interval_ms;
798        let rounded = (time_ms / interval + 1) * interval;
799        rounded.try_into().expect("must fit")
800    }
801
802    /// Handle the given compute event.
803    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
804        use ComputeEventReference::*;
805        match event {
806            Export(export) => self.handle_export(export),
807            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
808            Peek(peek) if peek.installed => self.handle_peek_install(peek),
809            Peek(peek) => self.handle_peek_retire(peek),
810            Frontier(frontier) => self.handle_frontier(frontier),
811            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
812            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
813            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
814            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
815            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
816            ArrangementHeapSizeOperatorDrop(inner) => {
817                self.handle_arrangement_heap_size_operator_dropped(inner)
818            }
819            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
820            ErrorCount(error_count) => self.handle_error_count(error_count),
821            Hydration(hydration) => self.handle_hydration(hydration),
822            OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
823            LirMapping(mapping) => self.handle_lir_mapping(mapping),
824            DataflowGlobal(global) => self.handle_dataflow_global(global),
825        }
826    }
827
828    fn handle_export(
829        &mut self,
830        ExportReference {
831            export_id,
832            dataflow_index,
833        }: Ref<'_, Export>,
834    ) {
835        let export_id = Columnar::into_owned(export_id);
836        let ts = self.ts();
837        let datum = self.state.pack_export_update(export_id, dataflow_index);
838        self.output.export.give((datum, ts, Diff::ONE));
839
840        let existing = self
841            .state
842            .exports
843            .insert(export_id, ExportState::new(dataflow_index));
844        if existing.is_some() {
845            error!(%export_id, "export already registered");
846        }
847
848        // Insert hydration time logging for this export.
849        let datum = self.state.pack_hydration_time_update(export_id, None);
850        self.output.hydration_time.give((datum, ts, Diff::ONE));
851    }
852
853    fn handle_export_dropped(
854        &mut self,
855        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
856    ) {
857        let export_id = Columnar::into_owned(export_id);
858        let Some(export) = self.state.exports.remove(&export_id) else {
859            error!(%export_id, "missing exports entry at time of export drop");
860            return;
861        };
862
863        let ts = self.ts();
864        let dataflow_index = export.dataflow_index;
865
866        let datum = self.state.pack_export_update(export_id, dataflow_index);
867        self.output.export.give((datum, ts, Diff::MINUS_ONE));
868
869        // Remove error count logging for this export.
870        if export.error_count != Diff::ZERO {
871            let datum = self
872                .state
873                .pack_error_count_update(export_id, export.error_count);
874            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
875        }
876
877        // Remove hydration time logging for this export.
878        let datum = self
879            .state
880            .pack_hydration_time_update(export_id, export.hydration_time_ns);
881        self.output
882            .hydration_time
883            .give((datum, ts, Diff::MINUS_ONE));
884
885        // Remove operator hydration logging for this export.
886        for (lir_id, hydrated) in export.operator_hydration {
887            let datum = self
888                .state
889                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
890            self.output
891                .operator_hydration_status
892                .give((datum, ts, Diff::MINUS_ONE));
893        }
894    }
895
896    fn handle_dataflow_shutdown(
897        &mut self,
898        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
899    ) {
900        let ts = self.ts();
901
902        // We deal with any `GlobalId` based mappings in this event.
903        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
904            for global_id in global_ids {
905                // Remove dataflow/`GlobalID` mapping.
906                let datum = self
907                    .state
908                    .pack_dataflow_global_update(dataflow_index, global_id);
909                self.output
910                    .dataflow_global_ids
911                    .give((datum, ts, Diff::MINUS_ONE));
912
913                // Remove LIR mapping.
914                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
915                    for (
916                        lir_id,
917                        LirMetadata {
918                            operator,
919                            parent_lir_id,
920                            nesting,
921                            operator_span,
922                        },
923                    ) in mappings
924                    {
925                        let datum = self.state.pack_lir_mapping_update(
926                            global_id,
927                            lir_id,
928                            operator,
929                            parent_lir_id,
930                            nesting,
931                            operator_span,
932                        );
933                        self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
934                    }
935                }
936            }
937        }
938    }
939
940    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
941        let ts = self.ts();
942        let export_id = Columnar::into_owned(export_id);
943
944        let Some(export) = self.state.exports.get_mut(&export_id) else {
945            // The export might have already been dropped, in which case we are no longer
946            // interested in its errors.
947            return;
948        };
949
950        let old_count = export.error_count;
951        let new_count = old_count + diff;
952        export.error_count = new_count;
953
954        if old_count != Diff::ZERO {
955            let datum = self.state.pack_error_count_update(export_id, old_count);
956            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
957        }
958        if new_count != Diff::ZERO {
959            let datum = self.state.pack_error_count_update(export_id, new_count);
960            self.output.error_count.give((datum, ts, Diff::ONE));
961        }
962    }
963
964    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
965        let ts = self.ts();
966        let export_id = Columnar::into_owned(export_id);
967
968        let Some(export) = self.state.exports.get_mut(&export_id) else {
969            error!(%export_id, "hydration event for unknown export");
970            return;
971        };
972        if export.hydration_time_ns.is_some() {
973            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
974            // after reconciliation. We can simply ignore these.
975            return;
976        }
977
978        let duration = export.created_at.elapsed();
979        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
980        export.hydration_time_ns = Some(nanos);
981
982        let retraction = self.state.pack_hydration_time_update(export_id, None);
983        self.output
984            .hydration_time
985            .give((retraction, ts, Diff::MINUS_ONE));
986        let insertion = self
987            .state
988            .pack_hydration_time_update(export_id, Some(nanos));
989        self.output.hydration_time.give((insertion, ts, Diff::ONE));
990    }
991
992    fn handle_operator_hydration(
993        &mut self,
994        OperatorHydrationReference {
995            export_id,
996            lir_id,
997            hydrated,
998        }: Ref<'_, OperatorHydration>,
999    ) {
1000        let ts = self.ts();
1001        let export_id = Columnar::into_owned(export_id);
1002        let lir_id = Columnar::into_owned(lir_id);
1003        let hydrated = Columnar::into_owned(hydrated);
1004
1005        let Some(export) = self.state.exports.get_mut(&export_id) else {
1006            // The export might have already been dropped, in which case we are no longer
1007            // interested in its operator hydration events.
1008            return;
1009        };
1010
1011        let old_status = export.operator_hydration.get(&lir_id).copied();
1012        export.operator_hydration.insert(lir_id, hydrated);
1013
1014        if let Some(hydrated) = old_status {
1015            let retraction = self
1016                .state
1017                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1018            self.output
1019                .operator_hydration_status
1020                .give((retraction, ts, Diff::MINUS_ONE));
1021        }
1022
1023        let insertion = self
1024            .state
1025            .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1026        self.output
1027            .operator_hydration_status
1028            .give((insertion, ts, Diff::ONE));
1029    }
1030
1031    fn handle_peek_install(
1032        &mut self,
1033        PeekEventReference {
1034            id,
1035            time,
1036            uuid,
1037            peek_type,
1038            installed: _,
1039        }: Ref<'_, PeekEvent>,
1040    ) {
1041        let id = Columnar::into_owned(id);
1042        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1043        let ts = self.ts();
1044        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1045        self.output.peek.give((datum, ts, Diff::ONE));
1046
1047        let existing = self.state.peek_stash.insert(uuid, self.time);
1048        if existing.is_some() {
1049            error!(%uuid, "peek already registered");
1050        }
1051    }
1052
1053    fn handle_peek_retire(
1054        &mut self,
1055        PeekEventReference {
1056            id,
1057            time,
1058            uuid,
1059            peek_type,
1060            installed: _,
1061        }: Ref<'_, PeekEvent>,
1062    ) {
1063        let id = Columnar::into_owned(id);
1064        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1065        let ts = self.ts();
1066        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1067        self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1068
1069        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1070            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1071            let bucket = elapsed_ns.next_power_of_two();
1072            let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1073            self.output.peek_duration.give((datum, ts, Diff::ONE));
1074        } else {
1075            error!(%uuid, "peek not yet registered");
1076        }
1077    }
1078
1079    fn handle_frontier(
1080        &mut self,
1081        FrontierReference {
1082            export_id,
1083            time,
1084            diff,
1085        }: Ref<'_, Frontier>,
1086    ) {
1087        let export_id = Columnar::into_owned(export_id);
1088        let diff = Diff::from(*diff);
1089        let ts = self.ts();
1090        let time = Columnar::into_owned(time);
1091        let datum = self.state.pack_frontier_update(export_id, time);
1092        self.output.frontier.give((datum, ts, diff));
1093    }
1094
1095    fn handle_import_frontier(
1096        &mut self,
1097        ImportFrontierReference {
1098            import_id,
1099            export_id,
1100            time,
1101            diff,
1102        }: Ref<'_, ImportFrontier>,
1103    ) {
1104        let import_id = Columnar::into_owned(import_id);
1105        let export_id = Columnar::into_owned(export_id);
1106        let diff = Diff::from(*diff);
1107        let ts = self.ts();
1108        let time = Columnar::into_owned(time);
1109        let datum = self
1110            .state
1111            .pack_import_frontier_update(export_id, import_id, time);
1112        self.output.import_frontier.give((datum, ts, diff));
1113    }
1114
1115    /// Update the allocation size for an arrangement.
1116    fn handle_arrangement_heap_size(
1117        &mut self,
1118        ArrangementHeapSizeReference {
1119            operator_id,
1120            delta_size,
1121        }: Ref<'_, ArrangementHeapSize>,
1122    ) {
1123        let ts = self.ts();
1124        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1125            return;
1126        };
1127
1128        state.size += delta_size;
1129
1130        let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1131        let diff = Diff::cast_from(delta_size);
1132        self.output.arrangement_heap_size.give((datum, ts, diff));
1133    }
1134
1135    /// Update the allocation capacity for an arrangement.
1136    fn handle_arrangement_heap_capacity(
1137        &mut self,
1138        ArrangementHeapCapacityReference {
1139            operator_id,
1140            delta_capacity,
1141        }: Ref<'_, ArrangementHeapCapacity>,
1142    ) {
1143        let ts = self.ts();
1144        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1145            return;
1146        };
1147
1148        state.capacity += delta_capacity;
1149
1150        let datum = self
1151            .state
1152            .pack_arrangement_heap_capacity_update(operator_id);
1153        let diff = Diff::cast_from(delta_capacity);
1154        self.output
1155            .arrangement_heap_capacity
1156            .give((datum, ts, diff));
1157    }
1158
1159    /// Update the allocation count for an arrangement.
1160    fn handle_arrangement_heap_allocations(
1161        &mut self,
1162        ArrangementHeapAllocationsReference {
1163            operator_id,
1164            delta_allocations,
1165        }: Ref<'_, ArrangementHeapAllocations>,
1166    ) {
1167        let ts = self.ts();
1168        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1169            return;
1170        };
1171
1172        state.count += delta_allocations;
1173
1174        let datum = self
1175            .state
1176            .pack_arrangement_heap_allocations_update(operator_id);
1177        let diff = Diff::cast_from(delta_allocations);
1178        self.output
1179            .arrangement_heap_allocations
1180            .give((datum, ts, diff));
1181    }
1182
1183    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1184    fn handle_arrangement_heap_size_operator(
1185        &mut self,
1186        ArrangementHeapSizeOperatorReference {
1187            operator_id,
1188            address,
1189        }: Ref<'_, ArrangementHeapSizeOperator>,
1190    ) {
1191        let activator = Activator::new(
1192            address.into_iter().collect(),
1193            Rc::clone(&self.state.activations),
1194        );
1195        let existing = self
1196            .state
1197            .arrangement_size
1198            .insert(operator_id, Default::default());
1199        if existing.is_some() {
1200            error!(%operator_id, "arrangement size operator already registered");
1201        }
1202        let existing = self
1203            .shared_state
1204            .arrangement_size_activators
1205            .insert(operator_id, activator);
1206        if existing.is_some() {
1207            error!(%operator_id, "arrangement size activator already registered");
1208        }
1209    }
1210
1211    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1212    fn handle_arrangement_heap_size_operator_dropped(
1213        &mut self,
1214        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1215    ) {
1216        let operator_id = event.operator_id;
1217        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1218            let ts = self.ts();
1219            let allocations = self
1220                .state
1221                .pack_arrangement_heap_allocations_update(operator_id);
1222            let diff = -Diff::cast_from(state.count);
1223            self.output
1224                .arrangement_heap_allocations
1225                .give((allocations, ts, diff));
1226
1227            let capacity = self
1228                .state
1229                .pack_arrangement_heap_capacity_update(operator_id);
1230            let diff = -Diff::cast_from(state.capacity);
1231            self.output
1232                .arrangement_heap_capacity
1233                .give((capacity, ts, diff));
1234
1235            let size = self.state.pack_arrangement_heap_size_update(operator_id);
1236            let diff = -Diff::cast_from(state.size);
1237            self.output.arrangement_heap_size.give((size, ts, diff));
1238        }
1239        self.shared_state
1240            .arrangement_size_activators
1241            .remove(&operator_id);
1242    }
1243
1244    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1245    fn handle_lir_mapping(
1246        &mut self,
1247        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1248    ) {
1249        let global_id = Columnar::into_owned(global_id);
1250        // record the state (for the later drop)
1251        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1252        self.state
1253            .lir_mapping
1254            .entry(global_id)
1255            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1256            .or_insert_with(|| mappings().collect());
1257
1258        // send the datum out
1259        let ts = self.ts();
1260        for (lir_id, meta) in mapping.into_iter() {
1261            let datum = self.state.pack_lir_mapping_update(
1262                global_id,
1263                Columnar::into_owned(lir_id),
1264                Columnar::into_owned(meta.operator),
1265                Columnar::into_owned(meta.parent_lir_id),
1266                Columnar::into_owned(meta.nesting),
1267                Columnar::into_owned(meta.operator_span),
1268            );
1269            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1270        }
1271    }
1272
1273    fn handle_dataflow_global(
1274        &mut self,
1275        DataflowGlobalReference {
1276            dataflow_index,
1277            global_id,
1278        }: Ref<'_, DataflowGlobal>,
1279    ) {
1280        let global_id = Columnar::into_owned(global_id);
1281        self.state
1282            .dataflow_global_ids
1283            .entry(dataflow_index)
1284            .and_modify(|globals| {
1285                // NB BTreeSet::insert() returns `false` when the element was already in the set
1286                if !globals.insert(global_id) {
1287                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1288                }
1289            })
1290            .or_insert_with(|| BTreeSet::from([global_id]));
1291
1292        let ts = self.ts();
1293        let datum = self
1294            .state
1295            .pack_dataflow_global_update(dataflow_index, global_id);
1296        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1297    }
1298}
1299
1300/// Logging state maintained for a compute collection.
1301///
1302/// This type is used to produce appropriate log events in response to changes of logged collection
1303/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1304pub struct CollectionLogging {
1305    export_id: GlobalId,
1306    logger: Logger,
1307
1308    logged_frontier: Option<Timestamp>,
1309    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1310}
1311
1312impl CollectionLogging {
1313    /// Create new logging state for the identified collection and emit initial logging events.
1314    pub fn new(
1315        export_id: GlobalId,
1316        logger: Logger,
1317        dataflow_index: usize,
1318        import_ids: impl Iterator<Item = GlobalId>,
1319    ) -> Self {
1320        logger.log(&ComputeEvent::Export(Export {
1321            export_id,
1322            dataflow_index,
1323        }));
1324
1325        let mut self_ = Self {
1326            export_id,
1327            logger,
1328            logged_frontier: None,
1329            logged_import_frontiers: Default::default(),
1330        };
1331
1332        // Initialize frontier logging.
1333        let initial_frontier = Some(Timestamp::MIN);
1334        self_.set_frontier(initial_frontier);
1335        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1336
1337        self_
1338    }
1339
1340    /// Set the collection frontier to the given new time and emit corresponding logging events.
1341    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1342        let old_time = self.logged_frontier;
1343        self.logged_frontier = new_time;
1344
1345        if old_time != new_time {
1346            let export_id = self.export_id;
1347            let retraction = old_time.map(|time| {
1348                ComputeEvent::Frontier(Frontier {
1349                    export_id,
1350                    time,
1351                    diff: -1,
1352                })
1353            });
1354            let insertion = new_time.map(|time| {
1355                ComputeEvent::Frontier(Frontier {
1356                    export_id,
1357                    time,
1358                    diff: 1,
1359                })
1360            });
1361            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1362            self.logger.log_many(events);
1363        }
1364    }
1365
1366    /// Set the frontier of the given import to the given new time and emit corresponding logging
1367    /// events.
1368    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1369        let old_time = self.logged_import_frontiers.remove(&import_id);
1370        if let Some(time) = new_time {
1371            self.logged_import_frontiers.insert(import_id, time);
1372        }
1373
1374        if old_time != new_time {
1375            let export_id = self.export_id;
1376            let retraction = old_time.map(|time| {
1377                ComputeEvent::ImportFrontier(ImportFrontier {
1378                    import_id,
1379                    export_id,
1380                    time,
1381                    diff: -1,
1382                })
1383            });
1384            let insertion = new_time.map(|time| {
1385                ComputeEvent::ImportFrontier(ImportFrontier {
1386                    import_id,
1387                    export_id,
1388                    time,
1389                    diff: 1,
1390                })
1391            });
1392            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1393            self.logger.log_many(events);
1394        }
1395    }
1396
1397    /// Set the collection as hydrated.
1398    pub fn set_hydrated(&self) {
1399        self.logger.log(&ComputeEvent::Hydration(Hydration {
1400            export_id: self.export_id,
1401        }));
1402    }
1403
1404    /// The export global ID of this collection.
1405    pub fn export_id(&self) -> GlobalId {
1406        self.export_id
1407    }
1408}
1409
1410impl Drop for CollectionLogging {
1411    fn drop(&mut self) {
1412        // Emit retraction events to clean up events previously logged.
1413        self.set_frontier(None);
1414
1415        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1416        for import_id in import_ids {
1417            self.set_import_frontier(import_id, None);
1418        }
1419
1420        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1421            export_id: self.export_id,
1422        }));
1423    }
1424}
1425
1426/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1427/// batch streams.
1428pub(crate) trait LogDataflowErrors {
1429    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1430}
1431
1432impl<'scope, T, D> LogDataflowErrors for VecCollection<'scope, T, D, Diff>
1433where
1434    T: timely::progress::Timestamp,
1435    D: Clone + 'static,
1436{
1437    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1438        self.inner
1439            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1440                move |input, output| {
1441                    input.for_each(|cap, data| {
1442                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1443                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1444
1445                        output.session(&cap).give_container(data);
1446                    });
1447                }
1448            })
1449            .as_collection()
1450    }
1451}
1452
1453impl<'scope, T, B> LogDataflowErrors for StreamVec<'scope, T, B>
1454where
1455    T: timely::progress::Timestamp,
1456    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1457{
1458    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1459        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1460            move |input, output| {
1461                input.for_each(|cap, data| {
1462                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1463                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1464
1465                    output.session(&cap).give_container(data);
1466                });
1467            }
1468        })
1469    }
1470}
1471
1472/// Return the sum of all diffs within the given batch.
1473///
1474/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1475/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1476/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1477/// batches might become large.
1478fn sum_batch_diffs<B>(batch: &B) -> Diff
1479where
1480    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1481{
1482    let mut sum = Diff::ZERO;
1483    let mut cursor = batch.cursor();
1484
1485    while cursor.key_valid(batch) {
1486        while cursor.val_valid(batch) {
1487            cursor.map_times(batch, |_t, r| sum += r);
1488            cursor.step_val(batch);
1489        }
1490        cursor.step_key(batch);
1491    }
1492
1493    sum
1494}
1495
1496#[cfg(test)]
1497mod tests {
1498    use super::*;
1499
1500    #[mz_ore::test]
1501    fn test_compute_event_size() {
1502        // This could be a static assertion, but we don't use those yet in this crate.
1503        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1504    }
1505}