1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::VecCollection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::replay::MzReplay;
28use timely::Data;
29use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
30use timely::dataflow::operators::Operator;
31use timely::dataflow::operators::generic::OutputBuilder;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::operators::generic::operator::empty;
34use timely::dataflow::{Scope, Stream};
35use timely::scheduling::Scheduler;
36use tracing::error;
37use uuid::Uuid;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::{
41 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
42 SharedLoggingState, Update,
43};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::RowRowSpine;
46
47pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
49pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
50
51#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
53pub struct Export {
54 pub export_id: GlobalId,
56 pub dataflow_index: usize,
58}
59
60#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
62pub struct ExportDropped {
63 pub export_id: GlobalId,
65}
66
67#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
69pub struct PeekEvent {
70 pub id: GlobalId,
72 pub time: Timestamp,
74 pub uuid: uuid::Bytes,
76 pub peek_type: PeekType,
79 pub installed: bool,
81}
82
83#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
85pub struct Frontier {
86 pub export_id: GlobalId,
87 pub time: Timestamp,
88 pub diff: i8,
89}
90
91#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
93pub struct ImportFrontier {
94 pub import_id: GlobalId,
95 pub export_id: GlobalId,
96 pub time: Timestamp,
97 pub diff: i8,
98}
99
100#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
102pub struct ArrangementHeapSize {
103 pub operator_id: usize,
105 pub delta_size: isize,
107}
108
109#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
111pub struct ArrangementHeapCapacity {
112 pub operator_id: usize,
114 pub delta_capacity: isize,
116}
117
118#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
120pub struct ArrangementHeapAllocations {
121 pub operator_id: usize,
123 pub delta_allocations: isize,
125}
126
127#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
129pub struct ArrangementHeapSizeOperator {
130 pub operator_id: usize,
132 pub address: Vec<usize>,
134}
135
136#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
138pub struct ArrangementHeapSizeOperatorDrop {
139 pub operator_id: usize,
141}
142
143#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
145pub struct DataflowShutdown {
146 pub dataflow_index: usize,
148}
149
150#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
152pub struct ErrorCount {
153 pub export_id: GlobalId,
155 pub diff: Diff,
157}
158
159#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
161pub struct Hydration {
162 pub export_id: GlobalId,
164}
165
166#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
168pub struct OperatorHydration {
169 pub export_id: GlobalId,
171 pub lir_id: LirId,
173 pub hydrated: bool,
175}
176
177#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
179pub struct LirMapping {
180 pub global_id: GlobalId,
187 pub mapping: Vec<(LirId, LirMetadata)>,
190}
191
192#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
194pub struct DataflowGlobal {
195 pub dataflow_index: usize,
197 pub global_id: GlobalId,
199}
200
201#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
203pub enum ComputeEvent {
204 Export(Export),
206 ExportDropped(ExportDropped),
208 Peek(PeekEvent),
210 Frontier(Frontier),
212 ImportFrontier(ImportFrontier),
214 ArrangementHeapSize(ArrangementHeapSize),
216 ArrangementHeapCapacity(ArrangementHeapCapacity),
218 ArrangementHeapAllocations(ArrangementHeapAllocations),
220 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
222 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
224 DataflowShutdown(DataflowShutdown),
226 ErrorCount(ErrorCount),
228 Hydration(Hydration),
230 OperatorHydration(OperatorHydration),
232 LirMapping(LirMapping),
236 DataflowGlobal(DataflowGlobal),
237}
238
239#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
241pub enum PeekType {
242 Index,
244 Persist,
246}
247
248impl PeekType {
249 fn name(self) -> &'static str {
251 match self {
252 PeekType::Index => "index",
253 PeekType::Persist => "persist",
254 }
255 }
256}
257
258#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
260pub struct LirMetadata {
261 operator: String,
263 parent_lir_id: Option<LirId>,
265 nesting: u8,
267 operator_span: (usize, usize),
270}
271
272impl LirMetadata {
273 pub fn new(
275 operator: String,
276 parent_lir_id: Option<LirId>,
277 nesting: u8,
278 operator_span: (usize, usize),
279 ) -> Self {
280 Self {
281 operator,
282 parent_lir_id,
283 nesting,
284 operator_span,
285 }
286 }
287}
288
289pub(super) struct Return {
291 pub collections: BTreeMap<LogVariant, LogCollection>,
293}
294
295pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
305 mut scope: G,
306 scheduler: S,
307 config: &mz_compute_client::logging::LoggingConfig,
308 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
309 shared_state: Rc<RefCell<SharedLoggingState>>,
310) -> Return {
311 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
312
313 scope.scoped("compute logging", move |scope| {
314 let enable_logging = config.enable_logging;
315 let (logs, token) = if enable_logging {
316 event_queue.links.mz_replay(
317 scope,
318 "compute logs",
319 config.interval,
320 event_queue.activator,
321 )
322 } else {
323 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
324 (empty(scope), token)
325 };
326
327 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
330 let mut input = demux.new_input(&logs, Pipeline);
331 let (export_out, export) = demux.new_output();
332 let mut export_out = OutputBuilder::from(export_out);
333 let (frontier_out, frontier) = demux.new_output();
334 let mut frontier_out = OutputBuilder::from(frontier_out);
335 let (import_frontier_out, import_frontier) = demux.new_output();
336 let mut import_frontier_out = OutputBuilder::from(import_frontier_out);
337 let (peek_out, peek) = demux.new_output();
338 let mut peek_out = OutputBuilder::from(peek_out);
339 let (peek_duration_out, peek_duration) = demux.new_output();
340 let mut peek_duration_out = OutputBuilder::from(peek_duration_out);
341 let (shutdown_duration_out, shutdown_duration) = demux.new_output();
342 let mut shutdown_duration_out = OutputBuilder::from(shutdown_duration_out);
343 let (arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
344 let mut arrangement_heap_size_out = OutputBuilder::from(arrangement_heap_size_out);
345 let (arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
346 let mut arrangement_heap_capacity_out = OutputBuilder::from(arrangement_heap_capacity_out);
347 let (arrangement_heap_allocations_out, arrangement_heap_allocations) =
348 demux.new_output();
349 let mut arrangement_heap_allocations_out = OutputBuilder::from(arrangement_heap_allocations_out);
350 let (error_count_out, error_count) = demux.new_output();
351 let mut error_count_out = OutputBuilder::from(error_count_out);
352 let (hydration_time_out, hydration_time) = demux.new_output();
353 let mut hydration_time_out = OutputBuilder::from(hydration_time_out);
354 let (operator_hydration_status_out, operator_hydration_status) = demux.new_output();
355 let mut operator_hydration_status_out = OutputBuilder::from(operator_hydration_status_out);
356 let (lir_mapping_out, lir_mapping) = demux.new_output();
357 let mut lir_mapping_out = OutputBuilder::from(lir_mapping_out);
358 let (dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
359 let mut dataflow_global_ids_out = OutputBuilder::from(dataflow_global_ids_out);
360
361 let mut demux_state = DemuxState::new(scheduler, scope.index());
362 demux.build(move |_capability| {
363 move |_frontiers| {
364 let mut export = export_out.activate();
365 let mut frontier = frontier_out.activate();
366 let mut import_frontier = import_frontier_out.activate();
367 let mut peek = peek_out.activate();
368 let mut peek_duration = peek_duration_out.activate();
369 let mut shutdown_duration = shutdown_duration_out.activate();
370 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
371 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
372 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
373 let mut error_count = error_count_out.activate();
374 let mut hydration_time = hydration_time_out.activate();
375 let mut operator_hydration_status = operator_hydration_status_out.activate();
376 let mut lir_mapping = lir_mapping_out.activate();
377 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
378
379 input.for_each(|cap, data| {
380 let mut output_sessions = DemuxOutput {
381 export: export.session_with_builder(&cap),
382 frontier: frontier.session_with_builder(&cap),
383 import_frontier: import_frontier.session_with_builder(&cap),
384 peek: peek.session_with_builder(&cap),
385 peek_duration: peek_duration.session_with_builder(&cap),
386 shutdown_duration: shutdown_duration.session_with_builder(&cap),
387 arrangement_heap_allocations: arrangement_heap_allocations.session_with_builder(&cap),
388 arrangement_heap_capacity: arrangement_heap_capacity.session_with_builder(&cap),
389 arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
390 error_count: error_count.session_with_builder(&cap),
391 hydration_time: hydration_time.session_with_builder(&cap),
392 operator_hydration_status: operator_hydration_status.session_with_builder(&cap),
393 lir_mapping: lir_mapping.session_with_builder(&cap),
394 dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
395 };
396
397 let shared_state = &mut shared_state.borrow_mut();
398 for (time, event) in data.borrow().into_index_iter() {
399 DemuxHandler {
400 state: &mut demux_state,
401 shared_state,
402 output: &mut output_sessions,
403 logging_interval_ms,
404 time,
405 }
406 .handle(event);
407 }
408 });
409 }
410 });
411
412 use ComputeLog::*;
413 let logs = [
414 (ArrangementHeapAllocations, arrangement_heap_allocations),
415 (ArrangementHeapCapacity, arrangement_heap_capacity),
416 (ArrangementHeapSize, arrangement_heap_size),
417 (DataflowCurrent, export),
418 (DataflowGlobal, dataflow_global_ids),
419 (ErrorCount, error_count),
420 (FrontierCurrent, frontier),
421 (HydrationTime, hydration_time),
422 (ImportFrontierCurrent, import_frontier),
423 (LirMapping, lir_mapping),
424 (OperatorHydrationStatus, operator_hydration_status),
425 (PeekCurrent, peek),
426 (PeekDuration, peek_duration),
427 (ShutdownDuration, shutdown_duration),
428 ];
429
430 let mut collections = BTreeMap::new();
432 for (variant, stream) in logs {
433 let variant = LogVariant::Compute(variant);
434 if config.index_logs.contains_key(&variant) {
435 let trace = stream
436 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
437 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
438 &format!("Arrange {variant:?}"),
439 )
440 .trace;
441 let collection = LogCollection {
442 trace,
443 token: Rc::clone(&token),
444 };
445 collections.insert(variant, collection);
446 }
447 }
448
449 Return { collections }
450 })
451}
452
453fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
459where
460 V: Display,
461{
462 scratch.clear();
463 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
464 Datum::String(scratch)
465}
466
467struct DemuxState<A> {
469 scheduler: A,
471 worker_id: usize,
473 scratch_string_a: String,
475 scratch_string_b: String,
477 exports: BTreeMap<GlobalId, ExportState>,
479 dataflow_export_counts: BTreeMap<usize, u32>,
481 dataflow_drop_times: BTreeMap<usize, Duration>,
483 shutdown_dataflows: BTreeSet<usize>,
485 peek_stash: BTreeMap<Uuid, Duration>,
487 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
489 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
491 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
493 arrangement_heap_allocations_packer: PermutedRowPacker,
495 arrangement_heap_capacity_packer: PermutedRowPacker,
497 arrangement_heap_size_packer: PermutedRowPacker,
499 dataflow_global_packer: PermutedRowPacker,
501 error_count_packer: PermutedRowPacker,
503 export_packer: PermutedRowPacker,
505 frontier_packer: PermutedRowPacker,
507 import_frontier_packer: PermutedRowPacker,
509 lir_mapping_packer: PermutedRowPacker,
511 operator_hydration_status_packer: PermutedRowPacker,
513 peek_duration_packer: PermutedRowPacker,
515 peek_packer: PermutedRowPacker,
517 shutdown_duration_packer: PermutedRowPacker,
519 hydration_time_packer: PermutedRowPacker,
521}
522
523impl<A: Scheduler> DemuxState<A> {
524 fn new(scheduler: A, worker_id: usize) -> Self {
525 Self {
526 scheduler,
527 worker_id,
528 scratch_string_a: String::new(),
529 scratch_string_b: String::new(),
530 exports: Default::default(),
531 dataflow_export_counts: Default::default(),
532 dataflow_drop_times: Default::default(),
533 shutdown_dataflows: Default::default(),
534 peek_stash: Default::default(),
535 arrangement_size: Default::default(),
536 lir_mapping: Default::default(),
537 dataflow_global_ids: Default::default(),
538 arrangement_heap_allocations_packer: PermutedRowPacker::new(
539 ComputeLog::ArrangementHeapAllocations,
540 ),
541 arrangement_heap_capacity_packer: PermutedRowPacker::new(
542 ComputeLog::ArrangementHeapCapacity,
543 ),
544 arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
545 dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
546 error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
547 export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
548 frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
549 hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
550 import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
551 lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
552 operator_hydration_status_packer: PermutedRowPacker::new(
553 ComputeLog::OperatorHydrationStatus,
554 ),
555 peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
556 peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
557 shutdown_duration_packer: PermutedRowPacker::new(ComputeLog::ShutdownDuration),
558 }
559 }
560
561 fn pack_arrangement_heap_allocations_update(
563 &mut self,
564 operator_id: usize,
565 ) -> (&RowRef, &RowRef) {
566 self.arrangement_heap_allocations_packer.pack_slice(&[
567 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
568 Datum::UInt64(u64::cast_from(self.worker_id)),
569 ])
570 }
571
572 fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
574 self.arrangement_heap_capacity_packer.pack_slice(&[
575 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
576 Datum::UInt64(u64::cast_from(self.worker_id)),
577 ])
578 }
579
580 fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
582 self.arrangement_heap_size_packer.pack_slice(&[
583 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
584 Datum::UInt64(u64::cast_from(self.worker_id)),
585 ])
586 }
587
588 fn pack_dataflow_global_update(
590 &mut self,
591 dataflow_index: usize,
592 global_id: GlobalId,
593 ) -> (&RowRef, &RowRef) {
594 self.dataflow_global_packer.pack_slice(&[
595 Datum::UInt64(u64::cast_from(dataflow_index)),
596 Datum::UInt64(u64::cast_from(self.worker_id)),
597 make_string_datum(global_id, &mut self.scratch_string_a),
598 ])
599 }
600
601 fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
603 self.error_count_packer.pack_slice(&[
607 make_string_datum(export_id, &mut self.scratch_string_a),
608 Datum::UInt64(u64::cast_from(self.worker_id)),
609 Datum::Int64(count.into_inner()),
610 ])
611 }
612
613 fn pack_export_update(
615 &mut self,
616 export_id: GlobalId,
617 dataflow_index: usize,
618 ) -> (&RowRef, &RowRef) {
619 self.export_packer.pack_slice(&[
620 make_string_datum(export_id, &mut self.scratch_string_a),
621 Datum::UInt64(u64::cast_from(self.worker_id)),
622 Datum::UInt64(u64::cast_from(dataflow_index)),
623 ])
624 }
625
626 fn pack_hydration_time_update(
628 &mut self,
629 export_id: GlobalId,
630 time_ns: Option<u64>,
631 ) -> (&RowRef, &RowRef) {
632 self.hydration_time_packer.pack_slice(&[
633 make_string_datum(export_id, &mut self.scratch_string_a),
634 Datum::UInt64(u64::cast_from(self.worker_id)),
635 Datum::from(time_ns),
636 ])
637 }
638
639 fn pack_import_frontier_update(
641 &mut self,
642 export_id: GlobalId,
643 import_id: GlobalId,
644 time: Timestamp,
645 ) -> (&RowRef, &RowRef) {
646 self.import_frontier_packer.pack_slice(&[
647 make_string_datum(export_id, &mut self.scratch_string_a),
648 make_string_datum(import_id, &mut self.scratch_string_b),
649 Datum::UInt64(u64::cast_from(self.worker_id)),
650 Datum::MzTimestamp(time),
651 ])
652 }
653
654 fn pack_lir_mapping_update(
656 &mut self,
657 global_id: GlobalId,
658 lir_id: LirId,
659 operator: String,
660 parent_lir_id: Option<LirId>,
661 nesting: u8,
662 operator_span: (usize, usize),
663 ) -> (&RowRef, &RowRef) {
664 self.lir_mapping_packer.pack_slice(&[
665 make_string_datum(global_id, &mut self.scratch_string_a),
666 Datum::UInt64(lir_id.into()),
667 Datum::UInt64(u64::cast_from(self.worker_id)),
668 make_string_datum(operator, &mut self.scratch_string_b),
669 parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
670 Datum::UInt16(u16::cast_from(nesting)),
671 Datum::UInt64(u64::cast_from(operator_span.0)),
672 Datum::UInt64(u64::cast_from(operator_span.1)),
673 ])
674 }
675
676 fn pack_operator_hydration_status_update(
679 &mut self,
680 export_id: GlobalId,
681 lir_id: LirId,
682 hydrated: bool,
683 ) -> (&RowRef, &RowRef) {
684 self.operator_hydration_status_packer.pack_slice(&[
685 make_string_datum(export_id, &mut self.scratch_string_a),
686 Datum::UInt64(lir_id.into()),
687 Datum::UInt64(u64::cast_from(self.worker_id)),
688 Datum::from(hydrated),
689 ])
690 }
691
692 fn pack_peek_duration_update(
694 &mut self,
695 peek_type: PeekType,
696 bucket: u128,
697 ) -> (&RowRef, &RowRef) {
698 self.peek_duration_packer.pack_slice(&[
699 Datum::UInt64(u64::cast_from(self.worker_id)),
700 Datum::String(peek_type.name()),
701 Datum::UInt64(bucket.try_into().expect("bucket too big")),
702 ])
703 }
704
705 fn pack_peek_update(
707 &mut self,
708 id: GlobalId,
709 time: Timestamp,
710 uuid: Uuid,
711 peek_type: PeekType,
712 ) -> (&RowRef, &RowRef) {
713 self.peek_packer.pack_slice(&[
714 Datum::Uuid(uuid),
715 Datum::UInt64(u64::cast_from(self.worker_id)),
716 make_string_datum(id, &mut self.scratch_string_a),
717 Datum::String(peek_type.name()),
718 Datum::MzTimestamp(time),
719 ])
720 }
721
722 fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
724 self.frontier_packer.pack_slice(&[
725 make_string_datum(export_id, &mut self.scratch_string_a),
726 Datum::UInt64(u64::cast_from(self.worker_id)),
727 Datum::MzTimestamp(time),
728 ])
729 }
730
731 fn pack_shutdown_duration_update(&mut self, bucket: u128) -> (&RowRef, &RowRef) {
733 self.shutdown_duration_packer.pack_slice(&[
734 Datum::UInt64(u64::cast_from(self.worker_id)),
735 Datum::UInt64(bucket.try_into().expect("bucket too big")),
736 ])
737 }
738}
739
740struct ExportState {
742 dataflow_index: usize,
744 error_count: Diff,
749 created_at: Instant,
751 hydration_time_ns: Option<u64>,
753 operator_hydration: BTreeMap<LirId, bool>,
755}
756
757impl ExportState {
758 fn new(dataflow_index: usize) -> Self {
759 Self {
760 dataflow_index,
761 error_count: Diff::ZERO,
762 created_at: Instant::now(),
763 hydration_time_ns: None,
764 operator_hydration: BTreeMap::new(),
765 }
766 }
767}
768
769#[derive(Default, Debug)]
771struct ArrangementSizeState {
772 size: isize,
773 capacity: isize,
774 count: isize,
775}
776
777struct DemuxOutput<'a, 'b> {
779 export: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
780 frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
781 import_frontier: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
782 peek: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
783 peek_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
784 shutdown_duration: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
785 arrangement_heap_allocations: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
786 arrangement_heap_capacity: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
787 arrangement_heap_size: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
788 hydration_time: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
789 operator_hydration_status: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
790 error_count: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
791 lir_mapping: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
792 dataflow_global_ids: OutputSessionColumnar<'a, 'b, Update<(Row, Row)>>,
793}
794
795struct DemuxHandler<'a, 'b, 'c, A: Scheduler> {
797 state: &'a mut DemuxState<A>,
799 shared_state: &'a mut SharedLoggingState,
801 output: &'a mut DemuxOutput<'b, 'c>,
803 logging_interval_ms: u128,
805 time: Duration,
807}
808
809impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
810 fn ts(&self) -> Timestamp {
813 let time_ms = self.time.as_millis();
814 let interval = self.logging_interval_ms;
815 let rounded = (time_ms / interval + 1) * interval;
816 rounded.try_into().expect("must fit")
817 }
818
819 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
821 use ComputeEventReference::*;
822 match event {
823 Export(export) => self.handle_export(export),
824 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
825 Peek(peek) if peek.installed => self.handle_peek_install(peek),
826 Peek(peek) => self.handle_peek_retire(peek),
827 Frontier(frontier) => self.handle_frontier(frontier),
828 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
829 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
830 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
831 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
832 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
833 ArrangementHeapSizeOperatorDrop(inner) => {
834 self.handle_arrangement_heap_size_operator_dropped(inner)
835 }
836 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
837 ErrorCount(error_count) => self.handle_error_count(error_count),
838 Hydration(hydration) => self.handle_hydration(hydration),
839 OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
840 LirMapping(mapping) => self.handle_lir_mapping(mapping),
841 DataflowGlobal(global) => self.handle_dataflow_global(global),
842 }
843 }
844
845 fn handle_export(
846 &mut self,
847 ExportReference {
848 export_id,
849 dataflow_index,
850 }: Ref<'_, Export>,
851 ) {
852 let export_id = Columnar::into_owned(export_id);
853 let ts = self.ts();
854 let datum = self.state.pack_export_update(export_id, dataflow_index);
855 self.output.export.give((datum, ts, Diff::ONE));
856
857 let existing = self
858 .state
859 .exports
860 .insert(export_id, ExportState::new(dataflow_index));
861 if existing.is_some() {
862 error!(%export_id, "export already registered");
863 }
864
865 *self
866 .state
867 .dataflow_export_counts
868 .entry(dataflow_index)
869 .or_default() += 1;
870
871 let datum = self.state.pack_hydration_time_update(export_id, None);
873 self.output.hydration_time.give((datum, ts, Diff::ONE));
874 }
875
876 fn handle_export_dropped(
877 &mut self,
878 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
879 ) {
880 let export_id = Columnar::into_owned(export_id);
881 let Some(export) = self.state.exports.remove(&export_id) else {
882 error!(%export_id, "missing exports entry at time of export drop");
883 return;
884 };
885
886 let ts = self.ts();
887 let dataflow_index = export.dataflow_index;
888
889 let datum = self.state.pack_export_update(export_id, dataflow_index);
890 self.output.export.give((datum, ts, Diff::MINUS_ONE));
891
892 match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
893 entry @ Some(0) | entry @ None => {
894 error!(
895 %export_id,
896 %dataflow_index,
897 "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
898 );
899 }
900 Some(1) => self.handle_dataflow_dropped(dataflow_index),
901 Some(count) => *count -= 1,
902 }
903
904 if export.error_count != Diff::ZERO {
906 let datum = self
907 .state
908 .pack_error_count_update(export_id, export.error_count);
909 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
910 }
911
912 let datum = self
914 .state
915 .pack_hydration_time_update(export_id, export.hydration_time_ns);
916 self.output
917 .hydration_time
918 .give((datum, ts, Diff::MINUS_ONE));
919
920 for (lir_id, hydrated) in export.operator_hydration {
922 let datum = self
923 .state
924 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
925 self.output
926 .operator_hydration_status
927 .give((datum, ts, Diff::MINUS_ONE));
928 }
929 }
930
931 fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
932 let ts = self.ts();
933 self.state.dataflow_export_counts.remove(&dataflow_index);
934
935 if self.state.shutdown_dataflows.remove(&dataflow_index) {
936 let datum = self.state.pack_shutdown_duration_update(0);
938 self.output.shutdown_duration.give((datum, ts, Diff::ONE));
939 } else {
940 let existing = self
942 .state
943 .dataflow_drop_times
944 .insert(dataflow_index, self.time);
945 if existing.is_some() {
946 error!(%dataflow_index, "dataflow already dropped");
947 }
948 }
949 }
950
951 fn handle_dataflow_shutdown(
952 &mut self,
953 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
954 ) {
955 let ts = self.ts();
956
957 if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
958 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
960 let elapsed_pow = elapsed_ns.next_power_of_two();
961 let datum = self.state.pack_shutdown_duration_update(elapsed_pow);
962 self.output.shutdown_duration.give((datum, ts, Diff::ONE));
963 } else {
964 let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
966 if !was_new {
967 error!(%dataflow_index, "dataflow already shutdown");
968 }
969 }
970
971 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
973 for global_id in global_ids {
974 let datum = self
976 .state
977 .pack_dataflow_global_update(dataflow_index, global_id);
978 self.output
979 .dataflow_global_ids
980 .give((datum, ts, Diff::MINUS_ONE));
981
982 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
984 for (
985 lir_id,
986 LirMetadata {
987 operator,
988 parent_lir_id,
989 nesting,
990 operator_span,
991 },
992 ) in mappings
993 {
994 let datum = self.state.pack_lir_mapping_update(
995 global_id,
996 lir_id,
997 operator,
998 parent_lir_id,
999 nesting,
1000 operator_span,
1001 );
1002 self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
1003 }
1004 }
1005 }
1006 }
1007 }
1008
1009 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
1010 let ts = self.ts();
1011 let export_id = Columnar::into_owned(export_id);
1012
1013 let Some(export) = self.state.exports.get_mut(&export_id) else {
1014 return;
1017 };
1018
1019 let old_count = export.error_count;
1020 let new_count = old_count + diff;
1021 export.error_count = new_count;
1022
1023 if old_count != Diff::ZERO {
1024 let datum = self.state.pack_error_count_update(export_id, old_count);
1025 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
1026 }
1027 if new_count != Diff::ZERO {
1028 let datum = self.state.pack_error_count_update(export_id, new_count);
1029 self.output.error_count.give((datum, ts, Diff::ONE));
1030 }
1031 }
1032
1033 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
1034 let ts = self.ts();
1035 let export_id = Columnar::into_owned(export_id);
1036
1037 let Some(export) = self.state.exports.get_mut(&export_id) else {
1038 error!(%export_id, "hydration event for unknown export");
1039 return;
1040 };
1041 if export.hydration_time_ns.is_some() {
1042 return;
1045 }
1046
1047 let duration = export.created_at.elapsed();
1048 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1049 export.hydration_time_ns = Some(nanos);
1050
1051 let retraction = self.state.pack_hydration_time_update(export_id, None);
1052 self.output
1053 .hydration_time
1054 .give((retraction, ts, Diff::MINUS_ONE));
1055 let insertion = self
1056 .state
1057 .pack_hydration_time_update(export_id, Some(nanos));
1058 self.output.hydration_time.give((insertion, ts, Diff::ONE));
1059 }
1060
1061 fn handle_operator_hydration(
1062 &mut self,
1063 OperatorHydrationReference {
1064 export_id,
1065 lir_id,
1066 hydrated,
1067 }: Ref<'_, OperatorHydration>,
1068 ) {
1069 let ts = self.ts();
1070 let export_id = Columnar::into_owned(export_id);
1071 let lir_id = Columnar::into_owned(lir_id);
1072 let hydrated = Columnar::into_owned(hydrated);
1073
1074 let Some(export) = self.state.exports.get_mut(&export_id) else {
1075 return;
1078 };
1079
1080 let old_status = export.operator_hydration.get(&lir_id).copied();
1081 export.operator_hydration.insert(lir_id, hydrated);
1082
1083 if let Some(hydrated) = old_status {
1084 let retraction = self
1085 .state
1086 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1087 self.output
1088 .operator_hydration_status
1089 .give((retraction, ts, Diff::MINUS_ONE));
1090 }
1091
1092 let insertion = self
1093 .state
1094 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1095 self.output
1096 .operator_hydration_status
1097 .give((insertion, ts, Diff::ONE));
1098 }
1099
1100 fn handle_peek_install(
1101 &mut self,
1102 PeekEventReference {
1103 id,
1104 time,
1105 uuid,
1106 peek_type,
1107 installed: _,
1108 }: Ref<'_, PeekEvent>,
1109 ) {
1110 let id = Columnar::into_owned(id);
1111 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1112 let ts = self.ts();
1113 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1114 self.output.peek.give((datum, ts, Diff::ONE));
1115
1116 let existing = self.state.peek_stash.insert(uuid, self.time);
1117 if existing.is_some() {
1118 error!(%uuid, "peek already registered");
1119 }
1120 }
1121
1122 fn handle_peek_retire(
1123 &mut self,
1124 PeekEventReference {
1125 id,
1126 time,
1127 uuid,
1128 peek_type,
1129 installed: _,
1130 }: Ref<'_, PeekEvent>,
1131 ) {
1132 let id = Columnar::into_owned(id);
1133 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1134 let ts = self.ts();
1135 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1136 self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1137
1138 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1139 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1140 let bucket = elapsed_ns.next_power_of_two();
1141 let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1142 self.output.peek_duration.give((datum, ts, Diff::ONE));
1143 } else {
1144 error!(%uuid, "peek not yet registered");
1145 }
1146 }
1147
1148 fn handle_frontier(
1149 &mut self,
1150 FrontierReference {
1151 export_id,
1152 time,
1153 diff,
1154 }: Ref<'_, Frontier>,
1155 ) {
1156 let export_id = Columnar::into_owned(export_id);
1157 let diff = Diff::from(*diff);
1158 let ts = self.ts();
1159 let time = Columnar::into_owned(time);
1160 let datum = self.state.pack_frontier_update(export_id, time);
1161 self.output.frontier.give((datum, ts, diff));
1162 }
1163
1164 fn handle_import_frontier(
1165 &mut self,
1166 ImportFrontierReference {
1167 import_id,
1168 export_id,
1169 time,
1170 diff,
1171 }: Ref<'_, ImportFrontier>,
1172 ) {
1173 let import_id = Columnar::into_owned(import_id);
1174 let export_id = Columnar::into_owned(export_id);
1175 let diff = Diff::from(*diff);
1176 let ts = self.ts();
1177 let time = Columnar::into_owned(time);
1178 let datum = self
1179 .state
1180 .pack_import_frontier_update(export_id, import_id, time);
1181 self.output.import_frontier.give((datum, ts, diff));
1182 }
1183
1184 fn handle_arrangement_heap_size(
1186 &mut self,
1187 ArrangementHeapSizeReference {
1188 operator_id,
1189 delta_size,
1190 }: Ref<'_, ArrangementHeapSize>,
1191 ) {
1192 let ts = self.ts();
1193 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1194 return;
1195 };
1196
1197 state.size += delta_size;
1198
1199 let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1200 let diff = Diff::cast_from(delta_size);
1201 self.output.arrangement_heap_size.give((datum, ts, diff));
1202 }
1203
1204 fn handle_arrangement_heap_capacity(
1206 &mut self,
1207 ArrangementHeapCapacityReference {
1208 operator_id,
1209 delta_capacity,
1210 }: Ref<'_, ArrangementHeapCapacity>,
1211 ) {
1212 let ts = self.ts();
1213 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1214 return;
1215 };
1216
1217 state.capacity += delta_capacity;
1218
1219 let datum = self
1220 .state
1221 .pack_arrangement_heap_capacity_update(operator_id);
1222 let diff = Diff::cast_from(delta_capacity);
1223 self.output
1224 .arrangement_heap_capacity
1225 .give((datum, ts, diff));
1226 }
1227
1228 fn handle_arrangement_heap_allocations(
1230 &mut self,
1231 ArrangementHeapAllocationsReference {
1232 operator_id,
1233 delta_allocations,
1234 }: Ref<'_, ArrangementHeapAllocations>,
1235 ) {
1236 let ts = self.ts();
1237 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1238 return;
1239 };
1240
1241 state.count += delta_allocations;
1242
1243 let datum = self
1244 .state
1245 .pack_arrangement_heap_allocations_update(operator_id);
1246 let diff = Diff::cast_from(delta_allocations);
1247 self.output
1248 .arrangement_heap_allocations
1249 .give((datum, ts, diff));
1250 }
1251
1252 fn handle_arrangement_heap_size_operator(
1254 &mut self,
1255 ArrangementHeapSizeOperatorReference {
1256 operator_id,
1257 address,
1258 }: Ref<'_, ArrangementHeapSizeOperator>,
1259 ) {
1260 let activator = self
1261 .state
1262 .scheduler
1263 .activator_for(address.into_iter().collect());
1264 let existing = self
1265 .state
1266 .arrangement_size
1267 .insert(operator_id, Default::default());
1268 if existing.is_some() {
1269 error!(%operator_id, "arrangement size operator already registered");
1270 }
1271 let existing = self
1272 .shared_state
1273 .arrangement_size_activators
1274 .insert(operator_id, activator);
1275 if existing.is_some() {
1276 error!(%operator_id, "arrangement size activator already registered");
1277 }
1278 }
1279
1280 fn handle_arrangement_heap_size_operator_dropped(
1282 &mut self,
1283 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1284 ) {
1285 let operator_id = event.operator_id;
1286 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1287 let ts = self.ts();
1288 let allocations = self
1289 .state
1290 .pack_arrangement_heap_allocations_update(operator_id);
1291 let diff = -Diff::cast_from(state.count);
1292 self.output
1293 .arrangement_heap_allocations
1294 .give((allocations, ts, diff));
1295
1296 let capacity = self
1297 .state
1298 .pack_arrangement_heap_capacity_update(operator_id);
1299 let diff = -Diff::cast_from(state.capacity);
1300 self.output
1301 .arrangement_heap_capacity
1302 .give((capacity, ts, diff));
1303
1304 let size = self.state.pack_arrangement_heap_size_update(operator_id);
1305 let diff = -Diff::cast_from(state.size);
1306 self.output.arrangement_heap_size.give((size, ts, diff));
1307 }
1308 self.shared_state
1309 .arrangement_size_activators
1310 .remove(&operator_id);
1311 }
1312
1313 fn handle_lir_mapping(
1315 &mut self,
1316 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1317 ) {
1318 let global_id = Columnar::into_owned(global_id);
1319 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1321 self.state
1322 .lir_mapping
1323 .entry(global_id)
1324 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1325 .or_insert_with(|| mappings().collect());
1326
1327 let ts = self.ts();
1329 for (lir_id, meta) in mapping.into_iter() {
1330 let datum = self.state.pack_lir_mapping_update(
1331 global_id,
1332 Columnar::into_owned(lir_id),
1333 Columnar::into_owned(meta.operator),
1334 Columnar::into_owned(meta.parent_lir_id),
1335 Columnar::into_owned(meta.nesting),
1336 Columnar::into_owned(meta.operator_span),
1337 );
1338 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1339 }
1340 }
1341
1342 fn handle_dataflow_global(
1343 &mut self,
1344 DataflowGlobalReference {
1345 dataflow_index,
1346 global_id,
1347 }: Ref<'_, DataflowGlobal>,
1348 ) {
1349 let global_id = Columnar::into_owned(global_id);
1350 self.state
1351 .dataflow_global_ids
1352 .entry(dataflow_index)
1353 .and_modify(|globals| {
1354 if !globals.insert(global_id) {
1356 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1357 }
1358 })
1359 .or_insert_with(|| BTreeSet::from([global_id]));
1360
1361 let ts = self.ts();
1362 let datum = self
1363 .state
1364 .pack_dataflow_global_update(dataflow_index, global_id);
1365 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1366 }
1367}
1368
1369pub struct CollectionLogging {
1374 export_id: GlobalId,
1375 logger: Logger,
1376
1377 logged_frontier: Option<Timestamp>,
1378 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1379}
1380
1381impl CollectionLogging {
1382 pub fn new(
1384 export_id: GlobalId,
1385 logger: Logger,
1386 dataflow_index: usize,
1387 import_ids: impl Iterator<Item = GlobalId>,
1388 ) -> Self {
1389 logger.log(&ComputeEvent::Export(Export {
1390 export_id,
1391 dataflow_index,
1392 }));
1393
1394 let mut self_ = Self {
1395 export_id,
1396 logger,
1397 logged_frontier: None,
1398 logged_import_frontiers: Default::default(),
1399 };
1400
1401 let initial_frontier = Some(Timestamp::MIN);
1403 self_.set_frontier(initial_frontier);
1404 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1405
1406 self_
1407 }
1408
1409 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1411 let old_time = self.logged_frontier;
1412 self.logged_frontier = new_time;
1413
1414 if old_time != new_time {
1415 let export_id = self.export_id;
1416 let retraction = old_time.map(|time| {
1417 ComputeEvent::Frontier(Frontier {
1418 export_id,
1419 time,
1420 diff: -1,
1421 })
1422 });
1423 let insertion = new_time.map(|time| {
1424 ComputeEvent::Frontier(Frontier {
1425 export_id,
1426 time,
1427 diff: 1,
1428 })
1429 });
1430 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1431 self.logger.log_many(events);
1432 }
1433 }
1434
1435 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1438 let old_time = self.logged_import_frontiers.remove(&import_id);
1439 if let Some(time) = new_time {
1440 self.logged_import_frontiers.insert(import_id, time);
1441 }
1442
1443 if old_time != new_time {
1444 let export_id = self.export_id;
1445 let retraction = old_time.map(|time| {
1446 ComputeEvent::ImportFrontier(ImportFrontier {
1447 import_id,
1448 export_id,
1449 time,
1450 diff: -1,
1451 })
1452 });
1453 let insertion = new_time.map(|time| {
1454 ComputeEvent::ImportFrontier(ImportFrontier {
1455 import_id,
1456 export_id,
1457 time,
1458 diff: 1,
1459 })
1460 });
1461 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1462 self.logger.log_many(events);
1463 }
1464 }
1465
1466 pub fn set_hydrated(&self) {
1468 self.logger.log(&ComputeEvent::Hydration(Hydration {
1469 export_id: self.export_id,
1470 }));
1471 }
1472}
1473
1474impl Drop for CollectionLogging {
1475 fn drop(&mut self) {
1476 self.set_frontier(None);
1478
1479 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1480 for import_id in import_ids {
1481 self.set_import_frontier(import_id, None);
1482 }
1483
1484 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1485 export_id: self.export_id,
1486 }));
1487 }
1488}
1489
1490pub(crate) trait LogDataflowErrors {
1493 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1494}
1495
1496impl<G, D> LogDataflowErrors for VecCollection<G, D, Diff>
1497where
1498 G: Scope,
1499 D: Data,
1500{
1501 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1502 self.inner
1503 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1504 move |input, output| {
1505 input.for_each(|cap, data| {
1506 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1507 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1508
1509 output.session(&cap).give_container(data);
1510 });
1511 }
1512 })
1513 .as_collection()
1514 }
1515}
1516
1517impl<G, B> LogDataflowErrors for Stream<G, B>
1518where
1519 G: Scope,
1520 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1521{
1522 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1523 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1524 move |input, output| {
1525 input.for_each(|cap, data| {
1526 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1527 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1528
1529 output.session(&cap).give_container(data);
1530 });
1531 }
1532 })
1533 }
1534}
1535
1536fn sum_batch_diffs<B>(batch: &B) -> Diff
1543where
1544 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1545{
1546 let mut sum = Diff::ZERO;
1547 let mut cursor = batch.cursor();
1548
1549 while cursor.key_valid(batch) {
1550 while cursor.val_valid(batch) {
1551 cursor.map_times(batch, |_t, r| sum += r);
1552 cursor.step_val(batch);
1553 }
1554 cursor.step_key(batch);
1555 }
1556
1557 sum
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562 use super::*;
1563
1564 #[mz_ore::test]
1565 fn test_compute_event_size() {
1566 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1568 }
1569}