1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::Operator;
30use timely::dataflow::operators::generic::OutputBuilder;
31use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
32use timely::dataflow::operators::generic::operator::empty;
33use timely::dataflow::{Scope, StreamVec};
34use timely::scheduling::Scheduler;
35use tracing::error;
36use uuid::Uuid;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::logging::{
40 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
41 SharedLoggingState, Update,
42};
43use crate::row_spine::RowRowBuilder;
44use crate::typedefs::RowRowSpine;
45
46pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
48pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
49
50#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
52pub struct Export {
53 pub export_id: GlobalId,
55 pub dataflow_index: usize,
57}
58
59#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
61pub struct ExportDropped {
62 pub export_id: GlobalId,
64}
65
66#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
68pub struct PeekEvent {
69 pub id: GlobalId,
71 pub time: Timestamp,
73 pub uuid: uuid::Bytes,
75 pub peek_type: PeekType,
78 pub installed: bool,
80}
81
82#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
84pub struct Frontier {
85 pub export_id: GlobalId,
86 pub time: Timestamp,
87 pub diff: i8,
88}
89
90#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
92pub struct ImportFrontier {
93 pub import_id: GlobalId,
94 pub export_id: GlobalId,
95 pub time: Timestamp,
96 pub diff: i8,
97}
98
99#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
101pub struct ArrangementHeapSize {
102 pub operator_id: usize,
104 pub delta_size: isize,
106}
107
108#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
110pub struct ArrangementHeapCapacity {
111 pub operator_id: usize,
113 pub delta_capacity: isize,
115}
116
117#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
119pub struct ArrangementHeapAllocations {
120 pub operator_id: usize,
122 pub delta_allocations: isize,
124}
125
126#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
128pub struct ArrangementHeapSizeOperator {
129 pub operator_id: usize,
131 pub address: Vec<usize>,
133}
134
135#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
137pub struct ArrangementHeapSizeOperatorDrop {
138 pub operator_id: usize,
140}
141
142#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
144pub struct DataflowShutdown {
145 pub dataflow_index: usize,
147}
148
149#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
151pub struct ErrorCount {
152 pub export_id: GlobalId,
154 pub diff: Diff,
156}
157
158#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
160pub struct Hydration {
161 pub export_id: GlobalId,
163}
164
165#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
167pub struct OperatorHydration {
168 pub export_id: GlobalId,
170 pub lir_id: LirId,
172 pub hydrated: bool,
174}
175
176#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
178pub struct LirMapping {
179 pub global_id: GlobalId,
186 pub mapping: Vec<(LirId, LirMetadata)>,
189}
190
191#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
193pub struct DataflowGlobal {
194 pub dataflow_index: usize,
196 pub global_id: GlobalId,
198}
199
200#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
202pub enum ComputeEvent {
203 Export(Export),
205 ExportDropped(ExportDropped),
207 Peek(PeekEvent),
209 Frontier(Frontier),
211 ImportFrontier(ImportFrontier),
213 ArrangementHeapSize(ArrangementHeapSize),
215 ArrangementHeapCapacity(ArrangementHeapCapacity),
217 ArrangementHeapAllocations(ArrangementHeapAllocations),
219 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
221 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
223 DataflowShutdown(DataflowShutdown),
225 ErrorCount(ErrorCount),
227 Hydration(Hydration),
229 OperatorHydration(OperatorHydration),
231 LirMapping(LirMapping),
235 DataflowGlobal(DataflowGlobal),
236}
237
238#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
240pub enum PeekType {
241 Index,
243 Persist,
245}
246
247impl PeekType {
248 fn name(self) -> &'static str {
250 match self {
251 PeekType::Index => "index",
252 PeekType::Persist => "persist",
253 }
254 }
255}
256
257#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
259pub struct LirMetadata {
260 operator: String,
262 parent_lir_id: Option<LirId>,
264 nesting: u8,
266 operator_span: (usize, usize),
269}
270
271impl LirMetadata {
272 pub fn new(
274 operator: String,
275 parent_lir_id: Option<LirId>,
276 nesting: u8,
277 operator_span: (usize, usize),
278 ) -> Self {
279 Self {
280 operator,
281 parent_lir_id,
282 nesting,
283 operator_span,
284 }
285 }
286}
287
288pub(super) struct Return {
290 pub collections: BTreeMap<LogVariant, LogCollection>,
292}
293
294pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304 mut scope: G,
305 scheduler: S,
306 config: &mz_compute_client::logging::LoggingConfig,
307 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308 shared_state: Rc<RefCell<SharedLoggingState>>,
309) -> Return {
310 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
311
312 scope.scoped("compute logging", move |scope| {
313 let enable_logging = config.enable_logging;
314 let (logs, token) = if enable_logging {
315 event_queue.links.mz_replay(
316 scope,
317 "compute logs",
318 config.interval,
319 event_queue.activator,
320 )
321 } else {
322 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
323 (empty(scope), token)
324 };
325
326 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
329 let mut input = demux.new_input(logs, Pipeline);
330 let (export_out, export) = demux.new_output();
331 let mut export_out = OutputBuilder::from(export_out);
332 let (frontier_out, frontier) = demux.new_output();
333 let mut frontier_out = OutputBuilder::from(frontier_out);
334 let (import_frontier_out, import_frontier) = demux.new_output();
335 let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
336 let (peek_out, peek) = demux.new_output();
337 let mut peek_out = OutputBuilder::from(peek_out);
338 let (peek_duration_out, peek_duration) = demux.new_output();
339 let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
340 let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
341 let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
342 let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
343 let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
344 let (arrangement_heap_allocations_out, arrangement_heap_allocations) = demux.new_output();
345 let mut arrangement_heap_allocations_out =
346 OutputBuilder::from(arrangement_heap_allocations_out);
347 let (error_count_out, error_count) = demux.new_output();
348 let mut error_count_out = OutputBuilder::from(error_count_out);
349 let (hydration_time_out, hydration_time) = demux.new_output();
350 let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
351 let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
352 let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
353 let (lir_mapping_out, lir_mapping) = demux.new_output();
354 let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
355 let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
356 let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
357
358 let mut demux_state = DemuxState::new(scheduler, scope.index());
359 demux.build(move |_capability| {
360 move |_frontiers| {
361 let mut export = export_out.activate();
362 let mut frontier = frontier_out.activate();
363 let mut import_frontier = import_frontier_out.activate();
364 let mut peek = peek_out.activate();
365 let mut peek_duration = peek_duration_out.activate();
366 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
367 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
368 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
369 let mut error_count = error_count_out.activate();
370 let mut hydration_time = hydration_time_out.activate();
371 let mut operator_hydration_status = operator_hydration_status_out.activate();
372 let mut lir_mapping = lir_mapping_out.activate();
373 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
374
375 input.for_each(|cap, data| {
376 let mut output_sessions = DemuxOutput {
377 export: export.session_with_builder(&cap),
378 frontier: frontier.session_with_builder(&cap),
379 import_frontier: import_frontier.session_with_builder(&cap),
380 peek: peek.session_with_builder(&cap),
381 peek_duration: peek_duration.session_with_builder(&cap),
382 arrangement_heap_allocations: arrangement_heap_allocations
383 .session_with_builder(&cap),
384 arrangement_heap_capacity: arrangement_heap_capacity
385 .session_with_builder(&cap),
386 arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
387 error_count: error_count.session_with_builder(&cap),
388 hydration_time: hydration_time.session_with_builder(&cap),
389 operator_hydration_status: operator_hydration_status
390 .session_with_builder(&cap),
391 lir_mapping: lir_mapping.session_with_builder(&cap),
392 dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
393 };
394
395 let shared_state = &mut shared_state.borrow_mut();
396 for (time, event) in data.borrow().into_index_iter() {
397 DemuxHandler {
398 state: &mut demux_state,
399 shared_state,
400 output: &mut output_sessions,
401 logging_interval_ms,
402 time,
403 }
404 .handle(event);
405 }
406 });
407 }
408 });
409
410 use ComputeLog::*;
411 let logs = [
412 (ArrangementHeapAllocations, arrangement_heap_allocations),
413 (ArrangementHeapCapacity, arrangement_heap_capacity),
414 (ArrangementHeapSize, arrangement_heap_size),
415 (DataflowCurrent, export),
416 (DataflowGlobal, dataflow_global_ids),
417 (ErrorCount, error_count),
418 (FrontierCurrent, frontier),
419 (HydrationTime, hydration_time),
420 (ImportFrontierCurrent, import_frontier),
421 (LirMapping, lir_mapping),
422 (OperatorHydrationStatus, operator_hydration_status),
423 (PeekCurrent, peek),
424 (PeekDuration, peek_duration),
425 ];
426
427 let mut collections = BTreeMap::new();
429 for (variant, stream) in logs {
430 let variant = LogVariant::Compute(variant);
431 if config.index_logs.contains_key(&variant) {
432 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
433 columnar_exchange::<Row, Row, Timestamp, Diff>,
434 );
435 let trace = stream
436 .mz_arrange_core::<
437 _,
438 Col2ValBatcher<_, _, _, _>,
439 RowRowBuilder<_, _>,
440 RowRowSpine<_, _>,
441 >(exchange, &format!("Arrange {variant:?}"))
442 .trace;
443 let collection = LogCollection {
444 trace,
445 token: Rc::clone(&token),
446 };
447 collections.insert(variant, collection);
448 }
449 }
450
451 Return { collections }
452 })
453}
454
455fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
461where
462 V: Display,
463{
464 scratch.clear();
465 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
466 Datum::String(scratch)
467}
468
469struct DemuxState<A> {
471 scheduler: A,
473 worker_id: usize,
475 scratch_string_a: String,
477 scratch_string_b: String,
479 exports: BTreeMap<GlobalId, ExportState>,
481 peek_stash: BTreeMap<Uuid, Duration>,
483 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
485 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
487 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
489 arrangement_heap_allocations_packer: PermutedRowPacker,
491 arrangement_heap_capacity_packer: PermutedRowPacker,
493 arrangement_heap_size_packer: PermutedRowPacker,
495 dataflow_global_packer: PermutedRowPacker,
497 error_count_packer: PermutedRowPacker,
499 export_packer: PermutedRowPacker,
501 frontier_packer: PermutedRowPacker,
503 import_frontier_packer: PermutedRowPacker,
505 lir_mapping_packer: PermutedRowPacker,
507 operator_hydration_status_packer: PermutedRowPacker,
509 peek_duration_packer: PermutedRowPacker,
511 peek_packer: PermutedRowPacker,
513 hydration_time_packer: PermutedRowPacker,
515}
516
517impl<A: Scheduler> DemuxState<A> {
518 fn new(scheduler: A, worker_id: usize) -> Self {
519 Self {
520 scheduler,
521 worker_id,
522 scratch_string_a: String::new(),
523 scratch_string_b: String::new(),
524 exports: Default::default(),
525 peek_stash: Default::default(),
526 arrangement_size: Default::default(),
527 lir_mapping: Default::default(),
528 dataflow_global_ids: Default::default(),
529 arrangement_heap_allocations_packer: PermutedRowPacker::new(
530 ComputeLog::ArrangementHeapAllocations,
531 ),
532 arrangement_heap_capacity_packer: PermutedRowPacker::new(
533 ComputeLog::ArrangementHeapCapacity,
534 ),
535 arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
536 dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
537 error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
538 export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
539 frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
540 hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
541 import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
542 lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
543 operator_hydration_status_packer: PermutedRowPacker::new(
544 ComputeLog::OperatorHydrationStatus,
545 ),
546 peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
547 peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
548 }
549 }
550
551 fn pack_arrangement_heap_allocations_update(
553 &mut self,
554 operator_id: usize,
555 ) -> (&RowRef, &RowRef) {
556 self.arrangement_heap_allocations_packer.pack_slice(&[
557 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
558 Datum::UInt64(u64::cast_from(self.worker_id)),
559 ])
560 }
561
562 fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
564 self.arrangement_heap_capacity_packer.pack_slice(&[
565 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
566 Datum::UInt64(u64::cast_from(self.worker_id)),
567 ])
568 }
569
570 fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
572 self.arrangement_heap_size_packer.pack_slice(&[
573 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
574 Datum::UInt64(u64::cast_from(self.worker_id)),
575 ])
576 }
577
578 fn pack_dataflow_global_update(
580 &mut self,
581 dataflow_index: usize,
582 global_id: GlobalId,
583 ) -> (&RowRef, &RowRef) {
584 self.dataflow_global_packer.pack_slice(&[
585 Datum::UInt64(u64::cast_from(dataflow_index)),
586 Datum::UInt64(u64::cast_from(self.worker_id)),
587 make_string_datum(global_id, &mut self.scratch_string_a),
588 ])
589 }
590
591 fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
593 self.error_count_packer.pack_slice(&[
597 make_string_datum(export_id, &mut self.scratch_string_a),
598 Datum::UInt64(u64::cast_from(self.worker_id)),
599 Datum::Int64(count.into_inner()),
600 ])
601 }
602
603 fn pack_export_update(
605 &mut self,
606 export_id: GlobalId,
607 dataflow_index: usize,
608 ) -> (&RowRef, &RowRef) {
609 self.export_packer.pack_slice(&[
610 make_string_datum(export_id, &mut self.scratch_string_a),
611 Datum::UInt64(u64::cast_from(self.worker_id)),
612 Datum::UInt64(u64::cast_from(dataflow_index)),
613 ])
614 }
615
616 fn pack_hydration_time_update(
618 &mut self,
619 export_id: GlobalId,
620 time_ns: Option<u64>,
621 ) -> (&RowRef, &RowRef) {
622 self.hydration_time_packer.pack_slice(&[
623 make_string_datum(export_id, &mut self.scratch_string_a),
624 Datum::UInt64(u64::cast_from(self.worker_id)),
625 Datum::from(time_ns),
626 ])
627 }
628
629 fn pack_import_frontier_update(
631 &mut self,
632 export_id: GlobalId,
633 import_id: GlobalId,
634 time: Timestamp,
635 ) -> (&RowRef, &RowRef) {
636 self.import_frontier_packer.pack_slice(&[
637 make_string_datum(export_id, &mut self.scratch_string_a),
638 make_string_datum(import_id, &mut self.scratch_string_b),
639 Datum::UInt64(u64::cast_from(self.worker_id)),
640 Datum::MzTimestamp(time),
641 ])
642 }
643
644 fn pack_lir_mapping_update(
646 &mut self,
647 global_id: GlobalId,
648 lir_id: LirId,
649 operator: String,
650 parent_lir_id: Option<LirId>,
651 nesting: u8,
652 operator_span: (usize, usize),
653 ) -> (&RowRef, &RowRef) {
654 self.lir_mapping_packer.pack_slice(&[
655 make_string_datum(global_id, &mut self.scratch_string_a),
656 Datum::UInt64(lir_id.into()),
657 Datum::UInt64(u64::cast_from(self.worker_id)),
658 make_string_datum(operator, &mut self.scratch_string_b),
659 parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
660 Datum::UInt16(u16::cast_from(nesting)),
661 Datum::UInt64(u64::cast_from(operator_span.0)),
662 Datum::UInt64(u64::cast_from(operator_span.1)),
663 ])
664 }
665
666 fn pack_operator_hydration_status_update(
669 &mut self,
670 export_id: GlobalId,
671 lir_id: LirId,
672 hydrated: bool,
673 ) -> (&RowRef, &RowRef) {
674 self.operator_hydration_status_packer.pack_slice(&[
675 make_string_datum(export_id, &mut self.scratch_string_a),
676 Datum::UInt64(lir_id.into()),
677 Datum::UInt64(u64::cast_from(self.worker_id)),
678 Datum::from(hydrated),
679 ])
680 }
681
682 fn pack_peek_duration_update(
684 &mut self,
685 peek_type: PeekType,
686 bucket: u128,
687 ) -> (&RowRef, &RowRef) {
688 self.peek_duration_packer.pack_slice(&[
689 Datum::UInt64(u64::cast_from(self.worker_id)),
690 Datum::String(peek_type.name()),
691 Datum::UInt64(bucket.try_into().expect("bucket too big")),
692 ])
693 }
694
695 fn pack_peek_update(
697 &mut self,
698 id: GlobalId,
699 time: Timestamp,
700 uuid: Uuid,
701 peek_type: PeekType,
702 ) -> (&RowRef, &RowRef) {
703 self.peek_packer.pack_slice(&[
704 Datum::Uuid(uuid),
705 Datum::UInt64(u64::cast_from(self.worker_id)),
706 make_string_datum(id, &mut self.scratch_string_a),
707 Datum::String(peek_type.name()),
708 Datum::MzTimestamp(time),
709 ])
710 }
711
712 fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
714 self.frontier_packer.pack_slice(&[
715 make_string_datum(export_id, &mut self.scratch_string_a),
716 Datum::UInt64(u64::cast_from(self.worker_id)),
717 Datum::MzTimestamp(time),
718 ])
719 }
720}
721
722struct ExportState {
724 dataflow_index: usize,
726 error_count: Diff,
731 created_at: Instant,
733 hydration_time_ns: Option<u64>,
735 operator_hydration: BTreeMap<LirId, bool>,
737}
738
739impl ExportState {
740 fn new(dataflow_index: usize) -> Self {
741 Self {
742 dataflow_index,
743 error_count: Diff::ZERO,
744 created_at: Instant::now(),
745 hydration_time_ns: None,
746 operator_hydration: BTreeMap::new(),
747 }
748 }
749}
750
751#[derive(Default, Debug)]
753struct ArrangementSizeState {
754 size: isize,
755 capacity: isize,
756 count: isize,
757}
758
759struct DemuxOutput<'a, 'b> {
761 export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
762 frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
763 import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764 peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765 peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766 arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767 arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
768 arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
769 hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
770 operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
771 error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
772 lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
773 dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
774}
775
776struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
778 state: &'a mut DemuxState<A>,
780 shared_state: &'a mut SharedLoggingState,
782 output: &'a mut DemuxOutput<'b, 'c>,
784 logging_interval_ms: u128,
786 time: Duration,
788}
789
790impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
791 fn ts(&self) -> Timestamp {
794 let time_ms = self.time.as_millis();
795 let interval = self.logging_interval_ms;
796 let rounded = (time_ms / interval + 1) * interval;
797 rounded.try_into().expect("must fit")
798 }
799
800 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
802 use ComputeEventReference::*;
803 match event {
804 Export(export) => self.handle_export(export),
805 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
806 Peek(peek) if peek.installed => self.handle_peek_install(peek),
807 Peek(peek) => self.handle_peek_retire(peek),
808 Frontier(frontier) => self.handle_frontier(frontier),
809 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
810 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
811 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
812 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
813 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
814 ArrangementHeapSizeOperatorDrop(inner) => {
815 self.handle_arrangement_heap_size_operator_dropped(inner)
816 }
817 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
818 ErrorCount(error_count) => self.handle_error_count(error_count),
819 Hydration(hydration) => self.handle_hydration(hydration),
820 OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
821 LirMapping(mapping) => self.handle_lir_mapping(mapping),
822 DataflowGlobal(global) => self.handle_dataflow_global(global),
823 }
824 }
825
826 fn handle_export(
827 &mut self,
828 ExportReference {
829 export_id,
830 dataflow_index,
831 }: Ref<'_, Export>,
832 ) {
833 let export_id = Columnar::into_owned(export_id);
834 let ts = self.ts();
835 let datum = self.state.pack_export_update(export_id, dataflow_index);
836 self.output.export.give((datum, ts, Diff::ONE));
837
838 let existing = self
839 .state
840 .exports
841 .insert(export_id, ExportState::new(dataflow_index));
842 if existing.is_some() {
843 error!(%export_id, "export already registered");
844 }
845
846 let datum = self.state.pack_hydration_time_update(export_id, None);
848 self.output.hydration_time.give((datum, ts, Diff::ONE));
849 }
850
851 fn handle_export_dropped(
852 &mut self,
853 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
854 ) {
855 let export_id = Columnar::into_owned(export_id);
856 let Some(export) = self.state.exports.remove(&export_id) else {
857 error!(%export_id, "missing exports entry at time of export drop");
858 return;
859 };
860
861 let ts = self.ts();
862 let dataflow_index = export.dataflow_index;
863
864 let datum = self.state.pack_export_update(export_id, dataflow_index);
865 self.output.export.give((datum, ts, Diff::MINUS_ONE));
866
867 if export.error_count != Diff::ZERO {
869 let datum = self
870 .state
871 .pack_error_count_update(export_id, export.error_count);
872 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
873 }
874
875 let datum = self
877 .state
878 .pack_hydration_time_update(export_id, export.hydration_time_ns);
879 self.output
880 .hydration_time
881 .give((datum, ts, Diff::MINUS_ONE));
882
883 for (lir_id, hydrated) in export.operator_hydration {
885 let datum = self
886 .state
887 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
888 self.output
889 .operator_hydration_status
890 .give((datum, ts, Diff::MINUS_ONE));
891 }
892 }
893
894 fn handle_dataflow_shutdown(
895 &mut self,
896 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
897 ) {
898 let ts = self.ts();
899
900 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
902 for global_id in global_ids {
903 let datum = self
905 .state
906 .pack_dataflow_global_update(dataflow_index, global_id);
907 self.output
908 .dataflow_global_ids
909 .give((datum, ts, Diff::MINUS_ONE));
910
911 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
913 for (
914 lir_id,
915 LirMetadata {
916 operator,
917 parent_lir_id,
918 nesting,
919 operator_span,
920 },
921 ) in mappings
922 {
923 let datum = self.state.pack_lir_mapping_update(
924 global_id,
925 lir_id,
926 operator,
927 parent_lir_id,
928 nesting,
929 operator_span,
930 );
931 self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
932 }
933 }
934 }
935 }
936 }
937
938 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
939 let ts = self.ts();
940 let export_id = Columnar::into_owned(export_id);
941
942 let Some(export) = self.state.exports.get_mut(&export_id) else {
943 return;
946 };
947
948 let old_count = export.error_count;
949 let new_count = old_count + diff;
950 export.error_count = new_count;
951
952 if old_count != Diff::ZERO {
953 let datum = self.state.pack_error_count_update(export_id, old_count);
954 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
955 }
956 if new_count != Diff::ZERO {
957 let datum = self.state.pack_error_count_update(export_id, new_count);
958 self.output.error_count.give((datum, ts, Diff::ONE));
959 }
960 }
961
962 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
963 let ts = self.ts();
964 let export_id = Columnar::into_owned(export_id);
965
966 let Some(export) = self.state.exports.get_mut(&export_id) else {
967 error!(%export_id, "hydration event for unknown export");
968 return;
969 };
970 if export.hydration_time_ns.is_some() {
971 return;
974 }
975
976 let duration = export.created_at.elapsed();
977 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
978 export.hydration_time_ns = Some(nanos);
979
980 let retraction = self.state.pack_hydration_time_update(export_id, None);
981 self.output
982 .hydration_time
983 .give((retraction, ts, Diff::MINUS_ONE));
984 let insertion = self
985 .state
986 .pack_hydration_time_update(export_id, Some(nanos));
987 self.output.hydration_time.give((insertion, ts, Diff::ONE));
988 }
989
990 fn handle_operator_hydration(
991 &mut self,
992 OperatorHydrationReference {
993 export_id,
994 lir_id,
995 hydrated,
996 }: Ref<'_, OperatorHydration>,
997 ) {
998 let ts = self.ts();
999 let export_id = Columnar::into_owned(export_id);
1000 let lir_id = Columnar::into_owned(lir_id);
1001 let hydrated = Columnar::into_owned(hydrated);
1002
1003 let Some(export) = self.state.exports.get_mut(&export_id) else {
1004 return;
1007 };
1008
1009 let old_status = export.operator_hydration.get(&lir_id).copied();
1010 export.operator_hydration.insert(lir_id, hydrated);
1011
1012 if let Some(hydrated) = old_status {
1013 let retraction = self
1014 .state
1015 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1016 self.output
1017 .operator_hydration_status
1018 .give((retraction, ts, Diff::MINUS_ONE));
1019 }
1020
1021 let insertion = self
1022 .state
1023 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1024 self.output
1025 .operator_hydration_status
1026 .give((insertion, ts, Diff::ONE));
1027 }
1028
1029 fn handle_peek_install(
1030 &mut self,
1031 PeekEventReference {
1032 id,
1033 time,
1034 uuid,
1035 peek_type,
1036 installed: _,
1037 }: Ref<'_, PeekEvent>,
1038 ) {
1039 let id = Columnar::into_owned(id);
1040 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1041 let ts = self.ts();
1042 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1043 self.output.peek.give((datum, ts, Diff::ONE));
1044
1045 let existing = self.state.peek_stash.insert(uuid, self.time);
1046 if existing.is_some() {
1047 error!(%uuid, "peek already registered");
1048 }
1049 }
1050
1051 fn handle_peek_retire(
1052 &mut self,
1053 PeekEventReference {
1054 id,
1055 time,
1056 uuid,
1057 peek_type,
1058 installed: _,
1059 }: Ref<'_, PeekEvent>,
1060 ) {
1061 let id = Columnar::into_owned(id);
1062 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1063 let ts = self.ts();
1064 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1065 self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1066
1067 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1068 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1069 let bucket = elapsed_ns.next_power_of_two();
1070 let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1071 self.output.peek_duration.give((datum, ts, Diff::ONE));
1072 } else {
1073 error!(%uuid, "peek not yet registered");
1074 }
1075 }
1076
1077 fn handle_frontier(
1078 &mut self,
1079 FrontierReference {
1080 export_id,
1081 time,
1082 diff,
1083 }: Ref<'_, Frontier>,
1084 ) {
1085 let export_id = Columnar::into_owned(export_id);
1086 let diff = Diff::from(*diff);
1087 let ts = self.ts();
1088 let time = Columnar::into_owned(time);
1089 let datum = self.state.pack_frontier_update(export_id, time);
1090 self.output.frontier.give((datum, ts, diff));
1091 }
1092
1093 fn handle_import_frontier(
1094 &mut self,
1095 ImportFrontierReference {
1096 import_id,
1097 export_id,
1098 time,
1099 diff,
1100 }: Ref<'_, ImportFrontier>,
1101 ) {
1102 let import_id = Columnar::into_owned(import_id);
1103 let export_id = Columnar::into_owned(export_id);
1104 let diff = Diff::from(*diff);
1105 let ts = self.ts();
1106 let time = Columnar::into_owned(time);
1107 let datum = self
1108 .state
1109 .pack_import_frontier_update(export_id, import_id, time);
1110 self.output.import_frontier.give((datum, ts, diff));
1111 }
1112
1113 fn handle_arrangement_heap_size(
1115 &mut self,
1116 ArrangementHeapSizeReference {
1117 operator_id,
1118 delta_size,
1119 }: Ref<'_, ArrangementHeapSize>,
1120 ) {
1121 let ts = self.ts();
1122 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1123 return;
1124 };
1125
1126 state.size += delta_size;
1127
1128 let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1129 let diff = Diff::cast_from(delta_size);
1130 self.output.arrangement_heap_size.give((datum, ts, diff));
1131 }
1132
1133 fn handle_arrangement_heap_capacity(
1135 &mut self,
1136 ArrangementHeapCapacityReference {
1137 operator_id,
1138 delta_capacity,
1139 }: Ref<'_, ArrangementHeapCapacity>,
1140 ) {
1141 let ts = self.ts();
1142 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1143 return;
1144 };
1145
1146 state.capacity += delta_capacity;
1147
1148 let datum = self
1149 .state
1150 .pack_arrangement_heap_capacity_update(operator_id);
1151 let diff = Diff::cast_from(delta_capacity);
1152 self.output
1153 .arrangement_heap_capacity
1154 .give((datum, ts, diff));
1155 }
1156
1157 fn handle_arrangement_heap_allocations(
1159 &mut self,
1160 ArrangementHeapAllocationsReference {
1161 operator_id,
1162 delta_allocations,
1163 }: Ref<'_, ArrangementHeapAllocations>,
1164 ) {
1165 let ts = self.ts();
1166 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1167 return;
1168 };
1169
1170 state.count += delta_allocations;
1171
1172 let datum = self
1173 .state
1174 .pack_arrangement_heap_allocations_update(operator_id);
1175 let diff = Diff::cast_from(delta_allocations);
1176 self.output
1177 .arrangement_heap_allocations
1178 .give((datum, ts, diff));
1179 }
1180
1181 fn handle_arrangement_heap_size_operator(
1183 &mut self,
1184 ArrangementHeapSizeOperatorReference {
1185 operator_id,
1186 address,
1187 }: Ref<'_, ArrangementHeapSizeOperator>,
1188 ) {
1189 let activator = self
1190 .state
1191 .scheduler
1192 .activator_for(address.into_iter().collect());
1193 let existing = self
1194 .state
1195 .arrangement_size
1196 .insert(operator_id, Default::default());
1197 if existing.is_some() {
1198 error!(%operator_id, "arrangement size operator already registered");
1199 }
1200 let existing = self
1201 .shared_state
1202 .arrangement_size_activators
1203 .insert(operator_id, activator);
1204 if existing.is_some() {
1205 error!(%operator_id, "arrangement size activator already registered");
1206 }
1207 }
1208
1209 fn handle_arrangement_heap_size_operator_dropped(
1211 &mut self,
1212 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1213 ) {
1214 let operator_id = event.operator_id;
1215 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1216 let ts = self.ts();
1217 let allocations = self
1218 .state
1219 .pack_arrangement_heap_allocations_update(operator_id);
1220 let diff = -Diff::cast_from(state.count);
1221 self.output
1222 .arrangement_heap_allocations
1223 .give((allocations, ts, diff));
1224
1225 let capacity = self
1226 .state
1227 .pack_arrangement_heap_capacity_update(operator_id);
1228 let diff = -Diff::cast_from(state.capacity);
1229 self.output
1230 .arrangement_heap_capacity
1231 .give((capacity, ts, diff));
1232
1233 let size = self.state.pack_arrangement_heap_size_update(operator_id);
1234 let diff = -Diff::cast_from(state.size);
1235 self.output.arrangement_heap_size.give((size, ts, diff));
1236 }
1237 self.shared_state
1238 .arrangement_size_activators
1239 .remove(&operator_id);
1240 }
1241
1242 fn handle_lir_mapping(
1244 &mut self,
1245 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1246 ) {
1247 let global_id = Columnar::into_owned(global_id);
1248 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1250 self.state
1251 .lir_mapping
1252 .entry(global_id)
1253 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1254 .or_insert_with(|| mappings().collect());
1255
1256 let ts = self.ts();
1258 for (lir_id, meta) in mapping.into_iter() {
1259 let datum = self.state.pack_lir_mapping_update(
1260 global_id,
1261 Columnar::into_owned(lir_id),
1262 Columnar::into_owned(meta.operator),
1263 Columnar::into_owned(meta.parent_lir_id),
1264 Columnar::into_owned(meta.nesting),
1265 Columnar::into_owned(meta.operator_span),
1266 );
1267 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1268 }
1269 }
1270
1271 fn handle_dataflow_global(
1272 &mut self,
1273 DataflowGlobalReference {
1274 dataflow_index,
1275 global_id,
1276 }: Ref<'_, DataflowGlobal>,
1277 ) {
1278 let global_id = Columnar::into_owned(global_id);
1279 self.state
1280 .dataflow_global_ids
1281 .entry(dataflow_index)
1282 .and_modify(|globals| {
1283 if !globals.insert(global_id) {
1285 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1286 }
1287 })
1288 .or_insert_with(|| BTreeSet::from([global_id]));
1289
1290 let ts = self.ts();
1291 let datum = self
1292 .state
1293 .pack_dataflow_global_update(dataflow_index, global_id);
1294 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1295 }
1296}
1297
1298pub struct CollectionLogging {
1303 export_id: GlobalId,
1304 logger: Logger,
1305
1306 logged_frontier: Option<Timestamp>,
1307 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1308}
1309
1310impl CollectionLogging {
1311 pub fn new(
1313 export_id: GlobalId,
1314 logger: Logger,
1315 dataflow_index: usize,
1316 import_ids: impl Iterator<Item = GlobalId>,
1317 ) -> Self {
1318 logger.log(&ComputeEvent::Export(Export {
1319 export_id,
1320 dataflow_index,
1321 }));
1322
1323 let mut self_ = Self {
1324 export_id,
1325 logger,
1326 logged_frontier: None,
1327 logged_import_frontiers: Default::default(),
1328 };
1329
1330 let initial_frontier = Some(Timestamp::MIN);
1332 self_.set_frontier(initial_frontier);
1333 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1334
1335 self_
1336 }
1337
1338 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1340 let old_time = self.logged_frontier;
1341 self.logged_frontier = new_time;
1342
1343 if old_time != new_time {
1344 let export_id = self.export_id;
1345 let retraction = old_time.map(|time| {
1346 ComputeEvent::Frontier(Frontier {
1347 export_id,
1348 time,
1349 diff: -1,
1350 })
1351 });
1352 let insertion = new_time.map(|time| {
1353 ComputeEvent::Frontier(Frontier {
1354 export_id,
1355 time,
1356 diff: 1,
1357 })
1358 });
1359 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1360 self.logger.log_many(events);
1361 }
1362 }
1363
1364 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1367 let old_time = self.logged_import_frontiers.remove(&import_id);
1368 if let Some(time) = new_time {
1369 self.logged_import_frontiers.insert(import_id, time);
1370 }
1371
1372 if old_time != new_time {
1373 let export_id = self.export_id;
1374 let retraction = old_time.map(|time| {
1375 ComputeEvent::ImportFrontier(ImportFrontier {
1376 import_id,
1377 export_id,
1378 time,
1379 diff: -1,
1380 })
1381 });
1382 let insertion = new_time.map(|time| {
1383 ComputeEvent::ImportFrontier(ImportFrontier {
1384 import_id,
1385 export_id,
1386 time,
1387 diff: 1,
1388 })
1389 });
1390 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1391 self.logger.log_many(events);
1392 }
1393 }
1394
1395 pub fn set_hydrated(&self) {
1397 self.logger.log(&ComputeEvent::Hydration(Hydration {
1398 export_id: self.export_id,
1399 }));
1400 }
1401
1402 pub fn export_id(&self) -> GlobalId {
1404 self.export_id
1405 }
1406}
1407
1408impl Drop for CollectionLogging {
1409 fn drop(&mut self) {
1410 self.set_frontier(None);
1412
1413 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1414 for import_id in import_ids {
1415 self.set_import_frontier(import_id, None);
1416 }
1417
1418 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1419 export_id: self.export_id,
1420 }));
1421 }
1422}
1423
1424pub(crate) trait LogDataflowErrors {
1427 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1428}
1429
1430impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1431where
1432 G: Scope,
1433 D: Clone + 'static,
1434{
1435 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1436 self.inner
1437 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1438 move |input, output| {
1439 input.for_each(|cap, data| {
1440 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1441 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1442
1443 output.session(&cap).give_container(data);
1444 });
1445 }
1446 })
1447 .as_collection()
1448 }
1449}
1450
1451impl<G, B> LogDataflowErrors for StreamVec<G, B>
1452where
1453 G: Scope,
1454 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1455{
1456 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1457 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1458 move |input, output| {
1459 input.for_each(|cap, data| {
1460 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1461 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1462
1463 output.session(&cap).give_container(data);
1464 });
1465 }
1466 })
1467 }
1468}
1469
1470fn sum_batch_diffs<B>(batch: &B) -> Diff
1477where
1478 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1479{
1480 let mut sum = Diff::ZERO;
1481 let mut cursor = batch.cursor();
1482
1483 while cursor.key_valid(batch) {
1484 while cursor.val_valid(batch) {
1485 cursor.map_times(batch, |_t, r| sum += r);
1486 cursor.step_val(batch);
1487 }
1488 cursor.step_key(batch);
1489 }
1490
1491 sum
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496 use super::*;
1497
1498 #[mz_ore::test]
1499 fn test_compute_event_size() {
1500 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1502 }
1503}