mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::Data;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, Stream};
35use timely::scheduling::Scheduler;
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42    SharedLoggingState, Update,
43};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::RowRowSpine;
46
47/// Type alias for a logger of compute events.
48pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51/// A dataflow exports a global ID.
52#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54    /// Identifier of the export.
55    pub export_id: GlobalId,
56    /// Timely worker index of the exporting dataflow.
57    pub dataflow_index: usize,
58}
59
60/// The export for a global id was dropped.
61#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63    /// Identifier of the export.
64    pub export_id: GlobalId,
65}
66
67/// A peek event with a [`PeekType`], and an installation status.
68#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70    /// The identifier of the view the peek targets.
71    pub id: GlobalId,
72    /// The logical timestamp requested.
73    pub time: Timestamp,
74    /// The ID of the peek.
75    pub uuid: uuid::Bytes,
76    /// The relevant _type_ of peek: index or persist.
77    // Note that this is not stored on the Peek event for data-packing reasons only.
78    pub peek_type: PeekType,
79    /// True if the peek is being installed; false if it's being removed.
80    pub installed: bool,
81}
82
83/// Frontier change event.
84#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86    pub export_id: GlobalId,
87    pub time: Timestamp,
88    pub diff: i8,
89}
90
91/// An import frontier change.
92#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94    pub import_id: GlobalId,
95    pub export_id: GlobalId,
96    pub time: Timestamp,
97    pub diff: i8,
98}
99
100/// A change in an arrangement's heap size.
101#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103    /// Operator index
104    pub operator_id: usize,
105    /// Delta of the heap size in bytes of the arrangement.
106    pub delta_size: isize,
107}
108
109/// A change in an arrangement's heap capacity.
110#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112    /// Operator index
113    pub operator_id: usize,
114    /// Delta of the heap capacity in bytes of the arrangement.
115    pub delta_capacity: isize,
116}
117
118/// A change in an arrangement's heap allocation count.
119#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121    /// Operator index
122    pub operator_id: usize,
123    /// Delta of distinct heap allocations backing the arrangement.
124    pub delta_allocations: isize,
125}
126
127/// Announcing an operator that manages an arrangement.
128#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130    /// Operator index
131    pub operator_id: usize,
132    /// The address of the operator.
133    pub address: Vec<usize>,
134}
135
136/// Drop event for an operator managing an arrangement.
137#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139    /// Operator index
140    pub operator_id: usize,
141}
142
143/// Dataflow shutdown event.
144#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146    /// Timely worker index of the dataflow.
147    pub dataflow_index: usize,
148}
149
150/// Error count update event.
151#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153    /// Identifier of the export.
154    pub export_id: GlobalId,
155    /// The change in error count.
156    pub diff: Diff,
157}
158
159/// An export is hydrated.
160#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162    /// Identifier of the export.
163    pub export_id: GlobalId,
164}
165
166/// An operator's hydration status changed.
167#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169    /// Identifier of the export.
170    pub export_id: GlobalId,
171    /// Identifier of the operator's LIR node.
172    pub lir_id: LirId,
173    /// Whether the operator is hydrated.
174    pub hydrated: bool,
175}
176
177/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
178#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180    /// The `GlobalId` in which the LIR operator is rendered.
181    ///
182    /// NB a single a dataflow may have many `GlobalId`s inside it.
183    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
184    /// tracks the many-to-one relationship between `GlobalId`s and
185    /// dataflows.
186    pub global_id: GlobalId,
187    /// The actual mapping.
188    /// Represented this way to reduce the size of `ComputeEvent`.
189    pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192/// Announce that a dataflow supports a specific global ID.
193#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195    /// The identifier of the dataflow.
196    pub dataflow_index: usize,
197    /// A `GlobalId` that is rendered as part of this dataflow.
198    pub global_id: GlobalId,
199}
200
201/// A logged compute event.
202#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204    /// A dataflow export was created.
205    Export(Export),
206    /// A dataflow export was dropped.
207    ExportDropped(ExportDropped),
208    /// Peek command.
209    Peek(PeekEvent),
210    /// Available frontier information for dataflow exports.
211    Frontier(Frontier),
212    /// Available frontier information for dataflow imports.
213    ImportFrontier(ImportFrontier),
214    /// Arrangement heap size update
215    ArrangementHeapSize(ArrangementHeapSize),
216    /// Arrangement heap size update
217    ArrangementHeapCapacity(ArrangementHeapCapacity),
218    /// Arrangement heap size update
219    ArrangementHeapAllocations(ArrangementHeapAllocations),
220    /// Arrangement size operator address
221    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222    /// Arrangement size operator dropped
223    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224    /// All operators of a dataflow have shut down.
225    DataflowShutdown(DataflowShutdown),
226    /// The number of errors in a dataflow export has changed.
227    ErrorCount(ErrorCount),
228    /// A dataflow export was hydrated.
229    Hydration(Hydration),
230    /// A dataflow operator's hydration status changed.
231    OperatorHydration(OperatorHydration),
232    /// An LIR operator was mapped to some particular dataflow operator.
233    ///
234    /// Cf. `ComputeLog::LirMaping`
235    LirMapping(LirMapping),
236    DataflowGlobal(DataflowGlobal),
237}
238
239/// A peek type distinguishing between index and persist peeks.
240#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242    /// A peek against an index.
243    Index,
244    /// A peek against persist.
245    Persist,
246}
247
248impl PeekType {
249    /// A human-readable name for a peek type.
250    fn name(self) -> &'static str {
251        match self {
252            PeekType::Index => "index",
253            PeekType::Persist => "persist",
254        }
255    }
256}
257
258/// Metadata for LIR operators.
259#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
262    operator: String,
263    /// The LIR identifier of the parent (if any).
264    parent_lir_id: Option<LirId>,
265    /// How nested the operator is (for nice indentation).
266    nesting: u8,
267    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
268    /// If `start == end`, then no operators were used.
269    operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273    /// Construct a new LIR metadata object.
274    pub fn new(
275        operator: String,
276        parent_lir_id: Option<LirId>,
277        nesting: u8,
278        operator_span: (usize, usize),
279    ) -> Self {
280        Self {
281            operator,
282            parent_lir_id,
283            nesting,
284            operator_span,
285        }
286    }
287}
288
289/// The return type of the [`construct`] function.
290pub(super) struct Return {
291    /// Collections returned by [`construct`].
292    pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295/// Constructs the logging dataflow fragment for compute logs.
296///
297/// Params
298/// * `scope`: The Timely scope hosting the log analysis dataflow.
299/// * `scheduler`: The timely scheduler to obtainer activators.
300/// * `config`: Logging configuration.
301/// * `event_queue`: The source to read compute log events from.
302/// * `compute_event_streams`: Additional compute event streams to absorb.
303/// * `shared_state`: Shared state between logging dataflow fragments.
304pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
305    mut scope: G,
306    scheduler: S,
307    config: &mz_compute_client::logging::LoggingConfig,
308    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309    shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313    scope.scoped("compute logging", move |scope| {
314        let enable_logging = config.enable_logging;
315        let (logs, token) = if enable_logging {
316            event_queue.links.mz_replay(
317                scope,
318                "compute logs",
319                config.interval,
320                event_queue.activator,
321            )
322        } else {
323            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324            (empty(scope), token)
325        };
326
327        // Build a demux operator that splits the replayed event stream up into the separate
328        // logging streams.
329        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330        let mut input = demux.new_input(&logs, Pipeline);
331        let (export_out, export) = demux.new_output();
332        let mut export_out = OutputBuilder::from(export_out);
333        let (frontier_out, frontier) = demux.new_output();
334        let mut frontier_out = OutputBuilder::from(frontier_out);
335        let (import_frontier_out, import_frontier) = demux.new_output();
336        let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337        let (peek_out, peek) = demux.new_output();
338        let mut peek_out = OutputBuilder::from(peek_out);
339        let (peek_duration_out, peek_duration) = demux.new_output();
340        let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341        let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
342        let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
343        let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
344        let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
345        let (arrangement_heap_allocations_out, arrangement_heap_allocations) =
346            demux.new_output();
347        let mut arrangement_heap_allocations_out = OutputBuilder::from(arrangement_heap_allocations_out);
348        let (error_count_out, error_count) = demux.new_output();
349        let mut error_count_out = OutputBuilder::from(error_count_out);
350        let (hydration_time_out, hydration_time) = demux.new_output();
351        let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
352        let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
353        let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
354        let (lir_mapping_out, lir_mapping) = demux.new_output();
355        let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
356        let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
357        let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
358
359        let mut demux_state = DemuxState::new(scheduler, scope.index());
360        demux.build(move |_capability| {
361            move |_frontiers| {
362                let mut export = export_out.activate();
363                let mut frontier = frontier_out.activate();
364                let mut import_frontier = import_frontier_out.activate();
365                let mut peek = peek_out.activate();
366                let mut peek_duration = peek_duration_out.activate();
367                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
368                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
369                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
370                let mut error_count = error_count_out.activate();
371                let mut hydration_time = hydration_time_out.activate();
372                let mut operator_hydration_status = operator_hydration_status_out.activate();
373                let mut lir_mapping = lir_mapping_out.activate();
374                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
375
376                input.for_each(|cap, data| {
377                    let mut output_sessions = DemuxOutput {
378                        export: export.session_with_builder(&cap),
379                        frontier: frontier.session_with_builder(&cap),
380                        import_frontier: import_frontier.session_with_builder(&cap),
381                        peek: peek.session_with_builder(&cap),
382                        peek_duration: peek_duration.session_with_builder(&cap),
383                        arrangement_heap_allocations: arrangement_heap_allocations.session_with_builder(&cap),
384                        arrangement_heap_capacity: arrangement_heap_capacity.session_with_builder(&cap),
385                        arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
386                        error_count: error_count.session_with_builder(&cap),
387                        hydration_time: hydration_time.session_with_builder(&cap),
388                        operator_hydration_status: operator_hydration_status.session_with_builder(&cap),
389                        lir_mapping: lir_mapping.session_with_builder(&cap),
390                        dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
391                    };
392
393                    let shared_state = &mut shared_state.borrow_mut();
394                    for (time, event) in data.borrow().into_index_iter() {
395                        DemuxHandler {
396                            state: &mut demux_state,
397                            shared_state,
398                            output: &mut output_sessions,
399                            logging_interval_ms,
400                            time,
401                        }
402                        .handle(event);
403                    }
404                });
405            }
406        });
407
408        use ComputeLog::*;
409        let logs = [
410            (ArrangementHeapAllocations, arrangement_heap_allocations),
411            (ArrangementHeapCapacity, arrangement_heap_capacity),
412            (ArrangementHeapSize, arrangement_heap_size),
413            (DataflowCurrent, export),
414            (DataflowGlobal, dataflow_global_ids),
415            (ErrorCount, error_count),
416            (FrontierCurrent, frontier),
417            (HydrationTime, hydration_time),
418            (ImportFrontierCurrent, import_frontier),
419            (LirMapping, lir_mapping),
420            (OperatorHydrationStatus, operator_hydration_status),
421            (PeekCurrent, peek),
422            (PeekDuration, peek_duration),
423        ];
424
425        // Build the output arrangements.
426        let mut collections = BTreeMap::new();
427        for (variant, stream) in logs {
428            let variant = LogVariant::Compute(variant);
429            if config.index_logs.contains_key(&variant) {
430                let trace = stream
431                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
432                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
433                        &format!("Arrange {variant:?}"),
434                    )
435                    .trace;
436                let collection = LogCollection {
437                    trace,
438                    token: Rc::clone(&token),
439                };
440                collections.insert(variant, collection);
441            }
442        }
443
444        Return { collections }
445    })
446}
447
448/// Format the given value and pack it into a `Datum::String`.
449///
450/// The `scratch` buffer is used to perform the string conversion without an allocation.
451/// Callers should not assume anything about the contents of this buffer after this function
452/// returns.
453fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
454where
455    V: Display,
456{
457    scratch.clear();
458    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
459    Datum::String(scratch)
460}
461
462/// State maintained by the demux operator.
463struct DemuxState<A> {
464    /// The timely scheduler.
465    scheduler: A,
466    /// The index of this worker.
467    worker_id: usize,
468    /// A reusable scratch string for formatting IDs.
469    scratch_string_a: String,
470    /// A reusable scratch string for formatting IDs.
471    scratch_string_b: String,
472    /// State tracked per dataflow export.
473    exports: BTreeMap<GlobalId, ExportState>,
474    /// Maps pending peeks to their installation time.
475    peek_stash: BTreeMap<Uuid, Duration>,
476    /// Arrangement size stash.
477    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
478    /// LIR -> operator span mapping.
479    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
480    /// Dataflow -> `GlobalId` mapping (many-to-one).
481    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
482    /// A row packer for the arrangement heap allocations output.
483    arrangement_heap_allocations_packer: PermutedRowPacker,
484    /// A row packer for the arrangement heap capacity output.
485    arrangement_heap_capacity_packer: PermutedRowPacker,
486    /// A row packer for the arrangement heap size output.
487    arrangement_heap_size_packer: PermutedRowPacker,
488    /// A row packer for the dataflow global output.
489    dataflow_global_packer: PermutedRowPacker,
490    /// A row packer for the error count output.
491    error_count_packer: PermutedRowPacker,
492    /// A row packer for the exports output.
493    export_packer: PermutedRowPacker,
494    /// A row packer for the frontier output.
495    frontier_packer: PermutedRowPacker,
496    /// A row packer for the exports output.
497    import_frontier_packer: PermutedRowPacker,
498    /// A row packer for the LIR mapping output.
499    lir_mapping_packer: PermutedRowPacker,
500    /// A row packer for the operator hydration status output.
501    operator_hydration_status_packer: PermutedRowPacker,
502    /// A row packer for the peek durations output.
503    peek_duration_packer: PermutedRowPacker,
504    /// A row packer for the peek output.
505    peek_packer: PermutedRowPacker,
506    /// A row packer for the hydration time output.
507    hydration_time_packer: PermutedRowPacker,
508}
509
510impl<A: Scheduler> DemuxState<A> {
511    fn new(scheduler: A, worker_id: usize) -> Self {
512        Self {
513            scheduler,
514            worker_id,
515            scratch_string_a: String::new(),
516            scratch_string_b: String::new(),
517            exports: Default::default(),
518            peek_stash: Default::default(),
519            arrangement_size: Default::default(),
520            lir_mapping: Default::default(),
521            dataflow_global_ids: Default::default(),
522            arrangement_heap_allocations_packer: PermutedRowPacker::new(
523                ComputeLog::ArrangementHeapAllocations,
524            ),
525            arrangement_heap_capacity_packer: PermutedRowPacker::new(
526                ComputeLog::ArrangementHeapCapacity,
527            ),
528            arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
529            dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
530            error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
531            export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
532            frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
533            hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
534            import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
535            lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
536            operator_hydration_status_packer: PermutedRowPacker::new(
537                ComputeLog::OperatorHydrationStatus,
538            ),
539            peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
540            peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
541        }
542    }
543
544    /// Pack an arrangement heap allocations update key-value for the given operator.
545    fn pack_arrangement_heap_allocations_update(
546        &mut self,
547        operator_id: usize,
548    ) -> (&RowRef, &RowRef) {
549        self.arrangement_heap_allocations_packer.pack_slice(&[
550            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
551            Datum::UInt64(u64::cast_from(self.worker_id)),
552        ])
553    }
554
555    /// Pack an arrangement heap capacity update key-value for the given operator.
556    fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
557        self.arrangement_heap_capacity_packer.pack_slice(&[
558            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
559            Datum::UInt64(u64::cast_from(self.worker_id)),
560        ])
561    }
562
563    /// Pack an arrangement heap size update key-value for the given operator.
564    fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
565        self.arrangement_heap_size_packer.pack_slice(&[
566            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
567            Datum::UInt64(u64::cast_from(self.worker_id)),
568        ])
569    }
570
571    /// Pack a dataflow global update key-value for the given dataflow index and global ID.
572    fn pack_dataflow_global_update(
573        &mut self,
574        dataflow_index: usize,
575        global_id: GlobalId,
576    ) -> (&RowRef, &RowRef) {
577        self.dataflow_global_packer.pack_slice(&[
578            Datum::UInt64(u64::cast_from(dataflow_index)),
579            Datum::UInt64(u64::cast_from(self.worker_id)),
580            make_string_datum(global_id, &mut self.scratch_string_a),
581        ])
582    }
583
584    /// Pack an error count update key-value for the given export ID and count.
585    fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
586        // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
587        // per-worker error count might be negative and at the SQL level having negative multiplicities
588        // is treated as an error.
589        self.error_count_packer.pack_slice(&[
590            make_string_datum(export_id, &mut self.scratch_string_a),
591            Datum::UInt64(u64::cast_from(self.worker_id)),
592            Datum::Int64(count.into_inner()),
593        ])
594    }
595
596    /// Pack an export update key-value for the given export ID and dataflow index.
597    fn pack_export_update(
598        &mut self,
599        export_id: GlobalId,
600        dataflow_index: usize,
601    ) -> (&RowRef, &RowRef) {
602        self.export_packer.pack_slice(&[
603            make_string_datum(export_id, &mut self.scratch_string_a),
604            Datum::UInt64(u64::cast_from(self.worker_id)),
605            Datum::UInt64(u64::cast_from(dataflow_index)),
606        ])
607    }
608
609    /// Pack a hydration time update key-value for the given export ID and hydration time.
610    fn pack_hydration_time_update(
611        &mut self,
612        export_id: GlobalId,
613        time_ns: Option<u64>,
614    ) -> (&RowRef, &RowRef) {
615        self.hydration_time_packer.pack_slice(&[
616            make_string_datum(export_id, &mut self.scratch_string_a),
617            Datum::UInt64(u64::cast_from(self.worker_id)),
618            Datum::from(time_ns),
619        ])
620    }
621
622    /// Pack an import frontier update key-value for the given export ID and dataflow index.
623    fn pack_import_frontier_update(
624        &mut self,
625        export_id: GlobalId,
626        import_id: GlobalId,
627        time: Timestamp,
628    ) -> (&RowRef, &RowRef) {
629        self.import_frontier_packer.pack_slice(&[
630            make_string_datum(export_id, &mut self.scratch_string_a),
631            make_string_datum(import_id, &mut self.scratch_string_b),
632            Datum::UInt64(u64::cast_from(self.worker_id)),
633            Datum::MzTimestamp(time),
634        ])
635    }
636
637    /// Pack an LIR mapping update key-value for the given LIR operator metadata.
638    fn pack_lir_mapping_update(
639        &mut self,
640        global_id: GlobalId,
641        lir_id: LirId,
642        operator: String,
643        parent_lir_id: Option<LirId>,
644        nesting: u8,
645        operator_span: (usize, usize),
646    ) -> (&RowRef, &RowRef) {
647        self.lir_mapping_packer.pack_slice(&[
648            make_string_datum(global_id, &mut self.scratch_string_a),
649            Datum::UInt64(lir_id.into()),
650            Datum::UInt64(u64::cast_from(self.worker_id)),
651            make_string_datum(operator, &mut self.scratch_string_b),
652            parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
653            Datum::UInt16(u16::cast_from(nesting)),
654            Datum::UInt64(u64::cast_from(operator_span.0)),
655            Datum::UInt64(u64::cast_from(operator_span.1)),
656        ])
657    }
658
659    /// Pack an operator hydration status update key-value for the given export ID, LIR ID, and
660    /// hydration status.
661    fn pack_operator_hydration_status_update(
662        &mut self,
663        export_id: GlobalId,
664        lir_id: LirId,
665        hydrated: bool,
666    ) -> (&RowRef, &RowRef) {
667        self.operator_hydration_status_packer.pack_slice(&[
668            make_string_datum(export_id, &mut self.scratch_string_a),
669            Datum::UInt64(lir_id.into()),
670            Datum::UInt64(u64::cast_from(self.worker_id)),
671            Datum::from(hydrated),
672        ])
673    }
674
675    /// Pack a peek duration update key-value for the given peek and peek type.
676    fn pack_peek_duration_update(
677        &mut self,
678        peek_type: PeekType,
679        bucket: u128,
680    ) -> (&RowRef, &RowRef) {
681        self.peek_duration_packer.pack_slice(&[
682            Datum::UInt64(u64::cast_from(self.worker_id)),
683            Datum::String(peek_type.name()),
684            Datum::UInt64(bucket.try_into().expect("bucket too big")),
685        ])
686    }
687
688    /// Pack a peek update key-value for the given peek and peek type.
689    fn pack_peek_update(
690        &mut self,
691        id: GlobalId,
692        time: Timestamp,
693        uuid: Uuid,
694        peek_type: PeekType,
695    ) -> (&RowRef, &RowRef) {
696        self.peek_packer.pack_slice(&[
697            Datum::Uuid(uuid),
698            Datum::UInt64(u64::cast_from(self.worker_id)),
699            make_string_datum(id, &mut self.scratch_string_a),
700            Datum::String(peek_type.name()),
701            Datum::MzTimestamp(time),
702        ])
703    }
704
705    /// Pack a frontier update key-value for the given export ID and time.
706    fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
707        self.frontier_packer.pack_slice(&[
708            make_string_datum(export_id, &mut self.scratch_string_a),
709            Datum::UInt64(u64::cast_from(self.worker_id)),
710            Datum::MzTimestamp(time),
711        ])
712    }
713}
714
715/// State tracked for each dataflow export.
716struct ExportState {
717    /// The ID of the dataflow maintaining this export.
718    dataflow_index: usize,
719    /// Number of errors in this export.
720    ///
721    /// This must be a signed integer, since per-worker error counts can be negative, only the
722    /// cross-worker total has to sum up to a non-negative value.
723    error_count: Diff,
724    /// When this export was created.
725    created_at: Instant,
726    /// Whether the exported collection is hydrated.
727    hydration_time_ns: Option<u64>,
728    /// Hydration status of operators feeding this export.
729    operator_hydration: BTreeMap<LirId, bool>,
730}
731
732impl ExportState {
733    fn new(dataflow_index: usize) -> Self {
734        Self {
735            dataflow_index,
736            error_count: Diff::ZERO,
737            created_at: Instant::now(),
738            hydration_time_ns: None,
739            operator_hydration: BTreeMap::new(),
740        }
741    }
742}
743
744/// State for tracking arrangement sizes.
745#[derive(Default, Debug)]
746struct ArrangementSizeState {
747    size: isize,
748    capacity: isize,
749    count: isize,
750}
751
752/// Bundled output sessions used by the demux operator.
753struct DemuxOutput<'a, 'b> {
754    export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
755    frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
756    import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
757    peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
758    peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
759    arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
760    arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
761    arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
762    hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
763    operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764    error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765    lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766    dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767}
768
769/// Event handler of the demux operator.
770struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
771    /// State kept by the demux operator.
772    state: &'a mut DemuxState<A>,
773    /// State shared across log receivers.
774    shared_state: &'a mut SharedLoggingState,
775    /// Demux output sessions.
776    output: &'a mut DemuxOutput<'b, 'c>,
777    /// The logging interval specifying the time granularity for the updates.
778    logging_interval_ms: u128,
779    /// The current event time.
780    time: Duration,
781}
782
783impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
784    /// Return the timestamp associated with the current event, based on the event time and the
785    /// logging interval.
786    fn ts(&self) -> Timestamp {
787        let time_ms = self.time.as_millis();
788        let interval = self.logging_interval_ms;
789        let rounded = (time_ms / interval + 1) * interval;
790        rounded.try_into().expect("must fit")
791    }
792
793    /// Handle the given compute event.
794    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
795        use ComputeEventReference::*;
796        match event {
797            Export(export) => self.handle_export(export),
798            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
799            Peek(peek) if peek.installed => self.handle_peek_install(peek),
800            Peek(peek) => self.handle_peek_retire(peek),
801            Frontier(frontier) => self.handle_frontier(frontier),
802            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
803            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
804            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
805            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
806            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
807            ArrangementHeapSizeOperatorDrop(inner) => {
808                self.handle_arrangement_heap_size_operator_dropped(inner)
809            }
810            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
811            ErrorCount(error_count) => self.handle_error_count(error_count),
812            Hydration(hydration) => self.handle_hydration(hydration),
813            OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
814            LirMapping(mapping) => self.handle_lir_mapping(mapping),
815            DataflowGlobal(global) => self.handle_dataflow_global(global),
816        }
817    }
818
819    fn handle_export(
820        &mut self,
821        ExportReference {
822            export_id,
823            dataflow_index,
824        }: Ref<'_, Export>,
825    ) {
826        let export_id = Columnar::into_owned(export_id);
827        let ts = self.ts();
828        let datum = self.state.pack_export_update(export_id, dataflow_index);
829        self.output.export.give((datum, ts, Diff::ONE));
830
831        let existing = self
832            .state
833            .exports
834            .insert(export_id, ExportState::new(dataflow_index));
835        if existing.is_some() {
836            error!(%export_id, "export already registered");
837        }
838
839        // Insert hydration time logging for this export.
840        let datum = self.state.pack_hydration_time_update(export_id, None);
841        self.output.hydration_time.give((datum, ts, Diff::ONE));
842    }
843
844    fn handle_export_dropped(
845        &mut self,
846        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
847    ) {
848        let export_id = Columnar::into_owned(export_id);
849        let Some(export) = self.state.exports.remove(&export_id) else {
850            error!(%export_id, "missing exports entry at time of export drop");
851            return;
852        };
853
854        let ts = self.ts();
855        let dataflow_index = export.dataflow_index;
856
857        let datum = self.state.pack_export_update(export_id, dataflow_index);
858        self.output.export.give((datum, ts, Diff::MINUS_ONE));
859
860        // Remove error count logging for this export.
861        if export.error_count != Diff::ZERO {
862            let datum = self
863                .state
864                .pack_error_count_update(export_id, export.error_count);
865            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
866        }
867
868        // Remove hydration time logging for this export.
869        let datum = self
870            .state
871            .pack_hydration_time_update(export_id, export.hydration_time_ns);
872        self.output
873            .hydration_time
874            .give((datum, ts, Diff::MINUS_ONE));
875
876        // Remove operator hydration logging for this export.
877        for (lir_id, hydrated) in export.operator_hydration {
878            let datum = self
879                .state
880                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
881            self.output
882                .operator_hydration_status
883                .give((datum, ts, Diff::MINUS_ONE));
884        }
885    }
886
887    fn handle_dataflow_shutdown(
888        &mut self,
889        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
890    ) {
891        let ts = self.ts();
892
893        // We deal with any `GlobalId` based mappings in this event.
894        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
895            for global_id in global_ids {
896                // Remove dataflow/`GlobalID` mapping.
897                let datum = self
898                    .state
899                    .pack_dataflow_global_update(dataflow_index, global_id);
900                self.output
901                    .dataflow_global_ids
902                    .give((datum, ts, Diff::MINUS_ONE));
903
904                // Remove LIR mapping.
905                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
906                    for (
907                        lir_id,
908                        LirMetadata {
909                            operator,
910                            parent_lir_id,
911                            nesting,
912                            operator_span,
913                        },
914                    ) in mappings
915                    {
916                        let datum = self.state.pack_lir_mapping_update(
917                            global_id,
918                            lir_id,
919                            operator,
920                            parent_lir_id,
921                            nesting,
922                            operator_span,
923                        );
924                        self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
925                    }
926                }
927            }
928        }
929    }
930
931    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
932        let ts = self.ts();
933        let export_id = Columnar::into_owned(export_id);
934
935        let Some(export) = self.state.exports.get_mut(&export_id) else {
936            // The export might have already been dropped, in which case we are no longer
937            // interested in its errors.
938            return;
939        };
940
941        let old_count = export.error_count;
942        let new_count = old_count + diff;
943        export.error_count = new_count;
944
945        if old_count != Diff::ZERO {
946            let datum = self.state.pack_error_count_update(export_id, old_count);
947            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
948        }
949        if new_count != Diff::ZERO {
950            let datum = self.state.pack_error_count_update(export_id, new_count);
951            self.output.error_count.give((datum, ts, Diff::ONE));
952        }
953    }
954
955    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
956        let ts = self.ts();
957        let export_id = Columnar::into_owned(export_id);
958
959        let Some(export) = self.state.exports.get_mut(&export_id) else {
960            error!(%export_id, "hydration event for unknown export");
961            return;
962        };
963        if export.hydration_time_ns.is_some() {
964            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
965            // after reconciliation. We can simply ignore these.
966            return;
967        }
968
969        let duration = export.created_at.elapsed();
970        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
971        export.hydration_time_ns = Some(nanos);
972
973        let retraction = self.state.pack_hydration_time_update(export_id, None);
974        self.output
975            .hydration_time
976            .give((retraction, ts, Diff::MINUS_ONE));
977        let insertion = self
978            .state
979            .pack_hydration_time_update(export_id, Some(nanos));
980        self.output.hydration_time.give((insertion, ts, Diff::ONE));
981    }
982
983    fn handle_operator_hydration(
984        &mut self,
985        OperatorHydrationReference {
986            export_id,
987            lir_id,
988            hydrated,
989        }: Ref<'_, OperatorHydration>,
990    ) {
991        let ts = self.ts();
992        let export_id = Columnar::into_owned(export_id);
993        let lir_id = Columnar::into_owned(lir_id);
994        let hydrated = Columnar::into_owned(hydrated);
995
996        let Some(export) = self.state.exports.get_mut(&export_id) else {
997            // The export might have already been dropped, in which case we are no longer
998            // interested in its operator hydration events.
999            return;
1000        };
1001
1002        let old_status = export.operator_hydration.get(&lir_id).copied();
1003        export.operator_hydration.insert(lir_id, hydrated);
1004
1005        if let Some(hydrated) = old_status {
1006            let retraction = self
1007                .state
1008                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1009            self.output
1010                .operator_hydration_status
1011                .give((retraction, ts, Diff::MINUS_ONE));
1012        }
1013
1014        let insertion = self
1015            .state
1016            .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1017        self.output
1018            .operator_hydration_status
1019            .give((insertion, ts, Diff::ONE));
1020    }
1021
1022    fn handle_peek_install(
1023        &mut self,
1024        PeekEventReference {
1025            id,
1026            time,
1027            uuid,
1028            peek_type,
1029            installed: _,
1030        }: Ref<'_, PeekEvent>,
1031    ) {
1032        let id = Columnar::into_owned(id);
1033        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1034        let ts = self.ts();
1035        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1036        self.output.peek.give((datum, ts, Diff::ONE));
1037
1038        let existing = self.state.peek_stash.insert(uuid, self.time);
1039        if existing.is_some() {
1040            error!(%uuid, "peek already registered");
1041        }
1042    }
1043
1044    fn handle_peek_retire(
1045        &mut self,
1046        PeekEventReference {
1047            id,
1048            time,
1049            uuid,
1050            peek_type,
1051            installed: _,
1052        }: Ref<'_, PeekEvent>,
1053    ) {
1054        let id = Columnar::into_owned(id);
1055        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1056        let ts = self.ts();
1057        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1058        self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1059
1060        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1061            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1062            let bucket = elapsed_ns.next_power_of_two();
1063            let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1064            self.output.peek_duration.give((datum, ts, Diff::ONE));
1065        } else {
1066            error!(%uuid, "peek not yet registered");
1067        }
1068    }
1069
1070    fn handle_frontier(
1071        &mut self,
1072        FrontierReference {
1073            export_id,
1074            time,
1075            diff,
1076        }: Ref<'_, Frontier>,
1077    ) {
1078        let export_id = Columnar::into_owned(export_id);
1079        let diff = Diff::from(*diff);
1080        let ts = self.ts();
1081        let time = Columnar::into_owned(time);
1082        let datum = self.state.pack_frontier_update(export_id, time);
1083        self.output.frontier.give((datum, ts, diff));
1084    }
1085
1086    fn handle_import_frontier(
1087        &mut self,
1088        ImportFrontierReference {
1089            import_id,
1090            export_id,
1091            time,
1092            diff,
1093        }: Ref<'_, ImportFrontier>,
1094    ) {
1095        let import_id = Columnar::into_owned(import_id);
1096        let export_id = Columnar::into_owned(export_id);
1097        let diff = Diff::from(*diff);
1098        let ts = self.ts();
1099        let time = Columnar::into_owned(time);
1100        let datum = self
1101            .state
1102            .pack_import_frontier_update(export_id, import_id, time);
1103        self.output.import_frontier.give((datum, ts, diff));
1104    }
1105
1106    /// Update the allocation size for an arrangement.
1107    fn handle_arrangement_heap_size(
1108        &mut self,
1109        ArrangementHeapSizeReference {
1110            operator_id,
1111            delta_size,
1112        }: Ref<'_, ArrangementHeapSize>,
1113    ) {
1114        let ts = self.ts();
1115        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1116            return;
1117        };
1118
1119        state.size += delta_size;
1120
1121        let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1122        let diff = Diff::cast_from(delta_size);
1123        self.output.arrangement_heap_size.give((datum, ts, diff));
1124    }
1125
1126    /// Update the allocation capacity for an arrangement.
1127    fn handle_arrangement_heap_capacity(
1128        &mut self,
1129        ArrangementHeapCapacityReference {
1130            operator_id,
1131            delta_capacity,
1132        }: Ref<'_, ArrangementHeapCapacity>,
1133    ) {
1134        let ts = self.ts();
1135        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1136            return;
1137        };
1138
1139        state.capacity += delta_capacity;
1140
1141        let datum = self
1142            .state
1143            .pack_arrangement_heap_capacity_update(operator_id);
1144        let diff = Diff::cast_from(delta_capacity);
1145        self.output
1146            .arrangement_heap_capacity
1147            .give((datum, ts, diff));
1148    }
1149
1150    /// Update the allocation count for an arrangement.
1151    fn handle_arrangement_heap_allocations(
1152        &mut self,
1153        ArrangementHeapAllocationsReference {
1154            operator_id,
1155            delta_allocations,
1156        }: Ref<'_, ArrangementHeapAllocations>,
1157    ) {
1158        let ts = self.ts();
1159        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1160            return;
1161        };
1162
1163        state.count += delta_allocations;
1164
1165        let datum = self
1166            .state
1167            .pack_arrangement_heap_allocations_update(operator_id);
1168        let diff = Diff::cast_from(delta_allocations);
1169        self.output
1170            .arrangement_heap_allocations
1171            .give((datum, ts, diff));
1172    }
1173
1174    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1175    fn handle_arrangement_heap_size_operator(
1176        &mut self,
1177        ArrangementHeapSizeOperatorReference {
1178            operator_id,
1179            address,
1180        }: Ref<'_, ArrangementHeapSizeOperator>,
1181    ) {
1182        let activator = self
1183            .state
1184            .scheduler
1185            .activator_for(address.into_iter().collect());
1186        let existing = self
1187            .state
1188            .arrangement_size
1189            .insert(operator_id, Default::default());
1190        if existing.is_some() {
1191            error!(%operator_id, "arrangement size operator already registered");
1192        }
1193        let existing = self
1194            .shared_state
1195            .arrangement_size_activators
1196            .insert(operator_id, activator);
1197        if existing.is_some() {
1198            error!(%operator_id, "arrangement size activator already registered");
1199        }
1200    }
1201
1202    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1203    fn handle_arrangement_heap_size_operator_dropped(
1204        &mut self,
1205        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1206    ) {
1207        let operator_id = event.operator_id;
1208        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1209            let ts = self.ts();
1210            let allocations = self
1211                .state
1212                .pack_arrangement_heap_allocations_update(operator_id);
1213            let diff = -Diff::cast_from(state.count);
1214            self.output
1215                .arrangement_heap_allocations
1216                .give((allocations, ts, diff));
1217
1218            let capacity = self
1219                .state
1220                .pack_arrangement_heap_capacity_update(operator_id);
1221            let diff = -Diff::cast_from(state.capacity);
1222            self.output
1223                .arrangement_heap_capacity
1224                .give((capacity, ts, diff));
1225
1226            let size = self.state.pack_arrangement_heap_size_update(operator_id);
1227            let diff = -Diff::cast_from(state.size);
1228            self.output.arrangement_heap_size.give((size, ts, diff));
1229        }
1230        self.shared_state
1231            .arrangement_size_activators
1232            .remove(&operator_id);
1233    }
1234
1235    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1236    fn handle_lir_mapping(
1237        &mut self,
1238        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1239    ) {
1240        let global_id = Columnar::into_owned(global_id);
1241        // record the state (for the later drop)
1242        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1243        self.state
1244            .lir_mapping
1245            .entry(global_id)
1246            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1247            .or_insert_with(|| mappings().collect());
1248
1249        // send the datum out
1250        let ts = self.ts();
1251        for (lir_id, meta) in mapping.into_iter() {
1252            let datum = self.state.pack_lir_mapping_update(
1253                global_id,
1254                Columnar::into_owned(lir_id),
1255                Columnar::into_owned(meta.operator),
1256                Columnar::into_owned(meta.parent_lir_id),
1257                Columnar::into_owned(meta.nesting),
1258                Columnar::into_owned(meta.operator_span),
1259            );
1260            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1261        }
1262    }
1263
1264    fn handle_dataflow_global(
1265        &mut self,
1266        DataflowGlobalReference {
1267            dataflow_index,
1268            global_id,
1269        }: Ref<'_, DataflowGlobal>,
1270    ) {
1271        let global_id = Columnar::into_owned(global_id);
1272        self.state
1273            .dataflow_global_ids
1274            .entry(dataflow_index)
1275            .and_modify(|globals| {
1276                // NB BTreeSet::insert() returns `false` when the element was already in the set
1277                if !globals.insert(global_id) {
1278                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1279                }
1280            })
1281            .or_insert_with(|| BTreeSet::from([global_id]));
1282
1283        let ts = self.ts();
1284        let datum = self
1285            .state
1286            .pack_dataflow_global_update(dataflow_index, global_id);
1287        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1288    }
1289}
1290
1291/// Logging state maintained for a compute collection.
1292///
1293/// This type is used to produce appropriate log events in response to changes of logged collection
1294/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1295pub struct CollectionLogging {
1296    export_id: GlobalId,
1297    logger: Logger,
1298
1299    logged_frontier: Option<Timestamp>,
1300    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1301}
1302
1303impl CollectionLogging {
1304    /// Create new logging state for the identified collection and emit initial logging events.
1305    pub fn new(
1306        export_id: GlobalId,
1307        logger: Logger,
1308        dataflow_index: usize,
1309        import_ids: impl Iterator<Item = GlobalId>,
1310    ) -> Self {
1311        logger.log(&ComputeEvent::Export(Export {
1312            export_id,
1313            dataflow_index,
1314        }));
1315
1316        let mut self_ = Self {
1317            export_id,
1318            logger,
1319            logged_frontier: None,
1320            logged_import_frontiers: Default::default(),
1321        };
1322
1323        // Initialize frontier logging.
1324        let initial_frontier = Some(Timestamp::MIN);
1325        self_.set_frontier(initial_frontier);
1326        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1327
1328        self_
1329    }
1330
1331    /// Set the collection frontier to the given new time and emit corresponding logging events.
1332    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1333        let old_time = self.logged_frontier;
1334        self.logged_frontier = new_time;
1335
1336        if old_time != new_time {
1337            let export_id = self.export_id;
1338            let retraction = old_time.map(|time| {
1339                ComputeEvent::Frontier(Frontier {
1340                    export_id,
1341                    time,
1342                    diff: -1,
1343                })
1344            });
1345            let insertion = new_time.map(|time| {
1346                ComputeEvent::Frontier(Frontier {
1347                    export_id,
1348                    time,
1349                    diff: 1,
1350                })
1351            });
1352            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1353            self.logger.log_many(events);
1354        }
1355    }
1356
1357    /// Set the frontier of the given import to the given new time and emit corresponding logging
1358    /// events.
1359    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1360        let old_time = self.logged_import_frontiers.remove(&import_id);
1361        if let Some(time) = new_time {
1362            self.logged_import_frontiers.insert(import_id, time);
1363        }
1364
1365        if old_time != new_time {
1366            let export_id = self.export_id;
1367            let retraction = old_time.map(|time| {
1368                ComputeEvent::ImportFrontier(ImportFrontier {
1369                    import_id,
1370                    export_id,
1371                    time,
1372                    diff: -1,
1373                })
1374            });
1375            let insertion = new_time.map(|time| {
1376                ComputeEvent::ImportFrontier(ImportFrontier {
1377                    import_id,
1378                    export_id,
1379                    time,
1380                    diff: 1,
1381                })
1382            });
1383            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1384            self.logger.log_many(events);
1385        }
1386    }
1387
1388    /// Set the collection as hydrated.
1389    pub fn set_hydrated(&self) {
1390        self.logger.log(&ComputeEvent::Hydration(Hydration {
1391            export_id: self.export_id,
1392        }));
1393    }
1394
1395    /// The export global ID of this collection.
1396    pub fn export_id(&self) -> GlobalId {
1397        self.export_id
1398    }
1399}
1400
1401impl Drop for CollectionLogging {
1402    fn drop(&mut self) {
1403        // Emit retraction events to clean up events previously logged.
1404        self.set_frontier(None);
1405
1406        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1407        for import_id in import_ids {
1408            self.set_import_frontier(import_id, None);
1409        }
1410
1411        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1412            export_id: self.export_id,
1413        }));
1414    }
1415}
1416
1417/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1418/// batch streams.
1419pub(crate) trait LogDataflowErrors {
1420    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1421}
1422
1423impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1424where
1425    G: Scope,
1426    D: Data,
1427{
1428    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1429        self.inner
1430            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1431                move |input, output| {
1432                    input.for_each(|cap, data| {
1433                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1434                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1435
1436                        output.session(&cap).give_container(data);
1437                    });
1438                }
1439            })
1440            .as_collection()
1441    }
1442}
1443
1444impl<G, B> LogDataflowErrors for Stream<G, B>
1445where
1446    G: Scope,
1447    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1448{
1449    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1450        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1451            move |input, output| {
1452                input.for_each(|cap, data| {
1453                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1454                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1455
1456                    output.session(&cap).give_container(data);
1457                });
1458            }
1459        })
1460    }
1461}
1462
1463/// Return the sum of all diffs within the given batch.
1464///
1465/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1466/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1467/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1468/// batches might become large.
1469fn sum_batch_diffs<B>(batch: &B) -> Diff
1470where
1471    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1472{
1473    let mut sum = Diff::ZERO;
1474    let mut cursor = batch.cursor();
1475
1476    while cursor.key_valid(batch) {
1477        while cursor.val_valid(batch) {
1478            cursor.map_times(batch, |_t, r| sum += r);
1479            cursor.step_val(batch);
1480        }
1481        cursor.step_key(batch);
1482    }
1483
1484    sum
1485}
1486
1487#[cfg(test)]
1488mod tests {
1489    use super::*;
1490
1491    #[mz_ore::test]
1492    fn test_compute_event_size() {
1493        // This could be a static assertion, but we don't use those yet in this crate.
1494        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1495    }
1496}