1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::Data;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, Stream};
35use timely::scheduling::Scheduler;
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42 SharedLoggingState, Update,
43};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::RowRowSpine;
46
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54 pub export_id: GlobalId,
56 pub dataflow_index: usize,
58}
59
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63 pub export_id: GlobalId,
65}
66
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70 pub id: GlobalId,
72 pub time: Timestamp,
74 pub uuid: uuid::Bytes,
76 pub peek_type: PeekType,
79 pub installed: bool,
81}
82
83#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86 pub export_id: GlobalId,
87 pub time: Timestamp,
88 pub diff: i8,
89}
90
91#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94 pub import_id: GlobalId,
95 pub export_id: GlobalId,
96 pub time: Timestamp,
97 pub diff: i8,
98}
99
100#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103 pub operator_id: usize,
105 pub delta_size: isize,
107}
108
109#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112 pub operator_id: usize,
114 pub delta_capacity: isize,
116}
117
118#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121 pub operator_id: usize,
123 pub delta_allocations: isize,
125}
126
127#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130 pub operator_id: usize,
132 pub address: Vec<usize>,
134}
135
136#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139 pub operator_id: usize,
141}
142
143#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146 pub dataflow_index: usize,
148}
149
150#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153 pub export_id: GlobalId,
155 pub diff: Diff,
157}
158
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162 pub export_id: GlobalId,
164}
165
166#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169 pub export_id: GlobalId,
171 pub lir_id: LirId,
173 pub hydrated: bool,
175}
176
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180 pub global_id: GlobalId,
187 pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195 pub dataflow_index: usize,
197 pub global_id: GlobalId,
199}
200
201#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204 Export(Export),
206 ExportDropped(ExportDropped),
208 Peek(PeekEvent),
210 Frontier(Frontier),
212 ImportFrontier(ImportFrontier),
214 ArrangementHeapSize(ArrangementHeapSize),
216 ArrangementHeapCapacity(ArrangementHeapCapacity),
218 ArrangementHeapAllocations(ArrangementHeapAllocations),
220 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224 DataflowShutdown(DataflowShutdown),
226 ErrorCount(ErrorCount),
228 Hydration(Hydration),
230 OperatorHydration(OperatorHydration),
232 LirMapping(LirMapping),
236 DataflowGlobal(DataflowGlobal),
237}
238
239#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242 Index,
244 Persist,
246}
247
248impl PeekType {
249 fn name(self) -> &'static str {
251 match self {
252 PeekType::Index => "index",
253 PeekType::Persist => "persist",
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261 operator: String,
263 parent_lir_id: Option<LirId>,
265 nesting: u8,
267 operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273 pub fn new(
275 operator: String,
276 parent_lir_id: Option<LirId>,
277 nesting: u8,
278 operator_span: (usize, usize),
279 ) -> Self {
280 Self {
281 operator,
282 parent_lir_id,
283 nesting,
284 operator_span,
285 }
286 }
287}
288
289pub(super) struct Return {
291 pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
305 mut scope: G,
306 scheduler: S,
307 config: &mz_compute_client::logging::LoggingConfig,
308 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309 shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313 scope.scoped("compute logging", move |scope| {
314 let enable_logging = config.enable_logging;
315 let (logs, token) = if enable_logging {
316 event_queue.links.mz_replay(
317 scope,
318 "compute logs",
319 config.interval,
320 event_queue.activator,
321 )
322 } else {
323 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324 (empty(scope), token)
325 };
326
327 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330 let mut input = demux.new_input(&logs, Pipeline);
331 let (export_out, export) = demux.new_output();
332 let mut export_out = OutputBuilder::from(export_out);
333 let (frontier_out, frontier) = demux.new_output();
334 let mut frontier_out = OutputBuilder::from(frontier_out);
335 let (import_frontier_out, import_frontier) = demux.new_output();
336 let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337 let (peek_out, peek) = demux.new_output();
338 let mut peek_out = OutputBuilder::from(peek_out);
339 let (peek_duration_out, peek_duration) = demux.new_output();
340 let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341 let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
342 let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
343 let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
344 let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
345 let (arrangement_heap_allocations_out, arrangement_heap_allocations) = demux.new_output();
346 let mut arrangement_heap_allocations_out =
347 OutputBuilder::from(arrangement_heap_allocations_out);
348 let (error_count_out, error_count) = demux.new_output();
349 let mut error_count_out = OutputBuilder::from(error_count_out);
350 let (hydration_time_out, hydration_time) = demux.new_output();
351 let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
352 let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
353 let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
354 let (lir_mapping_out, lir_mapping) = demux.new_output();
355 let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
356 let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
357 let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
358
359 let mut demux_state = DemuxState::new(scheduler, scope.index());
360 demux.build(move |_capability| {
361 move |_frontiers| {
362 let mut export = export_out.activate();
363 let mut frontier = frontier_out.activate();
364 let mut import_frontier = import_frontier_out.activate();
365 let mut peek = peek_out.activate();
366 let mut peek_duration = peek_duration_out.activate();
367 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
368 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
369 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
370 let mut error_count = error_count_out.activate();
371 let mut hydration_time = hydration_time_out.activate();
372 let mut operator_hydration_status = operator_hydration_status_out.activate();
373 let mut lir_mapping = lir_mapping_out.activate();
374 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
375
376 input.for_each(|cap, data| {
377 let mut output_sessions = DemuxOutput {
378 export: export.session_with_builder(&cap),
379 frontier: frontier.session_with_builder(&cap),
380 import_frontier: import_frontier.session_with_builder(&cap),
381 peek: peek.session_with_builder(&cap),
382 peek_duration: peek_duration.session_with_builder(&cap),
383 arrangement_heap_allocations: arrangement_heap_allocations
384 .session_with_builder(&cap),
385 arrangement_heap_capacity: arrangement_heap_capacity
386 .session_with_builder(&cap),
387 arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
388 error_count: error_count.session_with_builder(&cap),
389 hydration_time: hydration_time.session_with_builder(&cap),
390 operator_hydration_status: operator_hydration_status
391 .session_with_builder(&cap),
392 lir_mapping: lir_mapping.session_with_builder(&cap),
393 dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
394 };
395
396 let shared_state = &mut shared_state.borrow_mut();
397 for (time, event) in data.borrow().into_index_iter() {
398 DemuxHandler {
399 state: &mut demux_state,
400 shared_state,
401 output: &mut output_sessions,
402 logging_interval_ms,
403 time,
404 }
405 .handle(event);
406 }
407 });
408 }
409 });
410
411 use ComputeLog::*;
412 let logs = [
413 (ArrangementHeapAllocations, arrangement_heap_allocations),
414 (ArrangementHeapCapacity, arrangement_heap_capacity),
415 (ArrangementHeapSize, arrangement_heap_size),
416 (DataflowCurrent, export),
417 (DataflowGlobal, dataflow_global_ids),
418 (ErrorCount, error_count),
419 (FrontierCurrent, frontier),
420 (HydrationTime, hydration_time),
421 (ImportFrontierCurrent, import_frontier),
422 (LirMapping, lir_mapping),
423 (OperatorHydrationStatus, operator_hydration_status),
424 (PeekCurrent, peek),
425 (PeekDuration, peek_duration),
426 ];
427
428 let mut collections = BTreeMap::new();
430 for (variant, stream) in logs {
431 let variant = LogVariant::Compute(variant);
432 if config.index_logs.contains_key(&variant) {
433 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
434 columnar_exchange::<Row, Row, Timestamp, Diff>,
435 );
436 let trace = stream
437 .mz_arrange_core::<
438 _,
439 Col2ValBatcher<_, _, _, _>,
440 RowRowBuilder<_, _>,
441 RowRowSpine<_, _>,
442 >(exchange, &format!("Arrange {variant:?}"))
443 .trace;
444 let collection = LogCollection {
445 trace,
446 token: Rc::clone(&token),
447 };
448 collections.insert(variant, collection);
449 }
450 }
451
452 Return { collections }
453 })
454}
455
456fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
462where
463 V: Display,
464{
465 scratch.clear();
466 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
467 Datum::String(scratch)
468}
469
470struct DemuxState<A> {
472 scheduler: A,
474 worker_id: usize,
476 scratch_string_a: String,
478 scratch_string_b: String,
480 exports: BTreeMap<GlobalId, ExportState>,
482 peek_stash: BTreeMap<Uuid, Duration>,
484 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
486 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
488 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
490 arrangement_heap_allocations_packer: PermutedRowPacker,
492 arrangement_heap_capacity_packer: PermutedRowPacker,
494 arrangement_heap_size_packer: PermutedRowPacker,
496 dataflow_global_packer: PermutedRowPacker,
498 error_count_packer: PermutedRowPacker,
500 export_packer: PermutedRowPacker,
502 frontier_packer: PermutedRowPacker,
504 import_frontier_packer: PermutedRowPacker,
506 lir_mapping_packer: PermutedRowPacker,
508 operator_hydration_status_packer: PermutedRowPacker,
510 peek_duration_packer: PermutedRowPacker,
512 peek_packer: PermutedRowPacker,
514 hydration_time_packer: PermutedRowPacker,
516}
517
518impl<A: Scheduler> DemuxState<A> {
519 fn new(scheduler: A, worker_id: usize) -> Self {
520 Self {
521 scheduler,
522 worker_id,
523 scratch_string_a: String::new(),
524 scratch_string_b: String::new(),
525 exports: Default::default(),
526 peek_stash: Default::default(),
527 arrangement_size: Default::default(),
528 lir_mapping: Default::default(),
529 dataflow_global_ids: Default::default(),
530 arrangement_heap_allocations_packer: PermutedRowPacker::new(
531 ComputeLog::ArrangementHeapAllocations,
532 ),
533 arrangement_heap_capacity_packer: PermutedRowPacker::new(
534 ComputeLog::ArrangementHeapCapacity,
535 ),
536 arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
537 dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
538 error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
539 export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
540 frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
541 hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
542 import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
543 lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
544 operator_hydration_status_packer: PermutedRowPacker::new(
545 ComputeLog::OperatorHydrationStatus,
546 ),
547 peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
548 peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
549 }
550 }
551
552 fn pack_arrangement_heap_allocations_update(
554 &mut self,
555 operator_id: usize,
556 ) -> (&RowRef, &RowRef) {
557 self.arrangement_heap_allocations_packer.pack_slice(&[
558 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
559 Datum::UInt64(u64::cast_from(self.worker_id)),
560 ])
561 }
562
563 fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
565 self.arrangement_heap_capacity_packer.pack_slice(&[
566 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
567 Datum::UInt64(u64::cast_from(self.worker_id)),
568 ])
569 }
570
571 fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
573 self.arrangement_heap_size_packer.pack_slice(&[
574 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
575 Datum::UInt64(u64::cast_from(self.worker_id)),
576 ])
577 }
578
579 fn pack_dataflow_global_update(
581 &mut self,
582 dataflow_index: usize,
583 global_id: GlobalId,
584 ) -> (&RowRef, &RowRef) {
585 self.dataflow_global_packer.pack_slice(&[
586 Datum::UInt64(u64::cast_from(dataflow_index)),
587 Datum::UInt64(u64::cast_from(self.worker_id)),
588 make_string_datum(global_id, &mut self.scratch_string_a),
589 ])
590 }
591
592 fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
594 self.error_count_packer.pack_slice(&[
598 make_string_datum(export_id, &mut self.scratch_string_a),
599 Datum::UInt64(u64::cast_from(self.worker_id)),
600 Datum::Int64(count.into_inner()),
601 ])
602 }
603
604 fn pack_export_update(
606 &mut self,
607 export_id: GlobalId,
608 dataflow_index: usize,
609 ) -> (&RowRef, &RowRef) {
610 self.export_packer.pack_slice(&[
611 make_string_datum(export_id, &mut self.scratch_string_a),
612 Datum::UInt64(u64::cast_from(self.worker_id)),
613 Datum::UInt64(u64::cast_from(dataflow_index)),
614 ])
615 }
616
617 fn pack_hydration_time_update(
619 &mut self,
620 export_id: GlobalId,
621 time_ns: Option<u64>,
622 ) -> (&RowRef, &RowRef) {
623 self.hydration_time_packer.pack_slice(&[
624 make_string_datum(export_id, &mut self.scratch_string_a),
625 Datum::UInt64(u64::cast_from(self.worker_id)),
626 Datum::from(time_ns),
627 ])
628 }
629
630 fn pack_import_frontier_update(
632 &mut self,
633 export_id: GlobalId,
634 import_id: GlobalId,
635 time: Timestamp,
636 ) -> (&RowRef, &RowRef) {
637 self.import_frontier_packer.pack_slice(&[
638 make_string_datum(export_id, &mut self.scratch_string_a),
639 make_string_datum(import_id, &mut self.scratch_string_b),
640 Datum::UInt64(u64::cast_from(self.worker_id)),
641 Datum::MzTimestamp(time),
642 ])
643 }
644
645 fn pack_lir_mapping_update(
647 &mut self,
648 global_id: GlobalId,
649 lir_id: LirId,
650 operator: String,
651 parent_lir_id: Option<LirId>,
652 nesting: u8,
653 operator_span: (usize, usize),
654 ) -> (&RowRef, &RowRef) {
655 self.lir_mapping_packer.pack_slice(&[
656 make_string_datum(global_id, &mut self.scratch_string_a),
657 Datum::UInt64(lir_id.into()),
658 Datum::UInt64(u64::cast_from(self.worker_id)),
659 make_string_datum(operator, &mut self.scratch_string_b),
660 parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
661 Datum::UInt16(u16::cast_from(nesting)),
662 Datum::UInt64(u64::cast_from(operator_span.0)),
663 Datum::UInt64(u64::cast_from(operator_span.1)),
664 ])
665 }
666
667 fn pack_operator_hydration_status_update(
670 &mut self,
671 export_id: GlobalId,
672 lir_id: LirId,
673 hydrated: bool,
674 ) -> (&RowRef, &RowRef) {
675 self.operator_hydration_status_packer.pack_slice(&[
676 make_string_datum(export_id, &mut self.scratch_string_a),
677 Datum::UInt64(lir_id.into()),
678 Datum::UInt64(u64::cast_from(self.worker_id)),
679 Datum::from(hydrated),
680 ])
681 }
682
683 fn pack_peek_duration_update(
685 &mut self,
686 peek_type: PeekType,
687 bucket: u128,
688 ) -> (&RowRef, &RowRef) {
689 self.peek_duration_packer.pack_slice(&[
690 Datum::UInt64(u64::cast_from(self.worker_id)),
691 Datum::String(peek_type.name()),
692 Datum::UInt64(bucket.try_into().expect("bucket too big")),
693 ])
694 }
695
696 fn pack_peek_update(
698 &mut self,
699 id: GlobalId,
700 time: Timestamp,
701 uuid: Uuid,
702 peek_type: PeekType,
703 ) -> (&RowRef, &RowRef) {
704 self.peek_packer.pack_slice(&[
705 Datum::Uuid(uuid),
706 Datum::UInt64(u64::cast_from(self.worker_id)),
707 make_string_datum(id, &mut self.scratch_string_a),
708 Datum::String(peek_type.name()),
709 Datum::MzTimestamp(time),
710 ])
711 }
712
713 fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
715 self.frontier_packer.pack_slice(&[
716 make_string_datum(export_id, &mut self.scratch_string_a),
717 Datum::UInt64(u64::cast_from(self.worker_id)),
718 Datum::MzTimestamp(time),
719 ])
720 }
721}
722
723struct ExportState {
725 dataflow_index: usize,
727 error_count: Diff,
732 created_at: Instant,
734 hydration_time_ns: Option<u64>,
736 operator_hydration: BTreeMap<LirId, bool>,
738}
739
740impl ExportState {
741 fn new(dataflow_index: usize) -> Self {
742 Self {
743 dataflow_index,
744 error_count: Diff::ZERO,
745 created_at: Instant::now(),
746 hydration_time_ns: None,
747 operator_hydration: BTreeMap::new(),
748 }
749 }
750}
751
752#[derive(Default, Debug)]
754struct ArrangementSizeState {
755 size: isize,
756 capacity: isize,
757 count: isize,
758}
759
760struct DemuxOutput<'a, 'b> {
762 export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
763 frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764 import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765 peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766 peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767 arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
768 arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
769 arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
770 hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
771 operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
772 error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
773 lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
774 dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
775}
776
777struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
779 state: &'a mut DemuxState<A>,
781 shared_state: &'a mut SharedLoggingState,
783 output: &'a mut DemuxOutput<'b, 'c>,
785 logging_interval_ms: u128,
787 time: Duration,
789}
790
791impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
792 fn ts(&self) -> Timestamp {
795 let time_ms = self.time.as_millis();
796 let interval = self.logging_interval_ms;
797 let rounded = (time_ms / interval + 1) * interval;
798 rounded.try_into().expect("must fit")
799 }
800
801 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
803 use ComputeEventReference::*;
804 match event {
805 Export(export) => self.handle_export(export),
806 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
807 Peek(peek) if peek.installed => self.handle_peek_install(peek),
808 Peek(peek) => self.handle_peek_retire(peek),
809 Frontier(frontier) => self.handle_frontier(frontier),
810 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
811 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
812 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
813 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
814 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
815 ArrangementHeapSizeOperatorDrop(inner) => {
816 self.handle_arrangement_heap_size_operator_dropped(inner)
817 }
818 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
819 ErrorCount(error_count) => self.handle_error_count(error_count),
820 Hydration(hydration) => self.handle_hydration(hydration),
821 OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
822 LirMapping(mapping) => self.handle_lir_mapping(mapping),
823 DataflowGlobal(global) => self.handle_dataflow_global(global),
824 }
825 }
826
827 fn handle_export(
828 &mut self,
829 ExportReference {
830 export_id,
831 dataflow_index,
832 }: Ref<'_, Export>,
833 ) {
834 let export_id = Columnar::into_owned(export_id);
835 let ts = self.ts();
836 let datum = self.state.pack_export_update(export_id, dataflow_index);
837 self.output.export.give((datum, ts, Diff::ONE));
838
839 let existing = self
840 .state
841 .exports
842 .insert(export_id, ExportState::new(dataflow_index));
843 if existing.is_some() {
844 error!(%export_id, "export already registered");
845 }
846
847 let datum = self.state.pack_hydration_time_update(export_id, None);
849 self.output.hydration_time.give((datum, ts, Diff::ONE));
850 }
851
852 fn handle_export_dropped(
853 &mut self,
854 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
855 ) {
856 let export_id = Columnar::into_owned(export_id);
857 let Some(export) = self.state.exports.remove(&export_id) else {
858 error!(%export_id, "missing exports entry at time of export drop");
859 return;
860 };
861
862 let ts = self.ts();
863 let dataflow_index = export.dataflow_index;
864
865 let datum = self.state.pack_export_update(export_id, dataflow_index);
866 self.output.export.give((datum, ts, Diff::MINUS_ONE));
867
868 if export.error_count != Diff::ZERO {
870 let datum = self
871 .state
872 .pack_error_count_update(export_id, export.error_count);
873 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
874 }
875
876 let datum = self
878 .state
879 .pack_hydration_time_update(export_id, export.hydration_time_ns);
880 self.output
881 .hydration_time
882 .give((datum, ts, Diff::MINUS_ONE));
883
884 for (lir_id, hydrated) in export.operator_hydration {
886 let datum = self
887 .state
888 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
889 self.output
890 .operator_hydration_status
891 .give((datum, ts, Diff::MINUS_ONE));
892 }
893 }
894
895 fn handle_dataflow_shutdown(
896 &mut self,
897 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
898 ) {
899 let ts = self.ts();
900
901 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
903 for global_id in global_ids {
904 let datum = self
906 .state
907 .pack_dataflow_global_update(dataflow_index, global_id);
908 self.output
909 .dataflow_global_ids
910 .give((datum, ts, Diff::MINUS_ONE));
911
912 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
914 for (
915 lir_id,
916 LirMetadata {
917 operator,
918 parent_lir_id,
919 nesting,
920 operator_span,
921 },
922 ) in mappings
923 {
924 let datum = self.state.pack_lir_mapping_update(
925 global_id,
926 lir_id,
927 operator,
928 parent_lir_id,
929 nesting,
930 operator_span,
931 );
932 self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
933 }
934 }
935 }
936 }
937 }
938
939 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
940 let ts = self.ts();
941 let export_id = Columnar::into_owned(export_id);
942
943 let Some(export) = self.state.exports.get_mut(&export_id) else {
944 return;
947 };
948
949 let old_count = export.error_count;
950 let new_count = old_count + diff;
951 export.error_count = new_count;
952
953 if old_count != Diff::ZERO {
954 let datum = self.state.pack_error_count_update(export_id, old_count);
955 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
956 }
957 if new_count != Diff::ZERO {
958 let datum = self.state.pack_error_count_update(export_id, new_count);
959 self.output.error_count.give((datum, ts, Diff::ONE));
960 }
961 }
962
963 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
964 let ts = self.ts();
965 let export_id = Columnar::into_owned(export_id);
966
967 let Some(export) = self.state.exports.get_mut(&export_id) else {
968 error!(%export_id, "hydration event for unknown export");
969 return;
970 };
971 if export.hydration_time_ns.is_some() {
972 return;
975 }
976
977 let duration = export.created_at.elapsed();
978 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
979 export.hydration_time_ns = Some(nanos);
980
981 let retraction = self.state.pack_hydration_time_update(export_id, None);
982 self.output
983 .hydration_time
984 .give((retraction, ts, Diff::MINUS_ONE));
985 let insertion = self
986 .state
987 .pack_hydration_time_update(export_id, Some(nanos));
988 self.output.hydration_time.give((insertion, ts, Diff::ONE));
989 }
990
991 fn handle_operator_hydration(
992 &mut self,
993 OperatorHydrationReference {
994 export_id,
995 lir_id,
996 hydrated,
997 }: Ref<'_, OperatorHydration>,
998 ) {
999 let ts = self.ts();
1000 let export_id = Columnar::into_owned(export_id);
1001 let lir_id = Columnar::into_owned(lir_id);
1002 let hydrated = Columnar::into_owned(hydrated);
1003
1004 let Some(export) = self.state.exports.get_mut(&export_id) else {
1005 return;
1008 };
1009
1010 let old_status = export.operator_hydration.get(&lir_id).copied();
1011 export.operator_hydration.insert(lir_id, hydrated);
1012
1013 if let Some(hydrated) = old_status {
1014 let retraction = self
1015 .state
1016 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1017 self.output
1018 .operator_hydration_status
1019 .give((retraction, ts, Diff::MINUS_ONE));
1020 }
1021
1022 let insertion = self
1023 .state
1024 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1025 self.output
1026 .operator_hydration_status
1027 .give((insertion, ts, Diff::ONE));
1028 }
1029
1030 fn handle_peek_install(
1031 &mut self,
1032 PeekEventReference {
1033 id,
1034 time,
1035 uuid,
1036 peek_type,
1037 installed: _,
1038 }: Ref<'_, PeekEvent>,
1039 ) {
1040 let id = Columnar::into_owned(id);
1041 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1042 let ts = self.ts();
1043 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1044 self.output.peek.give((datum, ts, Diff::ONE));
1045
1046 let existing = self.state.peek_stash.insert(uuid, self.time);
1047 if existing.is_some() {
1048 error!(%uuid, "peek already registered");
1049 }
1050 }
1051
1052 fn handle_peek_retire(
1053 &mut self,
1054 PeekEventReference {
1055 id,
1056 time,
1057 uuid,
1058 peek_type,
1059 installed: _,
1060 }: Ref<'_, PeekEvent>,
1061 ) {
1062 let id = Columnar::into_owned(id);
1063 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1064 let ts = self.ts();
1065 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1066 self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1067
1068 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1069 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1070 let bucket = elapsed_ns.next_power_of_two();
1071 let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1072 self.output.peek_duration.give((datum, ts, Diff::ONE));
1073 } else {
1074 error!(%uuid, "peek not yet registered");
1075 }
1076 }
1077
1078 fn handle_frontier(
1079 &mut self,
1080 FrontierReference {
1081 export_id,
1082 time,
1083 diff,
1084 }: Ref<'_, Frontier>,
1085 ) {
1086 let export_id = Columnar::into_owned(export_id);
1087 let diff = Diff::from(*diff);
1088 let ts = self.ts();
1089 let time = Columnar::into_owned(time);
1090 let datum = self.state.pack_frontier_update(export_id, time);
1091 self.output.frontier.give((datum, ts, diff));
1092 }
1093
1094 fn handle_import_frontier(
1095 &mut self,
1096 ImportFrontierReference {
1097 import_id,
1098 export_id,
1099 time,
1100 diff,
1101 }: Ref<'_, ImportFrontier>,
1102 ) {
1103 let import_id = Columnar::into_owned(import_id);
1104 let export_id = Columnar::into_owned(export_id);
1105 let diff = Diff::from(*diff);
1106 let ts = self.ts();
1107 let time = Columnar::into_owned(time);
1108 let datum = self
1109 .state
1110 .pack_import_frontier_update(export_id, import_id, time);
1111 self.output.import_frontier.give((datum, ts, diff));
1112 }
1113
1114 fn handle_arrangement_heap_size(
1116 &mut self,
1117 ArrangementHeapSizeReference {
1118 operator_id,
1119 delta_size,
1120 }: Ref<'_, ArrangementHeapSize>,
1121 ) {
1122 let ts = self.ts();
1123 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1124 return;
1125 };
1126
1127 state.size += delta_size;
1128
1129 let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1130 let diff = Diff::cast_from(delta_size);
1131 self.output.arrangement_heap_size.give((datum, ts, diff));
1132 }
1133
1134 fn handle_arrangement_heap_capacity(
1136 &mut self,
1137 ArrangementHeapCapacityReference {
1138 operator_id,
1139 delta_capacity,
1140 }: Ref<'_, ArrangementHeapCapacity>,
1141 ) {
1142 let ts = self.ts();
1143 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1144 return;
1145 };
1146
1147 state.capacity += delta_capacity;
1148
1149 let datum = self
1150 .state
1151 .pack_arrangement_heap_capacity_update(operator_id);
1152 let diff = Diff::cast_from(delta_capacity);
1153 self.output
1154 .arrangement_heap_capacity
1155 .give((datum, ts, diff));
1156 }
1157
1158 fn handle_arrangement_heap_allocations(
1160 &mut self,
1161 ArrangementHeapAllocationsReference {
1162 operator_id,
1163 delta_allocations,
1164 }: Ref<'_, ArrangementHeapAllocations>,
1165 ) {
1166 let ts = self.ts();
1167 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1168 return;
1169 };
1170
1171 state.count += delta_allocations;
1172
1173 let datum = self
1174 .state
1175 .pack_arrangement_heap_allocations_update(operator_id);
1176 let diff = Diff::cast_from(delta_allocations);
1177 self.output
1178 .arrangement_heap_allocations
1179 .give((datum, ts, diff));
1180 }
1181
1182 fn handle_arrangement_heap_size_operator(
1184 &mut self,
1185 ArrangementHeapSizeOperatorReference {
1186 operator_id,
1187 address,
1188 }: Ref<'_, ArrangementHeapSizeOperator>,
1189 ) {
1190 let activator = self
1191 .state
1192 .scheduler
1193 .activator_for(address.into_iter().collect());
1194 let existing = self
1195 .state
1196 .arrangement_size
1197 .insert(operator_id, Default::default());
1198 if existing.is_some() {
1199 error!(%operator_id, "arrangement size operator already registered");
1200 }
1201 let existing = self
1202 .shared_state
1203 .arrangement_size_activators
1204 .insert(operator_id, activator);
1205 if existing.is_some() {
1206 error!(%operator_id, "arrangement size activator already registered");
1207 }
1208 }
1209
1210 fn handle_arrangement_heap_size_operator_dropped(
1212 &mut self,
1213 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1214 ) {
1215 let operator_id = event.operator_id;
1216 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1217 let ts = self.ts();
1218 let allocations = self
1219 .state
1220 .pack_arrangement_heap_allocations_update(operator_id);
1221 let diff = -Diff::cast_from(state.count);
1222 self.output
1223 .arrangement_heap_allocations
1224 .give((allocations, ts, diff));
1225
1226 let capacity = self
1227 .state
1228 .pack_arrangement_heap_capacity_update(operator_id);
1229 let diff = -Diff::cast_from(state.capacity);
1230 self.output
1231 .arrangement_heap_capacity
1232 .give((capacity, ts, diff));
1233
1234 let size = self.state.pack_arrangement_heap_size_update(operator_id);
1235 let diff = -Diff::cast_from(state.size);
1236 self.output.arrangement_heap_size.give((size, ts, diff));
1237 }
1238 self.shared_state
1239 .arrangement_size_activators
1240 .remove(&operator_id);
1241 }
1242
1243 fn handle_lir_mapping(
1245 &mut self,
1246 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1247 ) {
1248 let global_id = Columnar::into_owned(global_id);
1249 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1251 self.state
1252 .lir_mapping
1253 .entry(global_id)
1254 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1255 .or_insert_with(|| mappings().collect());
1256
1257 let ts = self.ts();
1259 for (lir_id, meta) in mapping.into_iter() {
1260 let datum = self.state.pack_lir_mapping_update(
1261 global_id,
1262 Columnar::into_owned(lir_id),
1263 Columnar::into_owned(meta.operator),
1264 Columnar::into_owned(meta.parent_lir_id),
1265 Columnar::into_owned(meta.nesting),
1266 Columnar::into_owned(meta.operator_span),
1267 );
1268 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1269 }
1270 }
1271
1272 fn handle_dataflow_global(
1273 &mut self,
1274 DataflowGlobalReference {
1275 dataflow_index,
1276 global_id,
1277 }: Ref<'_, DataflowGlobal>,
1278 ) {
1279 let global_id = Columnar::into_owned(global_id);
1280 self.state
1281 .dataflow_global_ids
1282 .entry(dataflow_index)
1283 .and_modify(|globals| {
1284 if !globals.insert(global_id) {
1286 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1287 }
1288 })
1289 .or_insert_with(|| BTreeSet::from([global_id]));
1290
1291 let ts = self.ts();
1292 let datum = self
1293 .state
1294 .pack_dataflow_global_update(dataflow_index, global_id);
1295 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1296 }
1297}
1298
1299pub struct CollectionLogging {
1304 export_id: GlobalId,
1305 logger: Logger,
1306
1307 logged_frontier: Option<Timestamp>,
1308 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1309}
1310
1311impl CollectionLogging {
1312 pub fn new(
1314 export_id: GlobalId,
1315 logger: Logger,
1316 dataflow_index: usize,
1317 import_ids: impl Iterator<Item = GlobalId>,
1318 ) -> Self {
1319 logger.log(&ComputeEvent::Export(Export {
1320 export_id,
1321 dataflow_index,
1322 }));
1323
1324 let mut self_ = Self {
1325 export_id,
1326 logger,
1327 logged_frontier: None,
1328 logged_import_frontiers: Default::default(),
1329 };
1330
1331 let initial_frontier = Some(Timestamp::MIN);
1333 self_.set_frontier(initial_frontier);
1334 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1335
1336 self_
1337 }
1338
1339 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1341 let old_time = self.logged_frontier;
1342 self.logged_frontier = new_time;
1343
1344 if old_time != new_time {
1345 let export_id = self.export_id;
1346 let retraction = old_time.map(|time| {
1347 ComputeEvent::Frontier(Frontier {
1348 export_id,
1349 time,
1350 diff: -1,
1351 })
1352 });
1353 let insertion = new_time.map(|time| {
1354 ComputeEvent::Frontier(Frontier {
1355 export_id,
1356 time,
1357 diff: 1,
1358 })
1359 });
1360 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1361 self.logger.log_many(events);
1362 }
1363 }
1364
1365 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1368 let old_time = self.logged_import_frontiers.remove(&import_id);
1369 if let Some(time) = new_time {
1370 self.logged_import_frontiers.insert(import_id, time);
1371 }
1372
1373 if old_time != new_time {
1374 let export_id = self.export_id;
1375 let retraction = old_time.map(|time| {
1376 ComputeEvent::ImportFrontier(ImportFrontier {
1377 import_id,
1378 export_id,
1379 time,
1380 diff: -1,
1381 })
1382 });
1383 let insertion = new_time.map(|time| {
1384 ComputeEvent::ImportFrontier(ImportFrontier {
1385 import_id,
1386 export_id,
1387 time,
1388 diff: 1,
1389 })
1390 });
1391 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1392 self.logger.log_many(events);
1393 }
1394 }
1395
1396 pub fn set_hydrated(&self) {
1398 self.logger.log(&ComputeEvent::Hydration(Hydration {
1399 export_id: self.export_id,
1400 }));
1401 }
1402
1403 pub fn export_id(&self) -> GlobalId {
1405 self.export_id
1406 }
1407}
1408
1409impl Drop for CollectionLogging {
1410 fn drop(&mut self) {
1411 self.set_frontier(None);
1413
1414 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1415 for import_id in import_ids {
1416 self.set_import_frontier(import_id, None);
1417 }
1418
1419 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1420 export_id: self.export_id,
1421 }));
1422 }
1423}
1424
1425pub(crate) trait LogDataflowErrors {
1428 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1429}
1430
1431impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1432where
1433 G: Scope,
1434 D: Data,
1435{
1436 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1437 self.inner
1438 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1439 move |input, output| {
1440 input.for_each(|cap, data| {
1441 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1442 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1443
1444 output.session(&cap).give_container(data);
1445 });
1446 }
1447 })
1448 .as_collection()
1449 }
1450}
1451
1452impl<G, B> LogDataflowErrors for Stream<G, B>
1453where
1454 G: Scope,
1455 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1456{
1457 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1458 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1459 move |input, output| {
1460 input.for_each(|cap, data| {
1461 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1462 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1463
1464 output.session(&cap).give_container(data);
1465 });
1466 }
1467 })
1468 }
1469}
1470
1471fn sum_batch_diffs<B>(batch: &B) -> Diff
1478where
1479 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1480{
1481 let mut sum = Diff::ZERO;
1482 let mut cursor = batch.cursor();
1483
1484 while cursor.key_valid(batch) {
1485 while cursor.val_valid(batch) {
1486 cursor.map_times(batch, |_t, r| sum += r);
1487 cursor.step_val(batch);
1488 }
1489 cursor.step_key(batch);
1490 }
1491
1492 sum
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497 use super::*;
1498
1499 #[mz_ore::test]
1500 fn test_compute_event_size() {
1501 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1503 }
1504}