mz_compute/logging/
compute.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by clusterd.
11
12use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::Columnar;
19use differential_dataflow::Collection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Timestamp};
25use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
26use mz_timely_util::replay::MzReplay;
27use timely::dataflow::channels::pact::Pipeline;
28use timely::dataflow::operators::Operator;
29use timely::dataflow::operators::core::Map;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::dataflow::{Scope, Stream};
32use timely::scheduling::Scheduler;
33use timely::{Container, Data};
34use tracing::error;
35use uuid::Uuid;
36
37use crate::extensions::arrange::MzArrange;
38use crate::logging::{
39    ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
40    PermutedRowPacker, SharedLoggingState, Update,
41};
42use crate::row_spine::{RowRowBatcher, RowRowBuilder};
43use crate::typedefs::RowRowSpine;
44
45/// Type alias for a logger of compute events.
46pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
47pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
48
49/// A dataflow exports a global ID.
50#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
51pub struct Export {
52    /// Identifier of the export.
53    pub export_id: GlobalId,
54    /// Timely worker index of the exporting dataflow.
55    pub dataflow_index: usize,
56}
57
58/// The export for a global id was dropped.
59#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
60pub struct ExportDropped {
61    /// Identifier of the export.
62    pub export_id: GlobalId,
63}
64
65/// A peek event with a [`Peek`], a [`PeekType`], and an installation status.
66#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
67pub struct PeekEvent {
68    /// The data for the peek itself.
69    pub peek: Peek,
70    /// The relevant _type_ of peek: index or persist.
71    // Note that this is not stored on the Peek event for data-packing reasons only.
72    pub peek_type: PeekType,
73    /// True if the peek is being installed; false if it's being removed.
74    pub installed: bool,
75}
76
77/// Frontier change event.
78#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
79pub struct Frontier {
80    pub export_id: GlobalId,
81    pub time: Timestamp,
82    pub diff: i8,
83}
84
85/// An import frontier change.
86#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
87pub struct ImportFrontier {
88    pub import_id: GlobalId,
89    pub export_id: GlobalId,
90    pub time: Timestamp,
91    pub diff: i8,
92}
93
94/// A change in an arrangement's heap size.
95#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
96pub struct ArrangementHeapSize {
97    /// Operator index
98    pub operator_id: usize,
99    /// Delta of the heap size in bytes of the arrangement.
100    pub delta_size: isize,
101}
102
103/// A change in an arrangement's heap capacity.
104#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
105pub struct ArrangementHeapCapacity {
106    /// Operator index
107    pub operator_id: usize,
108    /// Delta of the heap capacity in bytes of the arrangement.
109    pub delta_capacity: isize,
110}
111
112/// A change in an arrangement's heap allocation count.
113#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
114pub struct ArrangementHeapAllocations {
115    /// Operator index
116    pub operator_id: usize,
117    /// Delta of distinct heap allocations backing the arrangement.
118    pub delta_allocations: isize,
119}
120
121/// Announcing an operator that manages an arrangement.
122#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
123pub struct ArrangementHeapSizeOperator {
124    /// Operator index
125    pub operator_id: usize,
126    /// The address of the operator.
127    pub address: Vec<usize>,
128}
129
130/// Drop event for an operator managing an arrangement.
131#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
132pub struct ArrangementHeapSizeOperatorDrop {
133    /// Operator index
134    pub operator_id: usize,
135}
136
137/// Dataflow shutdown event.
138#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
139pub struct DataflowShutdown {
140    /// Timely worker index of the dataflow.
141    pub dataflow_index: usize,
142}
143
144/// Error count update event.
145#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
146pub struct ErrorCount {
147    /// Identifier of the export.
148    pub export_id: GlobalId,
149    /// The change in error count.
150    pub diff: Diff,
151}
152
153/// An export is hydrated.
154#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
155pub struct Hydration {
156    pub export_id: GlobalId,
157}
158
159/// Announce a mapping of an LIR operator to a dataflow operator for a global ID.
160#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct LirMapping {
162    /// The `GlobalId` in which the LIR operator is rendered.
163    ///
164    /// NB a single a dataflow may have many `GlobalId`s inside it.
165    /// A separate mapping (using `ComputeEvent::DataflowGlobal`)
166    /// tracks the many-to-one relationship between `GlobalId`s and
167    /// dataflows.
168    pub global_id: GlobalId,
169    /// The actual mapping.
170    /// Represented this way to reduce the size of `ComputeEvent`.
171    pub mapping: Vec<(LirId, LirMetadata)>,
172}
173
174/// Announce that a dataflow supports a specific global ID.
175#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
176pub struct DataflowGlobal {
177    /// The identifier of the dataflow.
178    pub dataflow_index: usize,
179    /// A `GlobalId` that is rendered as part of this dataflow.
180    pub global_id: GlobalId,
181}
182
183/// A logged compute event.
184#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
185pub enum ComputeEvent {
186    /// A dataflow export was created.
187    Export(Export),
188    /// A dataflow export was dropped.
189    ExportDropped(ExportDropped),
190    /// Peek command.
191    Peek(PeekEvent),
192    /// Available frontier information for dataflow exports.
193    Frontier(Frontier),
194    /// Available frontier information for dataflow imports.
195    ImportFrontier(ImportFrontier),
196    /// Arrangement heap size update
197    ArrangementHeapSize(ArrangementHeapSize),
198    /// Arrangement heap size update
199    ArrangementHeapCapacity(ArrangementHeapCapacity),
200    /// Arrangement heap size update
201    ArrangementHeapAllocations(ArrangementHeapAllocations),
202    /// Arrangement size operator address
203    ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
204    /// Arrangement size operator dropped
205    ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
206    /// All operators of a dataflow have shut down.
207    DataflowShutdown(DataflowShutdown),
208    /// The number of errors in a dataflow export has changed.
209    ErrorCount(ErrorCount),
210    /// A dataflow export was hydrated.
211    Hydration(Hydration),
212    /// An LIR operator was mapped to some particular dataflow operator.
213    ///
214    /// Cf. `ComputeLog::LirMaping`
215    LirMapping(LirMapping),
216    DataflowGlobal(DataflowGlobal),
217}
218
219/// A peek type distinguishing between index and persist peeks.
220#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
221pub enum PeekType {
222    /// A peek against an index.
223    Index,
224    /// A peek against persist.
225    Persist,
226}
227
228impl PeekType {
229    /// A human-readable name for a peek type.
230    fn name(self) -> &'static str {
231        match self {
232            PeekType::Index => "index",
233            PeekType::Persist => "persist",
234        }
235    }
236}
237
238/// A logged peek event.
239#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
240pub struct Peek {
241    /// The identifier of the view the peek targets.
242    id: GlobalId,
243    /// The logical timestamp requested.
244    time: Timestamp,
245    /// The ID of the peek.
246    uuid: uuid::Bytes,
247}
248
249impl Peek {
250    /// Create a new peek from its arguments.
251    pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self {
252        let uuid = uuid.into_bytes();
253        Self { id, time, uuid }
254    }
255}
256
257/// Metadata for LIR operators.
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
259pub struct LirMetadata {
260    /// The LIR operator, as a string (see `FlatPlanNode::humanize`).
261    operator: String,
262    /// The LIR identifier of the parent (if any).
263    parent_lir_id: Option<LirId>,
264    /// How nested the operator is (for nice indentation).
265    nesting: u8,
266    /// The dataflow operator ids, given as start (inclusive) and end (exclusive).
267    /// If `start == end`, then no operators were used.
268    operator_span: (usize, usize),
269}
270
271impl LirMetadata {
272    /// Construct a new LIR metadata object.
273    pub fn new(
274        operator: String,
275        parent_lir_id: Option<LirId>,
276        nesting: u8,
277        operator_span: (usize, usize),
278    ) -> Self {
279        Self {
280            operator,
281            parent_lir_id,
282            nesting,
283            operator_span,
284        }
285    }
286}
287
288/// The return type of the [`construct`] function.
289pub(super) struct Return {
290    /// Collections returned by [`construct`].
291    pub collections: BTreeMap<LogVariant, LogCollection>,
292}
293
294/// Constructs the logging dataflow fragment for compute logs.
295///
296/// Params
297/// * `scope`: The Timely scope hosting the log analysis dataflow.
298/// * `scheduler`: The timely scheduler to obtainer activators.
299/// * `config`: Logging configuration.
300/// * `event_queue`: The source to read compute log events from.
301/// * `compute_event_streams`: Additional compute event streams to absorb.
302/// * `shared_state`: Shared state between logging dataflow fragments.
303pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304    mut scope: G,
305    scheduler: S,
306    config: &mz_compute_client::logging::LoggingConfig,
307    event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308    shared_state: Rc<RefCell<SharedLoggingState>>,
309) -> Return {
310    let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
311
312    scope.scoped("compute logging", move |scope| {
313        let enable_logging = config.enable_logging;
314        let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
315            scope,
316            "compute logs",
317            config.interval,
318            event_queue.activator,
319            move |mut session, mut data| {
320                // If logging is disabled, we still need to install the indexes, but we can leave them
321                // empty. We do so by immediately filtering all logs events.
322                if enable_logging {
323                    session.give_container(data.to_mut())
324                }
325            },
326        );
327
328        // Build a demux operator that splits the replayed event stream up into the separate
329        // logging streams.
330        let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
331        let mut input = demux.new_input(&logs, Pipeline);
332        let (mut export_out, export) = demux.new_output();
333        let (mut frontier_out, frontier) = demux.new_output();
334        let (mut import_frontier_out, import_frontier) = demux.new_output();
335        let (mut peek_out, peek) = demux.new_output();
336        let (mut peek_duration_out, peek_duration) = demux.new_output();
337        let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
338        let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
339        let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
340        let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
341            demux.new_output();
342        let (mut error_count_out, error_count) = demux.new_output();
343        let (mut hydration_time_out, hydration_time) = demux.new_output();
344        let (mut lir_mapping_out, lir_mapping) = demux.new_output();
345        let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
346
347        let mut demux_state = DemuxState::new(scheduler);
348        demux.build(move |_capability| {
349            move |_frontiers| {
350                let mut export = export_out.activate();
351                let mut frontier = frontier_out.activate();
352                let mut import_frontier = import_frontier_out.activate();
353                let mut peek = peek_out.activate();
354                let mut peek_duration = peek_duration_out.activate();
355                let mut shutdown_duration = shutdown_duration_out.activate();
356                let mut arrangement_heap_size = arrangement_heap_size_out.activate();
357                let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
358                let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
359                let mut error_count = error_count_out.activate();
360                let mut hydration_time = hydration_time_out.activate();
361                let mut lir_mapping = lir_mapping_out.activate();
362                let mut dataflow_global_ids = dataflow_global_ids_out.activate();
363
364                input.for_each(|cap, data| {
365                    let mut output_sessions = DemuxOutput {
366                        export: export.session(&cap),
367                        frontier: frontier.session(&cap),
368                        import_frontier: import_frontier.session(&cap),
369                        peek: peek.session(&cap),
370                        peek_duration: peek_duration.session(&cap),
371                        shutdown_duration: shutdown_duration.session(&cap),
372                        arrangement_heap_size: arrangement_heap_size.session(&cap),
373                        arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
374                        arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
375                        error_count: error_count.session(&cap),
376                        hydration_time: hydration_time.session(&cap),
377                        lir_mapping: lir_mapping.session_with_builder(&cap),
378                        dataflow_global_ids: dataflow_global_ids.session(&cap),
379                    };
380
381                    let shared_state = &mut shared_state.borrow_mut();
382                    for (time, event) in data.drain() {
383                        DemuxHandler {
384                            state: &mut demux_state,
385                            shared_state,
386                            output: &mut output_sessions,
387                            logging_interval_ms,
388                            time,
389                        }
390                        .handle(event);
391                    }
392                });
393            }
394        });
395
396        let worker_id = scope.index();
397
398        // Encode the contents of each logging stream into its expected `Row` format.
399        let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
400        let dataflow_current = export.as_collection().map({
401            let mut scratch = String::new();
402            move |datum| {
403                packer.pack_slice_owned(&[
404                    make_string_datum(datum.export_id, &mut scratch),
405                    Datum::UInt64(u64::cast_from(worker_id)),
406                    Datum::UInt64(u64::cast_from(datum.dataflow_index)),
407                ])
408            }
409        });
410        let mut packer = PermutedRowPacker::new(ComputeLog::FrontierCurrent);
411        let frontier_current = frontier.as_collection().map({
412            let mut scratch = String::new();
413            move |datum| {
414                packer.pack_slice_owned(&[
415                    make_string_datum(datum.export_id, &mut scratch),
416                    Datum::UInt64(u64::cast_from(worker_id)),
417                    Datum::MzTimestamp(datum.time),
418                ])
419            }
420        });
421        let mut packer = PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent);
422        let import_frontier_current = import_frontier.as_collection().map({
423            let mut scratch1 = String::new();
424            let mut scratch2 = String::new();
425            move |datum| {
426                packer.pack_slice_owned(&[
427                    make_string_datum(datum.export_id, &mut scratch1),
428                    make_string_datum(datum.import_id, &mut scratch2),
429                    Datum::UInt64(u64::cast_from(worker_id)),
430                    Datum::MzTimestamp(datum.time),
431                ])
432            }
433        });
434        let mut packer = PermutedRowPacker::new(ComputeLog::PeekCurrent);
435        let peek_current = peek.as_collection().map({
436            let mut scratch = String::new();
437            move |PeekDatum { peek, peek_type }| {
438                packer.pack_slice_owned(&[
439                    Datum::Uuid(Uuid::from_bytes(peek.uuid)),
440                    Datum::UInt64(u64::cast_from(worker_id)),
441                    make_string_datum(peek.id, &mut scratch),
442                    Datum::String(peek_type.name()),
443                    Datum::MzTimestamp(peek.time),
444                ])
445            }
446        });
447        let mut packer = PermutedRowPacker::new(ComputeLog::PeekDuration);
448        let peek_duration =
449            peek_duration
450                .as_collection()
451                .map(move |PeekDurationDatum { peek_type, bucket }| {
452                    packer.pack_slice_owned(&[
453                        Datum::UInt64(u64::cast_from(worker_id)),
454                        Datum::String(peek_type.name()),
455                        Datum::UInt64(bucket.try_into().expect("bucket too big")),
456                    ])
457                });
458        let mut packer = PermutedRowPacker::new(ComputeLog::ShutdownDuration);
459        let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
460            packer.pack_slice_owned(&[
461                Datum::UInt64(u64::cast_from(worker_id)),
462                Datum::UInt64(bucket.try_into().expect("bucket too big")),
463            ])
464        });
465
466        let arrangement_heap_datum_to_row =
467            move |packer: &mut PermutedRowPacker, ArrangementHeapDatum { operator_id }| {
468                packer.pack_slice_owned(&[
469                    Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
470                    Datum::UInt64(u64::cast_from(worker_id)),
471                ])
472            };
473
474        let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
475        let arrangement_heap_size = arrangement_heap_size
476            .as_collection()
477            .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
478
479        let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapCapacity);
480        let arrangement_heap_capacity = arrangement_heap_capacity
481            .as_collection()
482            .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
483
484        let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
485        let arrangement_heap_allocations = arrangement_heap_allocations
486            .as_collection()
487            .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
488
489        let mut packer = PermutedRowPacker::new(ComputeLog::ErrorCount);
490        let error_count = error_count.as_collection().map({
491            let mut scratch = String::new();
492            move |datum| {
493                packer.pack_slice_owned(&[
494                    make_string_datum(datum.export_id, &mut scratch),
495                    Datum::UInt64(u64::cast_from(worker_id)),
496                    Datum::Int64(datum.count.into_inner()),
497                ])
498            }
499        });
500
501        let mut packer = PermutedRowPacker::new(ComputeLog::HydrationTime);
502        let hydration_time = hydration_time.as_collection().map({
503            let mut scratch = String::new();
504            move |datum| {
505                packer.pack_slice_owned(&[
506                    make_string_datum(datum.export_id, &mut scratch),
507                    Datum::UInt64(u64::cast_from(worker_id)),
508                    Datum::from(datum.time_ns),
509                ])
510            }
511        });
512
513        let mut scratch1 = String::new();
514        let mut scratch2 = String::new();
515        let mut packer = PermutedRowPacker::new(ComputeLog::LirMapping);
516        let lir_mapping = lir_mapping
517            .map(move |(datum, time, diff)| {
518                let row = packer.pack_slice_owned(&[
519                    make_string_datum(GlobalId::into_owned(datum.global_id), &mut scratch1),
520                    Datum::UInt64(<LirId as Columnar>::into_owned(datum.lir_id).into()),
521                    Datum::UInt64(u64::cast_from(worker_id)),
522                    make_string_datum(datum.operator, &mut scratch2),
523                    datum
524                        .parent_lir_id
525                        .map(|lir_id| Datum::UInt64(LirId::into_owned(lir_id).into()))
526                        .unwrap_or_else(|| Datum::Null),
527                    Datum::UInt16(u16::cast_from(*datum.nesting)),
528                    Datum::UInt64(u64::cast_from(datum.operator_span.0)),
529                    Datum::UInt64(u64::cast_from(datum.operator_span.1)),
530                ]);
531                (row, Timestamp::into_owned(time), diff)
532            })
533            .as_collection();
534
535        let mut packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
536        let dataflow_global_ids = dataflow_global_ids.as_collection().map({
537            let mut scratch = String::new();
538            move |datum| {
539                packer.pack_slice_owned(&[
540                    Datum::UInt64(u64::cast_from(datum.dataflow_index)),
541                    Datum::UInt64(u64::cast_from(worker_id)),
542                    make_string_datum(datum.global_id, &mut scratch),
543                ])
544            }
545        });
546
547        use ComputeLog::*;
548        let logs = [
549            (DataflowCurrent, dataflow_current),
550            (FrontierCurrent, frontier_current),
551            (ImportFrontierCurrent, import_frontier_current),
552            (PeekCurrent, peek_current),
553            (PeekDuration, peek_duration),
554            (ShutdownDuration, shutdown_duration),
555            (ArrangementHeapSize, arrangement_heap_size),
556            (ArrangementHeapCapacity, arrangement_heap_capacity),
557            (ArrangementHeapAllocations, arrangement_heap_allocations),
558            (ErrorCount, error_count),
559            (HydrationTime, hydration_time),
560            (LirMapping, lir_mapping),
561            (DataflowGlobal, dataflow_global_ids),
562        ];
563
564        // Build the output arrangements.
565        let mut collections = BTreeMap::new();
566        for (variant, collection) in logs {
567            let variant = LogVariant::Compute(variant);
568            if config.index_logs.contains_key(&variant) {
569                let trace = collection
570                    .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
571                        &format!("Arrange {variant:?}"),
572                    )
573                    .trace;
574                let collection = LogCollection {
575                    trace,
576                    token: Rc::clone(&token),
577                };
578                collections.insert(variant, collection);
579            }
580        }
581
582        Return { collections }
583    })
584}
585
586/// Format the given value and pack it into a `Datum::String`.
587///
588/// The `scratch` buffer is used to perform the string conversion without an allocation.
589/// Callers should not assume anything about the contents of this buffer after this function
590/// returns.
591fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
592where
593    V: Display,
594{
595    scratch.clear();
596    write!(scratch, "{}", value).expect("writing to a `String` can't fail");
597    Datum::String(scratch)
598}
599
600/// State maintained by the demux operator.
601struct DemuxState<A> {
602    /// The timely scheduler.
603    scheduler: A,
604    /// State tracked per dataflow export.
605    exports: BTreeMap<GlobalId, ExportState>,
606    /// Maps live dataflows to counts of their exports.
607    dataflow_export_counts: BTreeMap<usize, u32>,
608    /// Maps dropped dataflows to their drop time.
609    dataflow_drop_times: BTreeMap<usize, Duration>,
610    /// Contains dataflows that have shut down but not yet been dropped.
611    shutdown_dataflows: BTreeSet<usize>,
612    /// Maps pending peeks to their installation time.
613    peek_stash: BTreeMap<Uuid, Duration>,
614    /// Arrangement size stash.
615    arrangement_size: BTreeMap<usize, ArrangementSizeState>,
616    /// LIR -> operator span mapping.
617    lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
618    /// Dataflow -> `GlobalId` mapping (many-to-one).
619    dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
620}
621
622impl<A: Scheduler> DemuxState<A> {
623    fn new(scheduler: A) -> Self {
624        Self {
625            scheduler,
626            exports: Default::default(),
627            dataflow_export_counts: Default::default(),
628            dataflow_drop_times: Default::default(),
629            shutdown_dataflows: Default::default(),
630            peek_stash: Default::default(),
631            arrangement_size: Default::default(),
632            lir_mapping: Default::default(),
633            dataflow_global_ids: Default::default(),
634        }
635    }
636}
637
638/// State tracked for each dataflow export.
639struct ExportState {
640    /// The ID of the dataflow maintaining this export.
641    dataflow_index: usize,
642    /// Number of errors in this export.
643    ///
644    /// This must be a signed integer, since per-worker error counts can be negative, only the
645    /// cross-worker total has to sum up to a non-negative value.
646    error_count: Diff,
647    /// When this export was created.
648    created_at: Instant,
649    /// Whether the exported collection is hydrated.
650    hydration_time_ns: Option<u64>,
651}
652
653impl ExportState {
654    fn new(dataflow_index: usize) -> Self {
655        Self {
656            dataflow_index,
657            error_count: Diff::ZERO,
658            created_at: Instant::now(),
659            hydration_time_ns: None,
660        }
661    }
662}
663
664/// State for tracking arrangement sizes.
665#[derive(Default, Debug)]
666struct ArrangementSizeState {
667    size: isize,
668    capacity: isize,
669    count: isize,
670}
671
672/// Bundled output sessions used by the demux operator.
673struct DemuxOutput<'a> {
674    export: OutputSessionVec<'a, Update<ExportDatum>>,
675    frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
676    import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
677    peek: OutputSessionVec<'a, Update<PeekDatum>>,
678    peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
679    shutdown_duration: OutputSessionVec<'a, Update<u128>>,
680    arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
681    arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
682    arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
683    hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
684    error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
685    lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
686    dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
687}
688
689#[derive(Clone)]
690struct ExportDatum {
691    export_id: GlobalId,
692    dataflow_index: usize,
693}
694
695#[derive(Clone)]
696struct FrontierDatum {
697    export_id: GlobalId,
698    time: Timestamp,
699}
700
701#[derive(Clone)]
702struct ImportFrontierDatum {
703    export_id: GlobalId,
704    import_id: GlobalId,
705    time: Timestamp,
706}
707
708#[derive(Clone)]
709struct PeekDatum {
710    peek: Peek,
711    peek_type: PeekType,
712}
713
714#[derive(Clone)]
715struct PeekDurationDatum {
716    peek_type: PeekType,
717    bucket: u128,
718}
719
720#[derive(Clone, Copy)]
721struct ArrangementHeapDatum {
722    operator_id: usize,
723}
724
725#[derive(Clone)]
726struct HydrationTimeDatum {
727    export_id: GlobalId,
728    time_ns: Option<u64>,
729}
730
731#[derive(Clone)]
732struct ErrorCountDatum {
733    export_id: GlobalId,
734    // Normally we would use DD's diff field to encode counts, but in this case we can't: The total
735    // per-worker error count might be negative and at the SQL level having negative multiplicities
736    // is treated as an error.
737    count: Diff,
738}
739
740#[derive(Clone, Columnar)]
741struct LirMappingDatum {
742    global_id: GlobalId,
743    lir_id: LirId,
744    operator: String,
745    parent_lir_id: Option<LirId>,
746    nesting: u8,
747    operator_span: (usize, usize),
748}
749
750#[derive(Clone)]
751struct DataflowGlobalDatum {
752    dataflow_index: usize,
753    global_id: GlobalId,
754}
755
756/// Event handler of the demux operator.
757struct DemuxHandler<'a, 'b, A: Scheduler> {
758    /// State kept by the demux operator.
759    state: &'a mut DemuxState<A>,
760    /// State shared across log receivers.
761    shared_state: &'a mut SharedLoggingState,
762    /// Demux output sessions.
763    output: &'a mut DemuxOutput<'b>,
764    /// The logging interval specifying the time granularity for the updates.
765    logging_interval_ms: u128,
766    /// The current event time.
767    time: Duration,
768}
769
770impl<A: Scheduler> DemuxHandler<'_, '_, A> {
771    /// Return the timestamp associated with the current event, based on the event time and the
772    /// logging interval.
773    fn ts(&self) -> Timestamp {
774        let time_ms = self.time.as_millis();
775        let interval = self.logging_interval_ms;
776        let rounded = (time_ms / interval + 1) * interval;
777        rounded.try_into().expect("must fit")
778    }
779
780    /// Handle the given compute event.
781    fn handle(&mut self, event: <ComputeEvent as Columnar>::Ref<'_>) {
782        use ComputeEventReference::*;
783        match event {
784            Export(export) => self.handle_export(export),
785            ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
786            Peek(peek) if peek.installed => self.handle_peek_install(peek),
787            Peek(peek) => self.handle_peek_retire(peek),
788            Frontier(frontier) => self.handle_frontier(frontier),
789            ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
790            ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
791            ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
792            ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
793            ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
794            ArrangementHeapSizeOperatorDrop(inner) => {
795                self.handle_arrangement_heap_size_operator_dropped(inner)
796            }
797            DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
798            ErrorCount(error_count) => self.handle_error_count(error_count),
799            Hydration(hydration) => self.handle_hydration(hydration),
800            LirMapping(mapping) => self.handle_lir_mapping(mapping),
801            DataflowGlobal(global) => self.handle_dataflow_global(global),
802        }
803    }
804
805    fn handle_export(
806        &mut self,
807        ExportReference {
808            export_id,
809            dataflow_index,
810        }: <Export as Columnar>::Ref<'_>,
811    ) {
812        let export_id = Columnar::into_owned(export_id);
813        let ts = self.ts();
814        let datum = ExportDatum {
815            export_id,
816            dataflow_index,
817        };
818        self.output.export.give((datum, ts, Diff::ONE));
819
820        let existing = self
821            .state
822            .exports
823            .insert(export_id, ExportState::new(dataflow_index));
824        if existing.is_some() {
825            error!(%export_id, "export already registered");
826        }
827
828        *self
829            .state
830            .dataflow_export_counts
831            .entry(dataflow_index)
832            .or_default() += 1;
833
834        // Insert hydration time logging for this export.
835        let datum = HydrationTimeDatum {
836            export_id,
837            time_ns: None,
838        };
839        self.output.hydration_time.give((datum, ts, Diff::ONE));
840    }
841
842    fn handle_export_dropped(
843        &mut self,
844        ExportDroppedReference { export_id }: <ExportDropped as Columnar>::Ref<'_>,
845    ) {
846        let export_id = Columnar::into_owned(export_id);
847        let Some(export) = self.state.exports.remove(&export_id) else {
848            error!(%export_id, "missing exports entry at time of export drop");
849            return;
850        };
851
852        let ts = self.ts();
853        let dataflow_index = export.dataflow_index;
854
855        let datum = ExportDatum {
856            export_id,
857            dataflow_index,
858        };
859        self.output.export.give((datum, ts, Diff::MINUS_ONE));
860
861        match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
862            entry @ Some(0) | entry @ None => {
863                error!(
864                    %export_id,
865                    %dataflow_index,
866                    "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
867                );
868            }
869            Some(1) => self.handle_dataflow_dropped(dataflow_index),
870            Some(count) => *count -= 1,
871        }
872
873        // Remove error count logging for this export.
874        if export.error_count != Diff::ZERO {
875            let datum = ErrorCountDatum {
876                export_id,
877                count: export.error_count,
878            };
879            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
880        }
881
882        // Remove hydration time logging for this export.
883        let datum = HydrationTimeDatum {
884            export_id,
885            time_ns: export.hydration_time_ns,
886        };
887        self.output
888            .hydration_time
889            .give((datum, ts, Diff::MINUS_ONE));
890    }
891
892    fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
893        self.state.dataflow_export_counts.remove(&dataflow_index);
894
895        if self.state.shutdown_dataflows.remove(&dataflow_index) {
896            // Dataflow has already shut down before it was dropped.
897            self.output
898                .shutdown_duration
899                .give((0, self.ts(), Diff::ONE));
900        } else {
901            // Dataflow has not yet shut down.
902            let existing = self
903                .state
904                .dataflow_drop_times
905                .insert(dataflow_index, self.time);
906            if existing.is_some() {
907                error!(%dataflow_index, "dataflow already dropped");
908            }
909        }
910    }
911
912    fn handle_dataflow_shutdown(
913        &mut self,
914        DataflowShutdownReference { dataflow_index }: <DataflowShutdown as Columnar>::Ref<'_>,
915    ) {
916        let ts = self.ts();
917
918        if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
919            // Dataflow has already been dropped.
920            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
921            let elapsed_pow = elapsed_ns.next_power_of_two();
922            self.output
923                .shutdown_duration
924                .give((elapsed_pow, ts, Diff::ONE));
925        } else {
926            // Dataflow has not yet been dropped.
927            let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
928            if !was_new {
929                error!(%dataflow_index, "dataflow already shutdown");
930            }
931        }
932
933        // We deal with any `GlobalId` based mappings in this event.
934        if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
935            for global_id in global_ids {
936                // Remove dataflow/`GlobalID` mapping.
937                let datum = DataflowGlobalDatum {
938                    dataflow_index,
939                    global_id,
940                };
941                self.output
942                    .dataflow_global_ids
943                    .give((datum, ts, Diff::MINUS_ONE));
944
945                // Remove LIR mapping.
946                if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
947                    for (
948                        lir_id,
949                        LirMetadata {
950                            operator,
951                            parent_lir_id,
952                            nesting,
953                            operator_span,
954                        },
955                    ) in mappings
956                    {
957                        let datum = LirMappingDatum {
958                            global_id,
959                            lir_id,
960                            operator,
961                            parent_lir_id,
962                            nesting,
963                            operator_span,
964                        };
965                        self.output.lir_mapping.give(&(datum, ts, Diff::MINUS_ONE));
966                    }
967                }
968            }
969        }
970    }
971
972    fn handle_error_count(
973        &mut self,
974        ErrorCountReference { export_id, diff }: <ErrorCount as Columnar>::Ref<'_>,
975    ) {
976        let ts = self.ts();
977        let export_id = Columnar::into_owned(export_id);
978
979        let Some(export) = self.state.exports.get_mut(&export_id) else {
980            // The export might have already been dropped, in which case we are no longer
981            // interested in its errors.
982            return;
983        };
984
985        let old_count = export.error_count;
986        let new_count = old_count + diff;
987
988        if old_count != Diff::ZERO {
989            let datum = ErrorCountDatum {
990                export_id,
991                count: old_count,
992            };
993            self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
994        }
995        if new_count != Diff::ZERO {
996            let datum = ErrorCountDatum {
997                export_id,
998                count: new_count,
999            };
1000            self.output.error_count.give((datum, ts, Diff::ONE));
1001        }
1002
1003        export.error_count = new_count;
1004    }
1005
1006    fn handle_hydration(
1007        &mut self,
1008        HydrationReference { export_id }: <Hydration as Columnar>::Ref<'_>,
1009    ) {
1010        let ts = self.ts();
1011        let export_id = Columnar::into_owned(export_id);
1012
1013        let Some(export) = self.state.exports.get_mut(&export_id) else {
1014            error!(%export_id, "hydration event for unknown export");
1015            return;
1016        };
1017        if export.hydration_time_ns.is_some() {
1018            // Hydration events for already hydrated dataflows can occur when a dataflow is reused
1019            // after reconciliation. We can simply ignore these.
1020            return;
1021        }
1022
1023        let duration = export.created_at.elapsed();
1024        let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1025
1026        let retraction = HydrationTimeDatum {
1027            export_id,
1028            time_ns: None,
1029        };
1030        let insertion = HydrationTimeDatum {
1031            export_id,
1032            time_ns: Some(nanos),
1033        };
1034        self.output
1035            .hydration_time
1036            .give((retraction, ts, Diff::MINUS_ONE));
1037        self.output.hydration_time.give((insertion, ts, Diff::ONE));
1038
1039        export.hydration_time_ns = Some(nanos);
1040    }
1041
1042    fn handle_peek_install(
1043        &mut self,
1044        PeekEventReference {
1045            peek,
1046            peek_type,
1047            installed: _,
1048        }: <PeekEvent as Columnar>::Ref<'_>,
1049    ) {
1050        let peek = Peek::into_owned(peek);
1051        let uuid = Uuid::from_bytes(peek.uuid);
1052        let ts = self.ts();
1053        self.output
1054            .peek
1055            .give((PeekDatum { peek, peek_type }, ts, Diff::ONE));
1056
1057        let existing = self.state.peek_stash.insert(uuid, self.time);
1058        if existing.is_some() {
1059            error!(%uuid, "peek already registered");
1060        }
1061    }
1062
1063    fn handle_peek_retire(
1064        &mut self,
1065        PeekEventReference {
1066            peek,
1067            peek_type,
1068            installed: _,
1069        }: <PeekEvent as Columnar>::Ref<'_>,
1070    ) {
1071        let peek = Peek::into_owned(peek);
1072        let uuid = Uuid::from_bytes(peek.uuid);
1073        let ts = self.ts();
1074        self.output
1075            .peek
1076            .give((PeekDatum { peek, peek_type }, ts, Diff::MINUS_ONE));
1077
1078        if let Some(start) = self.state.peek_stash.remove(&uuid) {
1079            let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1080            let bucket = elapsed_ns.next_power_of_two();
1081            self.output.peek_duration.give((
1082                PeekDurationDatum { peek_type, bucket },
1083                ts,
1084                Diff::ONE,
1085            ));
1086        } else {
1087            error!(%uuid, "peek not yet registered");
1088        }
1089    }
1090
1091    fn handle_frontier(
1092        &mut self,
1093        FrontierReference {
1094            export_id,
1095            time,
1096            diff,
1097        }: <Frontier as Columnar>::Ref<'_>,
1098    ) {
1099        let export_id = Columnar::into_owned(export_id);
1100        let diff = Diff::from(*diff);
1101        let ts = self.ts();
1102        let time = Columnar::into_owned(time);
1103        let datum = FrontierDatum { export_id, time };
1104        self.output.frontier.give((datum, ts, diff));
1105    }
1106
1107    fn handle_import_frontier(
1108        &mut self,
1109        ImportFrontierReference {
1110            import_id,
1111            export_id,
1112            time,
1113            diff,
1114        }: <ImportFrontier as Columnar>::Ref<'_>,
1115    ) {
1116        let import_id = Columnar::into_owned(import_id);
1117        let export_id = Columnar::into_owned(export_id);
1118        let ts = self.ts();
1119        let time = Columnar::into_owned(time);
1120        let datum = ImportFrontierDatum {
1121            export_id,
1122            import_id,
1123            time,
1124        };
1125        self.output
1126            .import_frontier
1127            .give((datum, ts, (*diff).into()));
1128    }
1129
1130    /// Update the allocation size for an arrangement.
1131    fn handle_arrangement_heap_size(
1132        &mut self,
1133        ArrangementHeapSizeReference {
1134            operator_id,
1135            delta_size,
1136        }: <ArrangementHeapSize as Columnar>::Ref<'_>,
1137    ) {
1138        let ts = self.ts();
1139        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1140            return;
1141        };
1142
1143        let datum = ArrangementHeapDatum { operator_id };
1144        self.output
1145            .arrangement_heap_size
1146            .give((datum, ts, Diff::cast_from(delta_size)));
1147
1148        state.size += delta_size;
1149    }
1150
1151    /// Update the allocation capacity for an arrangement.
1152    fn handle_arrangement_heap_capacity(
1153        &mut self,
1154        ArrangementHeapCapacityReference {
1155            operator_id,
1156            delta_capacity,
1157        }: <ArrangementHeapCapacity as Columnar>::Ref<'_>,
1158    ) {
1159        let ts = self.ts();
1160        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1161            return;
1162        };
1163
1164        let datum = ArrangementHeapDatum { operator_id };
1165        self.output
1166            .arrangement_heap_capacity
1167            .give((datum, ts, Diff::cast_from(delta_capacity)));
1168
1169        state.capacity += delta_capacity;
1170    }
1171
1172    /// Update the allocation count for an arrangement.
1173    fn handle_arrangement_heap_allocations(
1174        &mut self,
1175        ArrangementHeapAllocationsReference {
1176            operator_id,
1177            delta_allocations,
1178        }: <ArrangementHeapAllocations as Columnar>::Ref<'_>,
1179    ) {
1180        let ts = self.ts();
1181        let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1182            return;
1183        };
1184
1185        let datum = ArrangementHeapDatum { operator_id };
1186        let diff = Diff::cast_from(delta_allocations);
1187        self.output
1188            .arrangement_heap_allocations
1189            .give((datum, ts, diff));
1190
1191        state.count += delta_allocations;
1192    }
1193
1194    /// Indicate that a new arrangement exists, start maintaining the heap size state.
1195    fn handle_arrangement_heap_size_operator(
1196        &mut self,
1197        ArrangementHeapSizeOperatorReference {
1198            operator_id,
1199            address,
1200        }: <ArrangementHeapSizeOperator as Columnar>::Ref<'_>,
1201    ) {
1202        let activator = self
1203            .state
1204            .scheduler
1205            .activator_for(address.into_iter().collect());
1206        let existing = self
1207            .state
1208            .arrangement_size
1209            .insert(operator_id, Default::default());
1210        if existing.is_some() {
1211            error!(%operator_id, "arrangement size operator already registered");
1212        }
1213        let existing = self
1214            .shared_state
1215            .arrangement_size_activators
1216            .insert(operator_id, activator);
1217        if existing.is_some() {
1218            error!(%operator_id, "arrangement size activator already registered");
1219        }
1220    }
1221
1222    /// Indicate that an arrangement has been dropped and we can cleanup the heap size state.
1223    fn handle_arrangement_heap_size_operator_dropped(
1224        &mut self,
1225        ArrangementHeapSizeOperatorDropReference { operator_id }: <ArrangementHeapSizeOperatorDrop as Columnar>::Ref<'_>,
1226    ) {
1227        if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1228            let ts = self.ts();
1229            let datum = ArrangementHeapDatum { operator_id };
1230
1231            let diff = -Diff::cast_from(state.size);
1232            self.output.arrangement_heap_size.give((datum, ts, diff));
1233
1234            let diff = -Diff::cast_from(state.capacity);
1235            self.output
1236                .arrangement_heap_capacity
1237                .give((datum, ts, diff));
1238
1239            let diff = -Diff::cast_from(state.count);
1240            self.output
1241                .arrangement_heap_allocations
1242                .give((datum, ts, diff));
1243        }
1244        self.shared_state
1245            .arrangement_size_activators
1246            .remove(&operator_id);
1247    }
1248
1249    /// Indicate that a new LIR operator exists; record the dataflow address it maps to.
1250    fn handle_lir_mapping(
1251        &mut self,
1252        LirMappingReference { global_id, mapping }: <LirMapping as Columnar>::Ref<'_>,
1253    ) {
1254        let global_id = Columnar::into_owned(global_id);
1255        // record the state (for the later drop)
1256        let mappings = || mapping.into_iter().map(Columnar::into_owned);
1257        self.state
1258            .lir_mapping
1259            .entry(global_id)
1260            .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1261            .or_insert_with(|| mappings().collect());
1262
1263        // send the datum out
1264        let ts = self.ts();
1265        for (lir_id, meta) in mapping.into_iter() {
1266            let datum = LirMappingDatumReference {
1267                global_id,
1268                lir_id,
1269                operator: meta.operator,
1270                parent_lir_id: meta.parent_lir_id,
1271                nesting: meta.nesting,
1272                operator_span: meta.operator_span,
1273            };
1274            self.output.lir_mapping.give((datum, ts, Diff::ONE));
1275        }
1276    }
1277
1278    fn handle_dataflow_global(
1279        &mut self,
1280        DataflowGlobalReference {
1281            dataflow_index,
1282            global_id,
1283        }: <DataflowGlobal as Columnar>::Ref<'_>,
1284    ) {
1285        let global_id = Columnar::into_owned(global_id);
1286        self.state
1287            .dataflow_global_ids
1288            .entry(dataflow_index)
1289            .and_modify(|globals| {
1290                // NB BTreeSet::insert() returns `false` when the element was already in the set
1291                if !globals.insert(global_id) {
1292                    error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1293                }
1294            })
1295            .or_insert_with(|| BTreeSet::from([global_id]));
1296
1297        let ts = self.ts();
1298        let datum = DataflowGlobalDatum {
1299            dataflow_index,
1300            global_id,
1301        };
1302        self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1303    }
1304}
1305
1306/// Logging state maintained for a compute collection.
1307///
1308/// This type is used to produce appropriate log events in response to changes of logged collection
1309/// state, e.g. frontiers, and to produce cleanup events when a collection is dropped.
1310pub struct CollectionLogging {
1311    export_id: GlobalId,
1312    logger: Logger,
1313
1314    logged_frontier: Option<Timestamp>,
1315    logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1316}
1317
1318impl CollectionLogging {
1319    /// Create new logging state for the identified collection and emit initial logging events.
1320    pub fn new(
1321        export_id: GlobalId,
1322        logger: Logger,
1323        dataflow_index: usize,
1324        import_ids: impl Iterator<Item = GlobalId>,
1325    ) -> Self {
1326        logger.log(&ComputeEvent::Export(Export {
1327            export_id,
1328            dataflow_index,
1329        }));
1330
1331        let mut self_ = Self {
1332            export_id,
1333            logger,
1334            logged_frontier: None,
1335            logged_import_frontiers: Default::default(),
1336        };
1337
1338        // Initialize frontier logging.
1339        let initial_frontier = Some(Timestamp::MIN);
1340        self_.set_frontier(initial_frontier);
1341        import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1342
1343        self_
1344    }
1345
1346    /// Set the collection frontier to the given new time and emit corresponding logging events.
1347    pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1348        let old_time = self.logged_frontier;
1349        self.logged_frontier = new_time;
1350
1351        if old_time != new_time {
1352            let export_id = self.export_id;
1353            let retraction = old_time.map(|time| {
1354                ComputeEvent::Frontier(Frontier {
1355                    export_id,
1356                    time,
1357                    diff: -1,
1358                })
1359            });
1360            let insertion = new_time.map(|time| {
1361                ComputeEvent::Frontier(Frontier {
1362                    export_id,
1363                    time,
1364                    diff: 1,
1365                })
1366            });
1367            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1368            self.logger.log_many(events);
1369        }
1370    }
1371
1372    /// Set the frontier of the given import to the given new time and emit corresponding logging
1373    /// events.
1374    pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1375        let old_time = self.logged_import_frontiers.remove(&import_id);
1376        if let Some(time) = new_time {
1377            self.logged_import_frontiers.insert(import_id, time);
1378        }
1379
1380        if old_time != new_time {
1381            let export_id = self.export_id;
1382            let retraction = old_time.map(|time| {
1383                ComputeEvent::ImportFrontier(ImportFrontier {
1384                    import_id,
1385                    export_id,
1386                    time,
1387                    diff: -1,
1388                })
1389            });
1390            let insertion = new_time.map(|time| {
1391                ComputeEvent::ImportFrontier(ImportFrontier {
1392                    import_id,
1393                    export_id,
1394                    time,
1395                    diff: 1,
1396                })
1397            });
1398            let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1399            self.logger.log_many(events);
1400        }
1401    }
1402
1403    /// Set the collection as hydrated.
1404    pub fn set_hydrated(&self) {
1405        self.logger.log(&ComputeEvent::Hydration(Hydration {
1406            export_id: self.export_id,
1407        }));
1408    }
1409}
1410
1411impl Drop for CollectionLogging {
1412    fn drop(&mut self) {
1413        // Emit retraction events to clean up events previously logged.
1414        self.set_frontier(None);
1415
1416        let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1417        for import_id in import_ids {
1418            self.set_import_frontier(import_id, None);
1419        }
1420
1421        self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1422            export_id: self.export_id,
1423        }));
1424    }
1425}
1426
1427/// Extension trait to attach `ComputeEvent::DataflowError` logging operators to collections and
1428/// batch streams.
1429pub(crate) trait LogDataflowErrors {
1430    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1431}
1432
1433impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
1434where
1435    G: Scope,
1436    D: Data,
1437{
1438    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1439        self.inner
1440            .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1441                move |input, output| {
1442                    input.for_each(|cap, data| {
1443                        let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1444                        logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1445
1446                        output.session(&cap).give_container(data);
1447                    });
1448                }
1449            })
1450            .as_collection()
1451    }
1452}
1453
1454impl<G, B> LogDataflowErrors for Stream<G, B>
1455where
1456    G: Scope,
1457    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1458{
1459    fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1460        self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1461            move |input, output| {
1462                input.for_each(|cap, data| {
1463                    let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1464                    logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1465
1466                    output.session(&cap).give_container(data);
1467                });
1468            }
1469        })
1470    }
1471}
1472
1473/// Return the sum of all diffs within the given batch.
1474///
1475/// Note that this operation can be expensive: Its runtime is O(N) with N being the number of
1476/// unique (key, value, time) tuples. We only use it on error streams, which are expected to
1477/// contain only a small number of records, so this doesn't matter much. But avoid using it when
1478/// batches might become large.
1479fn sum_batch_diffs<B>(batch: &B) -> Diff
1480where
1481    for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1482{
1483    let mut sum = Diff::ZERO;
1484    let mut cursor = batch.cursor();
1485
1486    while cursor.key_valid(batch) {
1487        while cursor.val_valid(batch) {
1488            cursor.map_times(batch, |_t, r| sum += r);
1489            cursor.step_val(batch);
1490        }
1491        cursor.step_key(batch);
1492    }
1493
1494    sum
1495}
1496
1497#[cfg(test)]
1498mod tests {
1499    use super::*;
1500
1501    #[mz_ore::test]
1502    fn test_compute_event_size() {
1503        // This could be a static assertion, but we don't use those yet in this crate.
1504        assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1505    }
1506}