mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Ref};
19use differential_dataflow::Collection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Timestamp};
25use mz_timely_util::columnar::Column;
26use mz_timely_util::columnar::builder::ColumnBuilder;
27use mz_timely_util::containers::ProvidedBuilder;
28use mz_timely_util::replay::MzReplay;
29use timely::dataflow::channels::pact::Pipeline;
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::core::Map;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::{Scope, Stream};
34use timely::scheduling::Scheduler;
35use timely::{Container, Data};
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrange;
40use crate::logging::{
41    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
42    PermutedRowPacker, SharedLoggingState, Update,
43};
44use crate::row_spine::{RowRowBatcher, RowRowBuilder};
45use crate::typedefs::RowRowSpine;
46
47/// Type alias for a logger of compute events.
48pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51/// A dataflow exports a global ID.
52#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54    /// Identifier of the export.
55    pub export_id: GlobalId,
56    /// Timely worker index of the exporting dataflow.
57    pub dataflow_index: usize,
58}
59
60/// The export for a global id was dropped.
61#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63    /// Identifier of the export.
64    pub export_id: GlobalId,
65}
66
67/// A peek event with a [`Peek`], a [`PeekType`], and an installation status.
68#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70    /// The data for the peek itself.
71    pub peek: Peek,
72    /// The relevant _type_ of peek: index or persist.
73    // Note that this is not stored on the Peek event for data-packing reasons only.
74    pub peek_type: PeekType,
75    /// True if the peek is being installed; false if it's being removed.
76    pub installed: bool,
77}
78
79/// Frontier change event.
80#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
81pub struct Frontier {
82    pub export_id: GlobalId,
83    pub time: Timestamp,
84    pub diff: i8,
85}
86
87/// An import frontier change.
88#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
89pub struct ImportFrontier {
90    pub import_id: GlobalId,
91    pub export_id: GlobalId,
92    pub time: Timestamp,
93    pub diff: i8,
94}
95
96/// A change in an arrangement's heap size.
97#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
98pub struct ArrangementHeapSize {
99    /// Operator index
100    pub operator_id: usize,
101    /// Delta of the heap size in bytes of the arrangement.
102    pub delta_size: isize,
103}
104
105/// A change in an arrangement's heap capacity.
106#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
107pub struct ArrangementHeapCapacity {
108    /// Operator index
109    pub operator_id: usize,
110    /// Delta of the heap capacity in bytes of the arrangement.
111    pub delta_capacity: isize,
112}
113
114/// A change in an arrangement's heap allocation count.
115#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
116pub struct ArrangementHeapAllocations {
117    /// Operator index
118    pub operator_id: usize,
119    /// Delta of distinct heap allocations backing the arrangement.
120    pub delta_allocations: isize,
121}
122
123/// Announcing an operator that manages an arrangement.
124#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
125pub struct ArrangementHeapSizeOperator {
126    /// Operator index
127    pub operator_id: usize,
128    /// The address of the operator.
129    pub address: Vec<usize>,
130}
131
132/// Drop event for an operator managing an arrangement.
133#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
134pub struct ArrangementHeapSizeOperatorDrop {
135    /// Operator index
136    pub operator_id: usize,
137}
138
139/// Dataflow shutdown event.
140#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
141pub struct DataflowShutdown {
142    /// Timely worker index of the dataflow.
143    pub dataflow_index: usize,
144}
145
146/// Error count update event.
147#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
148pub struct ErrorCount {
149    /// Identifier of the export.
150    pub export_id: GlobalId,
151    /// The change in error count.
152    pub diff: Diff,
153}
154
155/// An export is hydrated.
156#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
157pub struct Hydration {
158    pub export_id: GlobalId,
159}
160
161/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
162#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
163pub struct LirMapping {
164    /// The `GlobalId` in which the LIR operator is rendered.
165    ///
166    /// NB a single a dataflow may have many `GlobalId`s inside it.
167    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
168    /// tracks the many-to-one relationship between `GlobalId`s and
169    /// dataflows.
170    pub global_id: GlobalId,
171    /// The actual mapping.
172    /// Represented this way to reduce the size of `ComputeEvent`.
173    pub mapping: Vec<(LirId, LirMetadata)>,
174}
175
176/// Announce that a dataflow supports a specific global ID.
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
178pub struct DataflowGlobal {
179    /// The identifier of the dataflow.
180    pub dataflow_index: usize,
181    /// A `GlobalId` that is rendered as part of this dataflow.
182    pub global_id: GlobalId,
183}
184
185/// A logged compute event.
186#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
187pub enum ComputeEvent {
188    /// A dataflow export was created.
189    Export(Export),
190    /// A dataflow export was dropped.
191    ExportDropped(ExportDropped),
192    /// Peek command.
193    Peek(PeekEvent),
194    /// Available frontier information for dataflow exports.
195    Frontier(Frontier),
196    /// Available frontier information for dataflow imports.
197    ImportFrontier(ImportFrontier),
198    /// Arrangement heap size update
199    ArrangementHeapSize(ArrangementHeapSize),
200    /// Arrangement heap size update
201    ArrangementHeapCapacity(ArrangementHeapCapacity),
202    /// Arrangement heap size update
203    ArrangementHeapAllocations(ArrangementHeapAllocations),
204    /// Arrangement size operator address
205    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
206    /// Arrangement size operator dropped
207    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
208    /// All operators of a dataflow have shut down.
209    DataflowShutdown(DataflowShutdown),
210    /// The number of errors in a dataflow export has changed.
211    ErrorCount(ErrorCount),
212    /// A dataflow export was hydrated.
213    Hydration(Hydration),
214    /// An LIR operator was mapped to some particular dataflow operator.
215    ///
216    /// Cf. `ComputeLog::LirMaping`
217    LirMapping(LirMapping),
218    DataflowGlobal(DataflowGlobal),
219}
220
221/// A peek type distinguishing between index and persist peeks.
222#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
223pub enum PeekType {
224    /// A peek against an index.
225    Index,
226    /// A peek against persist.
227    Persist,
228}
229
230impl PeekType {
231    /// A human-readable name for a peek type.
232    fn name(self) -> &'static str {
233        match self {
234            PeekType::Index => "index",
235            PeekType::Persist => "persist",
236        }
237    }
238}
239
240/// A logged peek event.
241#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
242pub struct Peek {
243    /// The identifier of the view the peek targets.
244    id: GlobalId,
245    /// The logical timestamp requested.
246    time: Timestamp,
247    /// The ID of the peek.
248    uuid: uuid::Bytes,
249}
250
251impl Peek {
252    /// Create a new peek from its arguments.
253    pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self {
254        let uuid = uuid.into_bytes();
255        Self { id, time, uuid }
256    }
257}
258
259/// Metadata for LIR operators.
260#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
261pub struct LirMetadata {
262    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
263    operator: String,
264    /// The LIR identifier of the parent (if any).
265    parent_lir_id: Option<LirId>,
266    /// How nested the operator is (for nice indentation).
267    nesting: u8,
268    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
269    /// If `start == end`, then no operators were used.
270    operator_span: (usize, usize),
271}
272
273impl LirMetadata {
274    /// Construct a new LIR metadata object.
275    pub fn new(
276        operator: String,
277        parent_lir_id: Option<LirId>,
278        nesting: u8,
279        operator_span: (usize, usize),
280    ) -> Self {
281        Self {
282            operator,
283            parent_lir_id,
284            nesting,
285            operator_span,
286        }
287    }
288}
289
290/// The return type of the [`construct`] function.
291pub(super) struct Return {
292    /// Collections returned by [`construct`].
293    pub collections: BTreeMap<LogVariant, LogCollection>,
294}
295
296/// Constructs the logging dataflow fragment for compute logs.
297///
298/// Params
299/// * `scope`: The Timely scope hosting the log analysis dataflow.
300/// * `scheduler`: The timely scheduler to obtainer activators.
301/// * `config`: Logging configuration.
302/// * `event_queue`: The source to read compute log events from.
303/// * `compute_event_streams`: Additional compute event streams to absorb.
304/// * `shared_state`: Shared state between logging dataflow fragments.
305pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
306    mut scope: G,
307    scheduler: S,
308    config: &mz_compute_client::logging::LoggingConfig,
309    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
310    shared_state: Rc<RefCell<SharedLoggingState>>,
311) -> Return {
312    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
313
314    scope.scoped("compute logging", move |scope| {
315        let enable_logging = config.enable_logging;
316        let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
317            scope,
318            "compute logs",
319            config.interval,
320            event_queue.activator,
321            move |mut session, mut data| {
322                // If logging is disabled, we still need to install the indexes, but we can leave them
323                // empty. We do so by immediately filtering all logs events.
324                if enable_logging {
325                    session.give_container(data.to_mut())
326                }
327            },
328        );
329
330        // Build a demux operator that splits the replayed event stream up into the separate
331        // logging streams.
332        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
333        let mut input = demux.new_input(&logs, Pipeline);
334        let (mut export_out, export) = demux.new_output();
335        let (mut frontier_out, frontier) = demux.new_output();
336        let (mut import_frontier_out, import_frontier) = demux.new_output();
337        let (mut peek_out, peek) = demux.new_output();
338        let (mut peek_duration_out, peek_duration) = demux.new_output();
339        let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
340        let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
341        let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
342        let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
343            demux.new_output();
344        let (mut error_count_out, error_count) = demux.new_output();
345        let (mut hydration_time_out, hydration_time) = demux.new_output();
346        let (mut lir_mapping_out, lir_mapping) = demux.new_output();
347        let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
348
349        let mut demux_state = DemuxState::new(scheduler);
350        demux.build(move |_capability| {
351            move |_frontiers| {
352                let mut export = export_out.activate();
353                let mut frontier = frontier_out.activate();
354                let mut import_frontier = import_frontier_out.activate();
355                let mut peek = peek_out.activate();
356                let mut peek_duration = peek_duration_out.activate();
357                let mut shutdown_duration = shutdown_duration_out.activate();
358                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
359                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
360                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
361                let mut error_count = error_count_out.activate();
362                let mut hydration_time = hydration_time_out.activate();
363                let mut lir_mapping = lir_mapping_out.activate();
364                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
365
366                input.for_each(|cap, data| {
367                    let mut output_sessions = DemuxOutput {
368                        export: export.session(&cap),
369                        frontier: frontier.session(&cap),
370                        import_frontier: import_frontier.session(&cap),
371                        peek: peek.session(&cap),
372                        peek_duration: peek_duration.session(&cap),
373                        shutdown_duration: shutdown_duration.session(&cap),
374                        arrangement_heap_size: arrangement_heap_size.session(&cap),
375                        arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
376                        arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
377                        error_count: error_count.session(&cap),
378                        hydration_time: hydration_time.session(&cap),
379                        lir_mapping: lir_mapping.session_with_builder(&cap),
380                        dataflow_global_ids: dataflow_global_ids.session(&cap),
381                    };
382
383                    let shared_state = &mut shared_state.borrow_mut();
384                    for (time, event) in data.drain() {
385                        DemuxHandler {
386                            state: &mut demux_state,
387                            shared_state,
388                            output: &mut output_sessions,
389                            logging_interval_ms,
390                            time,
391                        }
392                        .handle(event);
393                    }
394                });
395            }
396        });
397
398        let worker_id = scope.index();
399
400        // Encode the contents of each logging stream into its expected `Row` format.
401        let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
402        let dataflow_current = export.as_collection().map({
403            let mut scratch = String::new();
404            move |datum| {
405                packer.pack_slice_owned(&[
406                    make_string_datum(datum.export_id, &mut scratch),
407                    Datum::UInt64(u64::cast_from(worker_id)),
408                    Datum::UInt64(u64::cast_from(datum.dataflow_index)),
409                ])
410            }
411        });
412        let mut packer = PermutedRowPacker::new(ComputeLog::FrontierCurrent);
413        let frontier_current = frontier.as_collection().map({
414            let mut scratch = String::new();
415            move |datum| {
416                packer.pack_slice_owned(&[
417                    make_string_datum(datum.export_id, &mut scratch),
418                    Datum::UInt64(u64::cast_from(worker_id)),
419                    Datum::MzTimestamp(datum.time),
420                ])
421            }
422        });
423        let mut packer = PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent);
424        let import_frontier_current = import_frontier.as_collection().map({
425            let mut scratch1 = String::new();
426            let mut scratch2 = String::new();
427            move |datum| {
428                packer.pack_slice_owned(&[
429                    make_string_datum(datum.export_id, &mut scratch1),
430                    make_string_datum(datum.import_id, &mut scratch2),
431                    Datum::UInt64(u64::cast_from(worker_id)),
432                    Datum::MzTimestamp(datum.time),
433                ])
434            }
435        });
436        let mut packer = PermutedRowPacker::new(ComputeLog::PeekCurrent);
437        let peek_current = peek.as_collection().map({
438            let mut scratch = String::new();
439            move |PeekDatum { peek, peek_type }| {
440                packer.pack_slice_owned(&[
441                    Datum::Uuid(Uuid::from_bytes(peek.uuid)),
442                    Datum::UInt64(u64::cast_from(worker_id)),
443                    make_string_datum(peek.id, &mut scratch),
444                    Datum::String(peek_type.name()),
445                    Datum::MzTimestamp(peek.time),
446                ])
447            }
448        });
449        let mut packer = PermutedRowPacker::new(ComputeLog::PeekDuration);
450        let peek_duration =
451            peek_duration
452                .as_collection()
453                .map(move |PeekDurationDatum { peek_type, bucket }| {
454                    packer.pack_slice_owned(&[
455                        Datum::UInt64(u64::cast_from(worker_id)),
456                        Datum::String(peek_type.name()),
457                        Datum::UInt64(bucket.try_into().expect("bucket too big")),
458                    ])
459                });
460        let mut packer = PermutedRowPacker::new(ComputeLog::ShutdownDuration);
461        let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
462            packer.pack_slice_owned(&[
463                Datum::UInt64(u64::cast_from(worker_id)),
464                Datum::UInt64(bucket.try_into().expect("bucket too big")),
465            ])
466        });
467
468        let arrangement_heap_datum_to_row =
469            move |packer: &mut PermutedRowPacker, ArrangementHeapDatum { operator_id }| {
470                packer.pack_slice_owned(&[
471                    Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
472                    Datum::UInt64(u64::cast_from(worker_id)),
473                ])
474            };
475
476        let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
477        let arrangement_heap_size = arrangement_heap_size
478            .as_collection()
479            .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
480
481        let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapCapacity);
482        let arrangement_heap_capacity = arrangement_heap_capacity
483            .as_collection()
484            .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
485
486        let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
487        let arrangement_heap_allocations = arrangement_heap_allocations
488            .as_collection()
489            .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
490
491        let mut packer = PermutedRowPacker::new(ComputeLog::ErrorCount);
492        let error_count = error_count.as_collection().map({
493            let mut scratch = String::new();
494            move |datum| {
495                packer.pack_slice_owned(&[
496                    make_string_datum(datum.export_id, &mut scratch),
497                    Datum::UInt64(u64::cast_from(worker_id)),
498                    Datum::Int64(datum.count.into_inner()),
499                ])
500            }
501        });
502
503        let mut packer = PermutedRowPacker::new(ComputeLog::HydrationTime);
504        let hydration_time = hydration_time.as_collection().map({
505            let mut scratch = String::new();
506            move |datum| {
507                packer.pack_slice_owned(&[
508                    make_string_datum(datum.export_id, &mut scratch),
509                    Datum::UInt64(u64::cast_from(worker_id)),
510                    Datum::from(datum.time_ns),
511                ])
512            }
513        });
514
515        let mut scratch1 = String::new();
516        let mut scratch2 = String::new();
517        let mut packer = PermutedRowPacker::new(ComputeLog::LirMapping);
518        let lir_mapping = lir_mapping
519            .map(move |(datum, time, diff)| {
520                let row = packer.pack_slice_owned(&[
521                    make_string_datum(GlobalId::into_owned(datum.global_id), &mut scratch1),
522                    Datum::UInt64(<LirId as Columnar>::into_owned(datum.lir_id).into()),
523                    Datum::UInt64(u64::cast_from(worker_id)),
524                    make_string_datum(datum.operator, &mut scratch2),
525                    datum
526                        .parent_lir_id
527                        .map(|lir_id| Datum::UInt64(LirId::into_owned(lir_id).into()))
528                        .unwrap_or_else(|| Datum::Null),
529                    Datum::UInt16(u16::cast_from(*datum.nesting)),
530                    Datum::UInt64(u64::cast_from(datum.operator_span.0)),
531                    Datum::UInt64(u64::cast_from(datum.operator_span.1)),
532                ]);
533                (row, Timestamp::into_owned(time), diff)
534            })
535            .as_collection();
536
537        let mut packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
538        let dataflow_global_ids = dataflow_global_ids.as_collection().map({
539            let mut scratch = String::new();
540            move |datum| {
541                packer.pack_slice_owned(&[
542                    Datum::UInt64(u64::cast_from(datum.dataflow_index)),
543                    Datum::UInt64(u64::cast_from(worker_id)),
544                    make_string_datum(datum.global_id, &mut scratch),
545                ])
546            }
547        });
548
549        use ComputeLog::*;
550        let logs = [
551            (DataflowCurrent, dataflow_current),
552            (FrontierCurrent, frontier_current),
553            (ImportFrontierCurrent, import_frontier_current),
554            (PeekCurrent, peek_current),
555            (PeekDuration, peek_duration),
556            (ShutdownDuration, shutdown_duration),
557            (ArrangementHeapSize, arrangement_heap_size),
558            (ArrangementHeapCapacity, arrangement_heap_capacity),
559            (ArrangementHeapAllocations, arrangement_heap_allocations),
560            (ErrorCount, error_count),
561            (HydrationTime, hydration_time),
562            (LirMapping, lir_mapping),
563            (DataflowGlobal, dataflow_global_ids),
564        ];
565
566        // Build the output arrangements.
567        let mut collections = BTreeMap::new();
568        for (variant, collection) in logs {
569            let variant = LogVariant::Compute(variant);
570            if config.index_logs.contains_key(&variant) {
571                let trace = collection
572                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
573                        &format!("Arrange {variant:?}"),
574                    )
575                    .trace;
576                let collection = LogCollection {
577                    trace,
578                    token: Rc::clone(&token),
579                };
580                collections.insert(variant, collection);
581            }
582        }
583
584        Return { collections }
585    })
586}
587
588/// Format the given value and pack it into a `Datum::String`.
589///
590/// The `scratch` buffer is used to perform the string conversion without an allocation.
591/// Callers should not assume anything about the contents of this buffer after this function
592/// returns.
593fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
594where
595    V: Display,
596{
597    scratch.clear();
598    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
599    Datum::String(scratch)
600}
601
602/// State maintained by the demux operator.
603struct DemuxState<A> {
604    /// The timely scheduler.
605    scheduler: A,
606    /// State tracked per dataflow export.
607    exports: BTreeMap<GlobalId, ExportState>,
608    /// Maps live dataflows to counts of their exports.
609    dataflow_export_counts: BTreeMap<usize, u32>,
610    /// Maps dropped dataflows to their drop time.
611    dataflow_drop_times: BTreeMap<usize, Duration>,
612    /// Contains dataflows that have shut down but not yet been dropped.
613    shutdown_dataflows: BTreeSet<usize>,
614    /// Maps pending peeks to their installation time.
615    peek_stash: BTreeMap<Uuid, Duration>,
616    /// Arrangement size stash.
617    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
618    /// LIR -> operator span mapping.
619    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
620    /// Dataflow -> `GlobalId` mapping (many-to-one).
621    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
622}
623
624impl<A: Scheduler> DemuxState<A> {
625    fn new(scheduler: A) -> Self {
626        Self {
627            scheduler,
628            exports: Default::default(),
629            dataflow_export_counts: Default::default(),
630            dataflow_drop_times: Default::default(),
631            shutdown_dataflows: Default::default(),
632            peek_stash: Default::default(),
633            arrangement_size: Default::default(),
634            lir_mapping: Default::default(),
635            dataflow_global_ids: Default::default(),
636        }
637    }
638}
639
640/// State tracked for each dataflow export.
641struct ExportState {
642    /// The ID of the dataflow maintaining this export.
643    dataflow_index: usize,
644    /// Number of errors in this export.
645    ///
646    /// This must be a signed integer, since per-worker error counts can be negative, only the
647    /// cross-worker total has to sum up to a non-negative value.
648    error_count: Diff,
649    /// When this export was created.
650    created_at: Instant,
651    /// Whether the exported collection is hydrated.
652    hydration_time_ns: Option<u64>,
653}
654
655impl ExportState {
656    fn new(dataflow_index: usize) -> Self {
657        Self {
658            dataflow_index,
659            error_count: Diff::ZERO,
660            created_at: Instant::now(),
661            hydration_time_ns: None,
662        }
663    }
664}
665
666/// State for tracking arrangement sizes.
667#[derive(Default, Debug)]
668struct ArrangementSizeState {
669    size: isize,
670    capacity: isize,
671    count: isize,
672}
673
674/// Bundled output sessions used by the demux operator.
675struct DemuxOutput<'a> {
676    export: OutputSessionVec<'a, Update<ExportDatum>>,
677    frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
678    import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
679    peek: OutputSessionVec<'a, Update<PeekDatum>>,
680    peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
681    shutdown_duration: OutputSessionVec<'a, Update<u128>>,
682    arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
683    arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
684    arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
685    hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
686    error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
687    lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
688    dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
689}
690
691#[derive(Clone)]
692struct ExportDatum {
693    export_id: GlobalId,
694    dataflow_index: usize,
695}
696
697#[derive(Clone)]
698struct FrontierDatum {
699    export_id: GlobalId,
700    time: Timestamp,
701}
702
703#[derive(Clone)]
704struct ImportFrontierDatum {
705    export_id: GlobalId,
706    import_id: GlobalId,
707    time: Timestamp,
708}
709
710#[derive(Clone)]
711struct PeekDatum {
712    peek: Peek,
713    peek_type: PeekType,
714}
715
716#[derive(Clone)]
717struct PeekDurationDatum {
718    peek_type: PeekType,
719    bucket: u128,
720}
721
722#[derive(Clone, Copy)]
723struct ArrangementHeapDatum {
724    operator_id: usize,
725}
726
727#[derive(Clone)]
728struct HydrationTimeDatum {
729    export_id: GlobalId,
730    time_ns: Option<u64>,
731}
732
733#[derive(Clone)]
734struct ErrorCountDatum {
735    export_id: GlobalId,
736    // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
737    // per-worker error count might be negative and at the SQL level having negative multiplicities
738    // is treated as an error.
739    count: Diff,
740}
741
742#[derive(Clone, Columnar)]
743struct LirMappingDatum {
744    global_id: GlobalId,
745    lir_id: LirId,
746    operator: String,
747    parent_lir_id: Option<LirId>,
748    nesting: u8,
749    operator_span: (usize, usize),
750}
751
752#[derive(Clone)]
753struct DataflowGlobalDatum {
754    dataflow_index: usize,
755    global_id: GlobalId,
756}
757
758/// Event handler of the demux operator.
759struct DemuxHandler<'a, 'b, A: Scheduler> {
760    /// State kept by the demux operator.
761    state: &'a mut DemuxState<A>,
762    /// State shared across log receivers.
763    shared_state: &'a mut SharedLoggingState,
764    /// Demux output sessions.
765    output: &'a mut DemuxOutput<'b>,
766    /// The logging interval specifying the time granularity for the updates.
767    logging_interval_ms: u128,
768    /// The current event time.
769    time: Duration,
770}
771
772impl<A: Scheduler> DemuxHandler<'_, '_, A> {
773    /// Return the timestamp associated with the current event, based on the event time and the
774    /// logging interval.
775    fn ts(&self) -> Timestamp {
776        let time_ms = self.time.as_millis();
777        let interval = self.logging_interval_ms;
778        let rounded = (time_ms / interval + 1) * interval;
779        rounded.try_into().expect("must fit")
780    }
781
782    /// Handle the given compute event.
783    fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
784        use ComputeEventReference::*;
785        match event {
786            Export(export) => self.handle_export(export),
787            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
788            Peek(peek) if peek.installed => self.handle_peek_install(peek),
789            Peek(peek) => self.handle_peek_retire(peek),
790            Frontier(frontier) => self.handle_frontier(frontier),
791            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
792            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
793            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
794            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
795            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
796            ArrangementHeapSizeOperatorDrop(inner) => {
797                self.handle_arrangement_heap_size_operator_dropped(inner)
798            }
799            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
800            ErrorCount(error_count) => self.handle_error_count(error_count),
801            Hydration(hydration) => self.handle_hydration(hydration),
802            LirMapping(mapping) => self.handle_lir_mapping(mapping),
803            DataflowGlobal(global) => self.handle_dataflow_global(global),
804        }
805    }
806
807    fn handle_export(
808        &mut self,
809        ExportReference {
810            export_id,
811            dataflow_index,
812        }: Ref<'_, Export>,
813    ) {
814        let export_id = Columnar::into_owned(export_id);
815        let ts = self.ts();
816        let datum = ExportDatum {
817            export_id,
818            dataflow_index,
819        };
820        self.output.export.give((datum, ts, Diff::ONE));
821
822        let existing = self
823            .state
824            .exports
825            .insert(export_id, ExportState::new(dataflow_index));
826        if existing.is_some() {
827            error!(%export_id, "export already registered");
828        }
829
830        *self
831            .state
832            .dataflow_export_counts
833            .entry(dataflow_index)
834            .or_default() += 1;
835
836        // Insert hydration time logging for this export.
837        let datum = HydrationTimeDatum {
838            export_id,
839            time_ns: None,
840        };
841        self.output.hydration_time.give((datum, ts, Diff::ONE));
842    }
843
844    fn handle_export_dropped(
845        &mut self,
846        ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
847    ) {
848        let export_id = Columnar::into_owned(export_id);
849        let Some(export) = self.state.exports.remove(&export_id) else {
850            error!(%export_id, "missing exports entry at time of export drop");
851            return;
852        };
853
854        let ts = self.ts();
855        let dataflow_index = export.dataflow_index;
856
857        let datum = ExportDatum {
858            export_id,
859            dataflow_index,
860        };
861        self.output.export.give((datum, ts, Diff::MINUS_ONE));
862
863        match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
864            entry @ Some(0) | entry @ None => {
865                error!(
866                    %export_id,
867                    %dataflow_index,
868                    "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
869                );
870            }
871            Some(1) => self.handle_dataflow_dropped(dataflow_index),
872            Some(count) => *count -= 1,
873        }
874
875        // Remove error count logging for this export.
876        if export.error_count != Diff::ZERO {
877            let datum = ErrorCountDatum {
878                export_id,
879                count: export.error_count,
880            };
881            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
882        }
883
884        // Remove hydration time logging for this export.
885        let datum = HydrationTimeDatum {
886            export_id,
887            time_ns: export.hydration_time_ns,
888        };
889        self.output
890            .hydration_time
891            .give((datum, ts, Diff::MINUS_ONE));
892    }
893
894    fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
895        self.state.dataflow_export_counts.remove(&dataflow_index);
896
897        if self.state.shutdown_dataflows.remove(&dataflow_index) {
898            // Dataflow has already shut down before it was dropped.
899            self.output
900                .shutdown_duration
901                .give((0, self.ts(), Diff::ONE));
902        } else {
903            // Dataflow has not yet shut down.
904            let existing = self
905                .state
906                .dataflow_drop_times
907                .insert(dataflow_index, self.time);
908            if existing.is_some() {
909                error!(%dataflow_index, "dataflow already dropped");
910            }
911        }
912    }
913
914    fn handle_dataflow_shutdown(
915        &mut self,
916        DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
917    ) {
918        let ts = self.ts();
919
920        if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
921            // Dataflow has already been dropped.
922            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
923            let elapsed_pow = elapsed_ns.next_power_of_two();
924            self.output
925                .shutdown_duration
926                .give((elapsed_pow, ts, Diff::ONE));
927        } else {
928            // Dataflow has not yet been dropped.
929            let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
930            if !was_new {
931                error!(%dataflow_index, "dataflow already shutdown");
932            }
933        }
934
935        // We deal with any `GlobalId` based mappings in this event.
936        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
937            for global_id in global_ids {
938                // Remove dataflow/`GlobalID` mapping.
939                let datum = DataflowGlobalDatum {
940                    dataflow_index,
941                    global_id,
942                };
943                self.output
944                    .dataflow_global_ids
945                    .give((datum, ts, Diff::MINUS_ONE));
946
947                // Remove LIR mapping.
948                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
949                    for (
950                        lir_id,
951                        LirMetadata {
952                            operator,
953                            parent_lir_id,
954                            nesting,
955                            operator_span,
956                        },
957                    ) in mappings
958                    {
959                        let datum = LirMappingDatum {
960                            global_id,
961                            lir_id,
962                            operator,
963                            parent_lir_id,
964                            nesting,
965                            operator_span,
966                        };
967                        self.output.lir_mapping.give(&(datum, ts, Diff::MINUS_ONE));
968                    }
969                }
970            }
971        }
972    }
973
974    fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
975        let ts = self.ts();
976        let export_id = Columnar::into_owned(export_id);
977
978        let Some(export) = self.state.exports.get_mut(&export_id) else {
979            // The export might have already been dropped, in which case we are no longer
980            // interested in its errors.
981            return;
982        };
983
984        let old_count = export.error_count;
985        let new_count = old_count + diff;
986
987        if old_count != Diff::ZERO {
988            let datum = ErrorCountDatum {
989                export_id,
990                count: old_count,
991            };
992            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
993        }
994        if new_count != Diff::ZERO {
995            let datum = ErrorCountDatum {
996                export_id,
997                count: new_count,
998            };
999            self.output.error_count.give((datum, ts, Diff::ONE));
1000        }
1001
1002        export.error_count = new_count;
1003    }
1004
1005    fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
1006        let ts = self.ts();
1007        let export_id = Columnar::into_owned(export_id);
1008
1009        let Some(export) = self.state.exports.get_mut(&export_id) else {
1010            error!(%export_id, "hydration event for unknown export");
1011            return;
1012        };
1013        if export.hydration_time_ns.is_some() {
1014            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
1015            // after reconciliation. We can simply ignore these.
1016            return;
1017        }
1018
1019        let duration = export.created_at.elapsed();
1020        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1021
1022        let retraction = HydrationTimeDatum {
1023            export_id,
1024            time_ns: None,
1025        };
1026        let insertion = HydrationTimeDatum {
1027            export_id,
1028            time_ns: Some(nanos),
1029        };
1030        self.output
1031            .hydration_time
1032            .give((retraction, ts, Diff::MINUS_ONE));
1033        self.output.hydration_time.give((insertion, ts, Diff::ONE));
1034
1035        export.hydration_time_ns = Some(nanos);
1036    }
1037
1038    fn handle_peek_install(
1039        &mut self,
1040        PeekEventReference {
1041            peek,
1042            peek_type,
1043            installed: _,
1044        }: Ref<'_, PeekEvent>,
1045    ) {
1046        let peek = Peek::into_owned(peek);
1047        let uuid = Uuid::from_bytes(peek.uuid);
1048        let ts = self.ts();
1049        self.output
1050            .peek
1051            .give((PeekDatum { peek, peek_type }, ts, Diff::ONE));
1052
1053        let existing = self.state.peek_stash.insert(uuid, self.time);
1054        if existing.is_some() {
1055            error!(%uuid, "peek already registered");
1056        }
1057    }
1058
1059    fn handle_peek_retire(
1060        &mut self,
1061        PeekEventReference {
1062            peek,
1063            peek_type,
1064            installed: _,
1065        }: Ref<'_, PeekEvent>,
1066    ) {
1067        let peek = Peek::into_owned(peek);
1068        let uuid = Uuid::from_bytes(peek.uuid);
1069        let ts = self.ts();
1070        self.output
1071            .peek
1072            .give((PeekDatum { peek, peek_type }, ts, Diff::MINUS_ONE));
1073
1074        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1075            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1076            let bucket = elapsed_ns.next_power_of_two();
1077            self.output.peek_duration.give((
1078                PeekDurationDatum { peek_type, bucket },
1079                ts,
1080                Diff::ONE,
1081            ));
1082        } else {
1083            error!(%uuid, "peek not yet registered");
1084        }
1085    }
1086
1087    fn handle_frontier(
1088        &mut self,
1089        FrontierReference {
1090            export_id,
1091            time,
1092            diff,
1093        }: Ref<'_, Frontier>,
1094    ) {
1095        let export_id = Columnar::into_owned(export_id);
1096        let diff = Diff::from(*diff);
1097        let ts = self.ts();
1098        let time = Columnar::into_owned(time);
1099        let datum = FrontierDatum { export_id, time };
1100        self.output.frontier.give((datum, ts, diff));
1101    }
1102
1103    fn handle_import_frontier(
1104        &mut self,
1105        ImportFrontierReference {
1106            import_id,
1107            export_id,
1108            time,
1109            diff,
1110        }: Ref<'_, ImportFrontier>,
1111    ) {
1112        let import_id = Columnar::into_owned(import_id);
1113        let export_id = Columnar::into_owned(export_id);
1114        let ts = self.ts();
1115        let time = Columnar::into_owned(time);
1116        let datum = ImportFrontierDatum {
1117            export_id,
1118            import_id,
1119            time,
1120        };
1121        self.output
1122            .import_frontier
1123            .give((datum, ts, (*diff).into()));
1124    }
1125
1126    /// Update the allocation size for an arrangement.
1127    fn handle_arrangement_heap_size(
1128        &mut self,
1129        ArrangementHeapSizeReference {
1130            operator_id,
1131            delta_size,
1132        }: Ref<'_, ArrangementHeapSize>,
1133    ) {
1134        let ts = self.ts();
1135        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1136            return;
1137        };
1138
1139        let datum = ArrangementHeapDatum { operator_id };
1140        self.output
1141            .arrangement_heap_size
1142            .give((datum, ts, Diff::cast_from(delta_size)));
1143
1144        state.size += delta_size;
1145    }
1146
1147    /// Update the allocation capacity for an arrangement.
1148    fn handle_arrangement_heap_capacity(
1149        &mut self,
1150        ArrangementHeapCapacityReference {
1151            operator_id,
1152            delta_capacity,
1153        }: Ref<'_, ArrangementHeapCapacity>,
1154    ) {
1155        let ts = self.ts();
1156        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1157            return;
1158        };
1159
1160        let datum = ArrangementHeapDatum { operator_id };
1161        self.output
1162            .arrangement_heap_capacity
1163            .give((datum, ts, Diff::cast_from(delta_capacity)));
1164
1165        state.capacity += delta_capacity;
1166    }
1167
1168    /// Update the allocation count for an arrangement.
1169    fn handle_arrangement_heap_allocations(
1170        &mut self,
1171        ArrangementHeapAllocationsReference {
1172            operator_id,
1173            delta_allocations,
1174        }: Ref<'_, ArrangementHeapAllocations>,
1175    ) {
1176        let ts = self.ts();
1177        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1178            return;
1179        };
1180
1181        let datum = ArrangementHeapDatum { operator_id };
1182        let diff = Diff::cast_from(delta_allocations);
1183        self.output
1184            .arrangement_heap_allocations
1185            .give((datum, ts, diff));
1186
1187        state.count += delta_allocations;
1188    }
1189
1190    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1191    fn handle_arrangement_heap_size_operator(
1192        &mut self,
1193        ArrangementHeapSizeOperatorReference {
1194            operator_id,
1195            address,
1196        }: Ref<'_, ArrangementHeapSizeOperator>,
1197    ) {
1198        let activator = self
1199            .state
1200            .scheduler
1201            .activator_for(address.into_iter().collect());
1202        let existing = self
1203            .state
1204            .arrangement_size
1205            .insert(operator_id, Default::default());
1206        if existing.is_some() {
1207            error!(%operator_id, "arrangement size operator already registered");
1208        }
1209        let existing = self
1210            .shared_state
1211            .arrangement_size_activators
1212            .insert(operator_id, activator);
1213        if existing.is_some() {
1214            error!(%operator_id, "arrangement size activator already registered");
1215        }
1216    }
1217
1218    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1219    fn handle_arrangement_heap_size_operator_dropped(
1220        &mut self,
1221        event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1222    ) {
1223        let operator_id = event.operator_id;
1224        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1225            let ts = self.ts();
1226            let datum = ArrangementHeapDatum { operator_id };
1227
1228            let diff = -Diff::cast_from(state.size);
1229            self.output.arrangement_heap_size.give((datum, ts, diff));
1230
1231            let diff = -Diff::cast_from(state.capacity);
1232            self.output
1233                .arrangement_heap_capacity
1234                .give((datum, ts, diff));
1235
1236            let diff = -Diff::cast_from(state.count);
1237            self.output
1238                .arrangement_heap_allocations
1239                .give((datum, ts, diff));
1240        }
1241        self.shared_state
1242            .arrangement_size_activators
1243            .remove(&operator_id);
1244    }
1245
1246    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1247    fn handle_lir_mapping(
1248        &mut self,
1249        LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1250    ) {
1251        let global_id = Columnar::into_owned(global_id);
1252        // record the state (for the later drop)
1253        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1254        self.state
1255            .lir_mapping
1256            .entry(global_id)
1257            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1258            .or_insert_with(|| mappings().collect());
1259
1260        // send the datum out
1261        let ts = self.ts();
1262        for (lir_id, meta) in mapping.into_iter() {
1263            let datum = LirMappingDatumReference {
1264                global_id,
1265                lir_id,
1266                operator: meta.operator,
1267                parent_lir_id: meta.parent_lir_id,
1268                nesting: meta.nesting,
1269                operator_span: meta.operator_span,
1270            };
1271            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1272        }
1273    }
1274
1275    fn handle_dataflow_global(
1276        &mut self,
1277        DataflowGlobalReference {
1278            dataflow_index,
1279            global_id,
1280        }: Ref<'_, DataflowGlobal>,
1281    ) {
1282        let global_id = Columnar::into_owned(global_id);
1283        self.state
1284            .dataflow_global_ids
1285            .entry(dataflow_index)
1286            .and_modify(|globals| {
1287                // NB BTreeSet::insert() returns `false` when the element was already in the set
1288                if !globals.insert(global_id) {
1289                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1290                }
1291            })
1292            .or_insert_with(|| BTreeSet::from([global_id]));
1293
1294        let ts = self.ts();
1295        let datum = DataflowGlobalDatum {
1296            dataflow_index,
1297            global_id,
1298        };
1299        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1300    }
1301}
1302
1303/// Logging state maintained for a compute collection.
1304///
1305/// This type is used to produce appropriate log events in response to changes of logged collection
1306/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1307pub struct CollectionLogging {
1308    export_id: GlobalId,
1309    logger: Logger,
1310
1311    logged_frontier: Option<Timestamp>,
1312    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1313}
1314
1315impl CollectionLogging {
1316    /// Create new logging state for the identified collection and emit initial logging events.
1317    pub fn new(
1318        export_id: GlobalId,
1319        logger: Logger,
1320        dataflow_index: usize,
1321        import_ids: impl Iterator<Item = GlobalId>,
1322    ) -> Self {
1323        logger.log(&ComputeEvent::Export(Export {
1324            export_id,
1325            dataflow_index,
1326        }));
1327
1328        let mut self_ = Self {
1329            export_id,
1330            logger,
1331            logged_frontier: None,
1332            logged_import_frontiers: Default::default(),
1333        };
1334
1335        // Initialize frontier logging.
1336        let initial_frontier = Some(Timestamp::MIN);
1337        self_.set_frontier(initial_frontier);
1338        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1339
1340        self_
1341    }
1342
1343    /// Set the collection frontier to the given new time and emit corresponding logging events.
1344    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1345        let old_time = self.logged_frontier;
1346        self.logged_frontier = new_time;
1347
1348        if old_time != new_time {
1349            let export_id = self.export_id;
1350            let retraction = old_time.map(|time| {
1351                ComputeEvent::Frontier(Frontier {
1352                    export_id,
1353                    time,
1354                    diff: -1,
1355                })
1356            });
1357            let insertion = new_time.map(|time| {
1358                ComputeEvent::Frontier(Frontier {
1359                    export_id,
1360                    time,
1361                    diff: 1,
1362                })
1363            });
1364            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1365            self.logger.log_many(events);
1366        }
1367    }
1368
1369    /// Set the frontier of the given import to the given new time and emit corresponding logging
1370    /// events.
1371    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1372        let old_time = self.logged_import_frontiers.remove(&import_id);
1373        if let Some(time) = new_time {
1374            self.logged_import_frontiers.insert(import_id, time);
1375        }
1376
1377        if old_time != new_time {
1378            let export_id = self.export_id;
1379            let retraction = old_time.map(|time| {
1380                ComputeEvent::ImportFrontier(ImportFrontier {
1381                    import_id,
1382                    export_id,
1383                    time,
1384                    diff: -1,
1385                })
1386            });
1387            let insertion = new_time.map(|time| {
1388                ComputeEvent::ImportFrontier(ImportFrontier {
1389                    import_id,
1390                    export_id,
1391                    time,
1392                    diff: 1,
1393                })
1394            });
1395            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1396            self.logger.log_many(events);
1397        }
1398    }
1399
1400    /// Set the collection as hydrated.
1401    pub fn set_hydrated(&self) {
1402        self.logger.log(&ComputeEvent::Hydration(Hydration {
1403            export_id: self.export_id,
1404        }));
1405    }
1406}
1407
1408impl Drop for CollectionLogging {
1409    fn drop(&mut self) {
1410        // Emit retraction events to clean up events previously logged.
1411        self.set_frontier(None);
1412
1413        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1414        for import_id in import_ids {
1415            self.set_import_frontier(import_id, None);
1416        }
1417
1418        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1419            export_id: self.export_id,
1420        }));
1421    }
1422}
1423
1424/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1425/// batch streams.
1426pub(crate) trait LogDataflowErrors {
1427    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1428}
1429
1430impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
1431where
1432    G: Scope,
1433    D: Data,
1434{
1435    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1436        self.inner
1437            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1438                move |input, output| {
1439                    input.for_each(|cap, data| {
1440                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1441                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1442
1443                        output.session(&cap).give_container(data);
1444                    });
1445                }
1446            })
1447            .as_collection()
1448    }
1449}
1450
1451impl<G, B> LogDataflowErrors for Stream<G, B>
1452where
1453    G: Scope,
1454    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1455{
1456    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1457        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1458            move |input, output| {
1459                input.for_each(|cap, data| {
1460                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1461                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1462
1463                    output.session(&cap).give_container(data);
1464                });
1465            }
1466        })
1467    }
1468}
1469
1470/// Return the sum of all diffs within the given batch.
1471///
1472/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1473/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1474/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1475/// batches might become large.
1476fn sum_batch_diffs<B>(batch: &B) -> Diff
1477where
1478    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1479{
1480    let mut sum = Diff::ZERO;
1481    let mut cursor = batch.cursor();
1482
1483    while cursor.key_valid(batch) {
1484        while cursor.val_valid(batch) {
1485            cursor.map_times(batch, |_t, r| sum += r);
1486            cursor.step_val(batch);
1487        }
1488        cursor.step_key(batch);
1489    }
1490
1491    sum
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496    use super::*;
1497
1498    #[mz_ore::test]
1499    fn test_compute_event_size() {
1500        // This could be a static assertion, but we don't use those yet in this crate.
1501        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1502    }
1503}