1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Ref};
19use differential_dataflow::Collection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Timestamp};
25use mz_timely_util::columnar::Column;
26use mz_timely_util::columnar::builder::ColumnBuilder;
27use mz_timely_util::containers::ProvidedBuilder;
28use mz_timely_util::replay::MzReplay;
29use timely::dataflow::channels::pact::Pipeline;
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::core::Map;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::{Scope, Stream};
34use timely::scheduling::Scheduler;
35use timely::{Container, Data};
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrange;
40use crate::logging::{
41 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
42 PermutedRowPacker, SharedLoggingState, Update,
43};
44use crate::row_spine::{RowRowBatcher, RowRowBuilder};
45use crate::typedefs::RowRowSpine;
46
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54 pub export_id: GlobalId,
56 pub dataflow_index: usize,
58}
59
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63 pub export_id: GlobalId,
65}
66
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70 pub peek: Peek,
72 pub peek_type: PeekType,
75 pub installed: bool,
77}
78
79#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
81pub struct Frontier {
82 pub export_id: GlobalId,
83 pub time: Timestamp,
84 pub diff: i8,
85}
86
87#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
89pub struct ImportFrontier {
90 pub import_id: GlobalId,
91 pub export_id: GlobalId,
92 pub time: Timestamp,
93 pub diff: i8,
94}
95
96#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
98pub struct ArrangementHeapSize {
99 pub operator_id: usize,
101 pub delta_size: isize,
103}
104
105#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
107pub struct ArrangementHeapCapacity {
108 pub operator_id: usize,
110 pub delta_capacity: isize,
112}
113
114#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
116pub struct ArrangementHeapAllocations {
117 pub operator_id: usize,
119 pub delta_allocations: isize,
121}
122
123#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
125pub struct ArrangementHeapSizeOperator {
126 pub operator_id: usize,
128 pub address: Vec<usize>,
130}
131
132#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
134pub struct ArrangementHeapSizeOperatorDrop {
135 pub operator_id: usize,
137}
138
139#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
141pub struct DataflowShutdown {
142 pub dataflow_index: usize,
144}
145
146#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
148pub struct ErrorCount {
149 pub export_id: GlobalId,
151 pub diff: Diff,
153}
154
155#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
157pub struct Hydration {
158 pub export_id: GlobalId,
159}
160
161#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
163pub struct LirMapping {
164 pub global_id: GlobalId,
171 pub mapping: Vec<(LirId, LirMetadata)>,
174}
175
176#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
178pub struct DataflowGlobal {
179 pub dataflow_index: usize,
181 pub global_id: GlobalId,
183}
184
185#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
187pub enum ComputeEvent {
188 Export(Export),
190 ExportDropped(ExportDropped),
192 Peek(PeekEvent),
194 Frontier(Frontier),
196 ImportFrontier(ImportFrontier),
198 ArrangementHeapSize(ArrangementHeapSize),
200 ArrangementHeapCapacity(ArrangementHeapCapacity),
202 ArrangementHeapAllocations(ArrangementHeapAllocations),
204 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
206 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
208 DataflowShutdown(DataflowShutdown),
210 ErrorCount(ErrorCount),
212 Hydration(Hydration),
214 LirMapping(LirMapping),
218 DataflowGlobal(DataflowGlobal),
219}
220
221#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
223pub enum PeekType {
224 Index,
226 Persist,
228}
229
230impl PeekType {
231 fn name(self) -> &'static str {
233 match self {
234 PeekType::Index => "index",
235 PeekType::Persist => "persist",
236 }
237 }
238}
239
240#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
242pub struct Peek {
243 id: GlobalId,
245 time: Timestamp,
247 uuid: uuid::Bytes,
249}
250
251impl Peek {
252 pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self {
254 let uuid = uuid.into_bytes();
255 Self { id, time, uuid }
256 }
257}
258
259#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
261pub struct LirMetadata {
262 operator: String,
264 parent_lir_id: Option<LirId>,
266 nesting: u8,
268 operator_span: (usize, usize),
271}
272
273impl LirMetadata {
274 pub fn new(
276 operator: String,
277 parent_lir_id: Option<LirId>,
278 nesting: u8,
279 operator_span: (usize, usize),
280 ) -> Self {
281 Self {
282 operator,
283 parent_lir_id,
284 nesting,
285 operator_span,
286 }
287 }
288}
289
290pub(super) struct Return {
292 pub collections: BTreeMap<LogVariant, LogCollection>,
294}
295
296pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
306 mut scope: G,
307 scheduler: S,
308 config: &mz_compute_client::logging::LoggingConfig,
309 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
310 shared_state: Rc<RefCell<SharedLoggingState>>,
311) -> Return {
312 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
313
314 scope.scoped("compute logging", move |scope| {
315 let enable_logging = config.enable_logging;
316 let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
317 scope,
318 "compute logs",
319 config.interval,
320 event_queue.activator,
321 move |mut session, mut data| {
322 if enable_logging {
325 session.give_container(data.to_mut())
326 }
327 },
328 );
329
330 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
333 let mut input = demux.new_input(&logs, Pipeline);
334 let (mut export_out, export) = demux.new_output();
335 let (mut frontier_out, frontier) = demux.new_output();
336 let (mut import_frontier_out, import_frontier) = demux.new_output();
337 let (mut peek_out, peek) = demux.new_output();
338 let (mut peek_duration_out, peek_duration) = demux.new_output();
339 let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
340 let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
341 let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
342 let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
343 demux.new_output();
344 let (mut error_count_out, error_count) = demux.new_output();
345 let (mut hydration_time_out, hydration_time) = demux.new_output();
346 let (mut lir_mapping_out, lir_mapping) = demux.new_output();
347 let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
348
349 let mut demux_state = DemuxState::new(scheduler);
350 demux.build(move |_capability| {
351 move |_frontiers| {
352 let mut export = export_out.activate();
353 let mut frontier = frontier_out.activate();
354 let mut import_frontier = import_frontier_out.activate();
355 let mut peek = peek_out.activate();
356 let mut peek_duration = peek_duration_out.activate();
357 let mut shutdown_duration = shutdown_duration_out.activate();
358 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
359 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
360 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
361 let mut error_count = error_count_out.activate();
362 let mut hydration_time = hydration_time_out.activate();
363 let mut lir_mapping = lir_mapping_out.activate();
364 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
365
366 input.for_each(|cap, data| {
367 let mut output_sessions = DemuxOutput {
368 export: export.session(&cap),
369 frontier: frontier.session(&cap),
370 import_frontier: import_frontier.session(&cap),
371 peek: peek.session(&cap),
372 peek_duration: peek_duration.session(&cap),
373 shutdown_duration: shutdown_duration.session(&cap),
374 arrangement_heap_size: arrangement_heap_size.session(&cap),
375 arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
376 arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
377 error_count: error_count.session(&cap),
378 hydration_time: hydration_time.session(&cap),
379 lir_mapping: lir_mapping.session_with_builder(&cap),
380 dataflow_global_ids: dataflow_global_ids.session(&cap),
381 };
382
383 let shared_state = &mut shared_state.borrow_mut();
384 for (time, event) in data.drain() {
385 DemuxHandler {
386 state: &mut demux_state,
387 shared_state,
388 output: &mut output_sessions,
389 logging_interval_ms,
390 time,
391 }
392 .handle(event);
393 }
394 });
395 }
396 });
397
398 let worker_id = scope.index();
399
400 let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
402 let dataflow_current = export.as_collection().map({
403 let mut scratch = String::new();
404 move |datum| {
405 packer.pack_slice_owned(&[
406 make_string_datum(datum.export_id, &mut scratch),
407 Datum::UInt64(u64::cast_from(worker_id)),
408 Datum::UInt64(u64::cast_from(datum.dataflow_index)),
409 ])
410 }
411 });
412 let mut packer = PermutedRowPacker::new(ComputeLog::FrontierCurrent);
413 let frontier_current = frontier.as_collection().map({
414 let mut scratch = String::new();
415 move |datum| {
416 packer.pack_slice_owned(&[
417 make_string_datum(datum.export_id, &mut scratch),
418 Datum::UInt64(u64::cast_from(worker_id)),
419 Datum::MzTimestamp(datum.time),
420 ])
421 }
422 });
423 let mut packer = PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent);
424 let import_frontier_current = import_frontier.as_collection().map({
425 let mut scratch1 = String::new();
426 let mut scratch2 = String::new();
427 move |datum| {
428 packer.pack_slice_owned(&[
429 make_string_datum(datum.export_id, &mut scratch1),
430 make_string_datum(datum.import_id, &mut scratch2),
431 Datum::UInt64(u64::cast_from(worker_id)),
432 Datum::MzTimestamp(datum.time),
433 ])
434 }
435 });
436 let mut packer = PermutedRowPacker::new(ComputeLog::PeekCurrent);
437 let peek_current = peek.as_collection().map({
438 let mut scratch = String::new();
439 move |PeekDatum { peek, peek_type }| {
440 packer.pack_slice_owned(&[
441 Datum::Uuid(Uuid::from_bytes(peek.uuid)),
442 Datum::UInt64(u64::cast_from(worker_id)),
443 make_string_datum(peek.id, &mut scratch),
444 Datum::String(peek_type.name()),
445 Datum::MzTimestamp(peek.time),
446 ])
447 }
448 });
449 let mut packer = PermutedRowPacker::new(ComputeLog::PeekDuration);
450 let peek_duration =
451 peek_duration
452 .as_collection()
453 .map(move |PeekDurationDatum { peek_type, bucket }| {
454 packer.pack_slice_owned(&[
455 Datum::UInt64(u64::cast_from(worker_id)),
456 Datum::String(peek_type.name()),
457 Datum::UInt64(bucket.try_into().expect("bucket too big")),
458 ])
459 });
460 let mut packer = PermutedRowPacker::new(ComputeLog::ShutdownDuration);
461 let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
462 packer.pack_slice_owned(&[
463 Datum::UInt64(u64::cast_from(worker_id)),
464 Datum::UInt64(bucket.try_into().expect("bucket too big")),
465 ])
466 });
467
468 let arrangement_heap_datum_to_row =
469 move |packer: &mut PermutedRowPacker, ArrangementHeapDatum { operator_id }| {
470 packer.pack_slice_owned(&[
471 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
472 Datum::UInt64(u64::cast_from(worker_id)),
473 ])
474 };
475
476 let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
477 let arrangement_heap_size = arrangement_heap_size
478 .as_collection()
479 .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
480
481 let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapCapacity);
482 let arrangement_heap_capacity = arrangement_heap_capacity
483 .as_collection()
484 .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
485
486 let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
487 let arrangement_heap_allocations = arrangement_heap_allocations
488 .as_collection()
489 .map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
490
491 let mut packer = PermutedRowPacker::new(ComputeLog::ErrorCount);
492 let error_count = error_count.as_collection().map({
493 let mut scratch = String::new();
494 move |datum| {
495 packer.pack_slice_owned(&[
496 make_string_datum(datum.export_id, &mut scratch),
497 Datum::UInt64(u64::cast_from(worker_id)),
498 Datum::Int64(datum.count.into_inner()),
499 ])
500 }
501 });
502
503 let mut packer = PermutedRowPacker::new(ComputeLog::HydrationTime);
504 let hydration_time = hydration_time.as_collection().map({
505 let mut scratch = String::new();
506 move |datum| {
507 packer.pack_slice_owned(&[
508 make_string_datum(datum.export_id, &mut scratch),
509 Datum::UInt64(u64::cast_from(worker_id)),
510 Datum::from(datum.time_ns),
511 ])
512 }
513 });
514
515 let mut scratch1 = String::new();
516 let mut scratch2 = String::new();
517 let mut packer = PermutedRowPacker::new(ComputeLog::LirMapping);
518 let lir_mapping = lir_mapping
519 .map(move |(datum, time, diff)| {
520 let row = packer.pack_slice_owned(&[
521 make_string_datum(GlobalId::into_owned(datum.global_id), &mut scratch1),
522 Datum::UInt64(<LirId as Columnar>::into_owned(datum.lir_id).into()),
523 Datum::UInt64(u64::cast_from(worker_id)),
524 make_string_datum(datum.operator, &mut scratch2),
525 datum
526 .parent_lir_id
527 .map(|lir_id| Datum::UInt64(LirId::into_owned(lir_id).into()))
528 .unwrap_or_else(|| Datum::Null),
529 Datum::UInt16(u16::cast_from(*datum.nesting)),
530 Datum::UInt64(u64::cast_from(datum.operator_span.0)),
531 Datum::UInt64(u64::cast_from(datum.operator_span.1)),
532 ]);
533 (row, Timestamp::into_owned(time), diff)
534 })
535 .as_collection();
536
537 let mut packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
538 let dataflow_global_ids = dataflow_global_ids.as_collection().map({
539 let mut scratch = String::new();
540 move |datum| {
541 packer.pack_slice_owned(&[
542 Datum::UInt64(u64::cast_from(datum.dataflow_index)),
543 Datum::UInt64(u64::cast_from(worker_id)),
544 make_string_datum(datum.global_id, &mut scratch),
545 ])
546 }
547 });
548
549 use ComputeLog::*;
550 let logs = [
551 (DataflowCurrent, dataflow_current),
552 (FrontierCurrent, frontier_current),
553 (ImportFrontierCurrent, import_frontier_current),
554 (PeekCurrent, peek_current),
555 (PeekDuration, peek_duration),
556 (ShutdownDuration, shutdown_duration),
557 (ArrangementHeapSize, arrangement_heap_size),
558 (ArrangementHeapCapacity, arrangement_heap_capacity),
559 (ArrangementHeapAllocations, arrangement_heap_allocations),
560 (ErrorCount, error_count),
561 (HydrationTime, hydration_time),
562 (LirMapping, lir_mapping),
563 (DataflowGlobal, dataflow_global_ids),
564 ];
565
566 let mut collections = BTreeMap::new();
568 for (variant, collection) in logs {
569 let variant = LogVariant::Compute(variant);
570 if config.index_logs.contains_key(&variant) {
571 let trace = collection
572 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
573 &format!("Arrange {variant:?}"),
574 )
575 .trace;
576 let collection = LogCollection {
577 trace,
578 token: Rc::clone(&token),
579 };
580 collections.insert(variant, collection);
581 }
582 }
583
584 Return { collections }
585 })
586}
587
588fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
594where
595 V: Display,
596{
597 scratch.clear();
598 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
599 Datum::String(scratch)
600}
601
602struct DemuxState<A> {
604 scheduler: A,
606 exports: BTreeMap<GlobalId, ExportState>,
608 dataflow_export_counts: BTreeMap<usize, u32>,
610 dataflow_drop_times: BTreeMap<usize, Duration>,
612 shutdown_dataflows: BTreeSet<usize>,
614 peek_stash: BTreeMap<Uuid, Duration>,
616 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
618 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
620 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
622}
623
624impl<A: Scheduler> DemuxState<A> {
625 fn new(scheduler: A) -> Self {
626 Self {
627 scheduler,
628 exports: Default::default(),
629 dataflow_export_counts: Default::default(),
630 dataflow_drop_times: Default::default(),
631 shutdown_dataflows: Default::default(),
632 peek_stash: Default::default(),
633 arrangement_size: Default::default(),
634 lir_mapping: Default::default(),
635 dataflow_global_ids: Default::default(),
636 }
637 }
638}
639
640struct ExportState {
642 dataflow_index: usize,
644 error_count: Diff,
649 created_at: Instant,
651 hydration_time_ns: Option<u64>,
653}
654
655impl ExportState {
656 fn new(dataflow_index: usize) -> Self {
657 Self {
658 dataflow_index,
659 error_count: Diff::ZERO,
660 created_at: Instant::now(),
661 hydration_time_ns: None,
662 }
663 }
664}
665
666#[derive(Default, Debug)]
668struct ArrangementSizeState {
669 size: isize,
670 capacity: isize,
671 count: isize,
672}
673
674struct DemuxOutput<'a> {
676 export: OutputSessionVec<'a, Update<ExportDatum>>,
677 frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
678 import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
679 peek: OutputSessionVec<'a, Update<PeekDatum>>,
680 peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
681 shutdown_duration: OutputSessionVec<'a, Update<u128>>,
682 arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
683 arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
684 arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
685 hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
686 error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
687 lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
688 dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
689}
690
691#[derive(Clone)]
692struct ExportDatum {
693 export_id: GlobalId,
694 dataflow_index: usize,
695}
696
697#[derive(Clone)]
698struct FrontierDatum {
699 export_id: GlobalId,
700 time: Timestamp,
701}
702
703#[derive(Clone)]
704struct ImportFrontierDatum {
705 export_id: GlobalId,
706 import_id: GlobalId,
707 time: Timestamp,
708}
709
710#[derive(Clone)]
711struct PeekDatum {
712 peek: Peek,
713 peek_type: PeekType,
714}
715
716#[derive(Clone)]
717struct PeekDurationDatum {
718 peek_type: PeekType,
719 bucket: u128,
720}
721
722#[derive(Clone, Copy)]
723struct ArrangementHeapDatum {
724 operator_id: usize,
725}
726
727#[derive(Clone)]
728struct HydrationTimeDatum {
729 export_id: GlobalId,
730 time_ns: Option<u64>,
731}
732
733#[derive(Clone)]
734struct ErrorCountDatum {
735 export_id: GlobalId,
736 count: Diff,
740}
741
742#[derive(Clone, Columnar)]
743struct LirMappingDatum {
744 global_id: GlobalId,
745 lir_id: LirId,
746 operator: String,
747 parent_lir_id: Option<LirId>,
748 nesting: u8,
749 operator_span: (usize, usize),
750}
751
752#[derive(Clone)]
753struct DataflowGlobalDatum {
754 dataflow_index: usize,
755 global_id: GlobalId,
756}
757
758struct DemuxHandler<'a, 'b, A: Scheduler> {
760 state: &'a mut DemuxState<A>,
762 shared_state: &'a mut SharedLoggingState,
764 output: &'a mut DemuxOutput<'b>,
766 logging_interval_ms: u128,
768 time: Duration,
770}
771
772impl<A: Scheduler> DemuxHandler<'_, '_, A> {
773 fn ts(&self) -> Timestamp {
776 let time_ms = self.time.as_millis();
777 let interval = self.logging_interval_ms;
778 let rounded = (time_ms / interval + 1) * interval;
779 rounded.try_into().expect("must fit")
780 }
781
782 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
784 use ComputeEventReference::*;
785 match event {
786 Export(export) => self.handle_export(export),
787 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
788 Peek(peek) if peek.installed => self.handle_peek_install(peek),
789 Peek(peek) => self.handle_peek_retire(peek),
790 Frontier(frontier) => self.handle_frontier(frontier),
791 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
792 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
793 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
794 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
795 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
796 ArrangementHeapSizeOperatorDrop(inner) => {
797 self.handle_arrangement_heap_size_operator_dropped(inner)
798 }
799 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
800 ErrorCount(error_count) => self.handle_error_count(error_count),
801 Hydration(hydration) => self.handle_hydration(hydration),
802 LirMapping(mapping) => self.handle_lir_mapping(mapping),
803 DataflowGlobal(global) => self.handle_dataflow_global(global),
804 }
805 }
806
807 fn handle_export(
808 &mut self,
809 ExportReference {
810 export_id,
811 dataflow_index,
812 }: Ref<'_, Export>,
813 ) {
814 let export_id = Columnar::into_owned(export_id);
815 let ts = self.ts();
816 let datum = ExportDatum {
817 export_id,
818 dataflow_index,
819 };
820 self.output.export.give((datum, ts, Diff::ONE));
821
822 let existing = self
823 .state
824 .exports
825 .insert(export_id, ExportState::new(dataflow_index));
826 if existing.is_some() {
827 error!(%export_id, "export already registered");
828 }
829
830 *self
831 .state
832 .dataflow_export_counts
833 .entry(dataflow_index)
834 .or_default() += 1;
835
836 let datum = HydrationTimeDatum {
838 export_id,
839 time_ns: None,
840 };
841 self.output.hydration_time.give((datum, ts, Diff::ONE));
842 }
843
844 fn handle_export_dropped(
845 &mut self,
846 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
847 ) {
848 let export_id = Columnar::into_owned(export_id);
849 let Some(export) = self.state.exports.remove(&export_id) else {
850 error!(%export_id, "missing exports entry at time of export drop");
851 return;
852 };
853
854 let ts = self.ts();
855 let dataflow_index = export.dataflow_index;
856
857 let datum = ExportDatum {
858 export_id,
859 dataflow_index,
860 };
861 self.output.export.give((datum, ts, Diff::MINUS_ONE));
862
863 match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
864 entry @ Some(0) | entry @ None => {
865 error!(
866 %export_id,
867 %dataflow_index,
868 "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
869 );
870 }
871 Some(1) => self.handle_dataflow_dropped(dataflow_index),
872 Some(count) => *count -= 1,
873 }
874
875 if export.error_count != Diff::ZERO {
877 let datum = ErrorCountDatum {
878 export_id,
879 count: export.error_count,
880 };
881 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
882 }
883
884 let datum = HydrationTimeDatum {
886 export_id,
887 time_ns: export.hydration_time_ns,
888 };
889 self.output
890 .hydration_time
891 .give((datum, ts, Diff::MINUS_ONE));
892 }
893
894 fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
895 self.state.dataflow_export_counts.remove(&dataflow_index);
896
897 if self.state.shutdown_dataflows.remove(&dataflow_index) {
898 self.output
900 .shutdown_duration
901 .give((0, self.ts(), Diff::ONE));
902 } else {
903 let existing = self
905 .state
906 .dataflow_drop_times
907 .insert(dataflow_index, self.time);
908 if existing.is_some() {
909 error!(%dataflow_index, "dataflow already dropped");
910 }
911 }
912 }
913
914 fn handle_dataflow_shutdown(
915 &mut self,
916 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
917 ) {
918 let ts = self.ts();
919
920 if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
921 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
923 let elapsed_pow = elapsed_ns.next_power_of_two();
924 self.output
925 .shutdown_duration
926 .give((elapsed_pow, ts, Diff::ONE));
927 } else {
928 let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
930 if !was_new {
931 error!(%dataflow_index, "dataflow already shutdown");
932 }
933 }
934
935 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
937 for global_id in global_ids {
938 let datum = DataflowGlobalDatum {
940 dataflow_index,
941 global_id,
942 };
943 self.output
944 .dataflow_global_ids
945 .give((datum, ts, Diff::MINUS_ONE));
946
947 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
949 for (
950 lir_id,
951 LirMetadata {
952 operator,
953 parent_lir_id,
954 nesting,
955 operator_span,
956 },
957 ) in mappings
958 {
959 let datum = LirMappingDatum {
960 global_id,
961 lir_id,
962 operator,
963 parent_lir_id,
964 nesting,
965 operator_span,
966 };
967 self.output.lir_mapping.give(&(datum, ts, Diff::MINUS_ONE));
968 }
969 }
970 }
971 }
972 }
973
974 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
975 let ts = self.ts();
976 let export_id = Columnar::into_owned(export_id);
977
978 let Some(export) = self.state.exports.get_mut(&export_id) else {
979 return;
982 };
983
984 let old_count = export.error_count;
985 let new_count = old_count + diff;
986
987 if old_count != Diff::ZERO {
988 let datum = ErrorCountDatum {
989 export_id,
990 count: old_count,
991 };
992 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
993 }
994 if new_count != Diff::ZERO {
995 let datum = ErrorCountDatum {
996 export_id,
997 count: new_count,
998 };
999 self.output.error_count.give((datum, ts, Diff::ONE));
1000 }
1001
1002 export.error_count = new_count;
1003 }
1004
1005 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
1006 let ts = self.ts();
1007 let export_id = Columnar::into_owned(export_id);
1008
1009 let Some(export) = self.state.exports.get_mut(&export_id) else {
1010 error!(%export_id, "hydration event for unknown export");
1011 return;
1012 };
1013 if export.hydration_time_ns.is_some() {
1014 return;
1017 }
1018
1019 let duration = export.created_at.elapsed();
1020 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1021
1022 let retraction = HydrationTimeDatum {
1023 export_id,
1024 time_ns: None,
1025 };
1026 let insertion = HydrationTimeDatum {
1027 export_id,
1028 time_ns: Some(nanos),
1029 };
1030 self.output
1031 .hydration_time
1032 .give((retraction, ts, Diff::MINUS_ONE));
1033 self.output.hydration_time.give((insertion, ts, Diff::ONE));
1034
1035 export.hydration_time_ns = Some(nanos);
1036 }
1037
1038 fn handle_peek_install(
1039 &mut self,
1040 PeekEventReference {
1041 peek,
1042 peek_type,
1043 installed: _,
1044 }: Ref<'_, PeekEvent>,
1045 ) {
1046 let peek = Peek::into_owned(peek);
1047 let uuid = Uuid::from_bytes(peek.uuid);
1048 let ts = self.ts();
1049 self.output
1050 .peek
1051 .give((PeekDatum { peek, peek_type }, ts, Diff::ONE));
1052
1053 let existing = self.state.peek_stash.insert(uuid, self.time);
1054 if existing.is_some() {
1055 error!(%uuid, "peek already registered");
1056 }
1057 }
1058
1059 fn handle_peek_retire(
1060 &mut self,
1061 PeekEventReference {
1062 peek,
1063 peek_type,
1064 installed: _,
1065 }: Ref<'_, PeekEvent>,
1066 ) {
1067 let peek = Peek::into_owned(peek);
1068 let uuid = Uuid::from_bytes(peek.uuid);
1069 let ts = self.ts();
1070 self.output
1071 .peek
1072 .give((PeekDatum { peek, peek_type }, ts, Diff::MINUS_ONE));
1073
1074 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1075 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1076 let bucket = elapsed_ns.next_power_of_two();
1077 self.output.peek_duration.give((
1078 PeekDurationDatum { peek_type, bucket },
1079 ts,
1080 Diff::ONE,
1081 ));
1082 } else {
1083 error!(%uuid, "peek not yet registered");
1084 }
1085 }
1086
1087 fn handle_frontier(
1088 &mut self,
1089 FrontierReference {
1090 export_id,
1091 time,
1092 diff,
1093 }: Ref<'_, Frontier>,
1094 ) {
1095 let export_id = Columnar::into_owned(export_id);
1096 let diff = Diff::from(*diff);
1097 let ts = self.ts();
1098 let time = Columnar::into_owned(time);
1099 let datum = FrontierDatum { export_id, time };
1100 self.output.frontier.give((datum, ts, diff));
1101 }
1102
1103 fn handle_import_frontier(
1104 &mut self,
1105 ImportFrontierReference {
1106 import_id,
1107 export_id,
1108 time,
1109 diff,
1110 }: Ref<'_, ImportFrontier>,
1111 ) {
1112 let import_id = Columnar::into_owned(import_id);
1113 let export_id = Columnar::into_owned(export_id);
1114 let ts = self.ts();
1115 let time = Columnar::into_owned(time);
1116 let datum = ImportFrontierDatum {
1117 export_id,
1118 import_id,
1119 time,
1120 };
1121 self.output
1122 .import_frontier
1123 .give((datum, ts, (*diff).into()));
1124 }
1125
1126 fn handle_arrangement_heap_size(
1128 &mut self,
1129 ArrangementHeapSizeReference {
1130 operator_id,
1131 delta_size,
1132 }: Ref<'_, ArrangementHeapSize>,
1133 ) {
1134 let ts = self.ts();
1135 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1136 return;
1137 };
1138
1139 let datum = ArrangementHeapDatum { operator_id };
1140 self.output
1141 .arrangement_heap_size
1142 .give((datum, ts, Diff::cast_from(delta_size)));
1143
1144 state.size += delta_size;
1145 }
1146
1147 fn handle_arrangement_heap_capacity(
1149 &mut self,
1150 ArrangementHeapCapacityReference {
1151 operator_id,
1152 delta_capacity,
1153 }: Ref<'_, ArrangementHeapCapacity>,
1154 ) {
1155 let ts = self.ts();
1156 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1157 return;
1158 };
1159
1160 let datum = ArrangementHeapDatum { operator_id };
1161 self.output
1162 .arrangement_heap_capacity
1163 .give((datum, ts, Diff::cast_from(delta_capacity)));
1164
1165 state.capacity += delta_capacity;
1166 }
1167
1168 fn handle_arrangement_heap_allocations(
1170 &mut self,
1171 ArrangementHeapAllocationsReference {
1172 operator_id,
1173 delta_allocations,
1174 }: Ref<'_, ArrangementHeapAllocations>,
1175 ) {
1176 let ts = self.ts();
1177 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1178 return;
1179 };
1180
1181 let datum = ArrangementHeapDatum { operator_id };
1182 let diff = Diff::cast_from(delta_allocations);
1183 self.output
1184 .arrangement_heap_allocations
1185 .give((datum, ts, diff));
1186
1187 state.count += delta_allocations;
1188 }
1189
1190 fn handle_arrangement_heap_size_operator(
1192 &mut self,
1193 ArrangementHeapSizeOperatorReference {
1194 operator_id,
1195 address,
1196 }: Ref<'_, ArrangementHeapSizeOperator>,
1197 ) {
1198 let activator = self
1199 .state
1200 .scheduler
1201 .activator_for(address.into_iter().collect());
1202 let existing = self
1203 .state
1204 .arrangement_size
1205 .insert(operator_id, Default::default());
1206 if existing.is_some() {
1207 error!(%operator_id, "arrangement size operator already registered");
1208 }
1209 let existing = self
1210 .shared_state
1211 .arrangement_size_activators
1212 .insert(operator_id, activator);
1213 if existing.is_some() {
1214 error!(%operator_id, "arrangement size activator already registered");
1215 }
1216 }
1217
1218 fn handle_arrangement_heap_size_operator_dropped(
1220 &mut self,
1221 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1222 ) {
1223 let operator_id = event.operator_id;
1224 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1225 let ts = self.ts();
1226 let datum = ArrangementHeapDatum { operator_id };
1227
1228 let diff = -Diff::cast_from(state.size);
1229 self.output.arrangement_heap_size.give((datum, ts, diff));
1230
1231 let diff = -Diff::cast_from(state.capacity);
1232 self.output
1233 .arrangement_heap_capacity
1234 .give((datum, ts, diff));
1235
1236 let diff = -Diff::cast_from(state.count);
1237 self.output
1238 .arrangement_heap_allocations
1239 .give((datum, ts, diff));
1240 }
1241 self.shared_state
1242 .arrangement_size_activators
1243 .remove(&operator_id);
1244 }
1245
1246 fn handle_lir_mapping(
1248 &mut self,
1249 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1250 ) {
1251 let global_id = Columnar::into_owned(global_id);
1252 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1254 self.state
1255 .lir_mapping
1256 .entry(global_id)
1257 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1258 .or_insert_with(|| mappings().collect());
1259
1260 let ts = self.ts();
1262 for (lir_id, meta) in mapping.into_iter() {
1263 let datum = LirMappingDatumReference {
1264 global_id,
1265 lir_id,
1266 operator: meta.operator,
1267 parent_lir_id: meta.parent_lir_id,
1268 nesting: meta.nesting,
1269 operator_span: meta.operator_span,
1270 };
1271 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1272 }
1273 }
1274
1275 fn handle_dataflow_global(
1276 &mut self,
1277 DataflowGlobalReference {
1278 dataflow_index,
1279 global_id,
1280 }: Ref<'_, DataflowGlobal>,
1281 ) {
1282 let global_id = Columnar::into_owned(global_id);
1283 self.state
1284 .dataflow_global_ids
1285 .entry(dataflow_index)
1286 .and_modify(|globals| {
1287 if !globals.insert(global_id) {
1289 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1290 }
1291 })
1292 .or_insert_with(|| BTreeSet::from([global_id]));
1293
1294 let ts = self.ts();
1295 let datum = DataflowGlobalDatum {
1296 dataflow_index,
1297 global_id,
1298 };
1299 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1300 }
1301}
1302
1303pub struct CollectionLogging {
1308 export_id: GlobalId,
1309 logger: Logger,
1310
1311 logged_frontier: Option<Timestamp>,
1312 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1313}
1314
1315impl CollectionLogging {
1316 pub fn new(
1318 export_id: GlobalId,
1319 logger: Logger,
1320 dataflow_index: usize,
1321 import_ids: impl Iterator<Item = GlobalId>,
1322 ) -> Self {
1323 logger.log(&ComputeEvent::Export(Export {
1324 export_id,
1325 dataflow_index,
1326 }));
1327
1328 let mut self_ = Self {
1329 export_id,
1330 logger,
1331 logged_frontier: None,
1332 logged_import_frontiers: Default::default(),
1333 };
1334
1335 let initial_frontier = Some(Timestamp::MIN);
1337 self_.set_frontier(initial_frontier);
1338 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1339
1340 self_
1341 }
1342
1343 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1345 let old_time = self.logged_frontier;
1346 self.logged_frontier = new_time;
1347
1348 if old_time != new_time {
1349 let export_id = self.export_id;
1350 let retraction = old_time.map(|time| {
1351 ComputeEvent::Frontier(Frontier {
1352 export_id,
1353 time,
1354 diff: -1,
1355 })
1356 });
1357 let insertion = new_time.map(|time| {
1358 ComputeEvent::Frontier(Frontier {
1359 export_id,
1360 time,
1361 diff: 1,
1362 })
1363 });
1364 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1365 self.logger.log_many(events);
1366 }
1367 }
1368
1369 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1372 let old_time = self.logged_import_frontiers.remove(&import_id);
1373 if let Some(time) = new_time {
1374 self.logged_import_frontiers.insert(import_id, time);
1375 }
1376
1377 if old_time != new_time {
1378 let export_id = self.export_id;
1379 let retraction = old_time.map(|time| {
1380 ComputeEvent::ImportFrontier(ImportFrontier {
1381 import_id,
1382 export_id,
1383 time,
1384 diff: -1,
1385 })
1386 });
1387 let insertion = new_time.map(|time| {
1388 ComputeEvent::ImportFrontier(ImportFrontier {
1389 import_id,
1390 export_id,
1391 time,
1392 diff: 1,
1393 })
1394 });
1395 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1396 self.logger.log_many(events);
1397 }
1398 }
1399
1400 pub fn set_hydrated(&self) {
1402 self.logger.log(&ComputeEvent::Hydration(Hydration {
1403 export_id: self.export_id,
1404 }));
1405 }
1406}
1407
1408impl Drop for CollectionLogging {
1409 fn drop(&mut self) {
1410 self.set_frontier(None);
1412
1413 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1414 for import_id in import_ids {
1415 self.set_import_frontier(import_id, None);
1416 }
1417
1418 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1419 export_id: self.export_id,
1420 }));
1421 }
1422}
1423
1424pub(crate) trait LogDataflowErrors {
1427 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1428}
1429
1430impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
1431where
1432 G: Scope,
1433 D: Data,
1434{
1435 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1436 self.inner
1437 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1438 move |input, output| {
1439 input.for_each(|cap, data| {
1440 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1441 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1442
1443 output.session(&cap).give_container(data);
1444 });
1445 }
1446 })
1447 .as_collection()
1448 }
1449}
1450
1451impl<G, B> LogDataflowErrors for Stream<G, B>
1452where
1453 G: Scope,
1454 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1455{
1456 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1457 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1458 move |input, output| {
1459 input.for_each(|cap, data| {
1460 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1461 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1462
1463 output.session(&cap).give_container(data);
1464 });
1465 }
1466 })
1467 }
1468}
1469
1470fn sum_batch_diffs<B>(batch: &B) -> Diff
1477where
1478 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1479{
1480 let mut sum = Diff::ZERO;
1481 let mut cursor = batch.cursor();
1482
1483 while cursor.key_valid(batch) {
1484 while cursor.val_valid(batch) {
1485 cursor.map_times(batch, |_t, r| sum += r);
1486 cursor.step_val(batch);
1487 }
1488 cursor.step_key(batch);
1489 }
1490
1491 sum
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496 use super::*;
1497
1498 #[mz_ore::test]
1499 fn test_compute_event_size() {
1500 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1502 }
1503}