mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::Collection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::containers::ProvidedBuilder;
28use mz_timely_util::replay::MzReplay;
29use timely::Data;
30use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
31use timely::dataflow::operators::Operator;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::{Scope, Stream};
34use timely::scheduling::Scheduler;
35use tracing::error;
36use uuid::Uuid;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::logging::{
40    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
41    SharedLoggingState, Update,
42};
43use crate::row_spine::RowRowBuilder;
44use crate::typedefs::RowRowSpine;
45
46/// Type alias for a logger of compute events.
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
48pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
49
50/// A dataflow exports a global ID.
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
52pub struct Export {
53    /// Identifier of the export.
54    pub export_id: GlobalId,
55    /// Timely worker index of the exporting dataflow.
56    pub dataflow_index: usize,
57}
58
59/// The export for a global id was dropped.
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
61pub struct ExportDropped {
62    /// Identifier of the export.
63    pub export_id: GlobalId,
64}
65
66/// A peek event with a [`PeekType`], and an installation status.
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
68pub struct PeekEvent {
69    /// The identifier of the view the peek targets.
70    pub id: GlobalId,
71    /// The logical timestamp requested.
72    pub time: Timestamp,
73    /// The ID of the peek.
74    pub uuid: uuid::Bytes,
75    /// The relevant _type_ of peek: index or persist.
76    // Note that this is not stored on the Peek event for data-packing reasons only.
77    pub peek_type: PeekType,
78    /// True if the peek is being installed; false if it's being removed.
79    pub installed: bool,
80}
81
82/// Frontier change event.
83#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
84pub struct Frontier {
85    pub export_id: GlobalId,
86    pub time: Timestamp,
87    pub diff: i8,
88}
89
90/// An import frontier change.
91#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
92pub struct ImportFrontier {
93    pub import_id: GlobalId,
94    pub export_id: GlobalId,
95    pub time: Timestamp,
96    pub diff: i8,
97}
98
99/// A change in an arrangement's heap size.
100#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
101pub struct ArrangementHeapSize {
102    /// Operator index
103    pub operator_id: usize,
104    /// Delta of the heap size in bytes of the arrangement.
105    pub delta_size: isize,
106}
107
108/// A change in an arrangement's heap capacity.
109#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
110pub struct ArrangementHeapCapacity {
111    /// Operator index
112    pub operator_id: usize,
113    /// Delta of the heap capacity in bytes of the arrangement.
114    pub delta_capacity: isize,
115}
116
117/// A change in an arrangement's heap allocation count.
118#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
119pub struct ArrangementHeapAllocations {
120    /// Operator index
121    pub operator_id: usize,
122    /// Delta of distinct heap allocations backing the arrangement.
123    pub delta_allocations: isize,
124}
125
126/// Announcing an operator that manages an arrangement.
127#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
128pub struct ArrangementHeapSizeOperator {
129    /// Operator index
130    pub operator_id: usize,
131    /// The address of the operator.
132    pub address: Vec<usize>,
133}
134
135/// Drop event for an operator managing an arrangement.
136#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
137pub struct ArrangementHeapSizeOperatorDrop {
138    /// Operator index
139    pub operator_id: usize,
140}
141
142/// Dataflow shutdown event.
143#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
144pub struct DataflowShutdown {
145    /// Timely worker index of the dataflow.
146    pub dataflow_index: usize,
147}
148
149/// Error count update event.
150#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
151pub struct ErrorCount {
152    /// Identifier of the export.
153    pub export_id: GlobalId,
154    /// The change in error count.
155    pub diff: Diff,
156}
157
158/// An export is hydrated.
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
160pub struct Hydration {
161    /// Identifier of the export.
162    pub export_id: GlobalId,
163}
164
165/// An operator's hydration status changed.
166#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
167pub struct OperatorHydration {
168    /// Identifier of the export.
169    pub export_id: GlobalId,
170    /// Identifier of the operator's LIR node.
171    pub lir_id: LirId,
172    /// Whether the operator is hydrated.
173    pub hydrated: bool,
174}
175
176/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
178pub struct LirMapping {
179    /// The `GlobalId` in which the LIR operator is rendered.
180    ///
181    /// NB a single a dataflow may have many `GlobalId`s inside it.
182    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
183    /// tracks the many-to-one relationship between `GlobalId`s and
184    /// dataflows.
185    pub global_id: GlobalId,
186    /// The actual mapping.
187    /// Represented this way to reduce the size of `ComputeEvent`.
188    pub mapping: Vec<(LirId, LirMetadata)>,
189}
190
191/// Announce that a dataflow supports a specific global ID.
192#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
193pub struct DataflowGlobal {
194    /// The identifier of the dataflow.
195    pub dataflow_index: usize,
196    /// A `GlobalId` that is rendered as part of this dataflow.
197    pub global_id: GlobalId,
198}
199
200/// A logged compute event.
201#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
202pub enum ComputeEvent {
203    /// A dataflow export was created.
204    Export(Export),
205    /// A dataflow export was dropped.
206    ExportDropped(ExportDropped),
207    /// Peek command.
208    Peek(PeekEvent),
209    /// Available frontier information for dataflow exports.
210    Frontier(Frontier),
211    /// Available frontier information for dataflow imports.
212    ImportFrontier(ImportFrontier),
213    /// Arrangement heap size update
214    ArrangementHeapSize(ArrangementHeapSize),
215    /// Arrangement heap size update
216    ArrangementHeapCapacity(ArrangementHeapCapacity),
217    /// Arrangement heap size update
218    ArrangementHeapAllocations(ArrangementHeapAllocations),
219    /// Arrangement size operator address
220    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
221    /// Arrangement size operator dropped
222    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
223    /// All operators of a dataflow have shut down.
224    DataflowShutdown(DataflowShutdown),
225    /// The number of errors in a dataflow export has changed.
226    ErrorCount(ErrorCount),
227    /// A dataflow export was hydrated.
228    Hydration(Hydration),
229    /// A dataflow operator's hydration status changed.
230    OperatorHydration(OperatorHydration),
231    /// An LIR operator was mapped to some particular dataflow operator.
232    ///
233    /// Cf. `ComputeLog::LirMaping`
234    LirMapping(LirMapping),
235    DataflowGlobal(DataflowGlobal),
236}
237
238/// A peek type distinguishing between index and persist peeks.
239#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
240pub enum PeekType {
241    /// A peek against an index.
242    Index,
243    /// A peek against persist.
244    Persist,
245}
246
247impl PeekType {
248    /// A human-readable name for a peek type.
249    fn name(self) -> &'static str {
250        match self {
251            PeekType::Index => "index",
252            PeekType::Persist => "persist",
253        }
254    }
255}
256
257/// Metadata for LIR operators.
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
259pub struct LirMetadata {
260    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
261    operator: String,
262    /// The LIR identifier of the parent (if any).
263    parent_lir_id: Option<LirId>,
264    /// How nested the operator is (for nice indentation).
265    nesting: u8,
266    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
267    /// If `start == end`, then no operators were used.
268    operator_span: (usize, usize),
269}
270
271impl LirMetadata {
272    /// Construct a new LIR metadata object.
273    pub fn new(
274        operator: String,
275        parent_lir_id: Option<LirId>,
276        nesting: u8,
277        operator_span: (usize, usize),
278    ) -> Self {
279        Self {
280            operator,
281            parent_lir_id,
282            nesting,
283            operator_span,
284        }
285    }
286}
287
288/// The return type of the [`construct`] function.
289pub(super) struct Return {
290    /// Collections returned by [`construct`].
291    pub collections: BTreeMap<LogVariant, LogCollection>,
292}
293
294/// Constructs the logging dataflow fragment for compute logs.
295///
296/// Params
297/// * `scope`: The Timely scope hosting the log analysis dataflow.
298/// * `scheduler`: The timely scheduler to obtainer activators.
299/// * `config`: Logging configuration.
300/// * `event_queue`: The source to read compute log events from.
301/// * `compute_event_streams`: Additional compute event streams to absorb.
302/// * `shared_state`: Shared state between logging dataflow fragments.
303pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304    mut scope: G,
305    scheduler: S,
306    config: &mz_compute_client::logging::LoggingConfig,
307    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308    shared_state: Rc<RefCell<SharedLoggingState>>,
309) -> Return {
310    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
311
312    scope.scoped("compute logging", move |scope| {
313        let enable_logging = config.enable_logging;
314        let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
315            scope,
316            "compute logs",
317            config.interval,
318            event_queue.activator,
319            move |mut session, mut data| {
320                // If logging is disabled, we still need to install the indexes, but we can leave them
321                // empty. We do so by immediately filtering all logs events.
322                if enable_logging {
323                    session.give_container(data.to_mut())
324                }
325            },
326        );
327
328        // Build a demux operator that splits the replayed event stream up into the separate
329        // logging streams.
330        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
331        let mut input = demux.new_input(&logs, Pipeline);
332        let (mut export_out, export) = demux.new_output();
333        let (mut frontier_out, frontier) = demux.new_output();
334        let (mut import_frontier_out, import_frontier) = demux.new_output();
335        let (mut peek_out, peek) = demux.new_output();
336        let (mut peek_duration_out, peek_duration) = demux.new_output();
337        let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
338        let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
339        let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
340        let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
341            demux.new_output();
342        let (mut error_count_out, error_count) = demux.new_output();
343        let (mut hydration_time_out, hydration_time) = demux.new_output();
344        let (mut operator_hydration_status_out, operator_hydration_status) = demux.new_output();
345        let (mut lir_mapping_out, lir_mapping) = demux.new_output();
346        let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
347
348        let mut demux_state = DemuxState::new(scheduler, scope.index());
349        demux.build(move |_capability| {
350            move |_frontiers| {
351                let mut export = export_out.activate();
352                let mut frontier = frontier_out.activate();
353                let mut import_frontier = import_frontier_out.activate();
354                let mut peek = peek_out.activate();
355                let mut peek_duration = peek_duration_out.activate();
356                let mut shutdown_duration = shutdown_duration_out.activate();
357                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
358                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
359                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
360                let mut error_count = error_count_out.activate();
361                let mut hydration_time = hydration_time_out.activate();
362                let mut operator_hydration_status = operator_hydration_status_out.activate();
363                let mut lir_mapping = lir_mapping_out.activate();
364                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
365
366                input.for_each(|cap, data| {
367                    let mut output_sessions = DemuxOutput {
368                        export: export.session_with_builder(&cap),
369                        frontier: frontier.session_with_builder(&cap),
370                        import_frontier: import_frontier.session_with_builder(&cap),
371                        peek: peek.session_with_builder(&cap),
372                        peek_duration: peek_duration.session_with_builder(&cap),
373                        shutdown_duration: shutdown_duration.session_with_builder(&cap),
374                        arrangement_heap_allocations: arrangement_heap_allocations.session_with_builder(&cap),
375                        arrangement_heap_capacity: arrangement_heap_capacity.session_with_builder(&cap),
376                        arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
377                        error_count: error_count.session_with_builder(&cap),
378                        hydration_time: hydration_time.session_with_builder(&cap),
379                        operator_hydration_status: operator_hydration_status.session_with_builder(&cap),
380                        lir_mapping: lir_mapping.session_with_builder(&cap),
381                        dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
382                    };
383
384                    let shared_state = &mut shared_state.borrow_mut();
385                    for (time, event) in data.borrow().into_index_iter() {
386                        DemuxHandler {
387                            state: &mut demux_state,
388                            shared_state,
389                            output: &mut output_sessions,
390                            logging_interval_ms,
391                            time,
392                        }
393                        .handle(event);
394                    }
395                });
396            }
397        });
398
399        use ComputeLog::*;
400        let logs = [
401            (ArrangementHeapAllocations, arrangement_heap_allocations),
402            (ArrangementHeapCapacity, arrangement_heap_capacity),
403            (ArrangementHeapSize, arrangement_heap_size),
404            (DataflowCurrent, export),
405            (DataflowGlobal, dataflow_global_ids),
406            (ErrorCount, error_count),
407            (FrontierCurrent, frontier),
408            (HydrationTime, hydration_time),
409            (ImportFrontierCurrent, import_frontier),
410            (LirMapping, lir_mapping),
411            (OperatorHydrationStatus, operator_hydration_status),
412            (PeekCurrent, peek),
413            (PeekDuration, peek_duration),
414            (ShutdownDuration, shutdown_duration),
415        ];
416
417        // Build the output arrangements.
418        let mut collections = BTreeMap::new();
419        for (variant, stream) in logs {
420            let variant = LogVariant::Compute(variant);
421            if config.index_logs.contains_key(&variant) {
422                let trace = stream
423                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
424                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
425                        &format!("Arrange {variant:?}"),
426                    )
427                    .trace;
428                let collection = LogCollection {
429                    trace,
430                    token: Rc::clone(&token),
431                };
432                collections.insert(variant, collection);
433            }
434        }
435
436        Return { collections }
437    })
438}
439
440/// Format the given value and pack it into a `Datum::String`.
441///
442/// The `scratch` buffer is used to perform the string conversion without an allocation.
443/// Callers should not assume anything about the contents of this buffer after this function
444/// returns.
445fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
446where
447    V: Display,
448{
449    scratch.clear();
450    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
451    Datum::String(scratch)
452}
453
454/// State maintained by the demux operator.
455struct DemuxState<A> {
456    /// The timely scheduler.
457    scheduler: A,
458    /// The index of this worker.
459    worker_id: usize,
460    /// A reusable scratch string for formatting IDs.
461    scratch_string_a: String,
462    /// A reusable scratch string for formatting IDs.
463    scratch_string_b: String,
464    /// State tracked per dataflow export.
465    exports: BTreeMap<GlobalId, ExportState>,
466    /// Maps live dataflows to counts of their exports.
467    dataflow_export_counts: BTreeMap<usize, u32>,
468    /// Maps dropped dataflows to their drop time.
469    dataflow_drop_times: BTreeMap<usize, Duration>,
470    /// Contains dataflows that have shut down but not yet been dropped.
471    shutdown_dataflows: BTreeSet<usize>,
472    /// Maps pending peeks to their installation time.
473    peek_stash: BTreeMap<Uuid, Duration>,
474    /// Arrangement size stash.
475    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
476    /// LIR -> operator span mapping.
477    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
478    /// Dataflow -> `GlobalId` mapping (many-to-one).
479    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
480    /// A row packer for the arrangement heap allocations output.
481    arrangement_heap_allocations_packer: PermutedRowPacker,
482    /// A row packer for the arrangement heap capacity output.
483    arrangement_heap_capacity_packer: PermutedRowPacker,
484    /// A row packer for the arrangement heap size output.
485    arrangement_heap_size_packer: PermutedRowPacker,
486    /// A row packer for the dataflow global output.
487    dataflow_global_packer: PermutedRowPacker,
488    /// A row packer for the error count output.
489    error_count_packer: PermutedRowPacker,
490    /// A row packer for the exports output.
491    export_packer: PermutedRowPacker,
492    /// A row packer for the frontier output.
493    frontier_packer: PermutedRowPacker,
494    /// A row packer for the exports output.
495    import_frontier_packer: PermutedRowPacker,
496    /// A row packer for the LIR mapping output.
497    lir_mapping_packer: PermutedRowPacker,
498    /// A row packer for the operator hydration status output.
499    operator_hydration_status_packer: PermutedRowPacker,
500    /// A row packer for the peek durations output.
501    peek_duration_packer: PermutedRowPacker,
502    /// A row packer for the peek output.
503    peek_packer: PermutedRowPacker,
504    /// A row packer for the shutdown duration output.
505    shutdown_duration_packer: PermutedRowPacker,
506    /// A row packer for the hydration time output.
507    hydration_time_packer: PermutedRowPacker,
508}
509
510impl<A: Scheduler> DemuxState<A> {
511    fn new(scheduler: A, worker_id: usize) -> Self {
512        Self {
513            scheduler,
514            worker_id,
515            scratch_string_a: String::new(),
516            scratch_string_b: String::new(),
517            exports: Default::default(),
518            dataflow_export_counts: Default::default(),
519            dataflow_drop_times: Default::default(),
520            shutdown_dataflows: Default::default(),
521            peek_stash: Default::default(),
522            arrangement_size: Default::default(),
523            lir_mapping: Default::default(),
524            dataflow_global_ids: Default::default(),
525            arrangement_heap_allocations_packer: PermutedRowPacker::new(
526                ComputeLog::ArrangementHeapAllocations,
527            ),
528            arrangement_heap_capacity_packer: PermutedRowPacker::new(
529                ComputeLog::ArrangementHeapCapacity,
530            ),
531            arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
532            dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
533            error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
534            export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
535            frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
536            hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
537            import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
538            lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
539            operator_hydration_status_packer: PermutedRowPacker::new(
540                ComputeLog::OperatorHydrationStatus,
541            ),
542            peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
543            peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
544            shutdown_duration_packer: PermutedRowPacker::new(ComputeLog::ShutdownDuration),
545        }
546    }
547
548    /// Pack an arrangement heap allocations update key-value for the given operator.
549    fn pack_arrangement_heap_allocations_update(
550        &mut self,
551        operator_id: usize,
552    ) -> (&RowRef, &RowRef) {
553        self.arrangement_heap_allocations_packer.pack_slice(&[
554            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
555            Datum::UInt64(u64::cast_from(self.worker_id)),
556        ])
557    }
558
559    /// Pack an arrangement heap capacity update key-value for the given operator.
560    fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
561        self.arrangement_heap_capacity_packer.pack_slice(&[
562            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
563            Datum::UInt64(u64::cast_from(self.worker_id)),
564        ])
565    }
566
567    /// Pack an arrangement heap size update key-value for the given operator.
568    fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
569        self.arrangement_heap_size_packer.pack_slice(&[
570            Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
571            Datum::UInt64(u64::cast_from(self.worker_id)),
572        ])
573    }
574
575    /// Pack a dataflow global update key-value for the given dataflow index and global ID.
576    fn pack_dataflow_global_update(
577        &mut self,
578        dataflow_index: usize,
579        global_id: GlobalId,
580    ) -> (&RowRef, &RowRef) {
581        self.dataflow_global_packer.pack_slice(&[
582            Datum::UInt64(u64::cast_from(dataflow_index)),
583            Datum::UInt64(u64::cast_from(self.worker_id)),
584            make_string_datum(global_id, &mut self.scratch_string_a),
585        ])
586    }
587
588    /// Pack an error count update key-value for the given export ID and count.
589    fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
590        // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
591        // per-worker error count might be negative and at the SQL level having negative multiplicities
592        // is treated as an error.
593        self.error_count_packer.pack_slice(&[
594            make_string_datum(export_id, &mut self.scratch_string_a),
595            Datum::UInt64(u64::cast_from(self.worker_id)),
596            Datum::Int64(count.into_inner()),
597        ])
598    }
599
600    /// Pack an export update key-value for the given export ID and dataflow index.
601    fn pack_export_update(
602        &mut self,
603        export_id: GlobalId,
604        dataflow_index: usize,
605    ) -> (&RowRef, &RowRef) {
606        self.export_packer.pack_slice(&[
607            make_string_datum(export_id, &mut self.scratch_string_a),
608            Datum::UInt64(u64::cast_from(self.worker_id)),
609            Datum::UInt64(u64::cast_from(dataflow_index)),
610        ])
611    }
612
613    /// Pack a hydration time update key-value for the given export ID and hydration time.
614    fn pack_hydration_time_update(
615        &mut self,
616        export_id: GlobalId,
617        time_ns: Option<u64>,
618    ) -> (&RowRef, &RowRef) {
619        self.hydration_time_packer.pack_slice(&[
620            make_string_datum(export_id, &mut self.scratch_string_a),
621            Datum::UInt64(u64::cast_from(self.worker_id)),
622            Datum::from(time_ns),
623        ])
624    }
625
626    /// Pack an import frontier update key-value for the given export ID and dataflow index.
627    fn pack_import_frontier_update(
628        &mut self,
629        export_id: GlobalId,
630        import_id: GlobalId,
631        time: Timestamp,
632    ) -> (&RowRef, &RowRef) {
633        self.import_frontier_packer.pack_slice(&[
634            make_string_datum(export_id, &mut self.scratch_string_a),
635            make_string_datum(import_id, &mut self.scratch_string_b),
636            Datum::UInt64(u64::cast_from(self.worker_id)),
637            Datum::MzTimestamp(time),
638        ])
639    }
640
641    /// Pack an LIR mapping update key-value for the given LIR operator metadata.
642    fn pack_lir_mapping_update(
643        &mut self,
644        global_id: GlobalId,
645        lir_id: LirId,
646        operator: String,
647        parent_lir_id: Option<LirId>,
648        nesting: u8,
649        operator_span: (usize, usize),
650    ) -> (&RowRef, &RowRef) {
651        self.lir_mapping_packer.pack_slice(&[
652            make_string_datum(global_id, &mut self.scratch_string_a),
653            Datum::UInt64(lir_id.into()),
654            Datum::UInt64(u64::cast_from(self.worker_id)),
655            make_string_datum(operator, &mut self.scratch_string_b),
656            parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
657            Datum::UInt16(u16::cast_from(nesting)),
658            Datum::UInt64(u64::cast_from(operator_span.0)),
659            Datum::UInt64(u64::cast_from(operator_span.1)),
660        ])
661    }
662
663    /// Pack an operator hydration status update key-value for the given export ID, LIR ID, and
664    /// hydration status.
665    fn pack_operator_hydration_status_update(
666        &mut self,
667        export_id: GlobalId,
668        lir_id: LirId,
669        hydrated: bool,
670    ) -> (&RowRef, &RowRef) {
671        self.operator_hydration_status_packer.pack_slice(&[
672            make_string_datum(export_id, &mut self.scratch_string_a),
673            Datum::UInt64(lir_id.into()),
674            Datum::UInt64(u64::cast_from(self.worker_id)),
675            Datum::from(hydrated),
676        ])
677    }
678
679    /// Pack a peek duration update key-value for the given peek and peek type.
680    fn pack_peek_duration_update(
681        &mut self,
682        peek_type: PeekType,
683        bucket: u128,
684    ) -> (&RowRef, &RowRef) {
685        self.peek_duration_packer.pack_slice(&[
686            Datum::UInt64(u64::cast_from(self.worker_id)),
687            Datum::String(peek_type.name()),
688            Datum::UInt64(bucket.try_into().expect("bucket too big")),
689        ])
690    }
691
692    /// Pack a peek update key-value for the given peek and peek type.
693    fn pack_peek_update(
694        &mut self,
695        id: GlobalId,
696        time: Timestamp,
697        uuid: Uuid,
698        peek_type: PeekType,
699    ) -> (&RowRef, &RowRef) {
700        self.peek_packer.pack_slice(&[
701            Datum::Uuid(uuid),
702            Datum::UInt64(u64::cast_from(self.worker_id)),
703            make_string_datum(id, &mut self.scratch_string_a),
704            Datum::String(peek_type.name()),
705            Datum::MzTimestamp(time),
706        ])
707    }
708
709    /// Pack a frontier update key-value for the given export ID and time.
710    fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
711        self.frontier_packer.pack_slice(&[
712            make_string_datum(export_id, &mut self.scratch_string_a),
713            Datum::UInt64(u64::cast_from(self.worker_id)),
714            Datum::MzTimestamp(time),
715        ])
716    }
717
718    /// Pack a shutdown duration update key-value for the given time bucket.
719    fn pack_shutdown_duration_update(&mut self, bucket: u128) -> (&RowRef, &RowRef) {
720        self.shutdown_duration_packer.pack_slice(&[
721            Datum::UInt64(u64::cast_from(self.worker_id)),
722            Datum::UInt64(bucket.try_into().expect("bucket too big")),
723        ])
724    }
725}
726
727/// State tracked for each dataflow export.
728struct ExportState {
729    /// The ID of the dataflow maintaining this export.
730    dataflow_index: usize,
731    /// Number of errors in this export.
732    ///
733    /// This must be a signed integer, since per-worker error counts can be negative, only the
734    /// cross-worker total has to sum up to a non-negative value.
735    error_count: Diff,
736    /// When this export was created.
737    created_at: Instant,
738    /// Whether the exported collection is hydrated.
739    hydration_time_ns: Option<u64>,
740    /// Hydration status of operators feeding this export.
741    operator_hydration: BTreeMap<LirId, bool>,
742}
743
744impl ExportState {
745    fn new(dataflow_index: usize) -> Self {
746        Self {
747            dataflow_index,
748            error_count: Diff::ZERO,
749            created_at: Instant::now(),
750            hydration_time_ns: None,
751            operator_hydration: BTreeMap::new(),
752        }
753    }
754}
755
756/// State for tracking arrangement sizes.
757#[derive(Default, Debug)]
758struct ArrangementSizeState {
759    size: isize,
760    capacity: isize,
761    count: isize,
762}
763
764/// Bundled output sessions used by the demux operator.
765struct DemuxOutput<'a> {
766    export: OutputSessionColumnar<'a, Update<(Row, Row)>>,
767    frontier: OutputSessionColumnar<'a, Update<(Row, Row)>>,
768    import_frontier: OutputSessionColumnar<'a, Update<(Row, Row)>>,
769    peek: OutputSessionColumnar<'a, Update<(Row, Row)>>,
770    peek_duration: OutputSessionColumnar<'a, Update<(Row, Row)>>,
771    shutdown_duration: OutputSessionColumnar<'a, Update<(Row, Row)>>,
772    arrangement_heap_allocations: OutputSessionColumnar<'a, Update<(Row, Row)>>,
773    arrangement_heap_capacity: OutputSessionColumnar<'a, Update<(Row, Row)>>,
774    arrangement_heap_size: OutputSessionColumnar<'a, Update<(Row, Row)>>,
775    hydration_time: OutputSessionColumnar<'a, Update<(Row, Row)>>,
776    operator_hydration_status: OutputSessionColumnar<'a, Update<(Row, Row)>>,
777    error_count: OutputSessionColumnar<'a, Update<(Row, Row)>>,
778    lir_mapping: OutputSessionColumnar<'a, Update<(Row, Row)>>,
779    dataflow_global_ids: OutputSessionColumnar<'a, Update<(Row, Row)>>,
780}
781
782/// Event handler of the demux operator.
783struct DemuxHandler<'a, 'b, A: Scheduler> {
784    /// State kept by the demux operator.
785    state: &'a mut DemuxState<A>,
786    /// State shared across log receivers.
787    shared_state: &'a mut SharedLoggingState,
788    /// Demux output sessions.
789    output: &'a mut DemuxOutput<'b>,
790    /// The logging interval specifying the time granularity for the updates.
791    logging_interval_ms: u128,
792    /// The current event time.
793    time: Duration,
794}
795
796impl<A: Scheduler> DemuxHandler<'_, '_, A> {
797    /// Return the timestamp associated with the current event, based on the event time and the
798    /// logging interval.
799    fn ts(&self) -> Timestamp {
800        let time_ms = self.time.as_millis();
801        let interval = self.logging_interval_ms;
802        let rounded = (time_ms / interval + 1) * interval;
803        rounded.try_into().expect("must fit")
804    }
805
806    /// Handle the given compute event.
807    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
808        use ComputeEventReference::*;
809        match event {
810            Export(export) => self.handle_export(export),
811            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
812            Peek(peek) if peek.installed => self.handle_peek_install(peek),
813            Peek(peek) => self.handle_peek_retire(peek),
814            Frontier(frontier) => self.handle_frontier(frontier),
815            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
816            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
817            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
818            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
819            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
820            ArrangementHeapSizeOperatorDrop(inner) => {
821                self.handle_arrangement_heap_size_operator_dropped(inner)
822            }
823            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
824            ErrorCount(error_count) => self.handle_error_count(error_count),
825            Hydration(hydration) => self.handle_hydration(hydration),
826            OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
827            LirMapping(mapping) => self.handle_lir_mapping(mapping),
828            DataflowGlobal(global) => self.handle_dataflow_global(global),
829        }
830    }
831
832    fn handle_export(
833        &mut self,
834        ExportReference {
835            export_id,
836            dataflow_index,
837        }: Ref<'_, Export>,
838    ) {
839        let export_id = Columnar::into_owned(export_id);
840        let ts = self.ts();
841        let datum = self.state.pack_export_update(export_id, dataflow_index);
842        self.output.export.give((datum, ts, Diff::ONE));
843
844        let existing = self
845            .state
846            .exports
847            .insert(export_id, ExportState::new(dataflow_index));
848        if existing.is_some() {
849            error!(%export_id, "export already registered");
850        }
851
852        *self
853            .state
854            .dataflow_export_counts
855            .entry(dataflow_index)
856            .or_default() += 1;
857
858        // Insert hydration time logging for this export.
859        let datum = self.state.pack_hydration_time_update(export_id, None);
860        self.output.hydration_time.give((datum, ts, Diff::ONE));
861    }
862
863    fn handle_export_dropped(
864        &mut self,
865        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
866    ) {
867        let export_id = Columnar::into_owned(export_id);
868        let Some(export) = self.state.exports.remove(&export_id) else {
869            error!(%export_id, "missing exports entry at time of export drop");
870            return;
871        };
872
873        let ts = self.ts();
874        let dataflow_index = export.dataflow_index;
875
876        let datum = self.state.pack_export_update(export_id, dataflow_index);
877        self.output.export.give((datum, ts, Diff::MINUS_ONE));
878
879        match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
880            entry @ Some(0) | entry @ None => {
881                error!(
882                    %export_id,
883                    %dataflow_index,
884                    "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
885                );
886            }
887            Some(1) => self.handle_dataflow_dropped(dataflow_index),
888            Some(count) => *count -= 1,
889        }
890
891        // Remove error count logging for this export.
892        if export.error_count != Diff::ZERO {
893            let datum = self
894                .state
895                .pack_error_count_update(export_id, export.error_count);
896            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
897        }
898
899        // Remove hydration time logging for this export.
900        let datum = self
901            .state
902            .pack_hydration_time_update(export_id, export.hydration_time_ns);
903        self.output
904            .hydration_time
905            .give((datum, ts, Diff::MINUS_ONE));
906
907        // Remove operator hydration logging for this export.
908        for (lir_id, hydrated) in export.operator_hydration {
909            let datum = self
910                .state
911                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
912            self.output
913                .operator_hydration_status
914                .give((datum, ts, Diff::MINUS_ONE));
915        }
916    }
917
918    fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
919        let ts = self.ts();
920        self.state.dataflow_export_counts.remove(&dataflow_index);
921
922        if self.state.shutdown_dataflows.remove(&dataflow_index) {
923            // Dataflow has already shut down before it was dropped.
924            let datum = self.state.pack_shutdown_duration_update(0);
925            self.output.shutdown_duration.give((datum, ts, Diff::ONE));
926        } else {
927            // Dataflow has not yet shut down.
928            let existing = self
929                .state
930                .dataflow_drop_times
931                .insert(dataflow_index, self.time);
932            if existing.is_some() {
933                error!(%dataflow_index, "dataflow already dropped");
934            }
935        }
936    }
937
938    fn handle_dataflow_shutdown(
939        &mut self,
940        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
941    ) {
942        let ts = self.ts();
943
944        if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
945            // Dataflow has already been dropped.
946            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
947            let elapsed_pow = elapsed_ns.next_power_of_two();
948            let datum = self.state.pack_shutdown_duration_update(elapsed_pow);
949            self.output.shutdown_duration.give((datum, ts, Diff::ONE));
950        } else {
951            // Dataflow has not yet been dropped.
952            let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
953            if !was_new {
954                error!(%dataflow_index, "dataflow already shutdown");
955            }
956        }
957
958        // We deal with any `GlobalId` based mappings in this event.
959        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
960            for global_id in global_ids {
961                // Remove dataflow/`GlobalID` mapping.
962                let datum = self
963                    .state
964                    .pack_dataflow_global_update(dataflow_index, global_id);
965                self.output
966                    .dataflow_global_ids
967                    .give((datum, ts, Diff::MINUS_ONE));
968
969                // Remove LIR mapping.
970                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
971                    for (
972                        lir_id,
973                        LirMetadata {
974                            operator,
975                            parent_lir_id,
976                            nesting,
977                            operator_span,
978                        },
979                    ) in mappings
980                    {
981                        let datum = self.state.pack_lir_mapping_update(
982                            global_id,
983                            lir_id,
984                            operator,
985                            parent_lir_id,
986                            nesting,
987                            operator_span,
988                        );
989                        self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
990                    }
991                }
992            }
993        }
994    }
995
996    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
997        let ts = self.ts();
998        let export_id = Columnar::into_owned(export_id);
999
1000        let Some(export) = self.state.exports.get_mut(&export_id) else {
1001            // The export might have already been dropped, in which case we are no longer
1002            // interested in its errors.
1003            return;
1004        };
1005
1006        let old_count = export.error_count;
1007        let new_count = old_count + diff;
1008        export.error_count = new_count;
1009
1010        if old_count != Diff::ZERO {
1011            let datum = self.state.pack_error_count_update(export_id, old_count);
1012            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
1013        }
1014        if new_count != Diff::ZERO {
1015            let datum = self.state.pack_error_count_update(export_id, new_count);
1016            self.output.error_count.give((datum, ts, Diff::ONE));
1017        }
1018    }
1019
1020    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
1021        let ts = self.ts();
1022        let export_id = Columnar::into_owned(export_id);
1023
1024        let Some(export) = self.state.exports.get_mut(&export_id) else {
1025            error!(%export_id, "hydration event for unknown export");
1026            return;
1027        };
1028        if export.hydration_time_ns.is_some() {
1029            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
1030            // after reconciliation. We can simply ignore these.
1031            return;
1032        }
1033
1034        let duration = export.created_at.elapsed();
1035        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1036        export.hydration_time_ns = Some(nanos);
1037
1038        let retraction = self.state.pack_hydration_time_update(export_id, None);
1039        self.output
1040            .hydration_time
1041            .give((retraction, ts, Diff::MINUS_ONE));
1042        let insertion = self
1043            .state
1044            .pack_hydration_time_update(export_id, Some(nanos));
1045        self.output.hydration_time.give((insertion, ts, Diff::ONE));
1046    }
1047
1048    fn handle_operator_hydration(
1049        &mut self,
1050        OperatorHydrationReference {
1051            export_id,
1052            lir_id,
1053            hydrated,
1054        }: Ref<'_, OperatorHydration>,
1055    ) {
1056        let ts = self.ts();
1057        let export_id = Columnar::into_owned(export_id);
1058        let lir_id = Columnar::into_owned(lir_id);
1059        let hydrated = Columnar::into_owned(hydrated);
1060
1061        let Some(export) = self.state.exports.get_mut(&export_id) else {
1062            // The export might have already been dropped, in which case we are no longer
1063            // interested in its operator hydration events.
1064            return;
1065        };
1066
1067        let old_status = export.operator_hydration.get(&lir_id).copied();
1068        export.operator_hydration.insert(lir_id, hydrated);
1069
1070        if let Some(hydrated) = old_status {
1071            let retraction = self
1072                .state
1073                .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1074            self.output
1075                .operator_hydration_status
1076                .give((retraction, ts, Diff::MINUS_ONE));
1077        }
1078
1079        let insertion = self
1080            .state
1081            .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1082        self.output
1083            .operator_hydration_status
1084            .give((insertion, ts, Diff::ONE));
1085    }
1086
1087    fn handle_peek_install(
1088        &mut self,
1089        PeekEventReference {
1090            id,
1091            time,
1092            uuid,
1093            peek_type,
1094            installed: _,
1095        }: Ref<'_, PeekEvent>,
1096    ) {
1097        let id = Columnar::into_owned(id);
1098        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1099        let ts = self.ts();
1100        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1101        self.output.peek.give((datum, ts, Diff::ONE));
1102
1103        let existing = self.state.peek_stash.insert(uuid, self.time);
1104        if existing.is_some() {
1105            error!(%uuid, "peek already registered");
1106        }
1107    }
1108
1109    fn handle_peek_retire(
1110        &mut self,
1111        PeekEventReference {
1112            id,
1113            time,
1114            uuid,
1115            peek_type,
1116            installed: _,
1117        }: Ref<'_, PeekEvent>,
1118    ) {
1119        let id = Columnar::into_owned(id);
1120        let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1121        let ts = self.ts();
1122        let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1123        self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1124
1125        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1126            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1127            let bucket = elapsed_ns.next_power_of_two();
1128            let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1129            self.output.peek_duration.give((datum, ts, Diff::ONE));
1130        } else {
1131            error!(%uuid, "peek not yet registered");
1132        }
1133    }
1134
1135    fn handle_frontier(
1136        &mut self,
1137        FrontierReference {
1138            export_id,
1139            time,
1140            diff,
1141        }: Ref<'_, Frontier>,
1142    ) {
1143        let export_id = Columnar::into_owned(export_id);
1144        let diff = Diff::from(*diff);
1145        let ts = self.ts();
1146        let time = Columnar::into_owned(time);
1147        let datum = self.state.pack_frontier_update(export_id, time);
1148        self.output.frontier.give((datum, ts, diff));
1149    }
1150
1151    fn handle_import_frontier(
1152        &mut self,
1153        ImportFrontierReference {
1154            import_id,
1155            export_id,
1156            time,
1157            diff,
1158        }: Ref<'_, ImportFrontier>,
1159    ) {
1160        let import_id = Columnar::into_owned(import_id);
1161        let export_id = Columnar::into_owned(export_id);
1162        let diff = Diff::from(*diff);
1163        let ts = self.ts();
1164        let time = Columnar::into_owned(time);
1165        let datum = self
1166            .state
1167            .pack_import_frontier_update(export_id, import_id, time);
1168        self.output.import_frontier.give((datum, ts, diff));
1169    }
1170
1171    /// Update the allocation size for an arrangement.
1172    fn handle_arrangement_heap_size(
1173        &mut self,
1174        ArrangementHeapSizeReference {
1175            operator_id,
1176            delta_size,
1177        }: Ref<'_, ArrangementHeapSize>,
1178    ) {
1179        let ts = self.ts();
1180        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1181            return;
1182        };
1183
1184        state.size += delta_size;
1185
1186        let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1187        let diff = Diff::cast_from(delta_size);
1188        self.output.arrangement_heap_size.give((datum, ts, diff));
1189    }
1190
1191    /// Update the allocation capacity for an arrangement.
1192    fn handle_arrangement_heap_capacity(
1193        &mut self,
1194        ArrangementHeapCapacityReference {
1195            operator_id,
1196            delta_capacity,
1197        }: Ref<'_, ArrangementHeapCapacity>,
1198    ) {
1199        let ts = self.ts();
1200        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1201            return;
1202        };
1203
1204        state.capacity += delta_capacity;
1205
1206        let datum = self
1207            .state
1208            .pack_arrangement_heap_capacity_update(operator_id);
1209        let diff = Diff::cast_from(delta_capacity);
1210        self.output
1211            .arrangement_heap_capacity
1212            .give((datum, ts, diff));
1213    }
1214
1215    /// Update the allocation count for an arrangement.
1216    fn handle_arrangement_heap_allocations(
1217        &mut self,
1218        ArrangementHeapAllocationsReference {
1219            operator_id,
1220            delta_allocations,
1221        }: Ref<'_, ArrangementHeapAllocations>,
1222    ) {
1223        let ts = self.ts();
1224        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1225            return;
1226        };
1227
1228        state.count += delta_allocations;
1229
1230        let datum = self
1231            .state
1232            .pack_arrangement_heap_allocations_update(operator_id);
1233        let diff = Diff::cast_from(delta_allocations);
1234        self.output
1235            .arrangement_heap_allocations
1236            .give((datum, ts, diff));
1237    }
1238
1239    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1240    fn handle_arrangement_heap_size_operator(
1241        &mut self,
1242        ArrangementHeapSizeOperatorReference {
1243            operator_id,
1244            address,
1245        }: Ref<'_, ArrangementHeapSizeOperator>,
1246    ) {
1247        let activator = self
1248            .state
1249            .scheduler
1250            .activator_for(address.into_iter().collect());
1251        let existing = self
1252            .state
1253            .arrangement_size
1254            .insert(operator_id, Default::default());
1255        if existing.is_some() {
1256            error!(%operator_id, "arrangement size operator already registered");
1257        }
1258        let existing = self
1259            .shared_state
1260            .arrangement_size_activators
1261            .insert(operator_id, activator);
1262        if existing.is_some() {
1263            error!(%operator_id, "arrangement size activator already registered");
1264        }
1265    }
1266
1267    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1268    fn handle_arrangement_heap_size_operator_dropped(
1269        &mut self,
1270        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1271    ) {
1272        let operator_id = event.operator_id;
1273        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1274            let ts = self.ts();
1275            let allocations = self
1276                .state
1277                .pack_arrangement_heap_allocations_update(operator_id);
1278            let diff = -Diff::cast_from(state.count);
1279            self.output
1280                .arrangement_heap_allocations
1281                .give((allocations, ts, diff));
1282
1283            let capacity = self
1284                .state
1285                .pack_arrangement_heap_capacity_update(operator_id);
1286            let diff = -Diff::cast_from(state.capacity);
1287            self.output
1288                .arrangement_heap_capacity
1289                .give((capacity, ts, diff));
1290
1291            let size = self.state.pack_arrangement_heap_size_update(operator_id);
1292            let diff = -Diff::cast_from(state.size);
1293            self.output.arrangement_heap_size.give((size, ts, diff));
1294        }
1295        self.shared_state
1296            .arrangement_size_activators
1297            .remove(&operator_id);
1298    }
1299
1300    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1301    fn handle_lir_mapping(
1302        &mut self,
1303        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1304    ) {
1305        let global_id = Columnar::into_owned(global_id);
1306        // record the state (for the later drop)
1307        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1308        self.state
1309            .lir_mapping
1310            .entry(global_id)
1311            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1312            .or_insert_with(|| mappings().collect());
1313
1314        // send the datum out
1315        let ts = self.ts();
1316        for (lir_id, meta) in mapping.into_iter() {
1317            let datum = self.state.pack_lir_mapping_update(
1318                global_id,
1319                Columnar::into_owned(lir_id),
1320                Columnar::into_owned(meta.operator),
1321                Columnar::into_owned(meta.parent_lir_id),
1322                Columnar::into_owned(meta.nesting),
1323                Columnar::into_owned(meta.operator_span),
1324            );
1325            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1326        }
1327    }
1328
1329    fn handle_dataflow_global(
1330        &mut self,
1331        DataflowGlobalReference {
1332            dataflow_index,
1333            global_id,
1334        }: Ref<'_, DataflowGlobal>,
1335    ) {
1336        let global_id = Columnar::into_owned(global_id);
1337        self.state
1338            .dataflow_global_ids
1339            .entry(dataflow_index)
1340            .and_modify(|globals| {
1341                // NB BTreeSet::insert() returns `false` when the element was already in the set
1342                if !globals.insert(global_id) {
1343                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1344                }
1345            })
1346            .or_insert_with(|| BTreeSet::from([global_id]));
1347
1348        let ts = self.ts();
1349        let datum = self
1350            .state
1351            .pack_dataflow_global_update(dataflow_index, global_id);
1352        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1353    }
1354}
1355
1356/// Logging state maintained for a compute collection.
1357///
1358/// This type is used to produce appropriate log events in response to changes of logged collection
1359/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1360pub struct CollectionLogging {
1361    export_id: GlobalId,
1362    logger: Logger,
1363
1364    logged_frontier: Option<Timestamp>,
1365    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1366}
1367
1368impl CollectionLogging {
1369    /// Create new logging state for the identified collection and emit initial logging events.
1370    pub fn new(
1371        export_id: GlobalId,
1372        logger: Logger,
1373        dataflow_index: usize,
1374        import_ids: impl Iterator<Item = GlobalId>,
1375    ) -> Self {
1376        logger.log(&ComputeEvent::Export(Export {
1377            export_id,
1378            dataflow_index,
1379        }));
1380
1381        let mut self_ = Self {
1382            export_id,
1383            logger,
1384            logged_frontier: None,
1385            logged_import_frontiers: Default::default(),
1386        };
1387
1388        // Initialize frontier logging.
1389        let initial_frontier = Some(Timestamp::MIN);
1390        self_.set_frontier(initial_frontier);
1391        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1392
1393        self_
1394    }
1395
1396    /// Set the collection frontier to the given new time and emit corresponding logging events.
1397    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1398        let old_time = self.logged_frontier;
1399        self.logged_frontier = new_time;
1400
1401        if old_time != new_time {
1402            let export_id = self.export_id;
1403            let retraction = old_time.map(|time| {
1404                ComputeEvent::Frontier(Frontier {
1405                    export_id,
1406                    time,
1407                    diff: -1,
1408                })
1409            });
1410            let insertion = new_time.map(|time| {
1411                ComputeEvent::Frontier(Frontier {
1412                    export_id,
1413                    time,
1414                    diff: 1,
1415                })
1416            });
1417            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1418            self.logger.log_many(events);
1419        }
1420    }
1421
1422    /// Set the frontier of the given import to the given new time and emit corresponding logging
1423    /// events.
1424    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1425        let old_time = self.logged_import_frontiers.remove(&import_id);
1426        if let Some(time) = new_time {
1427            self.logged_import_frontiers.insert(import_id, time);
1428        }
1429
1430        if old_time != new_time {
1431            let export_id = self.export_id;
1432            let retraction = old_time.map(|time| {
1433                ComputeEvent::ImportFrontier(ImportFrontier {
1434                    import_id,
1435                    export_id,
1436                    time,
1437                    diff: -1,
1438                })
1439            });
1440            let insertion = new_time.map(|time| {
1441                ComputeEvent::ImportFrontier(ImportFrontier {
1442                    import_id,
1443                    export_id,
1444                    time,
1445                    diff: 1,
1446                })
1447            });
1448            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1449            self.logger.log_many(events);
1450        }
1451    }
1452
1453    /// Set the collection as hydrated.
1454    pub fn set_hydrated(&self) {
1455        self.logger.log(&ComputeEvent::Hydration(Hydration {
1456            export_id: self.export_id,
1457        }));
1458    }
1459}
1460
1461impl Drop for CollectionLogging {
1462    fn drop(&mut self) {
1463        // Emit retraction events to clean up events previously logged.
1464        self.set_frontier(None);
1465
1466        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1467        for import_id in import_ids {
1468            self.set_import_frontier(import_id, None);
1469        }
1470
1471        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1472            export_id: self.export_id,
1473        }));
1474    }
1475}
1476
1477/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1478/// batch streams.
1479pub(crate) trait LogDataflowErrors {
1480    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1481}
1482
1483impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
1484where
1485    G: Scope,
1486    D: Data,
1487{
1488    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1489        self.inner
1490            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1491                move |input, output| {
1492                    input.for_each(|cap, data| {
1493                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1494                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1495
1496                        output.session(&cap).give_container(data);
1497                    });
1498                }
1499            })
1500            .as_collection()
1501    }
1502}
1503
1504impl<G, B> LogDataflowErrors for Stream<G, B>
1505where
1506    G: Scope,
1507    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1508{
1509    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1510        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1511            move |input, output| {
1512                input.for_each(|cap, data| {
1513                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1514                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1515
1516                    output.session(&cap).give_container(data);
1517                });
1518            }
1519        })
1520    }
1521}
1522
1523/// Return the sum of all diffs within the given batch.
1524///
1525/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1526/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1527/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1528/// batches might become large.
1529fn sum_batch_diffs<B>(batch: &B) -> Diff
1530where
1531    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1532{
1533    let mut sum = Diff::ZERO;
1534    let mut cursor = batch.cursor();
1535
1536    while cursor.key_valid(batch) {
1537        while cursor.val_valid(batch) {
1538            cursor.map_times(batch, |_t, r| sum += r);
1539            cursor.step_val(batch);
1540        }
1541        cursor.step_key(batch);
1542    }
1543
1544    sum
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549    use super::*;
1550
1551    #[mz_ore::test]
1552    fn test_compute_event_size() {
1553        // This could be a static assertion, but we don't use those yet in this crate.
1554        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1555    }
1556}