1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::batcher;
26use mz_timely_util::columnar::builder::ColumnBuilder;
27use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
28use mz_timely_util::replay::MzReplay;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, StreamVec};
35use timely::scheduling::activate::{Activations, Activator};
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42 SharedLoggingState, Update,
43};
44use crate::typedefs::RowRowSpine;
45use mz_row_spine::RowRowBuilder;
46
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54 pub export_id: GlobalId,
56 pub dataflow_index: usize,
58}
59
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63 pub export_id: GlobalId,
65}
66
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70 pub id: GlobalId,
72 pub time: Timestamp,
74 pub uuid: uuid::Bytes,
76 pub peek_type: PeekType,
79 pub installed: bool,
81}
82
83#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86 pub export_id: GlobalId,
87 pub time: Timestamp,
88 pub diff: i8,
89}
90
91#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94 pub import_id: GlobalId,
95 pub export_id: GlobalId,
96 pub time: Timestamp,
97 pub diff: i8,
98}
99
100#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103 pub operator_id: usize,
105 pub delta_size: isize,
107}
108
109#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112 pub operator_id: usize,
114 pub delta_capacity: isize,
116}
117
118#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121 pub operator_id: usize,
123 pub delta_allocations: isize,
125}
126
127#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130 pub operator_id: usize,
132 pub address: Vec<usize>,
134}
135
136#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139 pub operator_id: usize,
141}
142
143#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146 pub dataflow_index: usize,
148}
149
150#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153 pub export_id: GlobalId,
155 pub diff: Diff,
157}
158
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162 pub export_id: GlobalId,
164}
165
166#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169 pub export_id: GlobalId,
171 pub lir_id: LirId,
173 pub hydrated: bool,
175}
176
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180 pub global_id: GlobalId,
187 pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195 pub dataflow_index: usize,
197 pub global_id: GlobalId,
199}
200
201#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204 Export(Export),
206 ExportDropped(ExportDropped),
208 Peek(PeekEvent),
210 Frontier(Frontier),
212 ImportFrontier(ImportFrontier),
214 ArrangementHeapSize(ArrangementHeapSize),
216 ArrangementHeapCapacity(ArrangementHeapCapacity),
218 ArrangementHeapAllocations(ArrangementHeapAllocations),
220 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224 DataflowShutdown(DataflowShutdown),
226 ErrorCount(ErrorCount),
228 Hydration(Hydration),
230 OperatorHydration(OperatorHydration),
232 LirMapping(LirMapping),
236 DataflowGlobal(DataflowGlobal),
237}
238
239#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242 Index,
244 Persist,
246}
247
248impl PeekType {
249 fn name(self) -> &'static str {
251 match self {
252 PeekType::Index => "index",
253 PeekType::Persist => "persist",
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261 operator: String,
263 parent_lir_id: Option<LirId>,
265 nesting: u8,
267 operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273 pub fn new(
275 operator: String,
276 parent_lir_id: Option<LirId>,
277 nesting: u8,
278 operator_span: (usize, usize),
279 ) -> Self {
280 Self {
281 operator,
282 parent_lir_id,
283 nesting,
284 operator_span,
285 }
286 }
287}
288
289pub(super) struct Return {
291 pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295pub(super) fn construct<'scope>(
305 scope: Scope<'scope, Timestamp>,
306 activations: Rc<RefCell<Activations>>,
307 config: &mz_compute_client::logging::LoggingConfig,
308 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309 shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313 scope.scoped("compute logging", move |scope| {
314 let enable_logging = config.enable_logging;
315 let (logs, token) = if enable_logging {
316 event_queue.links.mz_replay(
317 scope,
318 "compute logs",
319 config.interval,
320 event_queue.activator,
321 )
322 } else {
323 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324 (empty(scope), token)
325 };
326
327 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330 let mut input = demux.new_input(logs, Pipeline);
331 let (export_out, export) = demux.new_output();
332 let mut export_out = OutputBuilder::from(export_out);
333 let (frontier_out, frontier) = demux.new_output();
334 let mut frontier_out = OutputBuilder::from(frontier_out);
335 let (import_frontier_out, import_frontier) = demux.new_output();
336 let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337 let (peek_out, peek) = demux.new_output();
338 let mut peek_out = OutputBuilder::from(peek_out);
339 let (peek_duration_out, peek_duration) = demux.new_output();
340 let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341 let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
342 let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
343 let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
344 let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
345 let (arrangement_heap_allocations_out, arrangement_heap_allocations) = demux.new_output();
346 let mut arrangement_heap_allocations_out =
347 OutputBuilder::from(arrangement_heap_allocations_out);
348 let (error_count_out, error_count) = demux.new_output();
349 let mut error_count_out = OutputBuilder::from(error_count_out);
350 let (hydration_time_out, hydration_time) = demux.new_output();
351 let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
352 let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
353 let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
354 let (lir_mapping_out, lir_mapping) = demux.new_output();
355 let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
356 let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
357 let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
358
359 let mut demux_state = DemuxState::new(activations, scope.index());
360 demux.build(move |_capability| {
361 move |_frontiers| {
362 let mut export = export_out.activate();
363 let mut frontier = frontier_out.activate();
364 let mut import_frontier = import_frontier_out.activate();
365 let mut peek = peek_out.activate();
366 let mut peek_duration = peek_duration_out.activate();
367 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
368 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
369 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
370 let mut error_count = error_count_out.activate();
371 let mut hydration_time = hydration_time_out.activate();
372 let mut operator_hydration_status = operator_hydration_status_out.activate();
373 let mut lir_mapping = lir_mapping_out.activate();
374 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
375
376 input.for_each(|cap, data| {
377 let mut output_sessions = DemuxOutput {
378 export: export.session_with_builder(&cap),
379 frontier: frontier.session_with_builder(&cap),
380 import_frontier: import_frontier.session_with_builder(&cap),
381 peek: peek.session_with_builder(&cap),
382 peek_duration: peek_duration.session_with_builder(&cap),
383 arrangement_heap_allocations: arrangement_heap_allocations
384 .session_with_builder(&cap),
385 arrangement_heap_capacity: arrangement_heap_capacity
386 .session_with_builder(&cap),
387 arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
388 error_count: error_count.session_with_builder(&cap),
389 hydration_time: hydration_time.session_with_builder(&cap),
390 operator_hydration_status: operator_hydration_status
391 .session_with_builder(&cap),
392 lir_mapping: lir_mapping.session_with_builder(&cap),
393 dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
394 };
395
396 let shared_state = &mut shared_state.borrow_mut();
397 for (time, event) in data.borrow().into_index_iter() {
398 DemuxHandler {
399 state: &mut demux_state,
400 shared_state,
401 output: &mut output_sessions,
402 logging_interval_ms,
403 time,
404 }
405 .handle(event);
406 }
407 });
408 }
409 });
410
411 use ComputeLog::*;
412 let logs = [
413 (ArrangementHeapAllocations, arrangement_heap_allocations),
414 (ArrangementHeapCapacity, arrangement_heap_capacity),
415 (ArrangementHeapSize, arrangement_heap_size),
416 (DataflowCurrent, export),
417 (DataflowGlobal, dataflow_global_ids),
418 (ErrorCount, error_count),
419 (FrontierCurrent, frontier),
420 (HydrationTime, hydration_time),
421 (ImportFrontierCurrent, import_frontier),
422 (LirMapping, lir_mapping),
423 (OperatorHydrationStatus, operator_hydration_status),
424 (PeekCurrent, peek),
425 (PeekDuration, peek_duration),
426 ];
427
428 let mut collections = BTreeMap::new();
430 for (variant, stream) in logs {
431 let variant = LogVariant::Compute(variant);
432 if config.index_logs.contains_key(&variant) {
433 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
434 columnar_exchange::<Row, Row, Timestamp, Diff>,
435 );
436 let trace = stream
437 .mz_arrange_core::<
438 _,
439 batcher::Chunker<_>,
440 Col2ValBatcher<_, _, _, _>,
441 RowRowBuilder<_, _>,
442 RowRowSpine<_, _>,
443 >(exchange, &format!("Arrange {variant:?}"))
444 .trace;
445 let collection = LogCollection {
446 trace,
447 token: Rc::clone(&token),
448 };
449 collections.insert(variant, collection);
450 }
451 }
452
453 Return { collections }
454 })
455}
456
457fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
463where
464 V: Display,
465{
466 scratch.clear();
467 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
468 Datum::String(scratch)
469}
470
471struct DemuxState {
473 activations: Rc<RefCell<Activations>>,
475 worker_id: usize,
477 scratch_string_a: String,
479 scratch_string_b: String,
481 exports: BTreeMap<GlobalId, ExportState>,
483 peek_stash: BTreeMap<Uuid, Duration>,
485 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
487 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
489 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
491 arrangement_heap_allocations_packer: PermutedRowPacker,
493 arrangement_heap_capacity_packer: PermutedRowPacker,
495 arrangement_heap_size_packer: PermutedRowPacker,
497 dataflow_global_packer: PermutedRowPacker,
499 error_count_packer: PermutedRowPacker,
501 export_packer: PermutedRowPacker,
503 frontier_packer: PermutedRowPacker,
505 import_frontier_packer: PermutedRowPacker,
507 lir_mapping_packer: PermutedRowPacker,
509 operator_hydration_status_packer: PermutedRowPacker,
511 peek_duration_packer: PermutedRowPacker,
513 peek_packer: PermutedRowPacker,
515 hydration_time_packer: PermutedRowPacker,
517}
518
519impl DemuxState {
520 fn new(activations: Rc<RefCell<Activations>>, worker_id: usize) -> Self {
521 Self {
522 activations,
523 worker_id,
524 scratch_string_a: String::new(),
525 scratch_string_b: String::new(),
526 exports: Default::default(),
527 peek_stash: Default::default(),
528 arrangement_size: Default::default(),
529 lir_mapping: Default::default(),
530 dataflow_global_ids: Default::default(),
531 arrangement_heap_allocations_packer: PermutedRowPacker::new(
532 ComputeLog::ArrangementHeapAllocations,
533 ),
534 arrangement_heap_capacity_packer: PermutedRowPacker::new(
535 ComputeLog::ArrangementHeapCapacity,
536 ),
537 arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
538 dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
539 error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
540 export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
541 frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
542 hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
543 import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
544 lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
545 operator_hydration_status_packer: PermutedRowPacker::new(
546 ComputeLog::OperatorHydrationStatus,
547 ),
548 peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
549 peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
550 }
551 }
552
553 fn pack_arrangement_heap_allocations_update(
555 &mut self,
556 operator_id: usize,
557 ) -> (&RowRef, &RowRef) {
558 self.arrangement_heap_allocations_packer.pack_slice(&[
559 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
560 Datum::UInt64(u64::cast_from(self.worker_id)),
561 ])
562 }
563
564 fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
566 self.arrangement_heap_capacity_packer.pack_slice(&[
567 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
568 Datum::UInt64(u64::cast_from(self.worker_id)),
569 ])
570 }
571
572 fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
574 self.arrangement_heap_size_packer.pack_slice(&[
575 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
576 Datum::UInt64(u64::cast_from(self.worker_id)),
577 ])
578 }
579
580 fn pack_dataflow_global_update(
582 &mut self,
583 dataflow_index: usize,
584 global_id: GlobalId,
585 ) -> (&RowRef, &RowRef) {
586 self.dataflow_global_packer.pack_slice(&[
587 Datum::UInt64(u64::cast_from(dataflow_index)),
588 Datum::UInt64(u64::cast_from(self.worker_id)),
589 make_string_datum(global_id, &mut self.scratch_string_a),
590 ])
591 }
592
593 fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
595 self.error_count_packer.pack_slice(&[
599 make_string_datum(export_id, &mut self.scratch_string_a),
600 Datum::UInt64(u64::cast_from(self.worker_id)),
601 Datum::Int64(count.into_inner()),
602 ])
603 }
604
605 fn pack_export_update(
607 &mut self,
608 export_id: GlobalId,
609 dataflow_index: usize,
610 ) -> (&RowRef, &RowRef) {
611 self.export_packer.pack_slice(&[
612 make_string_datum(export_id, &mut self.scratch_string_a),
613 Datum::UInt64(u64::cast_from(self.worker_id)),
614 Datum::UInt64(u64::cast_from(dataflow_index)),
615 ])
616 }
617
618 fn pack_hydration_time_update(
620 &mut self,
621 export_id: GlobalId,
622 time_ns: Option<u64>,
623 ) -> (&RowRef, &RowRef) {
624 self.hydration_time_packer.pack_slice(&[
625 make_string_datum(export_id, &mut self.scratch_string_a),
626 Datum::UInt64(u64::cast_from(self.worker_id)),
627 Datum::from(time_ns),
628 ])
629 }
630
631 fn pack_import_frontier_update(
633 &mut self,
634 export_id: GlobalId,
635 import_id: GlobalId,
636 time: Timestamp,
637 ) -> (&RowRef, &RowRef) {
638 self.import_frontier_packer.pack_slice(&[
639 make_string_datum(export_id, &mut self.scratch_string_a),
640 make_string_datum(import_id, &mut self.scratch_string_b),
641 Datum::UInt64(u64::cast_from(self.worker_id)),
642 Datum::MzTimestamp(time),
643 ])
644 }
645
646 fn pack_lir_mapping_update(
648 &mut self,
649 global_id: GlobalId,
650 lir_id: LirId,
651 operator: String,
652 parent_lir_id: Option<LirId>,
653 nesting: u8,
654 operator_span: (usize, usize),
655 ) -> (&RowRef, &RowRef) {
656 self.lir_mapping_packer.pack_slice(&[
657 make_string_datum(global_id, &mut self.scratch_string_a),
658 Datum::UInt64(lir_id.into()),
659 Datum::UInt64(u64::cast_from(self.worker_id)),
660 make_string_datum(operator, &mut self.scratch_string_b),
661 parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
662 Datum::UInt16(u16::cast_from(nesting)),
663 Datum::UInt64(u64::cast_from(operator_span.0)),
664 Datum::UInt64(u64::cast_from(operator_span.1)),
665 ])
666 }
667
668 fn pack_operator_hydration_status_update(
671 &mut self,
672 export_id: GlobalId,
673 lir_id: LirId,
674 hydrated: bool,
675 ) -> (&RowRef, &RowRef) {
676 self.operator_hydration_status_packer.pack_slice(&[
677 make_string_datum(export_id, &mut self.scratch_string_a),
678 Datum::UInt64(lir_id.into()),
679 Datum::UInt64(u64::cast_from(self.worker_id)),
680 Datum::from(hydrated),
681 ])
682 }
683
684 fn pack_peek_duration_update(
686 &mut self,
687 peek_type: PeekType,
688 bucket: u128,
689 ) -> (&RowRef, &RowRef) {
690 self.peek_duration_packer.pack_slice(&[
691 Datum::UInt64(u64::cast_from(self.worker_id)),
692 Datum::String(peek_type.name()),
693 Datum::UInt64(bucket.try_into().expect("bucket too big")),
694 ])
695 }
696
697 fn pack_peek_update(
699 &mut self,
700 id: GlobalId,
701 time: Timestamp,
702 uuid: Uuid,
703 peek_type: PeekType,
704 ) -> (&RowRef, &RowRef) {
705 self.peek_packer.pack_slice(&[
706 Datum::Uuid(uuid),
707 Datum::UInt64(u64::cast_from(self.worker_id)),
708 make_string_datum(id, &mut self.scratch_string_a),
709 Datum::String(peek_type.name()),
710 Datum::MzTimestamp(time),
711 ])
712 }
713
714 fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
716 self.frontier_packer.pack_slice(&[
717 make_string_datum(export_id, &mut self.scratch_string_a),
718 Datum::UInt64(u64::cast_from(self.worker_id)),
719 Datum::MzTimestamp(time),
720 ])
721 }
722}
723
724struct ExportState {
726 dataflow_index: usize,
728 error_count: Diff,
733 created_at: Instant,
735 hydration_time_ns: Option<u64>,
737 operator_hydration: BTreeMap<LirId, bool>,
739}
740
741impl ExportState {
742 fn new(dataflow_index: usize) -> Self {
743 Self {
744 dataflow_index,
745 error_count: Diff::ZERO,
746 created_at: Instant::now(),
747 hydration_time_ns: None,
748 operator_hydration: BTreeMap::new(),
749 }
750 }
751}
752
753#[derive(Default, Debug)]
755struct ArrangementSizeState {
756 size: isize,
757 capacity: isize,
758 count: isize,
759}
760
761struct DemuxOutput<'a, 'b> {
763 export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764 frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765 import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766 peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767 peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
768 arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
769 arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
770 arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
771 hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
772 operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
773 error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
774 lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
775 dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
776}
777
778struct DemuxHandler<'a, 'b, 'c> {
780 state: &'a mut DemuxState,
782 shared_state: &'a mut SharedLoggingState,
784 output: &'a mut DemuxOutput<'b, 'c>,
786 logging_interval_ms: u128,
788 time: Duration,
790}
791
792impl DemuxHandler<'_, '_, '_> {
793 fn ts(&self) -> Timestamp {
796 let time_ms = self.time.as_millis();
797 let interval = self.logging_interval_ms;
798 let rounded = (time_ms / interval + 1) * interval;
799 rounded.try_into().expect("must fit")
800 }
801
802 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
804 use ComputeEventReference::*;
805 match event {
806 Export(export) => self.handle_export(export),
807 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
808 Peek(peek) if peek.installed => self.handle_peek_install(peek),
809 Peek(peek) => self.handle_peek_retire(peek),
810 Frontier(frontier) => self.handle_frontier(frontier),
811 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
812 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
813 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
814 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
815 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
816 ArrangementHeapSizeOperatorDrop(inner) => {
817 self.handle_arrangement_heap_size_operator_dropped(inner)
818 }
819 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
820 ErrorCount(error_count) => self.handle_error_count(error_count),
821 Hydration(hydration) => self.handle_hydration(hydration),
822 OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
823 LirMapping(mapping) => self.handle_lir_mapping(mapping),
824 DataflowGlobal(global) => self.handle_dataflow_global(global),
825 }
826 }
827
828 fn handle_export(
829 &mut self,
830 ExportReference {
831 export_id,
832 dataflow_index,
833 }: Ref<'_, Export>,
834 ) {
835 let export_id = Columnar::into_owned(export_id);
836 let ts = self.ts();
837 let datum = self.state.pack_export_update(export_id, dataflow_index);
838 self.output.export.give((datum, ts, Diff::ONE));
839
840 let existing = self
841 .state
842 .exports
843 .insert(export_id, ExportState::new(dataflow_index));
844 if existing.is_some() {
845 error!(%export_id, "export already registered");
846 }
847
848 let datum = self.state.pack_hydration_time_update(export_id, None);
850 self.output.hydration_time.give((datum, ts, Diff::ONE));
851 }
852
853 fn handle_export_dropped(
854 &mut self,
855 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
856 ) {
857 let export_id = Columnar::into_owned(export_id);
858 let Some(export) = self.state.exports.remove(&export_id) else {
859 error!(%export_id, "missing exports entry at time of export drop");
860 return;
861 };
862
863 let ts = self.ts();
864 let dataflow_index = export.dataflow_index;
865
866 let datum = self.state.pack_export_update(export_id, dataflow_index);
867 self.output.export.give((datum, ts, Diff::MINUS_ONE));
868
869 if export.error_count != Diff::ZERO {
871 let datum = self
872 .state
873 .pack_error_count_update(export_id, export.error_count);
874 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
875 }
876
877 let datum = self
879 .state
880 .pack_hydration_time_update(export_id, export.hydration_time_ns);
881 self.output
882 .hydration_time
883 .give((datum, ts, Diff::MINUS_ONE));
884
885 for (lir_id, hydrated) in export.operator_hydration {
887 let datum = self
888 .state
889 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
890 self.output
891 .operator_hydration_status
892 .give((datum, ts, Diff::MINUS_ONE));
893 }
894 }
895
896 fn handle_dataflow_shutdown(
897 &mut self,
898 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
899 ) {
900 let ts = self.ts();
901
902 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
904 for global_id in global_ids {
905 let datum = self
907 .state
908 .pack_dataflow_global_update(dataflow_index, global_id);
909 self.output
910 .dataflow_global_ids
911 .give((datum, ts, Diff::MINUS_ONE));
912
913 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
915 for (
916 lir_id,
917 LirMetadata {
918 operator,
919 parent_lir_id,
920 nesting,
921 operator_span,
922 },
923 ) in mappings
924 {
925 let datum = self.state.pack_lir_mapping_update(
926 global_id,
927 lir_id,
928 operator,
929 parent_lir_id,
930 nesting,
931 operator_span,
932 );
933 self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
934 }
935 }
936 }
937 }
938 }
939
940 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
941 let ts = self.ts();
942 let export_id = Columnar::into_owned(export_id);
943
944 let Some(export) = self.state.exports.get_mut(&export_id) else {
945 return;
948 };
949
950 let old_count = export.error_count;
951 let new_count = old_count + diff;
952 export.error_count = new_count;
953
954 if old_count != Diff::ZERO {
955 let datum = self.state.pack_error_count_update(export_id, old_count);
956 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
957 }
958 if new_count != Diff::ZERO {
959 let datum = self.state.pack_error_count_update(export_id, new_count);
960 self.output.error_count.give((datum, ts, Diff::ONE));
961 }
962 }
963
964 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
965 let ts = self.ts();
966 let export_id = Columnar::into_owned(export_id);
967
968 let Some(export) = self.state.exports.get_mut(&export_id) else {
969 error!(%export_id, "hydration event for unknown export");
970 return;
971 };
972 if export.hydration_time_ns.is_some() {
973 return;
976 }
977
978 let duration = export.created_at.elapsed();
979 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
980 export.hydration_time_ns = Some(nanos);
981
982 let retraction = self.state.pack_hydration_time_update(export_id, None);
983 self.output
984 .hydration_time
985 .give((retraction, ts, Diff::MINUS_ONE));
986 let insertion = self
987 .state
988 .pack_hydration_time_update(export_id, Some(nanos));
989 self.output.hydration_time.give((insertion, ts, Diff::ONE));
990 }
991
992 fn handle_operator_hydration(
993 &mut self,
994 OperatorHydrationReference {
995 export_id,
996 lir_id,
997 hydrated,
998 }: Ref<'_, OperatorHydration>,
999 ) {
1000 let ts = self.ts();
1001 let export_id = Columnar::into_owned(export_id);
1002 let lir_id = Columnar::into_owned(lir_id);
1003 let hydrated = Columnar::into_owned(hydrated);
1004
1005 let Some(export) = self.state.exports.get_mut(&export_id) else {
1006 return;
1009 };
1010
1011 let old_status = export.operator_hydration.get(&lir_id).copied();
1012 export.operator_hydration.insert(lir_id, hydrated);
1013
1014 if let Some(hydrated) = old_status {
1015 let retraction = self
1016 .state
1017 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1018 self.output
1019 .operator_hydration_status
1020 .give((retraction, ts, Diff::MINUS_ONE));
1021 }
1022
1023 let insertion = self
1024 .state
1025 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1026 self.output
1027 .operator_hydration_status
1028 .give((insertion, ts, Diff::ONE));
1029 }
1030
1031 fn handle_peek_install(
1032 &mut self,
1033 PeekEventReference {
1034 id,
1035 time,
1036 uuid,
1037 peek_type,
1038 installed: _,
1039 }: Ref<'_, PeekEvent>,
1040 ) {
1041 let id = Columnar::into_owned(id);
1042 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1043 let ts = self.ts();
1044 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1045 self.output.peek.give((datum, ts, Diff::ONE));
1046
1047 let existing = self.state.peek_stash.insert(uuid, self.time);
1048 if existing.is_some() {
1049 error!(%uuid, "peek already registered");
1050 }
1051 }
1052
1053 fn handle_peek_retire(
1054 &mut self,
1055 PeekEventReference {
1056 id,
1057 time,
1058 uuid,
1059 peek_type,
1060 installed: _,
1061 }: Ref<'_, PeekEvent>,
1062 ) {
1063 let id = Columnar::into_owned(id);
1064 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1065 let ts = self.ts();
1066 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1067 self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1068
1069 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1070 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1071 let bucket = elapsed_ns.next_power_of_two();
1072 let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1073 self.output.peek_duration.give((datum, ts, Diff::ONE));
1074 } else {
1075 error!(%uuid, "peek not yet registered");
1076 }
1077 }
1078
1079 fn handle_frontier(
1080 &mut self,
1081 FrontierReference {
1082 export_id,
1083 time,
1084 diff,
1085 }: Ref<'_, Frontier>,
1086 ) {
1087 let export_id = Columnar::into_owned(export_id);
1088 let diff = Diff::from(*diff);
1089 let ts = self.ts();
1090 let time = Columnar::into_owned(time);
1091 let datum = self.state.pack_frontier_update(export_id, time);
1092 self.output.frontier.give((datum, ts, diff));
1093 }
1094
1095 fn handle_import_frontier(
1096 &mut self,
1097 ImportFrontierReference {
1098 import_id,
1099 export_id,
1100 time,
1101 diff,
1102 }: Ref<'_, ImportFrontier>,
1103 ) {
1104 let import_id = Columnar::into_owned(import_id);
1105 let export_id = Columnar::into_owned(export_id);
1106 let diff = Diff::from(*diff);
1107 let ts = self.ts();
1108 let time = Columnar::into_owned(time);
1109 let datum = self
1110 .state
1111 .pack_import_frontier_update(export_id, import_id, time);
1112 self.output.import_frontier.give((datum, ts, diff));
1113 }
1114
1115 fn handle_arrangement_heap_size(
1117 &mut self,
1118 ArrangementHeapSizeReference {
1119 operator_id,
1120 delta_size,
1121 }: Ref<'_, ArrangementHeapSize>,
1122 ) {
1123 let ts = self.ts();
1124 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1125 return;
1126 };
1127
1128 state.size += delta_size;
1129
1130 let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1131 let diff = Diff::cast_from(delta_size);
1132 self.output.arrangement_heap_size.give((datum, ts, diff));
1133 }
1134
1135 fn handle_arrangement_heap_capacity(
1137 &mut self,
1138 ArrangementHeapCapacityReference {
1139 operator_id,
1140 delta_capacity,
1141 }: Ref<'_, ArrangementHeapCapacity>,
1142 ) {
1143 let ts = self.ts();
1144 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1145 return;
1146 };
1147
1148 state.capacity += delta_capacity;
1149
1150 let datum = self
1151 .state
1152 .pack_arrangement_heap_capacity_update(operator_id);
1153 let diff = Diff::cast_from(delta_capacity);
1154 self.output
1155 .arrangement_heap_capacity
1156 .give((datum, ts, diff));
1157 }
1158
1159 fn handle_arrangement_heap_allocations(
1161 &mut self,
1162 ArrangementHeapAllocationsReference {
1163 operator_id,
1164 delta_allocations,
1165 }: Ref<'_, ArrangementHeapAllocations>,
1166 ) {
1167 let ts = self.ts();
1168 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1169 return;
1170 };
1171
1172 state.count += delta_allocations;
1173
1174 let datum = self
1175 .state
1176 .pack_arrangement_heap_allocations_update(operator_id);
1177 let diff = Diff::cast_from(delta_allocations);
1178 self.output
1179 .arrangement_heap_allocations
1180 .give((datum, ts, diff));
1181 }
1182
1183 fn handle_arrangement_heap_size_operator(
1185 &mut self,
1186 ArrangementHeapSizeOperatorReference {
1187 operator_id,
1188 address,
1189 }: Ref<'_, ArrangementHeapSizeOperator>,
1190 ) {
1191 let activator = Activator::new(
1192 address.into_iter().collect(),
1193 Rc::clone(&self.state.activations),
1194 );
1195 let existing = self
1196 .state
1197 .arrangement_size
1198 .insert(operator_id, Default::default());
1199 if existing.is_some() {
1200 error!(%operator_id, "arrangement size operator already registered");
1201 }
1202 let existing = self
1203 .shared_state
1204 .arrangement_size_activators
1205 .insert(operator_id, activator);
1206 if existing.is_some() {
1207 error!(%operator_id, "arrangement size activator already registered");
1208 }
1209 }
1210
1211 fn handle_arrangement_heap_size_operator_dropped(
1213 &mut self,
1214 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1215 ) {
1216 let operator_id = event.operator_id;
1217 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1218 let ts = self.ts();
1219 let allocations = self
1220 .state
1221 .pack_arrangement_heap_allocations_update(operator_id);
1222 let diff = -Diff::cast_from(state.count);
1223 self.output
1224 .arrangement_heap_allocations
1225 .give((allocations, ts, diff));
1226
1227 let capacity = self
1228 .state
1229 .pack_arrangement_heap_capacity_update(operator_id);
1230 let diff = -Diff::cast_from(state.capacity);
1231 self.output
1232 .arrangement_heap_capacity
1233 .give((capacity, ts, diff));
1234
1235 let size = self.state.pack_arrangement_heap_size_update(operator_id);
1236 let diff = -Diff::cast_from(state.size);
1237 self.output.arrangement_heap_size.give((size, ts, diff));
1238 }
1239 self.shared_state
1240 .arrangement_size_activators
1241 .remove(&operator_id);
1242 }
1243
1244 fn handle_lir_mapping(
1246 &mut self,
1247 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1248 ) {
1249 let global_id = Columnar::into_owned(global_id);
1250 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1252 self.state
1253 .lir_mapping
1254 .entry(global_id)
1255 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1256 .or_insert_with(|| mappings().collect());
1257
1258 let ts = self.ts();
1260 for (lir_id, meta) in mapping.into_iter() {
1261 let datum = self.state.pack_lir_mapping_update(
1262 global_id,
1263 Columnar::into_owned(lir_id),
1264 Columnar::into_owned(meta.operator),
1265 Columnar::into_owned(meta.parent_lir_id),
1266 Columnar::into_owned(meta.nesting),
1267 Columnar::into_owned(meta.operator_span),
1268 );
1269 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1270 }
1271 }
1272
1273 fn handle_dataflow_global(
1274 &mut self,
1275 DataflowGlobalReference {
1276 dataflow_index,
1277 global_id,
1278 }: Ref<'_, DataflowGlobal>,
1279 ) {
1280 let global_id = Columnar::into_owned(global_id);
1281 self.state
1282 .dataflow_global_ids
1283 .entry(dataflow_index)
1284 .and_modify(|globals| {
1285 if !globals.insert(global_id) {
1287 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1288 }
1289 })
1290 .or_insert_with(|| BTreeSet::from([global_id]));
1291
1292 let ts = self.ts();
1293 let datum = self
1294 .state
1295 .pack_dataflow_global_update(dataflow_index, global_id);
1296 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1297 }
1298}
1299
1300pub struct CollectionLogging {
1305 export_id: GlobalId,
1306 logger: Logger,
1307
1308 logged_frontier: Option<Timestamp>,
1309 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1310}
1311
1312impl CollectionLogging {
1313 pub fn new(
1315 export_id: GlobalId,
1316 logger: Logger,
1317 dataflow_index: usize,
1318 import_ids: impl Iterator<Item = GlobalId>,
1319 ) -> Self {
1320 logger.log(&ComputeEvent::Export(Export {
1321 export_id,
1322 dataflow_index,
1323 }));
1324
1325 let mut self_ = Self {
1326 export_id,
1327 logger,
1328 logged_frontier: None,
1329 logged_import_frontiers: Default::default(),
1330 };
1331
1332 let initial_frontier = Some(Timestamp::MIN);
1334 self_.set_frontier(initial_frontier);
1335 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1336
1337 self_
1338 }
1339
1340 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1342 let old_time = self.logged_frontier;
1343 self.logged_frontier = new_time;
1344
1345 if old_time != new_time {
1346 let export_id = self.export_id;
1347 let retraction = old_time.map(|time| {
1348 ComputeEvent::Frontier(Frontier {
1349 export_id,
1350 time,
1351 diff: -1,
1352 })
1353 });
1354 let insertion = new_time.map(|time| {
1355 ComputeEvent::Frontier(Frontier {
1356 export_id,
1357 time,
1358 diff: 1,
1359 })
1360 });
1361 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1362 self.logger.log_many(events);
1363 }
1364 }
1365
1366 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1369 let old_time = self.logged_import_frontiers.remove(&import_id);
1370 if let Some(time) = new_time {
1371 self.logged_import_frontiers.insert(import_id, time);
1372 }
1373
1374 if old_time != new_time {
1375 let export_id = self.export_id;
1376 let retraction = old_time.map(|time| {
1377 ComputeEvent::ImportFrontier(ImportFrontier {
1378 import_id,
1379 export_id,
1380 time,
1381 diff: -1,
1382 })
1383 });
1384 let insertion = new_time.map(|time| {
1385 ComputeEvent::ImportFrontier(ImportFrontier {
1386 import_id,
1387 export_id,
1388 time,
1389 diff: 1,
1390 })
1391 });
1392 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1393 self.logger.log_many(events);
1394 }
1395 }
1396
1397 pub fn set_hydrated(&self) {
1399 self.logger.log(&ComputeEvent::Hydration(Hydration {
1400 export_id: self.export_id,
1401 }));
1402 }
1403
1404 pub fn export_id(&self) -> GlobalId {
1406 self.export_id
1407 }
1408}
1409
1410impl Drop for CollectionLogging {
1411 fn drop(&mut self) {
1412 self.set_frontier(None);
1414
1415 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1416 for import_id in import_ids {
1417 self.set_import_frontier(import_id, None);
1418 }
1419
1420 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1421 export_id: self.export_id,
1422 }));
1423 }
1424}
1425
1426pub(crate) trait LogDataflowErrors {
1429 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1430}
1431
1432impl<'scope, T, D> LogDataflowErrors for VecCollection<'scope, T, D, Diff>
1433where
1434 T: timely::progress::Timestamp,
1435 D: Clone + 'static,
1436{
1437 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1438 self.inner
1439 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1440 move |input, output| {
1441 input.for_each(|cap, data| {
1442 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1443 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1444
1445 output.session(&cap).give_container(data);
1446 });
1447 }
1448 })
1449 .as_collection()
1450 }
1451}
1452
1453impl<'scope, T, B> LogDataflowErrors for StreamVec<'scope, T, B>
1454where
1455 T: timely::progress::Timestamp,
1456 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1457{
1458 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1459 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1460 move |input, output| {
1461 input.for_each(|cap, data| {
1462 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1463 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1464
1465 output.session(&cap).give_container(data);
1466 });
1467 }
1468 })
1469 }
1470}
1471
1472fn sum_batch_diffs<B>(batch: &B) -> Diff
1479where
1480 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1481{
1482 let mut sum = Diff::ZERO;
1483 let mut cursor = batch.cursor();
1484
1485 while cursor.key_valid(batch) {
1486 while cursor.val_valid(batch) {
1487 cursor.map_times(batch, |_t, r| sum += r);
1488 cursor.step_val(batch);
1489 }
1490 cursor.step_key(batch);
1491 }
1492
1493 sum
1494}
1495
1496#[cfg(test)]
1497mod tests {
1498 use super::*;
1499
1500 #[mz_ore::test]
1501 fn test_compute_event_size() {
1502 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1504 }
1505}