Skip to main content

mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::Operator;
30use timely::dataflow::operators::generic::OutputBuilder;
31use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
32use timely::dataflow::operators::generic::operator::empty;
33use timely::dataflow::{Scope, StreamVec};
34use timely::scheduling::Scheduler;
35use tracing::error;
36use uuid::Uuid;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::logging::{
40    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
41    SharedLoggingState, Update,
42};
43use crate::row_spine::RowRowBuilder;
44use crate::typedefs::RowRowSpine;
45
46/// Type alias for a logger of compute events.
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
48pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
49
50/// A dataflow exports a global ID.
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
52pub struct Export {
53    /// Identifier of the export.
54    pub export_id: GlobalId,
55    /// Timely worker index of the exporting dataflow.
56    pub dataflow_index: usize,
57}
58
59/// The export for a global id was dropped.
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
61pub struct ExportDropped {
62    /// Identifier of the export.
63    pub export_id: GlobalId,
64}
65
66/// A peek event with a [`PeekType`], and an installation status.
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
68pub struct PeekEvent {
69    /// The identifier of the view the peek targets.
70    pub id: GlobalId,
71    /// The logical timestamp requested.
72    pub time: Timestamp,
73    /// The ID of the peek.
74    pub uuid: uuid::Bytes,
75    /// The relevant _type_ of peek: index or persist.
76    // Note that this is not stored on the Peek event for data-packing reasons only.
77    pub peek_type: PeekType,
78    /// True if the peek is being installed; false if it's being removed.
79    pub installed: bool,
80}
81
82/// Frontier change event.
83#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
84pub struct Frontier {
85    pub export_id: GlobalId,
86    pub time: Timestamp,
87    pub diff: i8,
88}
89
90/// An import frontier change.
91#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
92pub struct ImportFrontier {
93    pub import_id: GlobalId,
94    pub export_id: GlobalId,
95    pub time: Timestamp,
96    pub diff: i8,
97}
98
99/// A change in an arrangement's heap size.
100#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
101pub struct ArrangementHeapSize {
102    /// Operator index
103    pub operator_id: usize,
104    /// Delta of the heap size in bytes of the arrangement.
105    pub delta_size: isize,
106}
107
108/// A change in an arrangement's heap capacity.
109#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
110pub struct ArrangementHeapCapacity {
111    /// Operator index
112    pub operator_id: usize,
113    /// Delta of the heap capacity in bytes of the arrangement.
114    pub delta_capacity: isize,
115}
116
117/// A change in an arrangement's heap allocation count.
118#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
119pub struct ArrangementHeapAllocations {
120    /// Operator index
121    pub operator_id: usize,
122    /// Delta of distinct heap allocations backing the arrangement.
123    pub delta_allocations: isize,
124}
125
126/// Announcing an operator that manages an arrangement.
127#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
128pub struct ArrangementHeapSizeOperator {
129    /// Operator index
130    pub operator_id: usize,
131    /// The address of the operator.
132    pub address: Vec<usize>,
133}
134
135/// Drop event for an operator managing an arrangement.
136#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
137pub struct ArrangementHeapSizeOperatorDrop {
138    /// Operator index
139    pub operator_id: usize,
140}
141
142/// Dataflow shutdown event.
143#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
144pub struct DataflowShutdown {
145    /// Timely worker index of the dataflow.
146    pub dataflow_index: usize,
147}
148
149/// Error count update event.
150#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
151pub struct ErrorCount {
152    /// Identifier of the export.
153    pub export_id: GlobalId,
154    /// The change in error count.
155    pub diff: Diff,
156}
157
158/// An export is hydrated.
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
160pub struct Hydration {
161    /// Identifier of the export.
162    pub export_id: GlobalId,
163}
164
165/// An operator's hydration status changed.
166#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
167pub struct OperatorHydration {
168    /// Identifier of the export.
169    pub export_id: GlobalId,
170    /// Identifier of the operator's LIR node.
171    pub lir_id: LirId,
172    /// Whether the operator is hydrated.
173    pub hydrated: bool,
174}
175
176/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
178pub struct LirMapping {
179    /// The `GlobalId` in which the LIR operator is rendered.
180    ///
181    /// NB a single a dataflow may have many `GlobalId`s inside it.
182    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
183    /// tracks the many-to-one relationship between `GlobalId`s and
184    /// dataflows.
185    pub global_id: GlobalId,
186    /// The actual mapping.
187    /// Represented this way to reduce the size of `ComputeEvent`.
188    pub mapping: Vec<(LirId, LirMetadata)>,
189}
190
191/// Announce that a dataflow supports a specific global ID.
192#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
193pub struct DataflowGlobal {
194    /// The identifier of the dataflow.
195    pub dataflow_index: usize,
196    /// A `GlobalId` that is rendered as part of this dataflow.
197    pub global_id: GlobalId,
198}
199
200/// A logged compute event.
201#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
202pub enum ComputeEvent {
203    /// A dataflow export was created.
204    Export(Export),
205    /// A dataflow export was dropped.
206    ExportDropped(ExportDropped),
207    /// Peek command.
208    Peek(PeekEvent),
209    /// Available frontier information for dataflow exports.
210    Frontier(Frontier),
211    /// Available frontier information for dataflow imports.
212    ImportFrontier(ImportFrontier),
213    /// Arrangement heap size update
214    ArrangementHeapSize(ArrangementHeapSize),
215    /// Arrangement heap size update
216    ArrangementHeapCapacity(ArrangementHeapCapacity),
217    /// Arrangement heap size update
218    ArrangementHeapAllocations(ArrangementHeapAllocations),
219    /// Arrangement size operator address
220    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
221    /// Arrangement size operator dropped
222    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
223    /// All operators of a dataflow have shut down.
224    DataflowShutdown(DataflowShutdown),
225    /// The number of errors in a dataflow export has changed.
226    ErrorCount(ErrorCount),
227    /// A dataflow export was hydrated.
228    Hydration(Hydration),
229    /// A dataflow operator's hydration status changed.
230    OperatorHydration(OperatorHydration),
231    /// An LIR operator was mapped to some particular dataflow operator.
232    ///
233    /// Cf. `ComputeLog::LirMaping`
234    LirMapping(LirMapping),
235    DataflowGlobal(DataflowGlobal),
236}
237
238/// A peek type distinguishing between index and persist peeks.
239#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
240pub enum PeekType {
241    /// A peek against an index.
242    Index,
243    /// A peek against persist.
244    Persist,
245}
246
247impl PeekType {
248    /// A human-readable name for a peek type.
249    fn name(self) -> &'static str {
250        match self {
251            PeekType::Index => "index",
252            PeekType::Persist => "persist",
253        }
254    }
255}
256
257/// Metadata for LIR operators.
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
259pub struct LirMetadata {
260    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
261    operator: String,
262    /// The LIR identifier of the parent (if any).
263    parent_lir_id: Option<LirId>,
264    /// How nested the operator is (for nice indentation).
265    nesting: u8,
266    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
267    /// If `start == end`, then no operators were used.
268    operator_span: (usize, usize),
269}
270
271impl LirMetadata {
272    /// Construct a new LIR metadata object.
273    pub fn new(
274        operator: String,
275        parent_lir_id: Option<LirId>,
276        nesting: u8,
277        operator_span: (usize, usize),
278    ) -> Self {
279        Self {
280            operator,
281            parent_lir_id,
282            nesting,
283            operator_span,
284        }
285    }
286}
287
288/// The return type of the [`construct`] function.
289pub(super) struct Return {
290    /// Collections returned by [`construct`].
291    pub collections: BTreeMap<LogVariant, LogCollection>,
292}
293
294/// Constructs the logging dataflow fragment for compute logs.
295///
296/// Params
297/// * `scope`: The Timely scope hosting the log analysis dataflow.
298/// * `scheduler`: The timely scheduler to obtainer activators.
299/// * `config`: Logging configuration.
300/// * `event_queue`: The source to read compute log events from.
301/// * `compute_event_streams`: Additional compute event streams to absorb.
302/// * `shared_state`: Shared state between logging dataflow fragments.
303pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304    mut scope: G,
305    scheduler: S,
306    config: &mz_compute_client::logging::LoggingConfig,
307    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308    shared_state: Rc<RefCell<SharedLoggingState>>,
309) -> Return {
310    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
311
312    scope.scoped("compute logging", move |scope| {
313        let enable_logging = config.enable_logging;
314        let (logs, token) = if enable_logging {
315            event_queue.links.mz_replay(
316                scope,
317                "compute logs",
318                config.interval,
319                event_queue.activator,
320            )
321        } else {
322            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
323            (empty(scope), token)
324        };
325
326        // Build a demux operator that splits the replayed event stream up into the separate
327        // logging streams.
328        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
329        let mut input = demux.new_input(logs, Pipeline);
330        let (export_out, export) = demux.new_output();
331        let mut export_out = OutputBuilder::from(export_out);
332        let (frontier_out, frontier) = demux.new_output();
333        let mut frontier_out = OutputBuilder::from(frontier_out);
334        let (import_frontier_out, import_frontier) = demux.new_output();
335        let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
336        let (peek_out, peek) = demux.new_output();
337        let mut peek_out = OutputBuilder::from(peek_out);
338        let (peek_duration_out, peek_duration) = demux.new_output();
339        let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
340        let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
341        let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
342        let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
343        let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
344        let (arrangement_heap_allocations_out, arrangement_heap_allocations) = demux.new_output();
345        let mut arrangement_heap_allocations_out =
346            OutputBuilder::from(arrangement_heap_allocations_out);
347        let (error_count_out, error_count) = demux.new_output();
348        let mut error_count_out = OutputBuilder::from(error_count_out);
349        let (hydration_time_out, hydration_time) = demux.new_output();
350        let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
351        let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
352        let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
353        let (lir_mapping_out, lir_mapping) = demux.new_output();
354        let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
355        let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
356        let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
357
358        let mut demux_state = DemuxState::new(scheduler, scope.index());
359        demux.build(move |_capability| {
360            move |_frontiers| {
361                let mut export = export_out.activate();
362                let mut frontier = frontier_out.activate();
363                let mut import_frontier = import_frontier_out.activate();
364                let mut peek = peek_out.activate();
365                let mut peek_duration = peek_duration_out.activate();
366                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
367                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
368                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
369                let mut error_count = error_count_out.activate();
370                let mut hydration_time = hydration_time_out.activate();
371                let mut operator_hydration_status = operator_hydration_status_out.activate();
372                let mut lir_mapping = lir_mapping_out.activate();
373                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
374
375                input.for_each(|cap, data| {
376                    let mut output_sessions = DemuxOutput {
377                        export: export.session_with_builder(&cap),
378                        frontier: frontier.session_with_builder(&cap),
379                        import_frontier: import_frontier.session_with_builder(&cap),
380                        peek: peek.session_with_builder(&cap),
381                        peek_duration: peek_duration.session_with_builder(&cap),
382                        arrangement_heap_allocations: arrangement_heap_allocations
383                            .session_with_builder(&cap),
384                        arrangement_heap_capacity: arrangement_heap_capacity
385                            .session_with_builder(&cap),
386                        arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
387                        error_count: error_count.session_with_builder(&cap),
388                        hydration_time: hydration_time.session_with_builder(&cap),
389                        operator_hydration_status: operator_hydration_status
390                            .session_with_builder(&cap),
391                        lir_mapping: lir_mapping.session_with_builder(&cap),
392                        dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
393                    };
394
395                    let shared_state = &mut shared_state.borrow_mut();
396                    for (time, event) in data.borrow().into_index_iter() {
397                        DemuxHandler {
398                            state: &mut demux_state,
399                            shared_state,
400                            output: &mut output_sessions,
401                            logging_interval_ms,
402                            time,
403                        }
404                        .handle(event);
405                    }
406                });
407            }
408        });
409
410        use ComputeLog::*;
411        let logs = [
412            (ArrangementHeapAllocations, arrangement_heap_allocations),
413            (ArrangementHeapCapacity, arrangement_heap_capacity),
414            (ArrangementHeapSize, arrangement_heap_size),
415            (DataflowCurrent, export),
416            (DataflowGlobal, dataflow_global_ids),
417            (ErrorCount, error_count),
418            (FrontierCurrent, frontier),
419            (HydrationTime, hydration_time),
420            (ImportFrontierCurrent, import_frontier),
421            (LirMapping, lir_mapping),
422            (OperatorHydrationStatus, operator_hydration_status),
423            (PeekCurrent, peek),
424            (PeekDuration, peek_duration),
425        ];
426
427        // Build the output arrangements.
428        let mut collections = BTreeMap::new();
429        for (variant, stream) in logs {
430            let variant = LogVariant::Compute(variant);
431            if config.index_logs.contains_key(&variant) {
432                let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
433                    columnar_exchange::<Row, Row, Timestamp, Diff>,
434                );
435                let trace = stream
436                    .mz_arrange_core::<
437                        _,
438                        Col2ValBatcher<_, _, _, _>,
439                        RowRowBuilder<_, _>,
440                        RowRowSpine<_, _>,
441                    >(exchange, &format!("Arrange {variant:?}"))
442                    .trace;
443                let collection = LogCollection {
444                    trace,
445                    token: Rc::clone(&token),
446                };
447                collections.insert(variant, collection);
448            }
449        }
450
451        Return { collections }
452    })
453}
454
455/// Format the given value and pack it into a `Datum::String`.
456///
457/// The `scratch` buffer is used to perform the string conversion without an allocation.
458/// Callers should not assume anything about the contents of this buffer after this function
459/// returns.
460fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
461where
462    V: Display,
463{
464    scratch.clear();
465    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
466    Datum::String(scratch)
467}
468
469/// State maintained by the demux operator.
470struct DemuxState<A> {
471    /// The timely scheduler.
472    scheduler: A,
473    /// The index of this worker.
474    worker_id: usize,
475    /// A reusable scratch string for formatting IDs.
476    scratch_string_a: String,
477    /// A reusable scratch string for formatting IDs.
478    scratch_string_b: String,
479    /// State tracked per dataflow export.
480    exports: BTreeMap<GlobalId, ExportState>,
481    /// Maps pending peeks to their installation time.
482    peek_stash: BTreeMap<Uuid, Duration>,
483    /// Arrangement size stash.
484    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
485    /// LIR -> operator span mapping.
486    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
487    /// Dataflow -> `GlobalId` mapping (many-to-one).
488    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
489    /// A row packer for the arrangement heap allocations output.
490    arrangement_heap_allocations_packer: PermutedRowPacker,
491    /// A row packer for the arrangement heap capacity output.
492    arrangement_heap_capacity_packer: PermutedRowPacker,
493    /// A row packer for the arrangement heap size output.
494    arrangement_heap_size_packer: PermutedRowPacker,
495    /// A row packer for the dataflow global output.
496    dataflow_global_packer: PermutedRowPacker,
497    /// A row packer for the error count output.
498    error_count_packer: PermutedRowPacker,
499    /// A row packer for the exports output.
500    export_packer: PermutedRowPacker,
501    /// A row packer for the frontier output.
502    frontier_packer: PermutedRowPacker,
503    /// A row packer for the exports output.
504    import_frontier_packer: PermutedRowPacker,
505    /// A row packer for the LIR mapping output.
506    lir_mapping_packer: PermutedRowPacker,
507    /// A row packer for the operator hydration status output.
508    operator_hydration_status_packer: PermutedRowPacker,
509    /// A row packer for the peek durations output.
510    peek_duration_packer: PermutedRowPacker,
511    /// A row packer for the peek output.
512    peek_packer: PermutedRowPacker,
513    /// A row packer for the hydration time output.
514    hydration_time_packer: PermutedRowPacker,
515}
516
517impl<A: Scheduler> DemuxState<A> {
518    fn new(scheduler: A, worker_id: usize) -> Self {
519        Self {
520            scheduler,
521            worker_id,
522            scratch_string_a: String::new(),
523            scratch_string_b: String::new(),
524            exports: Default::default(),
525            peek_stash: Default::default(),
526            arrangement_size: Default::default(),
527            lir_mapping: Default::default(),
528            dataflow_global_ids: Default::default(),
529            arrangement_heap_allocations_packer: PermutedRowPacker::new(
530                ComputeLog::ArrangementHeapAllocations,
531            ),
532            arrangement_heap_capacity_packer: PermutedRowPacker::new(
533                ComputeLog::ArrangementHeapCapacity,
534            ),
535            arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
536            dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
537            error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
538            export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
539            frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
540            hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
541            import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
542            lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
543            operator_hydration_status_packer: PermutedRowPacker::new(
544                ComputeLog::OperatorHydrationStatus,
545            ),
546            peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
547            peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
548        }
549    }
550
551    /// Pack an arrangement heap allocations update key-value for the given operator.
552    fn pack_arrangement_heap_allocations_update(
553        &mut self,
554        operator_id: usize,
555    ) -> (&RowRef, &RowRef) {
556        self.arrangement_heap_allocations_packer.pack_slice(&[
557            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
558            Datum::UInt64(u64::cast_from(self.worker_id)),
559        ])
560    }
561
562    /// Pack an arrangement heap capacity update key-value for the given operator.
563    fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
564        self.arrangement_heap_capacity_packer.pack_slice(&[
565            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
566            Datum::UInt64(u64::cast_from(self.worker_id)),
567        ])
568    }
569
570    /// Pack an arrangement heap size update key-value for the given operator.
571    fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
572        self.arrangement_heap_size_packer.pack_slice(&[
573            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
574            Datum::UInt64(u64::cast_from(self.worker_id)),
575        ])
576    }
577
578    /// Pack a dataflow global update key-value for the given dataflow index and global ID.
579    fn pack_dataflow_global_update(
580        &mut self,
581        dataflow_index: usize,
582        global_id: GlobalId,
583    ) -> (&RowRef, &RowRef) {
584        self.dataflow_global_packer.pack_slice(&[
585            Datum::UInt64(u64::cast_from(dataflow_index)),
586            Datum::UInt64(u64::cast_from(self.worker_id)),
587            make_string_datum(global_id, &mut self.scratch_string_a),
588        ])
589    }
590
591    /// Pack an error count update key-value for the given export ID and count.
592    fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
593        // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
594        // per-worker error count might be negative and at the SQL level having negative multiplicities
595        // is treated as an error.
596        self.error_count_packer.pack_slice(&[
597            make_string_datum(export_id, &mut self.scratch_string_a),
598            Datum::UInt64(u64::cast_from(self.worker_id)),
599            Datum::Int64(count.into_inner()),
600        ])
601    }
602
603    /// Pack an export update key-value for the given export ID and dataflow index.
604    fn pack_export_update(
605        &mut self,
606        export_id: GlobalId,
607        dataflow_index: usize,
608    ) -> (&RowRef, &RowRef) {
609        self.export_packer.pack_slice(&[
610            make_string_datum(export_id, &mut self.scratch_string_a),
611            Datum::UInt64(u64::cast_from(self.worker_id)),
612            Datum::UInt64(u64::cast_from(dataflow_index)),
613        ])
614    }
615
616    /// Pack a hydration time update key-value for the given export ID and hydration time.
617    fn pack_hydration_time_update(
618        &mut self,
619        export_id: GlobalId,
620        time_ns: Option<u64>,
621    ) -> (&RowRef, &RowRef) {
622        self.hydration_time_packer.pack_slice(&[
623            make_string_datum(export_id, &mut self.scratch_string_a),
624            Datum::UInt64(u64::cast_from(self.worker_id)),
625            Datum::from(time_ns),
626        ])
627    }
628
629    /// Pack an import frontier update key-value for the given export ID and dataflow index.
630    fn pack_import_frontier_update(
631        &mut self,
632        export_id: GlobalId,
633        import_id: GlobalId,
634        time: Timestamp,
635    ) -> (&RowRef, &RowRef) {
636        self.import_frontier_packer.pack_slice(&[
637            make_string_datum(export_id, &mut self.scratch_string_a),
638            make_string_datum(import_id, &mut self.scratch_string_b),
639            Datum::UInt64(u64::cast_from(self.worker_id)),
640            Datum::MzTimestamp(time),
641        ])
642    }
643
644    /// Pack an LIR mapping update key-value for the given LIR operator metadata.
645    fn pack_lir_mapping_update(
646        &mut self,
647        global_id: GlobalId,
648        lir_id: LirId,
649        operator: String,
650        parent_lir_id: Option<LirId>,
651        nesting: u8,
652        operator_span: (usize, usize),
653    ) -> (&RowRef, &RowRef) {
654        self.lir_mapping_packer.pack_slice(&[
655            make_string_datum(global_id, &mut self.scratch_string_a),
656            Datum::UInt64(lir_id.into()),
657            Datum::UInt64(u64::cast_from(self.worker_id)),
658            make_string_datum(operator, &mut self.scratch_string_b),
659            parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
660            Datum::UInt16(u16::cast_from(nesting)),
661            Datum::UInt64(u64::cast_from(operator_span.0)),
662            Datum::UInt64(u64::cast_from(operator_span.1)),
663        ])
664    }
665
666    /// Pack an operator hydration status update key-value for the given export ID, LIR ID, and
667    /// hydration status.
668    fn pack_operator_hydration_status_update(
669        &mut self,
670        export_id: GlobalId,
671        lir_id: LirId,
672        hydrated: bool,
673    ) -> (&RowRef, &RowRef) {
674        self.operator_hydration_status_packer.pack_slice(&[
675            make_string_datum(export_id, &mut self.scratch_string_a),
676            Datum::UInt64(lir_id.into()),
677            Datum::UInt64(u64::cast_from(self.worker_id)),
678            Datum::from(hydrated),
679        ])
680    }
681
682    /// Pack a peek duration update key-value for the given peek and peek type.
683    fn pack_peek_duration_update(
684        &mut self,
685        peek_type: PeekType,
686        bucket: u128,
687    ) -> (&RowRef, &RowRef) {
688        self.peek_duration_packer.pack_slice(&[
689            Datum::UInt64(u64::cast_from(self.worker_id)),
690            Datum::String(peek_type.name()),
691            Datum::UInt64(bucket.try_into().expect("bucket too big")),
692        ])
693    }
694
695    /// Pack a peek update key-value for the given peek and peek type.
696    fn pack_peek_update(
697        &mut self,
698        id: GlobalId,
699        time: Timestamp,
700        uuid: Uuid,
701        peek_type: PeekType,
702    ) -> (&RowRef, &RowRef) {
703        self.peek_packer.pack_slice(&[
704            Datum::Uuid(uuid),
705            Datum::UInt64(u64::cast_from(self.worker_id)),
706            make_string_datum(id, &mut self.scratch_string_a),
707            Datum::String(peek_type.name()),
708            Datum::MzTimestamp(time),
709        ])
710    }
711
712    /// Pack a frontier update key-value for the given export ID and time.
713    fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
714        self.frontier_packer.pack_slice(&[
715            make_string_datum(export_id, &mut self.scratch_string_a),
716            Datum::UInt64(u64::cast_from(self.worker_id)),
717            Datum::MzTimestamp(time),
718        ])
719    }
720}
721
722/// State tracked for each dataflow export.
723struct ExportState {
724    /// The ID of the dataflow maintaining this export.
725    dataflow_index: usize,
726    /// Number of errors in this export.
727    ///
728    /// This must be a signed integer, since per-worker error counts can be negative, only the
729    /// cross-worker total has to sum up to a non-negative value.
730    error_count: Diff,
731    /// When this export was created.
732    created_at: Instant,
733    /// Whether the exported collection is hydrated.
734    hydration_time_ns: Option<u64>,
735    /// Hydration status of operators feeding this export.
736    operator_hydration: BTreeMap<LirId, bool>,
737}
738
739impl ExportState {
740    fn new(dataflow_index: usize) -> Self {
741        Self {
742            dataflow_index,
743            error_count: Diff::ZERO,
744            created_at: Instant::now(),
745            hydration_time_ns: None,
746            operator_hydration: BTreeMap::new(),
747        }
748    }
749}
750
751/// State for tracking arrangement sizes.
752#[derive(Default, Debug)]
753struct ArrangementSizeState {
754    size: isize,
755    capacity: isize,
756    count: isize,
757}
758
759/// Bundled output sessions used by the demux operator.
760struct DemuxOutput<'a, 'b> {
761    export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
762    frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
763    import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764    peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765    peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766    arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767    arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
768    arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
769    hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
770    operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
771    error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
772    lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
773    dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
774}
775
776/// Event handler of the demux operator.
777struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
778    /// State kept by the demux operator.
779    state: &'a mut DemuxState<A>,
780    /// State shared across log receivers.
781    shared_state: &'a mut SharedLoggingState,
782    /// Demux output sessions.
783    output: &'a mut DemuxOutput<'b, 'c>,
784    /// The logging interval specifying the time granularity for the updates.
785    logging_interval_ms: u128,
786    /// The current event time.
787    time: Duration,
788}
789
790impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
791    /// Return the timestamp associated with the current event, based on the event time and the
792    /// logging interval.
793    fn ts(&self) -> Timestamp {
794        let time_ms = self.time.as_millis();
795        let interval = self.logging_interval_ms;
796        let rounded = (time_ms / interval + 1) * interval;
797        rounded.try_into().expect("must fit")
798    }
799
800    /// Handle the given compute event.
801    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
802        use ComputeEventReference::*;
803        match event {
804            Export(export) => self.handle_export(export),
805            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
806            Peek(peek) if peek.installed => self.handle_peek_install(peek),
807            Peek(peek) => self.handle_peek_retire(peek),
808            Frontier(frontier) => self.handle_frontier(frontier),
809            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
810            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
811            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
812            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
813            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
814            ArrangementHeapSizeOperatorDrop(inner) => {
815                self.handle_arrangement_heap_size_operator_dropped(inner)
816            }
817            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
818            ErrorCount(error_count) => self.handle_error_count(error_count),
819            Hydration(hydration) => self.handle_hydration(hydration),
820            OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
821            LirMapping(mapping) => self.handle_lir_mapping(mapping),
822            DataflowGlobal(global) => self.handle_dataflow_global(global),
823        }
824    }
825
826    fn handle_export(
827        &mut self,
828        ExportReference {
829            export_id,
830            dataflow_index,
831        }: Ref<'_, Export>,
832    ) {
833        let export_id = Columnar::into_owned(export_id);
834        let ts = self.ts();
835        let datum = self.state.pack_export_update(export_id, dataflow_index);
836        self.output.export.give((datum, ts, Diff::ONE));
837
838        let existing = self
839            .state
840            .exports
841            .insert(export_id, ExportState::new(dataflow_index));
842        if existing.is_some() {
843            error!(%export_id, "export already registered");
844        }
845
846        // Insert hydration time logging for this export.
847        let datum = self.state.pack_hydration_time_update(export_id, None);
848        self.output.hydration_time.give((datum, ts, Diff::ONE));
849    }
850
851    fn handle_export_dropped(
852        &mut self,
853        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
854    ) {
855        let export_id = Columnar::into_owned(export_id);
856        let Some(export) = self.state.exports.remove(&export_id) else {
857            error!(%export_id, "missing exports entry at time of export drop");
858            return;
859        };
860
861        let ts = self.ts();
862        let dataflow_index = export.dataflow_index;
863
864        let datum = self.state.pack_export_update(export_id, dataflow_index);
865        self.output.export.give((datum, ts, Diff::MINUS_ONE));
866
867        // Remove error count logging for this export.
868        if export.error_count != Diff::ZERO {
869            let datum = self
870                .state
871                .pack_error_count_update(export_id, export.error_count);
872            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
873        }
874
875        // Remove hydration time logging for this export.
876        let datum = self
877            .state
878            .pack_hydration_time_update(export_id, export.hydration_time_ns);
879        self.output
880            .hydration_time
881            .give((datum, ts, Diff::MINUS_ONE));
882
883        // Remove operator hydration logging for this export.
884        for (lir_id, hydrated) in export.operator_hydration {
885            let datum = self
886                .state
887                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
888            self.output
889                .operator_hydration_status
890                .give((datum, ts, Diff::MINUS_ONE));
891        }
892    }
893
894    fn handle_dataflow_shutdown(
895        &mut self,
896        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
897    ) {
898        let ts = self.ts();
899
900        // We deal with any `GlobalId` based mappings in this event.
901        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
902            for global_id in global_ids {
903                // Remove dataflow/`GlobalID` mapping.
904                let datum = self
905                    .state
906                    .pack_dataflow_global_update(dataflow_index, global_id);
907                self.output
908                    .dataflow_global_ids
909                    .give((datum, ts, Diff::MINUS_ONE));
910
911                // Remove LIR mapping.
912                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
913                    for (
914                        lir_id,
915                        LirMetadata {
916                            operator,
917                            parent_lir_id,
918                            nesting,
919                            operator_span,
920                        },
921                    ) in mappings
922                    {
923                        let datum = self.state.pack_lir_mapping_update(
924                            global_id,
925                            lir_id,
926                            operator,
927                            parent_lir_id,
928                            nesting,
929                            operator_span,
930                        );
931                        self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
932                    }
933                }
934            }
935        }
936    }
937
938    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
939        let ts = self.ts();
940        let export_id = Columnar::into_owned(export_id);
941
942        let Some(export) = self.state.exports.get_mut(&export_id) else {
943            // The export might have already been dropped, in which case we are no longer
944            // interested in its errors.
945            return;
946        };
947
948        let old_count = export.error_count;
949        let new_count = old_count + diff;
950        export.error_count = new_count;
951
952        if old_count != Diff::ZERO {
953            let datum = self.state.pack_error_count_update(export_id, old_count);
954            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
955        }
956        if new_count != Diff::ZERO {
957            let datum = self.state.pack_error_count_update(export_id, new_count);
958            self.output.error_count.give((datum, ts, Diff::ONE));
959        }
960    }
961
962    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
963        let ts = self.ts();
964        let export_id = Columnar::into_owned(export_id);
965
966        let Some(export) = self.state.exports.get_mut(&export_id) else {
967            error!(%export_id, "hydration event for unknown export");
968            return;
969        };
970        if export.hydration_time_ns.is_some() {
971            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
972            // after reconciliation. We can simply ignore these.
973            return;
974        }
975
976        let duration = export.created_at.elapsed();
977        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
978        export.hydration_time_ns = Some(nanos);
979
980        let retraction = self.state.pack_hydration_time_update(export_id, None);
981        self.output
982            .hydration_time
983            .give((retraction, ts, Diff::MINUS_ONE));
984        let insertion = self
985            .state
986            .pack_hydration_time_update(export_id, Some(nanos));
987        self.output.hydration_time.give((insertion, ts, Diff::ONE));
988    }
989
990    fn handle_operator_hydration(
991        &mut self,
992        OperatorHydrationReference {
993            export_id,
994            lir_id,
995            hydrated,
996        }: Ref<'_, OperatorHydration>,
997    ) {
998        let ts = self.ts();
999        let export_id = Columnar::into_owned(export_id);
1000        let lir_id = Columnar::into_owned(lir_id);
1001        let hydrated = Columnar::into_owned(hydrated);
1002
1003        let Some(export) = self.state.exports.get_mut(&export_id) else {
1004            // The export might have already been dropped, in which case we are no longer
1005            // interested in its operator hydration events.
1006            return;
1007        };
1008
1009        let old_status = export.operator_hydration.get(&lir_id).copied();
1010        export.operator_hydration.insert(lir_id, hydrated);
1011
1012        if let Some(hydrated) = old_status {
1013            let retraction = self
1014                .state
1015                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1016            self.output
1017                .operator_hydration_status
1018                .give((retraction, ts, Diff::MINUS_ONE));
1019        }
1020
1021        let insertion = self
1022            .state
1023            .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1024        self.output
1025            .operator_hydration_status
1026            .give((insertion, ts, Diff::ONE));
1027    }
1028
1029    fn handle_peek_install(
1030        &mut self,
1031        PeekEventReference {
1032            id,
1033            time,
1034            uuid,
1035            peek_type,
1036            installed: _,
1037        }: Ref<'_, PeekEvent>,
1038    ) {
1039        let id = Columnar::into_owned(id);
1040        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1041        let ts = self.ts();
1042        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1043        self.output.peek.give((datum, ts, Diff::ONE));
1044
1045        let existing = self.state.peek_stash.insert(uuid, self.time);
1046        if existing.is_some() {
1047            error!(%uuid, "peek already registered");
1048        }
1049    }
1050
1051    fn handle_peek_retire(
1052        &mut self,
1053        PeekEventReference {
1054            id,
1055            time,
1056            uuid,
1057            peek_type,
1058            installed: _,
1059        }: Ref<'_, PeekEvent>,
1060    ) {
1061        let id = Columnar::into_owned(id);
1062        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1063        let ts = self.ts();
1064        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1065        self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1066
1067        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1068            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1069            let bucket = elapsed_ns.next_power_of_two();
1070            let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1071            self.output.peek_duration.give((datum, ts, Diff::ONE));
1072        } else {
1073            error!(%uuid, "peek not yet registered");
1074        }
1075    }
1076
1077    fn handle_frontier(
1078        &mut self,
1079        FrontierReference {
1080            export_id,
1081            time,
1082            diff,
1083        }: Ref<'_, Frontier>,
1084    ) {
1085        let export_id = Columnar::into_owned(export_id);
1086        let diff = Diff::from(*diff);
1087        let ts = self.ts();
1088        let time = Columnar::into_owned(time);
1089        let datum = self.state.pack_frontier_update(export_id, time);
1090        self.output.frontier.give((datum, ts, diff));
1091    }
1092
1093    fn handle_import_frontier(
1094        &mut self,
1095        ImportFrontierReference {
1096            import_id,
1097            export_id,
1098            time,
1099            diff,
1100        }: Ref<'_, ImportFrontier>,
1101    ) {
1102        let import_id = Columnar::into_owned(import_id);
1103        let export_id = Columnar::into_owned(export_id);
1104        let diff = Diff::from(*diff);
1105        let ts = self.ts();
1106        let time = Columnar::into_owned(time);
1107        let datum = self
1108            .state
1109            .pack_import_frontier_update(export_id, import_id, time);
1110        self.output.import_frontier.give((datum, ts, diff));
1111    }
1112
1113    /// Update the allocation size for an arrangement.
1114    fn handle_arrangement_heap_size(
1115        &mut self,
1116        ArrangementHeapSizeReference {
1117            operator_id,
1118            delta_size,
1119        }: Ref<'_, ArrangementHeapSize>,
1120    ) {
1121        let ts = self.ts();
1122        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1123            return;
1124        };
1125
1126        state.size += delta_size;
1127
1128        let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1129        let diff = Diff::cast_from(delta_size);
1130        self.output.arrangement_heap_size.give((datum, ts, diff));
1131    }
1132
1133    /// Update the allocation capacity for an arrangement.
1134    fn handle_arrangement_heap_capacity(
1135        &mut self,
1136        ArrangementHeapCapacityReference {
1137            operator_id,
1138            delta_capacity,
1139        }: Ref<'_, ArrangementHeapCapacity>,
1140    ) {
1141        let ts = self.ts();
1142        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1143            return;
1144        };
1145
1146        state.capacity += delta_capacity;
1147
1148        let datum = self
1149            .state
1150            .pack_arrangement_heap_capacity_update(operator_id);
1151        let diff = Diff::cast_from(delta_capacity);
1152        self.output
1153            .arrangement_heap_capacity
1154            .give((datum, ts, diff));
1155    }
1156
1157    /// Update the allocation count for an arrangement.
1158    fn handle_arrangement_heap_allocations(
1159        &mut self,
1160        ArrangementHeapAllocationsReference {
1161            operator_id,
1162            delta_allocations,
1163        }: Ref<'_, ArrangementHeapAllocations>,
1164    ) {
1165        let ts = self.ts();
1166        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1167            return;
1168        };
1169
1170        state.count += delta_allocations;
1171
1172        let datum = self
1173            .state
1174            .pack_arrangement_heap_allocations_update(operator_id);
1175        let diff = Diff::cast_from(delta_allocations);
1176        self.output
1177            .arrangement_heap_allocations
1178            .give((datum, ts, diff));
1179    }
1180
1181    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1182    fn handle_arrangement_heap_size_operator(
1183        &mut self,
1184        ArrangementHeapSizeOperatorReference {
1185            operator_id,
1186            address,
1187        }: Ref<'_, ArrangementHeapSizeOperator>,
1188    ) {
1189        let activator = self
1190            .state
1191            .scheduler
1192            .activator_for(address.into_iter().collect());
1193        let existing = self
1194            .state
1195            .arrangement_size
1196            .insert(operator_id, Default::default());
1197        if existing.is_some() {
1198            error!(%operator_id, "arrangement size operator already registered");
1199        }
1200        let existing = self
1201            .shared_state
1202            .arrangement_size_activators
1203            .insert(operator_id, activator);
1204        if existing.is_some() {
1205            error!(%operator_id, "arrangement size activator already registered");
1206        }
1207    }
1208
1209    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1210    fn handle_arrangement_heap_size_operator_dropped(
1211        &mut self,
1212        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1213    ) {
1214        let operator_id = event.operator_id;
1215        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1216            let ts = self.ts();
1217            let allocations = self
1218                .state
1219                .pack_arrangement_heap_allocations_update(operator_id);
1220            let diff = -Diff::cast_from(state.count);
1221            self.output
1222                .arrangement_heap_allocations
1223                .give((allocations, ts, diff));
1224
1225            let capacity = self
1226                .state
1227                .pack_arrangement_heap_capacity_update(operator_id);
1228            let diff = -Diff::cast_from(state.capacity);
1229            self.output
1230                .arrangement_heap_capacity
1231                .give((capacity, ts, diff));
1232
1233            let size = self.state.pack_arrangement_heap_size_update(operator_id);
1234            let diff = -Diff::cast_from(state.size);
1235            self.output.arrangement_heap_size.give((size, ts, diff));
1236        }
1237        self.shared_state
1238            .arrangement_size_activators
1239            .remove(&operator_id);
1240    }
1241
1242    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1243    fn handle_lir_mapping(
1244        &mut self,
1245        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1246    ) {
1247        let global_id = Columnar::into_owned(global_id);
1248        // record the state (for the later drop)
1249        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1250        self.state
1251            .lir_mapping
1252            .entry(global_id)
1253            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1254            .or_insert_with(|| mappings().collect());
1255
1256        // send the datum out
1257        let ts = self.ts();
1258        for (lir_id, meta) in mapping.into_iter() {
1259            let datum = self.state.pack_lir_mapping_update(
1260                global_id,
1261                Columnar::into_owned(lir_id),
1262                Columnar::into_owned(meta.operator),
1263                Columnar::into_owned(meta.parent_lir_id),
1264                Columnar::into_owned(meta.nesting),
1265                Columnar::into_owned(meta.operator_span),
1266            );
1267            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1268        }
1269    }
1270
1271    fn handle_dataflow_global(
1272        &mut self,
1273        DataflowGlobalReference {
1274            dataflow_index,
1275            global_id,
1276        }: Ref<'_, DataflowGlobal>,
1277    ) {
1278        let global_id = Columnar::into_owned(global_id);
1279        self.state
1280            .dataflow_global_ids
1281            .entry(dataflow_index)
1282            .and_modify(|globals| {
1283                // NB BTreeSet::insert() returns `false` when the element was already in the set
1284                if !globals.insert(global_id) {
1285                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1286                }
1287            })
1288            .or_insert_with(|| BTreeSet::from([global_id]));
1289
1290        let ts = self.ts();
1291        let datum = self
1292            .state
1293            .pack_dataflow_global_update(dataflow_index, global_id);
1294        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1295    }
1296}
1297
1298/// Logging state maintained for a compute collection.
1299///
1300/// This type is used to produce appropriate log events in response to changes of logged collection
1301/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1302pub struct CollectionLogging {
1303    export_id: GlobalId,
1304    logger: Logger,
1305
1306    logged_frontier: Option<Timestamp>,
1307    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1308}
1309
1310impl CollectionLogging {
1311    /// Create new logging state for the identified collection and emit initial logging events.
1312    pub fn new(
1313        export_id: GlobalId,
1314        logger: Logger,
1315        dataflow_index: usize,
1316        import_ids: impl Iterator<Item = GlobalId>,
1317    ) -> Self {
1318        logger.log(&ComputeEvent::Export(Export {
1319            export_id,
1320            dataflow_index,
1321        }));
1322
1323        let mut self_ = Self {
1324            export_id,
1325            logger,
1326            logged_frontier: None,
1327            logged_import_frontiers: Default::default(),
1328        };
1329
1330        // Initialize frontier logging.
1331        let initial_frontier = Some(Timestamp::MIN);
1332        self_.set_frontier(initial_frontier);
1333        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1334
1335        self_
1336    }
1337
1338    /// Set the collection frontier to the given new time and emit corresponding logging events.
1339    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1340        let old_time = self.logged_frontier;
1341        self.logged_frontier = new_time;
1342
1343        if old_time != new_time {
1344            let export_id = self.export_id;
1345            let retraction = old_time.map(|time| {
1346                ComputeEvent::Frontier(Frontier {
1347                    export_id,
1348                    time,
1349                    diff: -1,
1350                })
1351            });
1352            let insertion = new_time.map(|time| {
1353                ComputeEvent::Frontier(Frontier {
1354                    export_id,
1355                    time,
1356                    diff: 1,
1357                })
1358            });
1359            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1360            self.logger.log_many(events);
1361        }
1362    }
1363
1364    /// Set the frontier of the given import to the given new time and emit corresponding logging
1365    /// events.
1366    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1367        let old_time = self.logged_import_frontiers.remove(&import_id);
1368        if let Some(time) = new_time {
1369            self.logged_import_frontiers.insert(import_id, time);
1370        }
1371
1372        if old_time != new_time {
1373            let export_id = self.export_id;
1374            let retraction = old_time.map(|time| {
1375                ComputeEvent::ImportFrontier(ImportFrontier {
1376                    import_id,
1377                    export_id,
1378                    time,
1379                    diff: -1,
1380                })
1381            });
1382            let insertion = new_time.map(|time| {
1383                ComputeEvent::ImportFrontier(ImportFrontier {
1384                    import_id,
1385                    export_id,
1386                    time,
1387                    diff: 1,
1388                })
1389            });
1390            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1391            self.logger.log_many(events);
1392        }
1393    }
1394
1395    /// Set the collection as hydrated.
1396    pub fn set_hydrated(&self) {
1397        self.logger.log(&ComputeEvent::Hydration(Hydration {
1398            export_id: self.export_id,
1399        }));
1400    }
1401
1402    /// The export global ID of this collection.
1403    pub fn export_id(&self) -> GlobalId {
1404        self.export_id
1405    }
1406}
1407
1408impl Drop for CollectionLogging {
1409    fn drop(&mut self) {
1410        // Emit retraction events to clean up events previously logged.
1411        self.set_frontier(None);
1412
1413        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1414        for import_id in import_ids {
1415            self.set_import_frontier(import_id, None);
1416        }
1417
1418        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1419            export_id: self.export_id,
1420        }));
1421    }
1422}
1423
1424/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1425/// batch streams.
1426pub(crate) trait LogDataflowErrors {
1427    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1428}
1429
1430impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1431where
1432    G: Scope,
1433    D: Clone + 'static,
1434{
1435    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1436        self.inner
1437            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1438                move |input, output| {
1439                    input.for_each(|cap, data| {
1440                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1441                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1442
1443                        output.session(&cap).give_container(data);
1444                    });
1445                }
1446            })
1447            .as_collection()
1448    }
1449}
1450
1451impl<G, B> LogDataflowErrors for StreamVec<G, B>
1452where
1453    G: Scope,
1454    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1455{
1456    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1457        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1458            move |input, output| {
1459                input.for_each(|cap, data| {
1460                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1461                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1462
1463                    output.session(&cap).give_container(data);
1464                });
1465            }
1466        })
1467    }
1468}
1469
1470/// Return the sum of all diffs within the given batch.
1471///
1472/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1473/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1474/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1475/// batches might become large.
1476fn sum_batch_diffs<B>(batch: &B) -> Diff
1477where
1478    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1479{
1480    let mut sum = Diff::ZERO;
1481    let mut cursor = batch.cursor();
1482
1483    while cursor.key_valid(batch) {
1484        while cursor.val_valid(batch) {
1485            cursor.map_times(batch, |_t, r| sum += r);
1486            cursor.step_val(batch);
1487        }
1488        cursor.step_key(batch);
1489    }
1490
1491    sum
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496    use super::*;
1497
1498    #[mz_ore::test]
1499    fn test_compute_event_size() {
1500        // This could be a static assertion, but we don't use those yet in this crate.
1501        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1502    }
1503}