1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::Data;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, Stream};
35use timely::scheduling::Scheduler;
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42 SharedLoggingState, Update,
43};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::RowRowSpine;
46
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54 pub export_id: GlobalId,
56 pub dataflow_index: usize,
58}
59
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63 pub export_id: GlobalId,
65}
66
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70 pub id: GlobalId,
72 pub time: Timestamp,
74 pub uuid: uuid::Bytes,
76 pub peek_type: PeekType,
79 pub installed: bool,
81}
82
83#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86 pub export_id: GlobalId,
87 pub time: Timestamp,
88 pub diff: i8,
89}
90
91#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94 pub import_id: GlobalId,
95 pub export_id: GlobalId,
96 pub time: Timestamp,
97 pub diff: i8,
98}
99
100#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103 pub operator_id: usize,
105 pub delta_size: isize,
107}
108
109#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112 pub operator_id: usize,
114 pub delta_capacity: isize,
116}
117
118#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121 pub operator_id: usize,
123 pub delta_allocations: isize,
125}
126
127#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130 pub operator_id: usize,
132 pub address: Vec<usize>,
134}
135
136#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139 pub operator_id: usize,
141}
142
143#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146 pub dataflow_index: usize,
148}
149
150#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153 pub export_id: GlobalId,
155 pub diff: Diff,
157}
158
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162 pub export_id: GlobalId,
164}
165
166#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169 pub export_id: GlobalId,
171 pub lir_id: LirId,
173 pub hydrated: bool,
175}
176
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180 pub global_id: GlobalId,
187 pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195 pub dataflow_index: usize,
197 pub global_id: GlobalId,
199}
200
201#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204 Export(Export),
206 ExportDropped(ExportDropped),
208 Peek(PeekEvent),
210 Frontier(Frontier),
212 ImportFrontier(ImportFrontier),
214 ArrangementHeapSize(ArrangementHeapSize),
216 ArrangementHeapCapacity(ArrangementHeapCapacity),
218 ArrangementHeapAllocations(ArrangementHeapAllocations),
220 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224 DataflowShutdown(DataflowShutdown),
226 ErrorCount(ErrorCount),
228 Hydration(Hydration),
230 OperatorHydration(OperatorHydration),
232 LirMapping(LirMapping),
236 DataflowGlobal(DataflowGlobal),
237}
238
239#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242 Index,
244 Persist,
246}
247
248impl PeekType {
249 fn name(self) -> &'static str {
251 match self {
252 PeekType::Index => "index",
253 PeekType::Persist => "persist",
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261 operator: String,
263 parent_lir_id: Option<LirId>,
265 nesting: u8,
267 operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273 pub fn new(
275 operator: String,
276 parent_lir_id: Option<LirId>,
277 nesting: u8,
278 operator_span: (usize, usize),
279 ) -> Self {
280 Self {
281 operator,
282 parent_lir_id,
283 nesting,
284 operator_span,
285 }
286 }
287}
288
289pub(super) struct Return {
291 pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
305 mut scope: G,
306 scheduler: S,
307 config: &mz_compute_client::logging::LoggingConfig,
308 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309 shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313 scope.scoped("compute logging", move |scope| {
314 let enable_logging = config.enable_logging;
315 let (logs, token) = if enable_logging {
316 event_queue.links.mz_replay(
317 scope,
318 "compute logs",
319 config.interval,
320 event_queue.activator,
321 )
322 } else {
323 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324 (empty(scope), token)
325 };
326
327 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330 let mut input = demux.new_input(&logs, Pipeline);
331 let (export_out, export) = demux.new_output();
332 let mut export_out = OutputBuilder::from(export_out);
333 let (frontier_out, frontier) = demux.new_output();
334 let mut frontier_out = OutputBuilder::from(frontier_out);
335 let (import_frontier_out, import_frontier) = demux.new_output();
336 let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337 let (peek_out, peek) = demux.new_output();
338 let mut peek_out = OutputBuilder::from(peek_out);
339 let (peek_duration_out, peek_duration) = demux.new_output();
340 let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341 let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
342 let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
343 let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
344 let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
345 let (arrangement_heap_allocations_out, arrangement_heap_allocations) =
346 demux.new_output();
347 let mut arrangement_heap_allocations_out = OutputBuilder::from(arrangement_heap_allocations_out);
348 let (error_count_out, error_count) = demux.new_output();
349 let mut error_count_out = OutputBuilder::from(error_count_out);
350 let (hydration_time_out, hydration_time) = demux.new_output();
351 let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
352 let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
353 let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
354 let (lir_mapping_out, lir_mapping) = demux.new_output();
355 let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
356 let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
357 let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
358
359 let mut demux_state = DemuxState::new(scheduler, scope.index());
360 demux.build(move |_capability| {
361 move |_frontiers| {
362 let mut export = export_out.activate();
363 let mut frontier = frontier_out.activate();
364 let mut import_frontier = import_frontier_out.activate();
365 let mut peek = peek_out.activate();
366 let mut peek_duration = peek_duration_out.activate();
367 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
368 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
369 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
370 let mut error_count = error_count_out.activate();
371 let mut hydration_time = hydration_time_out.activate();
372 let mut operator_hydration_status = operator_hydration_status_out.activate();
373 let mut lir_mapping = lir_mapping_out.activate();
374 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
375
376 input.for_each(|cap, data| {
377 let mut output_sessions = DemuxOutput {
378 export: export.session_with_builder(&cap),
379 frontier: frontier.session_with_builder(&cap),
380 import_frontier: import_frontier.session_with_builder(&cap),
381 peek: peek.session_with_builder(&cap),
382 peek_duration: peek_duration.session_with_builder(&cap),
383 arrangement_heap_allocations: arrangement_heap_allocations.session_with_builder(&cap),
384 arrangement_heap_capacity: arrangement_heap_capacity.session_with_builder(&cap),
385 arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
386 error_count: error_count.session_with_builder(&cap),
387 hydration_time: hydration_time.session_with_builder(&cap),
388 operator_hydration_status: operator_hydration_status.session_with_builder(&cap),
389 lir_mapping: lir_mapping.session_with_builder(&cap),
390 dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
391 };
392
393 let shared_state = &mut shared_state.borrow_mut();
394 for (time, event) in data.borrow().into_index_iter() {
395 DemuxHandler {
396 state: &mut demux_state,
397 shared_state,
398 output: &mut output_sessions,
399 logging_interval_ms,
400 time,
401 }
402 .handle(event);
403 }
404 });
405 }
406 });
407
408 use ComputeLog::*;
409 let logs = [
410 (ArrangementHeapAllocations, arrangement_heap_allocations),
411 (ArrangementHeapCapacity, arrangement_heap_capacity),
412 (ArrangementHeapSize, arrangement_heap_size),
413 (DataflowCurrent, export),
414 (DataflowGlobal, dataflow_global_ids),
415 (ErrorCount, error_count),
416 (FrontierCurrent, frontier),
417 (HydrationTime, hydration_time),
418 (ImportFrontierCurrent, import_frontier),
419 (LirMapping, lir_mapping),
420 (OperatorHydrationStatus, operator_hydration_status),
421 (PeekCurrent, peek),
422 (PeekDuration, peek_duration),
423 ];
424
425 let mut collections = BTreeMap::new();
427 for (variant, stream) in logs {
428 let variant = LogVariant::Compute(variant);
429 if config.index_logs.contains_key(&variant) {
430 let trace = stream
431 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
432 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
433 &format!("Arrange {variant:?}"),
434 )
435 .trace;
436 let collection = LogCollection {
437 trace,
438 token: Rc::clone(&token),
439 };
440 collections.insert(variant, collection);
441 }
442 }
443
444 Return { collections }
445 })
446}
447
448fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
454where
455 V: Display,
456{
457 scratch.clear();
458 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
459 Datum::String(scratch)
460}
461
462struct DemuxState<A> {
464 scheduler: A,
466 worker_id: usize,
468 scratch_string_a: String,
470 scratch_string_b: String,
472 exports: BTreeMap<GlobalId, ExportState>,
474 peek_stash: BTreeMap<Uuid, Duration>,
476 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
478 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
480 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
482 arrangement_heap_allocations_packer: PermutedRowPacker,
484 arrangement_heap_capacity_packer: PermutedRowPacker,
486 arrangement_heap_size_packer: PermutedRowPacker,
488 dataflow_global_packer: PermutedRowPacker,
490 error_count_packer: PermutedRowPacker,
492 export_packer: PermutedRowPacker,
494 frontier_packer: PermutedRowPacker,
496 import_frontier_packer: PermutedRowPacker,
498 lir_mapping_packer: PermutedRowPacker,
500 operator_hydration_status_packer: PermutedRowPacker,
502 peek_duration_packer: PermutedRowPacker,
504 peek_packer: PermutedRowPacker,
506 hydration_time_packer: PermutedRowPacker,
508}
509
510impl<A: Scheduler> DemuxState<A> {
511 fn new(scheduler: A, worker_id: usize) -> Self {
512 Self {
513 scheduler,
514 worker_id,
515 scratch_string_a: String::new(),
516 scratch_string_b: String::new(),
517 exports: Default::default(),
518 peek_stash: Default::default(),
519 arrangement_size: Default::default(),
520 lir_mapping: Default::default(),
521 dataflow_global_ids: Default::default(),
522 arrangement_heap_allocations_packer: PermutedRowPacker::new(
523 ComputeLog::ArrangementHeapAllocations,
524 ),
525 arrangement_heap_capacity_packer: PermutedRowPacker::new(
526 ComputeLog::ArrangementHeapCapacity,
527 ),
528 arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
529 dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
530 error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
531 export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
532 frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
533 hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
534 import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
535 lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
536 operator_hydration_status_packer: PermutedRowPacker::new(
537 ComputeLog::OperatorHydrationStatus,
538 ),
539 peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
540 peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
541 }
542 }
543
544 fn pack_arrangement_heap_allocations_update(
546 &mut self,
547 operator_id: usize,
548 ) -> (&RowRef, &RowRef) {
549 self.arrangement_heap_allocations_packer.pack_slice(&[
550 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
551 Datum::UInt64(u64::cast_from(self.worker_id)),
552 ])
553 }
554
555 fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
557 self.arrangement_heap_capacity_packer.pack_slice(&[
558 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
559 Datum::UInt64(u64::cast_from(self.worker_id)),
560 ])
561 }
562
563 fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
565 self.arrangement_heap_size_packer.pack_slice(&[
566 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
567 Datum::UInt64(u64::cast_from(self.worker_id)),
568 ])
569 }
570
571 fn pack_dataflow_global_update(
573 &mut self,
574 dataflow_index: usize,
575 global_id: GlobalId,
576 ) -> (&RowRef, &RowRef) {
577 self.dataflow_global_packer.pack_slice(&[
578 Datum::UInt64(u64::cast_from(dataflow_index)),
579 Datum::UInt64(u64::cast_from(self.worker_id)),
580 make_string_datum(global_id, &mut self.scratch_string_a),
581 ])
582 }
583
584 fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
586 self.error_count_packer.pack_slice(&[
590 make_string_datum(export_id, &mut self.scratch_string_a),
591 Datum::UInt64(u64::cast_from(self.worker_id)),
592 Datum::Int64(count.into_inner()),
593 ])
594 }
595
596 fn pack_export_update(
598 &mut self,
599 export_id: GlobalId,
600 dataflow_index: usize,
601 ) -> (&RowRef, &RowRef) {
602 self.export_packer.pack_slice(&[
603 make_string_datum(export_id, &mut self.scratch_string_a),
604 Datum::UInt64(u64::cast_from(self.worker_id)),
605 Datum::UInt64(u64::cast_from(dataflow_index)),
606 ])
607 }
608
609 fn pack_hydration_time_update(
611 &mut self,
612 export_id: GlobalId,
613 time_ns: Option<u64>,
614 ) -> (&RowRef, &RowRef) {
615 self.hydration_time_packer.pack_slice(&[
616 make_string_datum(export_id, &mut self.scratch_string_a),
617 Datum::UInt64(u64::cast_from(self.worker_id)),
618 Datum::from(time_ns),
619 ])
620 }
621
622 fn pack_import_frontier_update(
624 &mut self,
625 export_id: GlobalId,
626 import_id: GlobalId,
627 time: Timestamp,
628 ) -> (&RowRef, &RowRef) {
629 self.import_frontier_packer.pack_slice(&[
630 make_string_datum(export_id, &mut self.scratch_string_a),
631 make_string_datum(import_id, &mut self.scratch_string_b),
632 Datum::UInt64(u64::cast_from(self.worker_id)),
633 Datum::MzTimestamp(time),
634 ])
635 }
636
637 fn pack_lir_mapping_update(
639 &mut self,
640 global_id: GlobalId,
641 lir_id: LirId,
642 operator: String,
643 parent_lir_id: Option<LirId>,
644 nesting: u8,
645 operator_span: (usize, usize),
646 ) -> (&RowRef, &RowRef) {
647 self.lir_mapping_packer.pack_slice(&[
648 make_string_datum(global_id, &mut self.scratch_string_a),
649 Datum::UInt64(lir_id.into()),
650 Datum::UInt64(u64::cast_from(self.worker_id)),
651 make_string_datum(operator, &mut self.scratch_string_b),
652 parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
653 Datum::UInt16(u16::cast_from(nesting)),
654 Datum::UInt64(u64::cast_from(operator_span.0)),
655 Datum::UInt64(u64::cast_from(operator_span.1)),
656 ])
657 }
658
659 fn pack_operator_hydration_status_update(
662 &mut self,
663 export_id: GlobalId,
664 lir_id: LirId,
665 hydrated: bool,
666 ) -> (&RowRef, &RowRef) {
667 self.operator_hydration_status_packer.pack_slice(&[
668 make_string_datum(export_id, &mut self.scratch_string_a),
669 Datum::UInt64(lir_id.into()),
670 Datum::UInt64(u64::cast_from(self.worker_id)),
671 Datum::from(hydrated),
672 ])
673 }
674
675 fn pack_peek_duration_update(
677 &mut self,
678 peek_type: PeekType,
679 bucket: u128,
680 ) -> (&RowRef, &RowRef) {
681 self.peek_duration_packer.pack_slice(&[
682 Datum::UInt64(u64::cast_from(self.worker_id)),
683 Datum::String(peek_type.name()),
684 Datum::UInt64(bucket.try_into().expect("bucket too big")),
685 ])
686 }
687
688 fn pack_peek_update(
690 &mut self,
691 id: GlobalId,
692 time: Timestamp,
693 uuid: Uuid,
694 peek_type: PeekType,
695 ) -> (&RowRef, &RowRef) {
696 self.peek_packer.pack_slice(&[
697 Datum::Uuid(uuid),
698 Datum::UInt64(u64::cast_from(self.worker_id)),
699 make_string_datum(id, &mut self.scratch_string_a),
700 Datum::String(peek_type.name()),
701 Datum::MzTimestamp(time),
702 ])
703 }
704
705 fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
707 self.frontier_packer.pack_slice(&[
708 make_string_datum(export_id, &mut self.scratch_string_a),
709 Datum::UInt64(u64::cast_from(self.worker_id)),
710 Datum::MzTimestamp(time),
711 ])
712 }
713}
714
715struct ExportState {
717 dataflow_index: usize,
719 error_count: Diff,
724 created_at: Instant,
726 hydration_time_ns: Option<u64>,
728 operator_hydration: BTreeMap<LirId, bool>,
730}
731
732impl ExportState {
733 fn new(dataflow_index: usize) -> Self {
734 Self {
735 dataflow_index,
736 error_count: Diff::ZERO,
737 created_at: Instant::now(),
738 hydration_time_ns: None,
739 operator_hydration: BTreeMap::new(),
740 }
741 }
742}
743
744#[derive(Default, Debug)]
746struct ArrangementSizeState {
747 size: isize,
748 capacity: isize,
749 count: isize,
750}
751
752struct DemuxOutput<'a, 'b> {
754 export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
755 frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
756 import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
757 peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
758 peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
759 arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
760 arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
761 arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
762 hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
763 operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
764 error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
765 lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
766 dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
767}
768
769struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
771 state: &'a mut DemuxState<A>,
773 shared_state: &'a mut SharedLoggingState,
775 output: &'a mut DemuxOutput<'b, 'c>,
777 logging_interval_ms: u128,
779 time: Duration,
781}
782
783impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
784 fn ts(&self) -> Timestamp {
787 let time_ms = self.time.as_millis();
788 let interval = self.logging_interval_ms;
789 let rounded = (time_ms / interval + 1) * interval;
790 rounded.try_into().expect("must fit")
791 }
792
793 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
795 use ComputeEventReference::*;
796 match event {
797 Export(export) => self.handle_export(export),
798 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
799 Peek(peek) if peek.installed => self.handle_peek_install(peek),
800 Peek(peek) => self.handle_peek_retire(peek),
801 Frontier(frontier) => self.handle_frontier(frontier),
802 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
803 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
804 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
805 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
806 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
807 ArrangementHeapSizeOperatorDrop(inner) => {
808 self.handle_arrangement_heap_size_operator_dropped(inner)
809 }
810 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
811 ErrorCount(error_count) => self.handle_error_count(error_count),
812 Hydration(hydration) => self.handle_hydration(hydration),
813 OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
814 LirMapping(mapping) => self.handle_lir_mapping(mapping),
815 DataflowGlobal(global) => self.handle_dataflow_global(global),
816 }
817 }
818
819 fn handle_export(
820 &mut self,
821 ExportReference {
822 export_id,
823 dataflow_index,
824 }: Ref<'_, Export>,
825 ) {
826 let export_id = Columnar::into_owned(export_id);
827 let ts = self.ts();
828 let datum = self.state.pack_export_update(export_id, dataflow_index);
829 self.output.export.give((datum, ts, Diff::ONE));
830
831 let existing = self
832 .state
833 .exports
834 .insert(export_id, ExportState::new(dataflow_index));
835 if existing.is_some() {
836 error!(%export_id, "export already registered");
837 }
838
839 let datum = self.state.pack_hydration_time_update(export_id, None);
841 self.output.hydration_time.give((datum, ts, Diff::ONE));
842 }
843
844 fn handle_export_dropped(
845 &mut self,
846 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
847 ) {
848 let export_id = Columnar::into_owned(export_id);
849 let Some(export) = self.state.exports.remove(&export_id) else {
850 error!(%export_id, "missing exports entry at time of export drop");
851 return;
852 };
853
854 let ts = self.ts();
855 let dataflow_index = export.dataflow_index;
856
857 let datum = self.state.pack_export_update(export_id, dataflow_index);
858 self.output.export.give((datum, ts, Diff::MINUS_ONE));
859
860 if export.error_count != Diff::ZERO {
862 let datum = self
863 .state
864 .pack_error_count_update(export_id, export.error_count);
865 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
866 }
867
868 let datum = self
870 .state
871 .pack_hydration_time_update(export_id, export.hydration_time_ns);
872 self.output
873 .hydration_time
874 .give((datum, ts, Diff::MINUS_ONE));
875
876 for (lir_id, hydrated) in export.operator_hydration {
878 let datum = self
879 .state
880 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
881 self.output
882 .operator_hydration_status
883 .give((datum, ts, Diff::MINUS_ONE));
884 }
885 }
886
887 fn handle_dataflow_shutdown(
888 &mut self,
889 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
890 ) {
891 let ts = self.ts();
892
893 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
895 for global_id in global_ids {
896 let datum = self
898 .state
899 .pack_dataflow_global_update(dataflow_index, global_id);
900 self.output
901 .dataflow_global_ids
902 .give((datum, ts, Diff::MINUS_ONE));
903
904 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
906 for (
907 lir_id,
908 LirMetadata {
909 operator,
910 parent_lir_id,
911 nesting,
912 operator_span,
913 },
914 ) in mappings
915 {
916 let datum = self.state.pack_lir_mapping_update(
917 global_id,
918 lir_id,
919 operator,
920 parent_lir_id,
921 nesting,
922 operator_span,
923 );
924 self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
925 }
926 }
927 }
928 }
929 }
930
931 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
932 let ts = self.ts();
933 let export_id = Columnar::into_owned(export_id);
934
935 let Some(export) = self.state.exports.get_mut(&export_id) else {
936 return;
939 };
940
941 let old_count = export.error_count;
942 let new_count = old_count + diff;
943 export.error_count = new_count;
944
945 if old_count != Diff::ZERO {
946 let datum = self.state.pack_error_count_update(export_id, old_count);
947 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
948 }
949 if new_count != Diff::ZERO {
950 let datum = self.state.pack_error_count_update(export_id, new_count);
951 self.output.error_count.give((datum, ts, Diff::ONE));
952 }
953 }
954
955 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
956 let ts = self.ts();
957 let export_id = Columnar::into_owned(export_id);
958
959 let Some(export) = self.state.exports.get_mut(&export_id) else {
960 error!(%export_id, "hydration event for unknown export");
961 return;
962 };
963 if export.hydration_time_ns.is_some() {
964 return;
967 }
968
969 let duration = export.created_at.elapsed();
970 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
971 export.hydration_time_ns = Some(nanos);
972
973 let retraction = self.state.pack_hydration_time_update(export_id, None);
974 self.output
975 .hydration_time
976 .give((retraction, ts, Diff::MINUS_ONE));
977 let insertion = self
978 .state
979 .pack_hydration_time_update(export_id, Some(nanos));
980 self.output.hydration_time.give((insertion, ts, Diff::ONE));
981 }
982
983 fn handle_operator_hydration(
984 &mut self,
985 OperatorHydrationReference {
986 export_id,
987 lir_id,
988 hydrated,
989 }: Ref<'_, OperatorHydration>,
990 ) {
991 let ts = self.ts();
992 let export_id = Columnar::into_owned(export_id);
993 let lir_id = Columnar::into_owned(lir_id);
994 let hydrated = Columnar::into_owned(hydrated);
995
996 let Some(export) = self.state.exports.get_mut(&export_id) else {
997 return;
1000 };
1001
1002 let old_status = export.operator_hydration.get(&lir_id).copied();
1003 export.operator_hydration.insert(lir_id, hydrated);
1004
1005 if let Some(hydrated) = old_status {
1006 let retraction = self
1007 .state
1008 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1009 self.output
1010 .operator_hydration_status
1011 .give((retraction, ts, Diff::MINUS_ONE));
1012 }
1013
1014 let insertion = self
1015 .state
1016 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1017 self.output
1018 .operator_hydration_status
1019 .give((insertion, ts, Diff::ONE));
1020 }
1021
1022 fn handle_peek_install(
1023 &mut self,
1024 PeekEventReference {
1025 id,
1026 time,
1027 uuid,
1028 peek_type,
1029 installed: _,
1030 }: Ref<'_, PeekEvent>,
1031 ) {
1032 let id = Columnar::into_owned(id);
1033 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1034 let ts = self.ts();
1035 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1036 self.output.peek.give((datum, ts, Diff::ONE));
1037
1038 let existing = self.state.peek_stash.insert(uuid, self.time);
1039 if existing.is_some() {
1040 error!(%uuid, "peek already registered");
1041 }
1042 }
1043
1044 fn handle_peek_retire(
1045 &mut self,
1046 PeekEventReference {
1047 id,
1048 time,
1049 uuid,
1050 peek_type,
1051 installed: _,
1052 }: Ref<'_, PeekEvent>,
1053 ) {
1054 let id = Columnar::into_owned(id);
1055 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1056 let ts = self.ts();
1057 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1058 self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1059
1060 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1061 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1062 let bucket = elapsed_ns.next_power_of_two();
1063 let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1064 self.output.peek_duration.give((datum, ts, Diff::ONE));
1065 } else {
1066 error!(%uuid, "peek not yet registered");
1067 }
1068 }
1069
1070 fn handle_frontier(
1071 &mut self,
1072 FrontierReference {
1073 export_id,
1074 time,
1075 diff,
1076 }: Ref<'_, Frontier>,
1077 ) {
1078 let export_id = Columnar::into_owned(export_id);
1079 let diff = Diff::from(*diff);
1080 let ts = self.ts();
1081 let time = Columnar::into_owned(time);
1082 let datum = self.state.pack_frontier_update(export_id, time);
1083 self.output.frontier.give((datum, ts, diff));
1084 }
1085
1086 fn handle_import_frontier(
1087 &mut self,
1088 ImportFrontierReference {
1089 import_id,
1090 export_id,
1091 time,
1092 diff,
1093 }: Ref<'_, ImportFrontier>,
1094 ) {
1095 let import_id = Columnar::into_owned(import_id);
1096 let export_id = Columnar::into_owned(export_id);
1097 let diff = Diff::from(*diff);
1098 let ts = self.ts();
1099 let time = Columnar::into_owned(time);
1100 let datum = self
1101 .state
1102 .pack_import_frontier_update(export_id, import_id, time);
1103 self.output.import_frontier.give((datum, ts, diff));
1104 }
1105
1106 fn handle_arrangement_heap_size(
1108 &mut self,
1109 ArrangementHeapSizeReference {
1110 operator_id,
1111 delta_size,
1112 }: Ref<'_, ArrangementHeapSize>,
1113 ) {
1114 let ts = self.ts();
1115 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1116 return;
1117 };
1118
1119 state.size += delta_size;
1120
1121 let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1122 let diff = Diff::cast_from(delta_size);
1123 self.output.arrangement_heap_size.give((datum, ts, diff));
1124 }
1125
1126 fn handle_arrangement_heap_capacity(
1128 &mut self,
1129 ArrangementHeapCapacityReference {
1130 operator_id,
1131 delta_capacity,
1132 }: Ref<'_, ArrangementHeapCapacity>,
1133 ) {
1134 let ts = self.ts();
1135 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1136 return;
1137 };
1138
1139 state.capacity += delta_capacity;
1140
1141 let datum = self
1142 .state
1143 .pack_arrangement_heap_capacity_update(operator_id);
1144 let diff = Diff::cast_from(delta_capacity);
1145 self.output
1146 .arrangement_heap_capacity
1147 .give((datum, ts, diff));
1148 }
1149
1150 fn handle_arrangement_heap_allocations(
1152 &mut self,
1153 ArrangementHeapAllocationsReference {
1154 operator_id,
1155 delta_allocations,
1156 }: Ref<'_, ArrangementHeapAllocations>,
1157 ) {
1158 let ts = self.ts();
1159 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1160 return;
1161 };
1162
1163 state.count += delta_allocations;
1164
1165 let datum = self
1166 .state
1167 .pack_arrangement_heap_allocations_update(operator_id);
1168 let diff = Diff::cast_from(delta_allocations);
1169 self.output
1170 .arrangement_heap_allocations
1171 .give((datum, ts, diff));
1172 }
1173
1174 fn handle_arrangement_heap_size_operator(
1176 &mut self,
1177 ArrangementHeapSizeOperatorReference {
1178 operator_id,
1179 address,
1180 }: Ref<'_, ArrangementHeapSizeOperator>,
1181 ) {
1182 let activator = self
1183 .state
1184 .scheduler
1185 .activator_for(address.into_iter().collect());
1186 let existing = self
1187 .state
1188 .arrangement_size
1189 .insert(operator_id, Default::default());
1190 if existing.is_some() {
1191 error!(%operator_id, "arrangement size operator already registered");
1192 }
1193 let existing = self
1194 .shared_state
1195 .arrangement_size_activators
1196 .insert(operator_id, activator);
1197 if existing.is_some() {
1198 error!(%operator_id, "arrangement size activator already registered");
1199 }
1200 }
1201
1202 fn handle_arrangement_heap_size_operator_dropped(
1204 &mut self,
1205 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1206 ) {
1207 let operator_id = event.operator_id;
1208 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1209 let ts = self.ts();
1210 let allocations = self
1211 .state
1212 .pack_arrangement_heap_allocations_update(operator_id);
1213 let diff = -Diff::cast_from(state.count);
1214 self.output
1215 .arrangement_heap_allocations
1216 .give((allocations, ts, diff));
1217
1218 let capacity = self
1219 .state
1220 .pack_arrangement_heap_capacity_update(operator_id);
1221 let diff = -Diff::cast_from(state.capacity);
1222 self.output
1223 .arrangement_heap_capacity
1224 .give((capacity, ts, diff));
1225
1226 let size = self.state.pack_arrangement_heap_size_update(operator_id);
1227 let diff = -Diff::cast_from(state.size);
1228 self.output.arrangement_heap_size.give((size, ts, diff));
1229 }
1230 self.shared_state
1231 .arrangement_size_activators
1232 .remove(&operator_id);
1233 }
1234
1235 fn handle_lir_mapping(
1237 &mut self,
1238 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1239 ) {
1240 let global_id = Columnar::into_owned(global_id);
1241 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1243 self.state
1244 .lir_mapping
1245 .entry(global_id)
1246 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1247 .or_insert_with(|| mappings().collect());
1248
1249 let ts = self.ts();
1251 for (lir_id, meta) in mapping.into_iter() {
1252 let datum = self.state.pack_lir_mapping_update(
1253 global_id,
1254 Columnar::into_owned(lir_id),
1255 Columnar::into_owned(meta.operator),
1256 Columnar::into_owned(meta.parent_lir_id),
1257 Columnar::into_owned(meta.nesting),
1258 Columnar::into_owned(meta.operator_span),
1259 );
1260 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1261 }
1262 }
1263
1264 fn handle_dataflow_global(
1265 &mut self,
1266 DataflowGlobalReference {
1267 dataflow_index,
1268 global_id,
1269 }: Ref<'_, DataflowGlobal>,
1270 ) {
1271 let global_id = Columnar::into_owned(global_id);
1272 self.state
1273 .dataflow_global_ids
1274 .entry(dataflow_index)
1275 .and_modify(|globals| {
1276 if !globals.insert(global_id) {
1278 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1279 }
1280 })
1281 .or_insert_with(|| BTreeSet::from([global_id]));
1282
1283 let ts = self.ts();
1284 let datum = self
1285 .state
1286 .pack_dataflow_global_update(dataflow_index, global_id);
1287 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1288 }
1289}
1290
1291pub struct CollectionLogging {
1296 export_id: GlobalId,
1297 logger: Logger,
1298
1299 logged_frontier: Option<Timestamp>,
1300 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1301}
1302
1303impl CollectionLogging {
1304 pub fn new(
1306 export_id: GlobalId,
1307 logger: Logger,
1308 dataflow_index: usize,
1309 import_ids: impl Iterator<Item = GlobalId>,
1310 ) -> Self {
1311 logger.log(&ComputeEvent::Export(Export {
1312 export_id,
1313 dataflow_index,
1314 }));
1315
1316 let mut self_ = Self {
1317 export_id,
1318 logger,
1319 logged_frontier: None,
1320 logged_import_frontiers: Default::default(),
1321 };
1322
1323 let initial_frontier = Some(Timestamp::MIN);
1325 self_.set_frontier(initial_frontier);
1326 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1327
1328 self_
1329 }
1330
1331 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1333 let old_time = self.logged_frontier;
1334 self.logged_frontier = new_time;
1335
1336 if old_time != new_time {
1337 let export_id = self.export_id;
1338 let retraction = old_time.map(|time| {
1339 ComputeEvent::Frontier(Frontier {
1340 export_id,
1341 time,
1342 diff: -1,
1343 })
1344 });
1345 let insertion = new_time.map(|time| {
1346 ComputeEvent::Frontier(Frontier {
1347 export_id,
1348 time,
1349 diff: 1,
1350 })
1351 });
1352 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1353 self.logger.log_many(events);
1354 }
1355 }
1356
1357 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1360 let old_time = self.logged_import_frontiers.remove(&import_id);
1361 if let Some(time) = new_time {
1362 self.logged_import_frontiers.insert(import_id, time);
1363 }
1364
1365 if old_time != new_time {
1366 let export_id = self.export_id;
1367 let retraction = old_time.map(|time| {
1368 ComputeEvent::ImportFrontier(ImportFrontier {
1369 import_id,
1370 export_id,
1371 time,
1372 diff: -1,
1373 })
1374 });
1375 let insertion = new_time.map(|time| {
1376 ComputeEvent::ImportFrontier(ImportFrontier {
1377 import_id,
1378 export_id,
1379 time,
1380 diff: 1,
1381 })
1382 });
1383 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1384 self.logger.log_many(events);
1385 }
1386 }
1387
1388 pub fn set_hydrated(&self) {
1390 self.logger.log(&ComputeEvent::Hydration(Hydration {
1391 export_id: self.export_id,
1392 }));
1393 }
1394
1395 pub fn export_id(&self) -> GlobalId {
1397 self.export_id
1398 }
1399}
1400
1401impl Drop for CollectionLogging {
1402 fn drop(&mut self) {
1403 self.set_frontier(None);
1405
1406 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1407 for import_id in import_ids {
1408 self.set_import_frontier(import_id, None);
1409 }
1410
1411 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1412 export_id: self.export_id,
1413 }));
1414 }
1415}
1416
1417pub(crate) trait LogDataflowErrors {
1420 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1421}
1422
1423impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1424where
1425 G: Scope,
1426 D: Data,
1427{
1428 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1429 self.inner
1430 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1431 move |input, output| {
1432 input.for_each(|cap, data| {
1433 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1434 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1435
1436 output.session(&cap).give_container(data);
1437 });
1438 }
1439 })
1440 .as_collection()
1441 }
1442}
1443
1444impl<G, B> LogDataflowErrors for Stream<G, B>
1445where
1446 G: Scope,
1447 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1448{
1449 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1450 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1451 move |input, output| {
1452 input.for_each(|cap, data| {
1453 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1454 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1455
1456 output.session(&cap).give_container(data);
1457 });
1458 }
1459 })
1460 }
1461}
1462
1463fn sum_batch_diffs<B>(batch: &B) -> Diff
1470where
1471 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1472{
1473 let mut sum = Diff::ZERO;
1474 let mut cursor = batch.cursor();
1475
1476 while cursor.key_valid(batch) {
1477 while cursor.val_valid(batch) {
1478 cursor.map_times(batch, |_t, r| sum += r);
1479 cursor.step_val(batch);
1480 }
1481 cursor.step_key(batch);
1482 }
1483
1484 sum
1485}
1486
1487#[cfg(test)]
1488mod tests {
1489 use super::*;
1490
1491 #[mz_ore::test]
1492 fn test_compute_event_size() {
1493 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1495 }
1496}