Skip to main content

mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::Data;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, Stream};
35use timely::scheduling::Scheduler;
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42    SharedLoggingState, Update,
43};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::RowRowSpine;
46
47/// Type alias for a logger of compute events.
48pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51/// A dataflow exports a global ID.
52#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54    /// Identifier of the export.
55    pub export_id: GlobalId,
56    /// Timely worker index of the exporting dataflow.
57    pub dataflow_index: usize,
58}
59
60/// The export for a global id was dropped.
61#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63    /// Identifier of the export.
64    pub export_id: GlobalId,
65}
66
67/// A peek event with a [`PeekType`], and an installation status.
68#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70    /// The identifier of the view the peek targets.
71    pub id: GlobalId,
72    /// The logical timestamp requested.
73    pub time: Timestamp,
74    /// The ID of the peek.
75    pub uuid: uuid::Bytes,
76    /// The relevant _type_ of peek: index or persist.
77    // Note that this is not stored on the Peek event for data-packing reasons only.
78    pub peek_type: PeekType,
79    /// True if the peek is being installed; false if it's being removed.
80    pub installed: bool,
81}
82
83/// Frontier change event.
84#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86    pub export_id: GlobalId,
87    pub time: Timestamp,
88    pub diff: i8,
89}
90
91/// An import frontier change.
92#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94    pub import_id: GlobalId,
95    pub export_id: GlobalId,
96    pub time: Timestamp,
97    pub diff: i8,
98}
99
100/// A change in an arrangement's heap size.
101#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103    /// Operator index
104    pub operator_id: usize,
105    /// Delta of the heap size in bytes of the arrangement.
106    pub delta_size: isize,
107}
108
109/// A change in an arrangement's heap capacity.
110#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112    /// Operator index
113    pub operator_id: usize,
114    /// Delta of the heap capacity in bytes of the arrangement.
115    pub delta_capacity: isize,
116}
117
118/// A change in an arrangement's heap allocation count.
119#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121    /// Operator index
122    pub operator_id: usize,
123    /// Delta of distinct heap allocations backing the arrangement.
124    pub delta_allocations: isize,
125}
126
127/// Announcing an operator that manages an arrangement.
128#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130    /// Operator index
131    pub operator_id: usize,
132    /// The address of the operator.
133    pub address: Vec<usize>,
134}
135
136/// Drop event for an operator managing an arrangement.
137#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139    /// Operator index
140    pub operator_id: usize,
141}
142
143/// Dataflow shutdown event.
144#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146    /// Timely worker index of the dataflow.
147    pub dataflow_index: usize,
148}
149
150/// Error count update event.
151#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153    /// Identifier of the export.
154    pub export_id: GlobalId,
155    /// The change in error count.
156    pub diff: Diff,
157}
158
159/// An export is hydrated.
160#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162    /// Identifier of the export.
163    pub export_id: GlobalId,
164}
165
166/// An operator's hydration status changed.
167#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169    /// Identifier of the export.
170    pub export_id: GlobalId,
171    /// Identifier of the operator's LIR node.
172    pub lir_id: LirId,
173    /// Whether the operator is hydrated.
174    pub hydrated: bool,
175}
176
177/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
178#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180    /// The `GlobalId` in which the LIR operator is rendered.
181    ///
182    /// NB a single a dataflow may have many `GlobalId`s inside it.
183    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
184    /// tracks the many-to-one relationship between `GlobalId`s and
185    /// dataflows.
186    pub global_id: GlobalId,
187    /// The actual mapping.
188    /// Represented this way to reduce the size of `ComputeEvent`.
189    pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192/// Announce that a dataflow supports a specific global ID.
193#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195    /// The identifier of the dataflow.
196    pub dataflow_index: usize,
197    /// A `GlobalId` that is rendered as part of this dataflow.
198    pub global_id: GlobalId,
199}
200
201/// A logged compute event.
202#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204    /// A dataflow export was created.
205    Export(Export),
206    /// A dataflow export was dropped.
207    ExportDropped(ExportDropped),
208    /// Peek command.
209    Peek(PeekEvent),
210    /// Available frontier information for dataflow exports.
211    Frontier(Frontier),
212    /// Available frontier information for dataflow imports.
213    ImportFrontier(ImportFrontier),
214    /// Arrangement heap size update
215    ArrangementHeapSize(ArrangementHeapSize),
216    /// Arrangement heap size update
217    ArrangementHeapCapacity(ArrangementHeapCapacity),
218    /// Arrangement heap size update
219    ArrangementHeapAllocations(ArrangementHeapAllocations),
220    /// Arrangement size operator address
221    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222    /// Arrangement size operator dropped
223    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224    /// All operators of a dataflow have shut down.
225    DataflowShutdown(DataflowShutdown),
226    /// The number of errors in a dataflow export has changed.
227    ErrorCount(ErrorCount),
228    /// A dataflow export was hydrated.
229    Hydration(Hydration),
230    /// A dataflow operator's hydration status changed.
231    OperatorHydration(OperatorHydration),
232    /// An LIR operator was mapped to some particular dataflow operator.
233    ///
234    /// Cf. `ComputeLog::LirMaping`
235    LirMapping(LirMapping),
236    DataflowGlobal(DataflowGlobal),
237}
238
239/// A peek type distinguishing between index and persist peeks.
240#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242    /// A peek against an index.
243    Index,
244    /// A peek against persist.
245    Persist,
246}
247
248impl PeekType {
249    /// A human-readable name for a peek type.
250    fn name(self) -> &'static str {
251        match self {
252            PeekType::Index => "index",
253            PeekType::Persist => "persist",
254        }
255    }
256}
257
258/// Metadata for LIR operators.
259#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
262    operator: String,
263    /// The LIR identifier of the parent (if any).
264    parent_lir_id: Option<LirId>,
265    /// How nested the operator is (for nice indentation).
266    nesting: u8,
267    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
268    /// If `start == end`, then no operators were used.
269    operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273    /// Construct a new LIR metadata object.
274    pub fn new(
275        operator: String,
276        parent_lir_id: Option<LirId>,
277        nesting: u8,
278        operator_span: (usize, usize),
279    ) -> Self {
280        Self {
281            operator,
282            parent_lir_id,
283            nesting,
284            operator_span,
285        }
286    }
287}
288
289/// The return type of the [`construct`] function.
290pub(super) struct Return {
291    /// Collections returned by [`construct`].
292    pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295/// Constructs the logging dataflow fragment for compute logs.
296///
297/// Params
298/// * `scope`: The Timely scope hosting the log analysis dataflow.
299/// * `scheduler`: The timely scheduler to obtainer activators.
300/// * `config`: Logging configuration.
301/// * `event_queue`: The source to read compute log events from.
302/// * `compute_event_streams`: Additional compute event streams to absorb.
303/// * `shared_state`: Shared state between logging dataflow fragments.
304pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
305    mut scope: G,
306    scheduler: S,
307    config: &mz_compute_client::logging::LoggingConfig,
308    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309    shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313    scope.scoped("compute logging", move |scope| {
314        let enable_logging = config.enable_logging;
315        let (logs, token) = if enable_logging {
316            event_queue.links.mz_replay(
317                scope,
318                "compute logs",
319                config.interval,
320                event_queue.activator,
321            )
322        } else {
323            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324            (empty(scope), token)
325        };
326
327        // Build a demux operator that splits the replayed event stream up into the separate
328        // logging streams.
329        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330        let mut input = demux.new_input(&logs, Pipeline);
331        let (export_out, export) = demux.new_output();
332        let mut export_out = OutputBuilder::from(export_out);
333        let (frontier_out, frontier) = demux.new_output();
334        let mut frontier_out = OutputBuilder::from(frontier_out);
335        let (import_frontier_out, import_frontier) = demux.new_output();
336        let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337        let (peek_out, peek) = demux.new_output();
338        let mut peek_out = OutputBuilder::from(peek_out);
339        let (peek_duration_out, peek_duration) = demux.new_output();
340        let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341        let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
342        let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
343        let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
344        let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
345        let (arrangement_heap_allocations_out, arrangement_heap_allocations) = demux.new_output();
346        let mut arrangement_heap_allocations_out =
347            OutputBuilder::from(arrangement_heap_allocations_out);
348        let (error_count_out, error_count) = demux.new_output();
349        let mut error_count_out = OutputBuilder::from(error_count_out);
350        let (hydration_time_out, hydration_time) = demux.new_output();
351        let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
352        let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
353        let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
354        let (lir_mapping_out, lir_mapping) = demux.new_output();
355        let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
356        let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
357        let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
358
359        let mut demux_state = DemuxState::new(scheduler, scope.index());
360        demux.build(move |_capability| {
361            move |_frontiers| {
362                let mut export = export_out.activate();
363                let mut frontier = frontier_out.activate();
364                let mut import_frontier = import_frontier_out.activate();
365                let mut peek = peek_out.activate();
366                let mut peek_duration = peek_duration_out.activate();
367                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
368                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
369                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
370                let mut error_count = error_count_out.activate();
371                let mut hydration_time = hydration_time_out.activate();
372                let mut operator_hydration_status = operator_hydration_status_out.activate();
373                let mut lir_mapping = lir_mapping_out.activate();
374                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
375
376                input.for_each(|cap, data| {
377                    let mut output_sessions = DemuxOutput {
378                        export: export.session_with_builder(&cap),
379                        frontier: frontier.session_with_builder(&cap),
380                        import_frontier: import_frontier.session_with_builder(&cap),
381                        peek: peek.session_with_builder(&cap),
382                        peek_duration: peek_duration.session_with_builder(&cap),
383                        arrangement_heap_allocations: arrangement_heap_allocations
384                            .session_with_builder(&cap),
385                        arrangement_heap_capacity: arrangement_heap_capacity
386                            .session_with_builder(&cap),
387                        arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
388                        error_count: error_count.session_with_builder(&cap),
389                        hydration_time: hydration_time.session_with_builder(&cap),
390                        operator_hydration_status: operator_hydration_status
391                            .session_with_builder(&cap),
392                        lir_mapping: lir_mapping.session_with_builder(&cap),
393                        dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
394                    };
395
396                    let shared_state = &mut shared_state.borrow_mut();
397                    for (time, event) in data.borrow().into_index_iter() {
398                        DemuxHandler {
399                            state: &mut demux_state,
400                            shared_state,
401                            output: &mut output_sessions,
402                            logging_interval_ms,
403                            time,
404                        }
405                        .handle(event);
406                    }
407                });
408            }
409        });
410
411        use ComputeLog::*;
412        let logs = [
413            (ArrangementHeapAllocations, arrangement_heap_allocations),
414            (ArrangementHeapCapacity, arrangement_heap_capacity),
415            (ArrangementHeapSize, arrangement_heap_size),
416            (DataflowCurrent, export),
417            (DataflowGlobal, dataflow_global_ids),
418            (ErrorCount, error_count),
419            (FrontierCurrent, frontier),
420            (HydrationTime, hydration_time),
421            (ImportFrontierCurrent, import_frontier),
422            (LirMapping, lir_mapping),
423            (OperatorHydrationStatus, operator_hydration_status),
424            (PeekCurrent, peek),
425            (PeekDuration, peek_duration),
426        ];
427
428        // Build the output arrangements.
429        let mut collections = BTreeMap::new();
430        for (variant, stream) in logs {
431            let variant = LogVariant::Compute(variant);
432            if config.index_logs.contains_key(&variant) {
433                let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
434                    columnar_exchange::<Row, Row, Timestamp, Diff>,
435                );
436                let trace = stream
437                    .mz_arrange_core::<
438                        _,
439                        Col2ValBatcher<_, _, _, _>,
440                        RowRowBuilder<_, _>,
441                        RowRowSpine<_, _>,
442                    >(exchange, &format!("Arrange {variant:?}"))
443                    .trace;
444                let collection = LogCollection {
445                    trace,
446                    token: Rc::clone(&token),
447                };
448                collections.insert(variant, collection);
449            }
450        }
451
452        Return { collections }
453    })
454}
455
456/// Format the given value and pack it into a `Datum::String`.
457///
458/// The `scratch` buffer is used to perform the string conversion without an allocation.
459/// Callers should not assume anything about the contents of this buffer after this function
460/// returns.
461fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
462where
463    V: Display,
464{
465    scratch.clear();
466    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
467    Datum::String(scratch)
468}
469
470/// State maintained by the demux operator.
471struct DemuxState<A> {
472    /// The timely scheduler.
473    scheduler: A,
474    /// The index of this worker.
475    worker_id: usize,
476    /// A reusable scratch string for formatting IDs.
477    scratch_string_a: String,
478    /// A reusable scratch string for formatting IDs.
479    scratch_string_b: String,
480    /// State tracked per dataflow export.
481    exports: BTreeMap<GlobalId, ExportState>,
482    /// Maps pending peeks to their installation time.
483    peek_stash: BTreeMap<Uuid, Duration>,
484    /// Arrangement size stash.
485    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
486    /// LIR -> operator span mapping.
487    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
488    /// Dataflow -> `GlobalId` mapping (many-to-one).
489    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
490    /// A row packer for the arrangement heap allocations output.
491    arrangement_heap_allocations_packer: PermutedRowPacker,
492    /// A row packer for the arrangement heap capacity output.
493    arrangement_heap_capacity_packer: PermutedRowPacker,
494    /// A row packer for the arrangement heap size output.
495    arrangement_heap_size_packer: PermutedRowPacker,
496    /// A row packer for the dataflow global output.
497    dataflow_global_packer: PermutedRowPacker,
498    /// A row packer for the error count output.
499    error_count_packer: PermutedRowPacker,
500    /// A row packer for the exports output.
501    export_packer: PermutedRowPacker,
502    /// A row packer for the frontier output.
503    frontier_packer: PermutedRowPacker,
504    /// A row packer for the exports output.
505    import_frontier_packer: PermutedRowPacker,
506    /// A row packer for the LIR mapping output.
507    lir_mapping_packer: PermutedRowPacker,
508    /// A row packer for the operator hydration status output.
509    operator_hydration_status_packer: PermutedRowPacker,
510    /// A row packer for the peek durations output.
511    peek_duration_packer: PermutedRowPacker,
512    /// A row packer for the peek output.
513    peek_packer: PermutedRowPacker,
514    /// A row packer for the hydration time output.
515    hydration_time_packer: PermutedRowPacker,
516}
517
518impl<A: Scheduler> DemuxState<A> {
519    fn new(scheduler: A, worker_id: usize) -> Self {
520        Self {
521            scheduler,
522            worker_id,
523            scratch_string_a: String::new(),
524            scratch_string_b: String::new(),
525            exports: Default::default(),
526            peek_stash: Default::default(),
527            arrangement_size: Default::default(),
528            lir_mapping: Default::default(),
529            dataflow_global_ids: Default::default(),
530            arrangement_heap_allocations_packer: PermutedRowPacker::new(
531                ComputeLog::ArrangementHeapAllocations,
532            ),
533            arrangement_heap_capacity_packer: PermutedRowPacker::new(
534                ComputeLog::ArrangementHeapCapacity,
535            ),
536            arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
537            dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
538            error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
539            export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
540            frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
541            hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
542            import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
543            lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
544            operator_hydration_status_packer: PermutedRowPacker::new(
545                ComputeLog::OperatorHydrationStatus,
546            ),
547            peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
548            peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
549        }
550    }
551
552    /// Pack an arrangement heap allocations update key-value for the given operator.
553    fn pack_arrangement_heap_allocations_update(
554        &mut self,
555        operator_id: usize,
556    ) -> (&RowRef, &RowRef) {
557        self.arrangement_heap_allocations_packer.pack_slice(&[
558            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
559            Datum::UInt64(u64::cast_from(self.worker_id)),
560        ])
561    }
562
563    /// Pack an arrangement heap capacity update key-value for the given operator.
564    fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
565        self.arrangement_heap_capacity_packer.pack_slice(&[
566            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
567            Datum::UInt64(u64::cast_from(self.worker_id)),
568        ])
569    }
570
571    /// Pack an arrangement heap size update key-value for the given operator.
572    fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
573        self.arrangement_heap_size_packer.pack_slice(&[
574            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
575            Datum::UInt64(u64::cast_from(self.worker_id)),
576        ])
577    }
578
579    /// Pack a dataflow global update key-value for the given dataflow index and global ID.
580    fn pack_dataflow_global_update(
581        &mut self,
582        dataflow_index: usize,
583        global_id: GlobalId,
584    ) -> (&RowRef, &RowRef) {
585        self.dataflow_global_packer.pack_slice(&[
586            Datum::UInt64(u64::cast_from(dataflow_index)),
587            Datum::UInt64(u64::cast_from(self.worker_id)),
588            make_string_datum(global_id, &mut self.scratch_string_a),
589        ])
590    }
591
592    /// Pack an error count update key-value for the given export ID and count.
593    fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
594        // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
595        // per-worker error count might be negative and at the SQL level having negative multiplicities
596        // is treated as an error.
597        self.error_count_packer.pack_slice(&[
598            make_string_datum(export_id, &mut self.scratch_string_a),
599            Datum::UInt64(u64::cast_from(self.worker_id)),
600            Datum::Int64(count.into_inner()),
601        ])
602    }
603
604    /// Pack an export update key-value for the given export ID and dataflow index.
605    fn pack_export_update(
606        &mut self,
607        export_id: GlobalId,
608        dataflow_index: usize,
609    ) -> (&RowRef, &RowRef) {
610        self.export_packer.pack_slice(&[
611            make_string_datum(export_id, &mut self.scratch_string_a),
612            Datum::UInt64(u64::cast_from(self.worker_id)),
613            Datum::UInt64(u64::cast_from(dataflow_index)),
614        ])
615    }
616
617    /// Pack a hydration time update key-value for the given export ID and hydration time.
618    fn pack_hydration_time_update(
619        &mut self,
620        export_id: GlobalId,
621        time_ns: Option<u64>,
622    ) -> (&RowRef, &RowRef) {
623        self.hydration_time_packer.pack_slice(&[
624            make_string_datum(export_id, &mut self.scratch_string_a),
625            Datum::UInt64(u64::cast_from(self.worker_id)),
626            Datum::from(time_ns),
627        ])
628    }
629
630    /// Pack an import frontier update key-value for the given export ID and dataflow index.
631    fn pack_import_frontier_update(
632        &mut self,
633        export_id: GlobalId,
634        import_id: GlobalId,
635        time: Timestamp,
636    ) -> (&RowRef, &RowRef) {
637        self.import_frontier_packer.pack_slice(&[
638            make_string_datum(export_id, &mut self.scratch_string_a),
639            make_string_datum(import_id, &mut self.scratch_string_b),
640            Datum::UInt64(u64::cast_from(self.worker_id)),
641            Datum::MzTimestamp(time),
642        ])
643    }
644
645    /// Pack an LIR mapping update key-value for the given LIR operator metadata.
646    fn pack_lir_mapping_update(
647        &mut self,
648        global_id: GlobalId,
649        lir_id: LirId,
650        operator: String,
651        parent_lir_id: Option<LirId>,
652        nesting: u8,
653        operator_span: (usize, usize),
654    ) -> (&RowRef, &RowRef) {
655        self.lir_mapping_packer.pack_slice(&[
656            make_string_datum(global_id, &mut self.scratch_string_a),
657            Datum::UInt64(lir_id.into()),
658            Datum::UInt64(u64::cast_from(self.worker_id)),
659            make_string_datum(operator, &mut self.scratch_string_b),
660            parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
661            Datum::UInt16(u16::cast_from(nesting)),
662            Datum::UInt64(u64::cast_from(operator_span.0)),
663            Datum::UInt64(u64::cast_from(operator_span.1)),
664        ])
665    }
666
667    /// Pack an operator hydration status update key-value for the given export ID, LIR ID, and
668    /// hydration status.
669    fn pack_operator_hydration_status_update(
670        &mut self,
671        export_id: GlobalId,
672        lir_id: LirId,
673        hydrated: bool,
674    ) -> (&RowRef, &RowRef) {
675        self.operator_hydration_status_packer.pack_slice(&[
676            make_string_datum(export_id, &mut self.scratch_string_a),
677            Datum::UInt64(lir_id.into()),
678            Datum::UInt64(u64::cast_from(self.worker_id)),
679            Datum::from(hydrated),
680        ])
681    }
682
683    /// Pack a peek duration update key-value for the given peek and peek type.
684    fn pack_peek_duration_update(
685        &mut self,
686        peek_type: PeekType,
687        bucket: u128,
688    ) -> (&RowRef, &RowRef) {
689        self.peek_duration_packer.pack_slice(&[
690            Datum::UInt64(u64::cast_from(self.worker_id)),
691            Datum::String(peek_type.name()),
692            Datum::UInt64(bucket.try_into().expect("bucket too big")),
693        ])
694    }
695
696    /// Pack a peek update key-value for the given peek and peek type.
697    fn pack_peek_update(
698        &mut self,
699        id: GlobalId,
700        time: Timestamp,
701        uuid: Uuid,
702        peek_type: PeekType,
703    ) -> (&RowRef, &RowRef) {
704        self.peek_packer.pack_slice(&[
705            Datum::Uuid(uuid),
706            Datum::UInt64(u64::cast_from(self.worker_id)),
707            make_string_datum(id, &mut self.scratch_string_a),
708            Datum::String(peek_type.name()),
709            Datum::MzTimestamp(time),
710        ])
711    }
712
713    /// Pack a frontier update key-value for the given export ID and time.
714    fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
715        self.frontier_packer.pack_slice(&[
716            make_string_datum(export_id, &mut self.scratch_string_a),
717            Datum::UInt64(u64::cast_from(self.worker_id)),
718            Datum::MzTimestamp(time),
719        ])
720    }
721}
722
723/// State tracked for each dataflow export.
724struct ExportState {
725    /// The ID of the dataflow maintaining this export.
726    dataflow_index: usize,
727    /// Number of errors in this export.
728    ///
729    /// This must be a signed integer, since per-worker error counts can be negative, only the
730    /// cross-worker total has to sum up to a non-negative value.
731    error_count: Diff,
732    /// When this export was created.
733    created_at: Instant,
734    /// Whether the exported collection is hydrated.
735    hydration_time_ns: Option<u64>,
736    /// Hydration status of operators feeding this export.
737    operator_hydration: BTreeMap<LirId, bool>,
738}
739
740impl ExportState {
741    fn new(dataflow_index: usize) -> Self {
742        Self {
743            dataflow_index,
744            error_count: Diff::ZERO,
745            created_at: Instant::now(),
746            hydration_time_ns: None,
747            operator_hydration: BTreeMap::new(),
748        }
749    }
750}
751
752/// State for tracking arrangement sizes.
753#[derive(Default, Debug)]
754struct ArrangementSizeState {
755    size: isize,
756    capacity: isize,
757    count: isize,
758}
759
760/// Bundled output sessions used by the demux operator.
761struct DemuxOutput<'a, 'b> {
762    export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
763    frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764    import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765    peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766    peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767    arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
768    arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
769    arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
770    hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
771    operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
772    error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
773    lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
774    dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
775}
776
777/// Event handler of the demux operator.
778struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
779    /// State kept by the demux operator.
780    state: &'a mut DemuxState<A>,
781    /// State shared across log receivers.
782    shared_state: &'a mut SharedLoggingState,
783    /// Demux output sessions.
784    output: &'a mut DemuxOutput<'b, 'c>,
785    /// The logging interval specifying the time granularity for the updates.
786    logging_interval_ms: u128,
787    /// The current event time.
788    time: Duration,
789}
790
791impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
792    /// Return the timestamp associated with the current event, based on the event time and the
793    /// logging interval.
794    fn ts(&self) -> Timestamp {
795        let time_ms = self.time.as_millis();
796        let interval = self.logging_interval_ms;
797        let rounded = (time_ms / interval + 1) * interval;
798        rounded.try_into().expect("must fit")
799    }
800
801    /// Handle the given compute event.
802    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
803        use ComputeEventReference::*;
804        match event {
805            Export(export) => self.handle_export(export),
806            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
807            Peek(peek) if peek.installed => self.handle_peek_install(peek),
808            Peek(peek) => self.handle_peek_retire(peek),
809            Frontier(frontier) => self.handle_frontier(frontier),
810            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
811            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
812            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
813            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
814            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
815            ArrangementHeapSizeOperatorDrop(inner) => {
816                self.handle_arrangement_heap_size_operator_dropped(inner)
817            }
818            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
819            ErrorCount(error_count) => self.handle_error_count(error_count),
820            Hydration(hydration) => self.handle_hydration(hydration),
821            OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
822            LirMapping(mapping) => self.handle_lir_mapping(mapping),
823            DataflowGlobal(global) => self.handle_dataflow_global(global),
824        }
825    }
826
827    fn handle_export(
828        &mut self,
829        ExportReference {
830            export_id,
831            dataflow_index,
832        }: Ref<'_, Export>,
833    ) {
834        let export_id = Columnar::into_owned(export_id);
835        let ts = self.ts();
836        let datum = self.state.pack_export_update(export_id, dataflow_index);
837        self.output.export.give((datum, ts, Diff::ONE));
838
839        let existing = self
840            .state
841            .exports
842            .insert(export_id, ExportState::new(dataflow_index));
843        if existing.is_some() {
844            error!(%export_id, "export already registered");
845        }
846
847        // Insert hydration time logging for this export.
848        let datum = self.state.pack_hydration_time_update(export_id, None);
849        self.output.hydration_time.give((datum, ts, Diff::ONE));
850    }
851
852    fn handle_export_dropped(
853        &mut self,
854        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
855    ) {
856        let export_id = Columnar::into_owned(export_id);
857        let Some(export) = self.state.exports.remove(&export_id) else {
858            error!(%export_id, "missing exports entry at time of export drop");
859            return;
860        };
861
862        let ts = self.ts();
863        let dataflow_index = export.dataflow_index;
864
865        let datum = self.state.pack_export_update(export_id, dataflow_index);
866        self.output.export.give((datum, ts, Diff::MINUS_ONE));
867
868        // Remove error count logging for this export.
869        if export.error_count != Diff::ZERO {
870            let datum = self
871                .state
872                .pack_error_count_update(export_id, export.error_count);
873            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
874        }
875
876        // Remove hydration time logging for this export.
877        let datum = self
878            .state
879            .pack_hydration_time_update(export_id, export.hydration_time_ns);
880        self.output
881            .hydration_time
882            .give((datum, ts, Diff::MINUS_ONE));
883
884        // Remove operator hydration logging for this export.
885        for (lir_id, hydrated) in export.operator_hydration {
886            let datum = self
887                .state
888                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
889            self.output
890                .operator_hydration_status
891                .give((datum, ts, Diff::MINUS_ONE));
892        }
893    }
894
895    fn handle_dataflow_shutdown(
896        &mut self,
897        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
898    ) {
899        let ts = self.ts();
900
901        // We deal with any `GlobalId` based mappings in this event.
902        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
903            for global_id in global_ids {
904                // Remove dataflow/`GlobalID` mapping.
905                let datum = self
906                    .state
907                    .pack_dataflow_global_update(dataflow_index, global_id);
908                self.output
909                    .dataflow_global_ids
910                    .give((datum, ts, Diff::MINUS_ONE));
911
912                // Remove LIR mapping.
913                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
914                    for (
915                        lir_id,
916                        LirMetadata {
917                            operator,
918                            parent_lir_id,
919                            nesting,
920                            operator_span,
921                        },
922                    ) in mappings
923                    {
924                        let datum = self.state.pack_lir_mapping_update(
925                            global_id,
926                            lir_id,
927                            operator,
928                            parent_lir_id,
929                            nesting,
930                            operator_span,
931                        );
932                        self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
933                    }
934                }
935            }
936        }
937    }
938
939    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
940        let ts = self.ts();
941        let export_id = Columnar::into_owned(export_id);
942
943        let Some(export) = self.state.exports.get_mut(&export_id) else {
944            // The export might have already been dropped, in which case we are no longer
945            // interested in its errors.
946            return;
947        };
948
949        let old_count = export.error_count;
950        let new_count = old_count + diff;
951        export.error_count = new_count;
952
953        if old_count != Diff::ZERO {
954            let datum = self.state.pack_error_count_update(export_id, old_count);
955            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
956        }
957        if new_count != Diff::ZERO {
958            let datum = self.state.pack_error_count_update(export_id, new_count);
959            self.output.error_count.give((datum, ts, Diff::ONE));
960        }
961    }
962
963    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
964        let ts = self.ts();
965        let export_id = Columnar::into_owned(export_id);
966
967        let Some(export) = self.state.exports.get_mut(&export_id) else {
968            error!(%export_id, "hydration event for unknown export");
969            return;
970        };
971        if export.hydration_time_ns.is_some() {
972            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
973            // after reconciliation. We can simply ignore these.
974            return;
975        }
976
977        let duration = export.created_at.elapsed();
978        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
979        export.hydration_time_ns = Some(nanos);
980
981        let retraction = self.state.pack_hydration_time_update(export_id, None);
982        self.output
983            .hydration_time
984            .give((retraction, ts, Diff::MINUS_ONE));
985        let insertion = self
986            .state
987            .pack_hydration_time_update(export_id, Some(nanos));
988        self.output.hydration_time.give((insertion, ts, Diff::ONE));
989    }
990
991    fn handle_operator_hydration(
992        &mut self,
993        OperatorHydrationReference {
994            export_id,
995            lir_id,
996            hydrated,
997        }: Ref<'_, OperatorHydration>,
998    ) {
999        let ts = self.ts();
1000        let export_id = Columnar::into_owned(export_id);
1001        let lir_id = Columnar::into_owned(lir_id);
1002        let hydrated = Columnar::into_owned(hydrated);
1003
1004        let Some(export) = self.state.exports.get_mut(&export_id) else {
1005            // The export might have already been dropped, in which case we are no longer
1006            // interested in its operator hydration events.
1007            return;
1008        };
1009
1010        let old_status = export.operator_hydration.get(&lir_id).copied();
1011        export.operator_hydration.insert(lir_id, hydrated);
1012
1013        if let Some(hydrated) = old_status {
1014            let retraction = self
1015                .state
1016                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1017            self.output
1018                .operator_hydration_status
1019                .give((retraction, ts, Diff::MINUS_ONE));
1020        }
1021
1022        let insertion = self
1023            .state
1024            .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1025        self.output
1026            .operator_hydration_status
1027            .give((insertion, ts, Diff::ONE));
1028    }
1029
1030    fn handle_peek_install(
1031        &mut self,
1032        PeekEventReference {
1033            id,
1034            time,
1035            uuid,
1036            peek_type,
1037            installed: _,
1038        }: Ref<'_, PeekEvent>,
1039    ) {
1040        let id = Columnar::into_owned(id);
1041        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1042        let ts = self.ts();
1043        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1044        self.output.peek.give((datum, ts, Diff::ONE));
1045
1046        let existing = self.state.peek_stash.insert(uuid, self.time);
1047        if existing.is_some() {
1048            error!(%uuid, "peek already registered");
1049        }
1050    }
1051
1052    fn handle_peek_retire(
1053        &mut self,
1054        PeekEventReference {
1055            id,
1056            time,
1057            uuid,
1058            peek_type,
1059            installed: _,
1060        }: Ref<'_, PeekEvent>,
1061    ) {
1062        let id = Columnar::into_owned(id);
1063        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1064        let ts = self.ts();
1065        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1066        self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1067
1068        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1069            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1070            let bucket = elapsed_ns.next_power_of_two();
1071            let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1072            self.output.peek_duration.give((datum, ts, Diff::ONE));
1073        } else {
1074            error!(%uuid, "peek not yet registered");
1075        }
1076    }
1077
1078    fn handle_frontier(
1079        &mut self,
1080        FrontierReference {
1081            export_id,
1082            time,
1083            diff,
1084        }: Ref<'_, Frontier>,
1085    ) {
1086        let export_id = Columnar::into_owned(export_id);
1087        let diff = Diff::from(*diff);
1088        let ts = self.ts();
1089        let time = Columnar::into_owned(time);
1090        let datum = self.state.pack_frontier_update(export_id, time);
1091        self.output.frontier.give((datum, ts, diff));
1092    }
1093
1094    fn handle_import_frontier(
1095        &mut self,
1096        ImportFrontierReference {
1097            import_id,
1098            export_id,
1099            time,
1100            diff,
1101        }: Ref<'_, ImportFrontier>,
1102    ) {
1103        let import_id = Columnar::into_owned(import_id);
1104        let export_id = Columnar::into_owned(export_id);
1105        let diff = Diff::from(*diff);
1106        let ts = self.ts();
1107        let time = Columnar::into_owned(time);
1108        let datum = self
1109            .state
1110            .pack_import_frontier_update(export_id, import_id, time);
1111        self.output.import_frontier.give((datum, ts, diff));
1112    }
1113
1114    /// Update the allocation size for an arrangement.
1115    fn handle_arrangement_heap_size(
1116        &mut self,
1117        ArrangementHeapSizeReference {
1118            operator_id,
1119            delta_size,
1120        }: Ref<'_, ArrangementHeapSize>,
1121    ) {
1122        let ts = self.ts();
1123        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1124            return;
1125        };
1126
1127        state.size += delta_size;
1128
1129        let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1130        let diff = Diff::cast_from(delta_size);
1131        self.output.arrangement_heap_size.give((datum, ts, diff));
1132    }
1133
1134    /// Update the allocation capacity for an arrangement.
1135    fn handle_arrangement_heap_capacity(
1136        &mut self,
1137        ArrangementHeapCapacityReference {
1138            operator_id,
1139            delta_capacity,
1140        }: Ref<'_, ArrangementHeapCapacity>,
1141    ) {
1142        let ts = self.ts();
1143        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1144            return;
1145        };
1146
1147        state.capacity += delta_capacity;
1148
1149        let datum = self
1150            .state
1151            .pack_arrangement_heap_capacity_update(operator_id);
1152        let diff = Diff::cast_from(delta_capacity);
1153        self.output
1154            .arrangement_heap_capacity
1155            .give((datum, ts, diff));
1156    }
1157
1158    /// Update the allocation count for an arrangement.
1159    fn handle_arrangement_heap_allocations(
1160        &mut self,
1161        ArrangementHeapAllocationsReference {
1162            operator_id,
1163            delta_allocations,
1164        }: Ref<'_, ArrangementHeapAllocations>,
1165    ) {
1166        let ts = self.ts();
1167        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1168            return;
1169        };
1170
1171        state.count += delta_allocations;
1172
1173        let datum = self
1174            .state
1175            .pack_arrangement_heap_allocations_update(operator_id);
1176        let diff = Diff::cast_from(delta_allocations);
1177        self.output
1178            .arrangement_heap_allocations
1179            .give((datum, ts, diff));
1180    }
1181
1182    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1183    fn handle_arrangement_heap_size_operator(
1184        &mut self,
1185        ArrangementHeapSizeOperatorReference {
1186            operator_id,
1187            address,
1188        }: Ref<'_, ArrangementHeapSizeOperator>,
1189    ) {
1190        let activator = self
1191            .state
1192            .scheduler
1193            .activator_for(address.into_iter().collect());
1194        let existing = self
1195            .state
1196            .arrangement_size
1197            .insert(operator_id, Default::default());
1198        if existing.is_some() {
1199            error!(%operator_id, "arrangement size operator already registered");
1200        }
1201        let existing = self
1202            .shared_state
1203            .arrangement_size_activators
1204            .insert(operator_id, activator);
1205        if existing.is_some() {
1206            error!(%operator_id, "arrangement size activator already registered");
1207        }
1208    }
1209
1210    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1211    fn handle_arrangement_heap_size_operator_dropped(
1212        &mut self,
1213        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1214    ) {
1215        let operator_id = event.operator_id;
1216        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1217            let ts = self.ts();
1218            let allocations = self
1219                .state
1220                .pack_arrangement_heap_allocations_update(operator_id);
1221            let diff = -Diff::cast_from(state.count);
1222            self.output
1223                .arrangement_heap_allocations
1224                .give((allocations, ts, diff));
1225
1226            let capacity = self
1227                .state
1228                .pack_arrangement_heap_capacity_update(operator_id);
1229            let diff = -Diff::cast_from(state.capacity);
1230            self.output
1231                .arrangement_heap_capacity
1232                .give((capacity, ts, diff));
1233
1234            let size = self.state.pack_arrangement_heap_size_update(operator_id);
1235            let diff = -Diff::cast_from(state.size);
1236            self.output.arrangement_heap_size.give((size, ts, diff));
1237        }
1238        self.shared_state
1239            .arrangement_size_activators
1240            .remove(&operator_id);
1241    }
1242
1243    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1244    fn handle_lir_mapping(
1245        &mut self,
1246        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1247    ) {
1248        let global_id = Columnar::into_owned(global_id);
1249        // record the state (for the later drop)
1250        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1251        self.state
1252            .lir_mapping
1253            .entry(global_id)
1254            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1255            .or_insert_with(|| mappings().collect());
1256
1257        // send the datum out
1258        let ts = self.ts();
1259        for (lir_id, meta) in mapping.into_iter() {
1260            let datum = self.state.pack_lir_mapping_update(
1261                global_id,
1262                Columnar::into_owned(lir_id),
1263                Columnar::into_owned(meta.operator),
1264                Columnar::into_owned(meta.parent_lir_id),
1265                Columnar::into_owned(meta.nesting),
1266                Columnar::into_owned(meta.operator_span),
1267            );
1268            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1269        }
1270    }
1271
1272    fn handle_dataflow_global(
1273        &mut self,
1274        DataflowGlobalReference {
1275            dataflow_index,
1276            global_id,
1277        }: Ref<'_, DataflowGlobal>,
1278    ) {
1279        let global_id = Columnar::into_owned(global_id);
1280        self.state
1281            .dataflow_global_ids
1282            .entry(dataflow_index)
1283            .and_modify(|globals| {
1284                // NB BTreeSet::insert() returns `false` when the element was already in the set
1285                if !globals.insert(global_id) {
1286                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1287                }
1288            })
1289            .or_insert_with(|| BTreeSet::from([global_id]));
1290
1291        let ts = self.ts();
1292        let datum = self
1293            .state
1294            .pack_dataflow_global_update(dataflow_index, global_id);
1295        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1296    }
1297}
1298
1299/// Logging state maintained for a compute collection.
1300///
1301/// This type is used to produce appropriate log events in response to changes of logged collection
1302/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1303pub struct CollectionLogging {
1304    export_id: GlobalId,
1305    logger: Logger,
1306
1307    logged_frontier: Option<Timestamp>,
1308    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1309}
1310
1311impl CollectionLogging {
1312    /// Create new logging state for the identified collection and emit initial logging events.
1313    pub fn new(
1314        export_id: GlobalId,
1315        logger: Logger,
1316        dataflow_index: usize,
1317        import_ids: impl Iterator<Item = GlobalId>,
1318    ) -> Self {
1319        logger.log(&ComputeEvent::Export(Export {
1320            export_id,
1321            dataflow_index,
1322        }));
1323
1324        let mut self_ = Self {
1325            export_id,
1326            logger,
1327            logged_frontier: None,
1328            logged_import_frontiers: Default::default(),
1329        };
1330
1331        // Initialize frontier logging.
1332        let initial_frontier = Some(Timestamp::MIN);
1333        self_.set_frontier(initial_frontier);
1334        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1335
1336        self_
1337    }
1338
1339    /// Set the collection frontier to the given new time and emit corresponding logging events.
1340    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1341        let old_time = self.logged_frontier;
1342        self.logged_frontier = new_time;
1343
1344        if old_time != new_time {
1345            let export_id = self.export_id;
1346            let retraction = old_time.map(|time| {
1347                ComputeEvent::Frontier(Frontier {
1348                    export_id,
1349                    time,
1350                    diff: -1,
1351                })
1352            });
1353            let insertion = new_time.map(|time| {
1354                ComputeEvent::Frontier(Frontier {
1355                    export_id,
1356                    time,
1357                    diff: 1,
1358                })
1359            });
1360            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1361            self.logger.log_many(events);
1362        }
1363    }
1364
1365    /// Set the frontier of the given import to the given new time and emit corresponding logging
1366    /// events.
1367    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1368        let old_time = self.logged_import_frontiers.remove(&import_id);
1369        if let Some(time) = new_time {
1370            self.logged_import_frontiers.insert(import_id, time);
1371        }
1372
1373        if old_time != new_time {
1374            let export_id = self.export_id;
1375            let retraction = old_time.map(|time| {
1376                ComputeEvent::ImportFrontier(ImportFrontier {
1377                    import_id,
1378                    export_id,
1379                    time,
1380                    diff: -1,
1381                })
1382            });
1383            let insertion = new_time.map(|time| {
1384                ComputeEvent::ImportFrontier(ImportFrontier {
1385                    import_id,
1386                    export_id,
1387                    time,
1388                    diff: 1,
1389                })
1390            });
1391            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1392            self.logger.log_many(events);
1393        }
1394    }
1395
1396    /// Set the collection as hydrated.
1397    pub fn set_hydrated(&self) {
1398        self.logger.log(&ComputeEvent::Hydration(Hydration {
1399            export_id: self.export_id,
1400        }));
1401    }
1402
1403    /// The export global ID of this collection.
1404    pub fn export_id(&self) -> GlobalId {
1405        self.export_id
1406    }
1407}
1408
1409impl Drop for CollectionLogging {
1410    fn drop(&mut self) {
1411        // Emit retraction events to clean up events previously logged.
1412        self.set_frontier(None);
1413
1414        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1415        for import_id in import_ids {
1416            self.set_import_frontier(import_id, None);
1417        }
1418
1419        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1420            export_id: self.export_id,
1421        }));
1422    }
1423}
1424
1425/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1426/// batch streams.
1427pub(crate) trait LogDataflowErrors {
1428    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1429}
1430
1431impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1432where
1433    G: Scope,
1434    D: Data,
1435{
1436    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1437        self.inner
1438            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1439                move |input, output| {
1440                    input.for_each(|cap, data| {
1441                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1442                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1443
1444                        output.session(&cap).give_container(data);
1445                    });
1446                }
1447            })
1448            .as_collection()
1449    }
1450}
1451
1452impl<G, B> LogDataflowErrors for Stream<G, B>
1453where
1454    G: Scope,
1455    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1456{
1457    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1458        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1459            move |input, output| {
1460                input.for_each(|cap, data| {
1461                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1462                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1463
1464                    output.session(&cap).give_container(data);
1465                });
1466            }
1467        })
1468    }
1469}
1470
1471/// Return the sum of all diffs within the given batch.
1472///
1473/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1474/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1475/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1476/// batches might become large.
1477fn sum_batch_diffs<B>(batch: &B) -> Diff
1478where
1479    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1480{
1481    let mut sum = Diff::ZERO;
1482    let mut cursor = batch.cursor();
1483
1484    while cursor.key_valid(batch) {
1485        while cursor.val_valid(batch) {
1486            cursor.map_times(batch, |_t, r| sum += r);
1487            cursor.step_val(batch);
1488        }
1489        cursor.step_key(batch);
1490    }
1491
1492    sum
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497    use super::*;
1498
1499    #[mz_ore::test]
1500    fn test_compute_event_size() {
1501        // This could be a static assertion, but we don't use those yet in this crate.
1502        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1503    }
1504}