1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::Columnar;
19use differential_dataflow::Collection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Timestamp};
25use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
26use mz_timely_util::replay::MzReplay;
27use timely::dataflow::channels::pact::Pipeline;
28use timely::dataflow::operators::Operator;
29use timely::dataflow::operators::core::Map;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::dataflow::{Scope, Stream};
32use timely::scheduling::Scheduler;
33use timely::{Container, Data};
34use tracing::error;
35use uuid::Uuid;
36
37use crate::extensions::arrange::MzArrange;
38use crate::logging::{
39 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
40 PermutedRowPacker, SharedLoggingState, Update,
41};
42use crate::row_spine::{RowRowBatcher, RowRowBuilder};
43use crate::typedefs::RowRowSpine;
44
45pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
47pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
48
49#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
51pub struct Export {
52 pub export_id: GlobalId,
54 pub dataflow_index: usize,
56}
57
58#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
60pub struct ExportDropped {
61 pub export_id: GlobalId,
63}
64
65#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
67pub struct PeekEvent {
68 pub peek: Peek,
70 pub peek_type: PeekType,
73 pub installed: bool,
75}
76
77#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
79pub struct Frontier {
80 pub export_id: GlobalId,
81 pub time: Timestamp,
82 pub diff: i8,
83}
84
85#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
87pub struct ImportFrontier {
88 pub import_id: GlobalId,
89 pub export_id: GlobalId,
90 pub time: Timestamp,
91 pub diff: i8,
92}
93
94#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
96pub struct ArrangementHeapSize {
97 pub operator_id: usize,
99 pub delta_size: isize,
101}
102
103#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
105pub struct ArrangementHeapCapacity {
106 pub operator_id: usize,
108 pub delta_capacity: isize,
110}
111
112#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
114pub struct ArrangementHeapAllocations {
115 pub operator_id: usize,
117 pub delta_allocations: isize,
119}
120
121#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
123pub struct ArrangementHeapSizeOperator {
124 pub operator_id: usize,
126 pub address: Vec<usize>,
128}
129
130#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
132pub struct ArrangementHeapSizeOperatorDrop {
133 pub operator_id: usize,
135}
136
137#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
139pub struct DataflowShutdown {
140 pub dataflow_index: usize,
142}
143
144#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
146pub struct ErrorCount {
147 pub export_id: GlobalId,
149 pub diff: Diff,
151}
152
153#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
155pub struct Hydration {
156 pub export_id: GlobalId,
157}
158
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct LirMapping {
162 pub global_id: GlobalId,
169 pub mapping: Vec<(LirId, LirMetadata)>,
172}
173
174#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
176pub struct DataflowGlobal {
177 pub dataflow_index: usize,
179 pub global_id: GlobalId,
181}
182
183#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
185pub enum ComputeEvent {
186 Export(Export),
188 ExportDropped(ExportDropped),
190 Peek(PeekEvent),
192 Frontier(Frontier),
194 ImportFrontier(ImportFrontier),
196 ArrangementHeapSize(ArrangementHeapSize),
198 ArrangementHeapCapacity(ArrangementHeapCapacity),
200 ArrangementHeapAllocations(ArrangementHeapAllocations),
202 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
204 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
206 DataflowShutdown(DataflowShutdown),
208 ErrorCount(ErrorCount),
210 Hydration(Hydration),
212 LirMapping(LirMapping),
216 DataflowGlobal(DataflowGlobal),
217}
218
219#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
221pub enum PeekType {
222 Index,
224 Persist,
226}
227
228impl PeekType {
229 fn name(self) -> &'static str {
231 match self {
232 PeekType::Index => "index",
233 PeekType::Persist => "persist",
234 }
235 }
236}
237
238#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
240pub struct Peek {
241 id: GlobalId,
243 time: Timestamp,
245 uuid: uuid::Bytes,
247}
248
249impl Peek {
250 pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self {
252 let uuid = uuid.into_bytes();
253 Self { id, time, uuid }
254 }
255}
256
257#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
259pub struct LirMetadata {
260 operator: String,
262 parent_lir_id: Option<LirId>,
264 nesting: u8,
266 operator_span: (usize, usize),
269}
270
271impl LirMetadata {
272 pub fn new(
274 operator: String,
275 parent_lir_id: Option<LirId>,
276 nesting: u8,
277 operator_span: (usize, usize),
278 ) -> Self {
279 Self {
280 operator,
281 parent_lir_id,
282 nesting,
283 operator_span,
284 }
285 }
286}
287
288pub(super) struct Return {
290 pub collections: BTreeMap<LogVariant, LogCollection>,
292}
293
294pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304 mut scope: G,
305 scheduler: S,
306 config: &mz_compute_client::logging::LoggingConfig,
307 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308 shared_state: Rc<RefCell<SharedLoggingState>>,
309) -> Return {
310 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
311
312 scope.scoped("compute logging", move |scope| {
313 let enable_logging = config.enable_logging;
314 let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
315 scope,
316 "compute logs",
317 config.interval,
318 event_queue.activator,
319 move |mut session, mut data| {
320 if enable_logging {
323 session.give_container(data.to_mut())
324 }
325 },
326 );
327
328 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
331 let mut input = demux.new_input(&logs, Pipeline);
332 let (mut export_out, export) = demux.new_output();
333 let (mut frontier_out, frontier) = demux.new_output();
334 let (mut import_frontier_out, import_frontier) = demux.new_output();
335 let (mut peek_out, peek) = demux.new_output();
336 let (mut peek_duration_out, peek_duration) = demux.new_output();
337 let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
338 let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
339 let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
340 let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
341 demux.new_output();
342 let (mut error_count_out, error_count) = demux.new_output();
343 let (mut hydration_time_out, hydration_time) = demux.new_output();
344 let (mut lir_mapping_out, lir_mapping) = demux.new_output();
345 let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
346
347 let mut demux_state = DemuxState::new(scheduler);
348 demux.build(move |_capability| {
349 move |_frontiers| {
350 let mut export = export_out.activate();
351 let mut frontier = frontier_out.activate();
352 let mut import_frontier = import_frontier_out.activate();
353 let mut peek = peek_out.activate();
354 let mut peek_duration = peek_duration_out.activate();
355 let mut shutdown_duration = shutdown_duration_out.activate();
356 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
357 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
358 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
359 let mut error_count = error_count_out.activate();
360 let mut hydration_time = hydration_time_out.activate();
361 let mut lir_mapping = lir_mapping_out.activate();
362 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
363
364 input.for_each(|cap, data| {
365 let mut output_sessions = DemuxOutput {
366 export: export.session(&cap),
367 frontier: frontier.session(&cap),
368 import_frontier: import_frontier.session(&cap),
369 peek: peek.session(&cap),
370 peek_duration: peek_duration.session(&cap),
371 shutdown_duration: shutdown_duration.session(&cap),
372 arrangement_heap_size: arrangement_heap_size.session(&cap),
373 arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
374 arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
375 error_count: error_count.session(&cap),
376 hydration_time: hydration_time.session(&cap),
377 lir_mapping: lir_mapping.session_with_builder(&cap),
378 dataflow_global_ids: dataflow_global_ids.session(&cap),
379 };
380
381 let shared_state = &mut shared_state.borrow_mut();
382 for (time, event) in data.drain() {
383 DemuxHandler {
384 state: &mut demux_state,
385 shared_state,
386 output: &mut output_sessions,
387 logging_interval_ms,
388 time,
389 }
390 .handle(event);
391 }
392 });
393 }
394 });
395
396 let worker_id = scope.index();
397
398 let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
400 let dataflow_current = export.as_collection().map({
401 let mut scratch = String::new();
402 move |datum| {
403 packer.pack_slice_owned(&[
404 make_string_datum(datum.export_id, &mut scratch),
405 Datum::UInt64(u64::cast_from(worker_id)),
406 Datum::UInt64(u64::cast_from(datum.dataflow_index)),
407 ])
408 }
409 });
410 let mut packer = PermutedRowPacker::new(ComputeLog::FrontierCurrent);
411 let frontier_current = frontier.as_collection().map({
412 let mut scratch = String::new();
413 move |datum| {
414 packer.pack_slice_owned(&[
415 make_string_datum(datum.export_id, &mut scratch),
416 Datum::UInt64(u64::cast_from(worker_id)),
417 Datum::MzTimestamp(datum.time),
418 ])
419 }
420 });
421 let mut packer = PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent);
422 let import_frontier_current = import_frontier.as_collection().map({
423 let mut scratch1 = String::new();
424 let mut scratch2 = String::new();
425 move |datum| {
426 packer.pack_slice_owned(&[
427 make_string_datum(datum.export_id, &mut scratch1),
428 make_string_datum(datum.import_id, &mut scratch2),
429 Datum::UInt64(u64::cast_from(worker_id)),
430 Datum::MzTimestamp(datum.time),
431 ])
432 }
433 });
434 let mut packer = PermutedRowPacker::new(ComputeLog::PeekCurrent);
435 let peek_current = peek.as_collection().map({
436 let mut scratch = String::new();
437 move |PeekDatum { peek, peek_type }| {
438 packer.pack_slice_owned(&[
439 Datum::Uuid(Uuid::from_bytes(peek.uuid)),
440 Datum::UInt64(u64::cast_from(worker_id)),
441 make_string_datum(peek.id, &mut scratch),
442 Datum::String(peek_type.name()),
443 Datum::MzTimestamp(peek.time),
444 ])
445 }
446 });
447 let mut packer = PermutedRowPacker::new(ComputeLog::PeekDuration);
448 let peek_duration =
449 peek_duration
450 .as_collection()
451 .map(move |PeekDurationDatum { peek_type, bucket }| {
452 packer.pack_slice_owned(&[
453 Datum::UInt64(u64::cast_from(worker_id)),
454 Datum::String(peek_type.name()),
455 Datum::UInt64(bucket.try_into().expect("bucket too big")),
456 ])
457 });
458 let mut packer = PermutedRowPacker::new(ComputeLog::ShutdownDuration);
459 let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
460 packer.pack_slice_owned(&[
461 Datum::UInt64(u64::cast_from(worker_id)),
462 Datum::UInt64(bucket.try_into().expect("bucket too big")),
463 ])
464 });
465
466 let arrangement_heap_datum_to_row =
467 move |packer: &mut PermutedRowPacker, ArrangementHeapDatum { operator_id }| {
468 packer.pack_slice_owned(&[
469 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
470 Datum::UInt64(u64::cast_from(worker_id)),
471 ])
472 };
473
474 let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
475 let arrangement_heap_size = arrangement_heap_size
476 .as_collection()
477 .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
478
479 let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapCapacity);
480 let arrangement_heap_capacity = arrangement_heap_capacity
481 .as_collection()
482 .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
483
484 let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
485 let arrangement_heap_allocations = arrangement_heap_allocations
486 .as_collection()
487 .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
488
489 let mut packer = PermutedRowPacker::new(ComputeLog::ErrorCount);
490 let error_count = error_count.as_collection().map({
491 let mut scratch = String::new();
492 move |datum| {
493 packer.pack_slice_owned(&[
494 make_string_datum(datum.export_id, &mut scratch),
495 Datum::UInt64(u64::cast_from(worker_id)),
496 Datum::Int64(datum.count.into_inner()),
497 ])
498 }
499 });
500
501 let mut packer = PermutedRowPacker::new(ComputeLog::HydrationTime);
502 let hydration_time = hydration_time.as_collection().map({
503 let mut scratch = String::new();
504 move |datum| {
505 packer.pack_slice_owned(&[
506 make_string_datum(datum.export_id, &mut scratch),
507 Datum::UInt64(u64::cast_from(worker_id)),
508 Datum::from(datum.time_ns),
509 ])
510 }
511 });
512
513 let mut scratch1 = String::new();
514 let mut scratch2 = String::new();
515 let mut packer = PermutedRowPacker::new(ComputeLog::LirMapping);
516 let lir_mapping = lir_mapping
517 .map(move |(datum, time, diff)| {
518 let row = packer.pack_slice_owned(&[
519 make_string_datum(GlobalId::into_owned(datum.global_id), &mut scratch1),
520 Datum::UInt64(<LirId as Columnar>::into_owned(datum.lir_id).into()),
521 Datum::UInt64(u64::cast_from(worker_id)),
522 make_string_datum(datum.operator, &mut scratch2),
523 datum
524 .parent_lir_id
525 .map(|lir_id| Datum::UInt64(LirId::into_owned(lir_id).into()))
526 .unwrap_or_else(|| Datum::Null),
527 Datum::UInt16(u16::cast_from(*datum.nesting)),
528 Datum::UInt64(u64::cast_from(datum.operator_span.0)),
529 Datum::UInt64(u64::cast_from(datum.operator_span.1)),
530 ]);
531 (row, Timestamp::into_owned(time), diff)
532 })
533 .as_collection();
534
535 let mut packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
536 let dataflow_global_ids = dataflow_global_ids.as_collection().map({
537 let mut scratch = String::new();
538 move |datum| {
539 packer.pack_slice_owned(&[
540 Datum::UInt64(u64::cast_from(datum.dataflow_index)),
541 Datum::UInt64(u64::cast_from(worker_id)),
542 make_string_datum(datum.global_id, &mut scratch),
543 ])
544 }
545 });
546
547 use ComputeLog::*;
548 let logs = [
549 (DataflowCurrent, dataflow_current),
550 (FrontierCurrent, frontier_current),
551 (ImportFrontierCurrent, import_frontier_current),
552 (PeekCurrent, peek_current),
553 (PeekDuration, peek_duration),
554 (ShutdownDuration, shutdown_duration),
555 (ArrangementHeapSize, arrangement_heap_size),
556 (ArrangementHeapCapacity, arrangement_heap_capacity),
557 (ArrangementHeapAllocations, arrangement_heap_allocations),
558 (ErrorCount, error_count),
559 (HydrationTime, hydration_time),
560 (LirMapping, lir_mapping),
561 (DataflowGlobal, dataflow_global_ids),
562 ];
563
564 let mut collections = BTreeMap::new();
566 for (variant, collection) in logs {
567 let variant = LogVariant::Compute(variant);
568 if config.index_logs.contains_key(&variant) {
569 let trace = collection
570 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
571 &format!("Arrange {variant:?}"),
572 )
573 .trace;
574 let collection = LogCollection {
575 trace,
576 token: Rc::clone(&token),
577 };
578 collections.insert(variant, collection);
579 }
580 }
581
582 Return { collections }
583 })
584}
585
586fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
592where
593 V: Display,
594{
595 scratch.clear();
596 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
597 Datum::String(scratch)
598}
599
600struct DemuxState<A> {
602 scheduler: A,
604 exports: BTreeMap<GlobalId, ExportState>,
606 dataflow_export_counts: BTreeMap<usize, u32>,
608 dataflow_drop_times: BTreeMap<usize, Duration>,
610 shutdown_dataflows: BTreeSet<usize>,
612 peek_stash: BTreeMap<Uuid, Duration>,
614 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
616 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
618 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
620}
621
622impl<A: Scheduler> DemuxState<A> {
623 fn new(scheduler: A) -> Self {
624 Self {
625 scheduler,
626 exports: Default::default(),
627 dataflow_export_counts: Default::default(),
628 dataflow_drop_times: Default::default(),
629 shutdown_dataflows: Default::default(),
630 peek_stash: Default::default(),
631 arrangement_size: Default::default(),
632 lir_mapping: Default::default(),
633 dataflow_global_ids: Default::default(),
634 }
635 }
636}
637
638struct ExportState {
640 dataflow_index: usize,
642 error_count: Diff,
647 created_at: Instant,
649 hydration_time_ns: Option<u64>,
651}
652
653impl ExportState {
654 fn new(dataflow_index: usize) -> Self {
655 Self {
656 dataflow_index,
657 error_count: Diff::ZERO,
658 created_at: Instant::now(),
659 hydration_time_ns: None,
660 }
661 }
662}
663
664#[derive(Default, Debug)]
666struct ArrangementSizeState {
667 size: isize,
668 capacity: isize,
669 count: isize,
670}
671
672struct DemuxOutput<'a> {
674 export: OutputSessionVec<'a, Update<ExportDatum>>,
675 frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
676 import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
677 peek: OutputSessionVec<'a, Update<PeekDatum>>,
678 peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
679 shutdown_duration: OutputSessionVec<'a, Update<u128>>,
680 arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
681 arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
682 arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
683 hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
684 error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
685 lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
686 dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
687}
688
689#[derive(Clone)]
690struct ExportDatum {
691 export_id: GlobalId,
692 dataflow_index: usize,
693}
694
695#[derive(Clone)]
696struct FrontierDatum {
697 export_id: GlobalId,
698 time: Timestamp,
699}
700
701#[derive(Clone)]
702struct ImportFrontierDatum {
703 export_id: GlobalId,
704 import_id: GlobalId,
705 time: Timestamp,
706}
707
708#[derive(Clone)]
709struct PeekDatum {
710 peek: Peek,
711 peek_type: PeekType,
712}
713
714#[derive(Clone)]
715struct PeekDurationDatum {
716 peek_type: PeekType,
717 bucket: u128,
718}
719
720#[derive(Clone, Copy)]
721struct ArrangementHeapDatum {
722 operator_id: usize,
723}
724
725#[derive(Clone)]
726struct HydrationTimeDatum {
727 export_id: GlobalId,
728 time_ns: Option<u64>,
729}
730
731#[derive(Clone)]
732struct ErrorCountDatum {
733 export_id: GlobalId,
734 count: Diff,
738}
739
740#[derive(Clone, Columnar)]
741struct LirMappingDatum {
742 global_id: GlobalId,
743 lir_id: LirId,
744 operator: String,
745 parent_lir_id: Option<LirId>,
746 nesting: u8,
747 operator_span: (usize, usize),
748}
749
750#[derive(Clone)]
751struct DataflowGlobalDatum {
752 dataflow_index: usize,
753 global_id: GlobalId,
754}
755
756struct DemuxHandler<'a, 'b, A: Scheduler> {
758 state: &'a mut DemuxState<A>,
760 shared_state: &'a mut SharedLoggingState,
762 output: &'a mut DemuxOutput<'b>,
764 logging_interval_ms: u128,
766 time: Duration,
768}
769
770impl<A: Scheduler> DemuxHandler<'_, '_, A> {
771 fn ts(&self) -> Timestamp {
774 let time_ms = self.time.as_millis();
775 let interval = self.logging_interval_ms;
776 let rounded = (time_ms / interval + 1) * interval;
777 rounded.try_into().expect("must fit")
778 }
779
780 fn handle(&mut self, event: <ComputeEvent as Columnar>::Ref<'_>) {
782 use ComputeEventReference::*;
783 match event {
784 Export(export) => self.handle_export(export),
785 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
786 Peek(peek) if peek.installed => self.handle_peek_install(peek),
787 Peek(peek) => self.handle_peek_retire(peek),
788 Frontier(frontier) => self.handle_frontier(frontier),
789 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
790 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
791 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
792 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
793 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
794 ArrangementHeapSizeOperatorDrop(inner) => {
795 self.handle_arrangement_heap_size_operator_dropped(inner)
796 }
797 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
798 ErrorCount(error_count) => self.handle_error_count(error_count),
799 Hydration(hydration) => self.handle_hydration(hydration),
800 LirMapping(mapping) => self.handle_lir_mapping(mapping),
801 DataflowGlobal(global) => self.handle_dataflow_global(global),
802 }
803 }
804
805 fn handle_export(
806 &mut self,
807 ExportReference {
808 export_id,
809 dataflow_index,
810 }: <Export as Columnar>::Ref<'_>,
811 ) {
812 let export_id = Columnar::into_owned(export_id);
813 let ts = self.ts();
814 let datum = ExportDatum {
815 export_id,
816 dataflow_index,
817 };
818 self.output.export.give((datum, ts, Diff::ONE));
819
820 let existing = self
821 .state
822 .exports
823 .insert(export_id, ExportState::new(dataflow_index));
824 if existing.is_some() {
825 error!(%export_id, "export already registered");
826 }
827
828 *self
829 .state
830 .dataflow_export_counts
831 .entry(dataflow_index)
832 .or_default() += 1;
833
834 let datum = HydrationTimeDatum {
836 export_id,
837 time_ns: None,
838 };
839 self.output.hydration_time.give((datum, ts, Diff::ONE));
840 }
841
842 fn handle_export_dropped(
843 &mut self,
844 ExportDroppedReference { export_id }: <ExportDropped as Columnar>::Ref<'_>,
845 ) {
846 let export_id = Columnar::into_owned(export_id);
847 let Some(export) = self.state.exports.remove(&export_id) else {
848 error!(%export_id, "missing exports entry at time of export drop");
849 return;
850 };
851
852 let ts = self.ts();
853 let dataflow_index = export.dataflow_index;
854
855 let datum = ExportDatum {
856 export_id,
857 dataflow_index,
858 };
859 self.output.export.give((datum, ts, Diff::MINUS_ONE));
860
861 match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
862 entry @ Some(0) | entry @ None => {
863 error!(
864 %export_id,
865 %dataflow_index,
866 "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
867 );
868 }
869 Some(1) => self.handle_dataflow_dropped(dataflow_index),
870 Some(count) => *count -= 1,
871 }
872
873 if export.error_count != Diff::ZERO {
875 let datum = ErrorCountDatum {
876 export_id,
877 count: export.error_count,
878 };
879 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
880 }
881
882 let datum = HydrationTimeDatum {
884 export_id,
885 time_ns: export.hydration_time_ns,
886 };
887 self.output
888 .hydration_time
889 .give((datum, ts, Diff::MINUS_ONE));
890 }
891
892 fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
893 self.state.dataflow_export_counts.remove(&dataflow_index);
894
895 if self.state.shutdown_dataflows.remove(&dataflow_index) {
896 self.output
898 .shutdown_duration
899 .give((0, self.ts(), Diff::ONE));
900 } else {
901 let existing = self
903 .state
904 .dataflow_drop_times
905 .insert(dataflow_index, self.time);
906 if existing.is_some() {
907 error!(%dataflow_index, "dataflow already dropped");
908 }
909 }
910 }
911
912 fn handle_dataflow_shutdown(
913 &mut self,
914 DataflowShutdownReference { dataflow_index }: <DataflowShutdown as Columnar>::Ref<'_>,
915 ) {
916 let ts = self.ts();
917
918 if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
919 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
921 let elapsed_pow = elapsed_ns.next_power_of_two();
922 self.output
923 .shutdown_duration
924 .give((elapsed_pow, ts, Diff::ONE));
925 } else {
926 let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
928 if !was_new {
929 error!(%dataflow_index, "dataflow already shutdown");
930 }
931 }
932
933 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
935 for global_id in global_ids {
936 let datum = DataflowGlobalDatum {
938 dataflow_index,
939 global_id,
940 };
941 self.output
942 .dataflow_global_ids
943 .give((datum, ts, Diff::MINUS_ONE));
944
945 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
947 for (
948 lir_id,
949 LirMetadata {
950 operator,
951 parent_lir_id,
952 nesting,
953 operator_span,
954 },
955 ) in mappings
956 {
957 let datum = LirMappingDatum {
958 global_id,
959 lir_id,
960 operator,
961 parent_lir_id,
962 nesting,
963 operator_span,
964 };
965 self.output.lir_mapping.give(&(datum, ts, Diff::MINUS_ONE));
966 }
967 }
968 }
969 }
970 }
971
972 fn handle_error_count(
973 &mut self,
974 ErrorCountReference { export_id, diff }: <ErrorCount as Columnar>::Ref<'_>,
975 ) {
976 let ts = self.ts();
977 let export_id = Columnar::into_owned(export_id);
978
979 let Some(export) = self.state.exports.get_mut(&export_id) else {
980 return;
983 };
984
985 let old_count = export.error_count;
986 let new_count = old_count + diff;
987
988 if old_count != Diff::ZERO {
989 let datum = ErrorCountDatum {
990 export_id,
991 count: old_count,
992 };
993 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
994 }
995 if new_count != Diff::ZERO {
996 let datum = ErrorCountDatum {
997 export_id,
998 count: new_count,
999 };
1000 self.output.error_count.give((datum, ts, Diff::ONE));
1001 }
1002
1003 export.error_count = new_count;
1004 }
1005
1006 fn handle_hydration(
1007 &mut self,
1008 HydrationReference { export_id }: <Hydration as Columnar>::Ref<'_>,
1009 ) {
1010 let ts = self.ts();
1011 let export_id = Columnar::into_owned(export_id);
1012
1013 let Some(export) = self.state.exports.get_mut(&export_id) else {
1014 error!(%export_id, "hydration event for unknown export");
1015 return;
1016 };
1017 if export.hydration_time_ns.is_some() {
1018 return;
1021 }
1022
1023 let duration = export.created_at.elapsed();
1024 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1025
1026 let retraction = HydrationTimeDatum {
1027 export_id,
1028 time_ns: None,
1029 };
1030 let insertion = HydrationTimeDatum {
1031 export_id,
1032 time_ns: Some(nanos),
1033 };
1034 self.output
1035 .hydration_time
1036 .give((retraction, ts, Diff::MINUS_ONE));
1037 self.output.hydration_time.give((insertion, ts, Diff::ONE));
1038
1039 export.hydration_time_ns = Some(nanos);
1040 }
1041
1042 fn handle_peek_install(
1043 &mut self,
1044 PeekEventReference {
1045 peek,
1046 peek_type,
1047 installed: _,
1048 }: <PeekEvent as Columnar>::Ref<'_>,
1049 ) {
1050 let peek = Peek::into_owned(peek);
1051 let uuid = Uuid::from_bytes(peek.uuid);
1052 let ts = self.ts();
1053 self.output
1054 .peek
1055 .give((PeekDatum { peek, peek_type }, ts, Diff::ONE));
1056
1057 let existing = self.state.peek_stash.insert(uuid, self.time);
1058 if existing.is_some() {
1059 error!(%uuid, "peek already registered");
1060 }
1061 }
1062
1063 fn handle_peek_retire(
1064 &mut self,
1065 PeekEventReference {
1066 peek,
1067 peek_type,
1068 installed: _,
1069 }: <PeekEvent as Columnar>::Ref<'_>,
1070 ) {
1071 let peek = Peek::into_owned(peek);
1072 let uuid = Uuid::from_bytes(peek.uuid);
1073 let ts = self.ts();
1074 self.output
1075 .peek
1076 .give((PeekDatum { peek, peek_type }, ts, Diff::MINUS_ONE));
1077
1078 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1079 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1080 let bucket = elapsed_ns.next_power_of_two();
1081 self.output.peek_duration.give((
1082 PeekDurationDatum { peek_type, bucket },
1083 ts,
1084 Diff::ONE,
1085 ));
1086 } else {
1087 error!(%uuid, "peek not yet registered");
1088 }
1089 }
1090
1091 fn handle_frontier(
1092 &mut self,
1093 FrontierReference {
1094 export_id,
1095 time,
1096 diff,
1097 }: <Frontier as Columnar>::Ref<'_>,
1098 ) {
1099 let export_id = Columnar::into_owned(export_id);
1100 let diff = Diff::from(*diff);
1101 let ts = self.ts();
1102 let time = Columnar::into_owned(time);
1103 let datum = FrontierDatum { export_id, time };
1104 self.output.frontier.give((datum, ts, diff));
1105 }
1106
1107 fn handle_import_frontier(
1108 &mut self,
1109 ImportFrontierReference {
1110 import_id,
1111 export_id,
1112 time,
1113 diff,
1114 }: <ImportFrontier as Columnar>::Ref<'_>,
1115 ) {
1116 let import_id = Columnar::into_owned(import_id);
1117 let export_id = Columnar::into_owned(export_id);
1118 let ts = self.ts();
1119 let time = Columnar::into_owned(time);
1120 let datum = ImportFrontierDatum {
1121 export_id,
1122 import_id,
1123 time,
1124 };
1125 self.output
1126 .import_frontier
1127 .give((datum, ts, (*diff).into()));
1128 }
1129
1130 fn handle_arrangement_heap_size(
1132 &mut self,
1133 ArrangementHeapSizeReference {
1134 operator_id,
1135 delta_size,
1136 }: <ArrangementHeapSize as Columnar>::Ref<'_>,
1137 ) {
1138 let ts = self.ts();
1139 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1140 return;
1141 };
1142
1143 let datum = ArrangementHeapDatum { operator_id };
1144 self.output
1145 .arrangement_heap_size
1146 .give((datum, ts, Diff::cast_from(delta_size)));
1147
1148 state.size += delta_size;
1149 }
1150
1151 fn handle_arrangement_heap_capacity(
1153 &mut self,
1154 ArrangementHeapCapacityReference {
1155 operator_id,
1156 delta_capacity,
1157 }: <ArrangementHeapCapacity as Columnar>::Ref<'_>,
1158 ) {
1159 let ts = self.ts();
1160 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1161 return;
1162 };
1163
1164 let datum = ArrangementHeapDatum { operator_id };
1165 self.output
1166 .arrangement_heap_capacity
1167 .give((datum, ts, Diff::cast_from(delta_capacity)));
1168
1169 state.capacity += delta_capacity;
1170 }
1171
1172 fn handle_arrangement_heap_allocations(
1174 &mut self,
1175 ArrangementHeapAllocationsReference {
1176 operator_id,
1177 delta_allocations,
1178 }: <ArrangementHeapAllocations as Columnar>::Ref<'_>,
1179 ) {
1180 let ts = self.ts();
1181 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1182 return;
1183 };
1184
1185 let datum = ArrangementHeapDatum { operator_id };
1186 let diff = Diff::cast_from(delta_allocations);
1187 self.output
1188 .arrangement_heap_allocations
1189 .give((datum, ts, diff));
1190
1191 state.count += delta_allocations;
1192 }
1193
1194 fn handle_arrangement_heap_size_operator(
1196 &mut self,
1197 ArrangementHeapSizeOperatorReference {
1198 operator_id,
1199 address,
1200 }: <ArrangementHeapSizeOperator as Columnar>::Ref<'_>,
1201 ) {
1202 let activator = self
1203 .state
1204 .scheduler
1205 .activator_for(address.into_iter().collect());
1206 let existing = self
1207 .state
1208 .arrangement_size
1209 .insert(operator_id, Default::default());
1210 if existing.is_some() {
1211 error!(%operator_id, "arrangement size operator already registered");
1212 }
1213 let existing = self
1214 .shared_state
1215 .arrangement_size_activators
1216 .insert(operator_id, activator);
1217 if existing.is_some() {
1218 error!(%operator_id, "arrangement size activator already registered");
1219 }
1220 }
1221
1222 fn handle_arrangement_heap_size_operator_dropped(
1224 &mut self,
1225 ArrangementHeapSizeOperatorDropReference { operator_id }: <ArrangementHeapSizeOperatorDrop as Columnar>::Ref<'_>,
1226 ) {
1227 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1228 let ts = self.ts();
1229 let datum = ArrangementHeapDatum { operator_id };
1230
1231 let diff = -Diff::cast_from(state.size);
1232 self.output.arrangement_heap_size.give((datum, ts, diff));
1233
1234 let diff = -Diff::cast_from(state.capacity);
1235 self.output
1236 .arrangement_heap_capacity
1237 .give((datum, ts, diff));
1238
1239 let diff = -Diff::cast_from(state.count);
1240 self.output
1241 .arrangement_heap_allocations
1242 .give((datum, ts, diff));
1243 }
1244 self.shared_state
1245 .arrangement_size_activators
1246 .remove(&operator_id);
1247 }
1248
1249 fn handle_lir_mapping(
1251 &mut self,
1252 LirMappingReference { global_id, mapping }: <LirMapping as Columnar>::Ref<'_>,
1253 ) {
1254 let global_id = Columnar::into_owned(global_id);
1255 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1257 self.state
1258 .lir_mapping
1259 .entry(global_id)
1260 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1261 .or_insert_with(|| mappings().collect());
1262
1263 let ts = self.ts();
1265 for (lir_id, meta) in mapping.into_iter() {
1266 let datum = LirMappingDatumReference {
1267 global_id,
1268 lir_id,
1269 operator: meta.operator,
1270 parent_lir_id: meta.parent_lir_id,
1271 nesting: meta.nesting,
1272 operator_span: meta.operator_span,
1273 };
1274 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1275 }
1276 }
1277
1278 fn handle_dataflow_global(
1279 &mut self,
1280 DataflowGlobalReference {
1281 dataflow_index,
1282 global_id,
1283 }: <DataflowGlobal as Columnar>::Ref<'_>,
1284 ) {
1285 let global_id = Columnar::into_owned(global_id);
1286 self.state
1287 .dataflow_global_ids
1288 .entry(dataflow_index)
1289 .and_modify(|globals| {
1290 if !globals.insert(global_id) {
1292 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1293 }
1294 })
1295 .or_insert_with(|| BTreeSet::from([global_id]));
1296
1297 let ts = self.ts();
1298 let datum = DataflowGlobalDatum {
1299 dataflow_index,
1300 global_id,
1301 };
1302 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1303 }
1304}
1305
1306pub struct CollectionLogging {
1311 export_id: GlobalId,
1312 logger: Logger,
1313
1314 logged_frontier: Option<Timestamp>,
1315 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1316}
1317
1318impl CollectionLogging {
1319 pub fn new(
1321 export_id: GlobalId,
1322 logger: Logger,
1323 dataflow_index: usize,
1324 import_ids: impl Iterator<Item = GlobalId>,
1325 ) -> Self {
1326 logger.log(&ComputeEvent::Export(Export {
1327 export_id,
1328 dataflow_index,
1329 }));
1330
1331 let mut self_ = Self {
1332 export_id,
1333 logger,
1334 logged_frontier: None,
1335 logged_import_frontiers: Default::default(),
1336 };
1337
1338 let initial_frontier = Some(Timestamp::MIN);
1340 self_.set_frontier(initial_frontier);
1341 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1342
1343 self_
1344 }
1345
1346 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1348 let old_time = self.logged_frontier;
1349 self.logged_frontier = new_time;
1350
1351 if old_time != new_time {
1352 let export_id = self.export_id;
1353 let retraction = old_time.map(|time| {
1354 ComputeEvent::Frontier(Frontier {
1355 export_id,
1356 time,
1357 diff: -1,
1358 })
1359 });
1360 let insertion = new_time.map(|time| {
1361 ComputeEvent::Frontier(Frontier {
1362 export_id,
1363 time,
1364 diff: 1,
1365 })
1366 });
1367 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1368 self.logger.log_many(events);
1369 }
1370 }
1371
1372 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1375 let old_time = self.logged_import_frontiers.remove(&import_id);
1376 if let Some(time) = new_time {
1377 self.logged_import_frontiers.insert(import_id, time);
1378 }
1379
1380 if old_time != new_time {
1381 let export_id = self.export_id;
1382 let retraction = old_time.map(|time| {
1383 ComputeEvent::ImportFrontier(ImportFrontier {
1384 import_id,
1385 export_id,
1386 time,
1387 diff: -1,
1388 })
1389 });
1390 let insertion = new_time.map(|time| {
1391 ComputeEvent::ImportFrontier(ImportFrontier {
1392 import_id,
1393 export_id,
1394 time,
1395 diff: 1,
1396 })
1397 });
1398 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1399 self.logger.log_many(events);
1400 }
1401 }
1402
1403 pub fn set_hydrated(&self) {
1405 self.logger.log(&ComputeEvent::Hydration(Hydration {
1406 export_id: self.export_id,
1407 }));
1408 }
1409}
1410
1411impl Drop for CollectionLogging {
1412 fn drop(&mut self) {
1413 self.set_frontier(None);
1415
1416 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1417 for import_id in import_ids {
1418 self.set_import_frontier(import_id, None);
1419 }
1420
1421 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1422 export_id: self.export_id,
1423 }));
1424 }
1425}
1426
1427pub(crate) trait LogDataflowErrors {
1430 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1431}
1432
1433impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
1434where
1435 G: Scope,
1436 D: Data,
1437{
1438 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1439 self.inner
1440 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1441 move |input, output| {
1442 input.for_each(|cap, data| {
1443 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1444 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1445
1446 output.session(&cap).give_container(data);
1447 });
1448 }
1449 })
1450 .as_collection()
1451 }
1452}
1453
1454impl<G, B> LogDataflowErrors for Stream<G, B>
1455where
1456 G: Scope,
1457 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1458{
1459 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1460 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1461 move |input, output| {
1462 input.for_each(|cap, data| {
1463 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1464 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1465
1466 output.session(&cap).give_container(data);
1467 });
1468 }
1469 })
1470 }
1471}
1472
1473fn sum_batch_diffs<B>(batch: &B) -> Diff
1480where
1481 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1482{
1483 let mut sum = Diff::ZERO;
1484 let mut cursor = batch.cursor();
1485
1486 while cursor.key_valid(batch) {
1487 while cursor.val_valid(batch) {
1488 cursor.map_times(batch, |_t, r| sum += r);
1489 cursor.step_val(batch);
1490 }
1491 cursor.step_key(batch);
1492 }
1493
1494 sum
1495}
1496
1497#[cfg(test)]
1498mod tests {
1499 use super::*;
1500
1501 #[mz_ore::test]
1502 fn test_compute_event_size() {
1503 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1505 }
1506}