1use std::cell::RefCell;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::{Display, Write};
15use std::rc::Rc;
16use std::time::{Duration, Instant};
17
18use columnar::{Columnar, Index, Ref};
19use differential_dataflow::Collection;
20use differential_dataflow::collection::AsCollection;
21use differential_dataflow::trace::{BatchReader, Cursor};
22use mz_compute_types::plan::LirId;
23use mz_ore::cast::CastFrom;
24use mz_repr::{Datum, Diff, GlobalId, Row, RowRef, Timestamp};
25use mz_timely_util::columnar::builder::ColumnBuilder;
26use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
27use mz_timely_util::containers::ProvidedBuilder;
28use mz_timely_util::replay::MzReplay;
29use timely::Data;
30use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
31use timely::dataflow::operators::Operator;
32use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
33use timely::dataflow::{Scope, Stream};
34use timely::scheduling::Scheduler;
35use tracing::error;
36use uuid::Uuid;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::logging::{
40 ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, PermutedRowPacker,
41 SharedLoggingState, Update,
42};
43use crate::row_spine::RowRowBuilder;
44use crate::typedefs::RowRowSpine;
45
46pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
48pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
49
50#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
52pub struct Export {
53 pub export_id: GlobalId,
55 pub dataflow_index: usize,
57}
58
59#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
61pub struct ExportDropped {
62 pub export_id: GlobalId,
64}
65
66#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
68pub struct PeekEvent {
69 pub id: GlobalId,
71 pub time: Timestamp,
73 pub uuid: uuid::Bytes,
75 pub peek_type: PeekType,
78 pub installed: bool,
80}
81
82#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
84pub struct Frontier {
85 pub export_id: GlobalId,
86 pub time: Timestamp,
87 pub diff: i8,
88}
89
90#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
92pub struct ImportFrontier {
93 pub import_id: GlobalId,
94 pub export_id: GlobalId,
95 pub time: Timestamp,
96 pub diff: i8,
97}
98
99#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
101pub struct ArrangementHeapSize {
102 pub operator_id: usize,
104 pub delta_size: isize,
106}
107
108#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
110pub struct ArrangementHeapCapacity {
111 pub operator_id: usize,
113 pub delta_capacity: isize,
115}
116
117#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
119pub struct ArrangementHeapAllocations {
120 pub operator_id: usize,
122 pub delta_allocations: isize,
124}
125
126#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
128pub struct ArrangementHeapSizeOperator {
129 pub operator_id: usize,
131 pub address: Vec<usize>,
133}
134
135#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
137pub struct ArrangementHeapSizeOperatorDrop {
138 pub operator_id: usize,
140}
141
142#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
144pub struct DataflowShutdown {
145 pub dataflow_index: usize,
147}
148
149#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
151pub struct ErrorCount {
152 pub export_id: GlobalId,
154 pub diff: Diff,
156}
157
158#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
160pub struct Hydration {
161 pub export_id: GlobalId,
163}
164
165#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
167pub struct OperatorHydration {
168 pub export_id: GlobalId,
170 pub lir_id: LirId,
172 pub hydrated: bool,
174}
175
176#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
178pub struct LirMapping {
179 pub global_id: GlobalId,
186 pub mapping: Vec<(LirId, LirMetadata)>,
189}
190
191#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
193pub struct DataflowGlobal {
194 pub dataflow_index: usize,
196 pub global_id: GlobalId,
198}
199
200#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
202pub enum ComputeEvent {
203 Export(Export),
205 ExportDropped(ExportDropped),
207 Peek(PeekEvent),
209 Frontier(Frontier),
211 ImportFrontier(ImportFrontier),
213 ArrangementHeapSize(ArrangementHeapSize),
215 ArrangementHeapCapacity(ArrangementHeapCapacity),
217 ArrangementHeapAllocations(ArrangementHeapAllocations),
219 ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
221 ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
223 DataflowShutdown(DataflowShutdown),
225 ErrorCount(ErrorCount),
227 Hydration(Hydration),
229 OperatorHydration(OperatorHydration),
231 LirMapping(LirMapping),
235 DataflowGlobal(DataflowGlobal),
236}
237
238#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
240pub enum PeekType {
241 Index,
243 Persist,
245}
246
247impl PeekType {
248 fn name(self) -> &'static str {
250 match self {
251 PeekType::Index => "index",
252 PeekType::Persist => "persist",
253 }
254 }
255}
256
257#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
259pub struct LirMetadata {
260 operator: String,
262 parent_lir_id: Option<LirId>,
264 nesting: u8,
266 operator_span: (usize, usize),
269}
270
271impl LirMetadata {
272 pub fn new(
274 operator: String,
275 parent_lir_id: Option<LirId>,
276 nesting: u8,
277 operator_span: (usize, usize),
278 ) -> Self {
279 Self {
280 operator,
281 parent_lir_id,
282 nesting,
283 operator_span,
284 }
285 }
286}
287
288pub(super) struct Return {
290 pub collections: BTreeMap<LogVariant, LogCollection>,
292}
293
294pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304 mut scope: G,
305 scheduler: S,
306 config: &mz_compute_client::logging::LoggingConfig,
307 event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308 shared_state: Rc<RefCell<SharedLoggingState>>,
309) -> Return {
310 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
311
312 scope.scoped("compute logging", move |scope| {
313 let enable_logging = config.enable_logging;
314 let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
315 scope,
316 "compute logs",
317 config.interval,
318 event_queue.activator,
319 move |mut session, mut data| {
320 if enable_logging {
323 session.give_container(data.to_mut())
324 }
325 },
326 );
327
328 let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
331 let mut input = demux.new_input(&logs, Pipeline);
332 let (mut export_out, export) = demux.new_output();
333 let (mut frontier_out, frontier) = demux.new_output();
334 let (mut import_frontier_out, import_frontier) = demux.new_output();
335 let (mut peek_out, peek) = demux.new_output();
336 let (mut peek_duration_out, peek_duration) = demux.new_output();
337 let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
338 let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
339 let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
340 let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
341 demux.new_output();
342 let (mut error_count_out, error_count) = demux.new_output();
343 let (mut hydration_time_out, hydration_time) = demux.new_output();
344 let (mut operator_hydration_status_out, operator_hydration_status) = demux.new_output();
345 let (mut lir_mapping_out, lir_mapping) = demux.new_output();
346 let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
347
348 let mut demux_state = DemuxState::new(scheduler, scope.index());
349 demux.build(move |_capability| {
350 move |_frontiers| {
351 let mut export = export_out.activate();
352 let mut frontier = frontier_out.activate();
353 let mut import_frontier = import_frontier_out.activate();
354 let mut peek = peek_out.activate();
355 let mut peek_duration = peek_duration_out.activate();
356 let mut shutdown_duration = shutdown_duration_out.activate();
357 let mut arrangement_heap_size = arrangement_heap_size_out.activate();
358 let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
359 let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
360 let mut error_count = error_count_out.activate();
361 let mut hydration_time = hydration_time_out.activate();
362 let mut operator_hydration_status = operator_hydration_status_out.activate();
363 let mut lir_mapping = lir_mapping_out.activate();
364 let mut dataflow_global_ids = dataflow_global_ids_out.activate();
365
366 input.for_each(|cap, data| {
367 let mut output_sessions = DemuxOutput {
368 export: export.session_with_builder(&cap),
369 frontier: frontier.session_with_builder(&cap),
370 import_frontier: import_frontier.session_with_builder(&cap),
371 peek: peek.session_with_builder(&cap),
372 peek_duration: peek_duration.session_with_builder(&cap),
373 shutdown_duration: shutdown_duration.session_with_builder(&cap),
374 arrangement_heap_allocations: arrangement_heap_allocations.session_with_builder(&cap),
375 arrangement_heap_capacity: arrangement_heap_capacity.session_with_builder(&cap),
376 arrangement_heap_size: arrangement_heap_size.session_with_builder(&cap),
377 error_count: error_count.session_with_builder(&cap),
378 hydration_time: hydration_time.session_with_builder(&cap),
379 operator_hydration_status: operator_hydration_status.session_with_builder(&cap),
380 lir_mapping: lir_mapping.session_with_builder(&cap),
381 dataflow_global_ids: dataflow_global_ids.session_with_builder(&cap),
382 };
383
384 let shared_state = &mut shared_state.borrow_mut();
385 for (time, event) in data.borrow().into_index_iter() {
386 DemuxHandler {
387 state: &mut demux_state,
388 shared_state,
389 output: &mut output_sessions,
390 logging_interval_ms,
391 time,
392 }
393 .handle(event);
394 }
395 });
396 }
397 });
398
399 use ComputeLog::*;
400 let logs = [
401 (ArrangementHeapAllocations, arrangement_heap_allocations),
402 (ArrangementHeapCapacity, arrangement_heap_capacity),
403 (ArrangementHeapSize, arrangement_heap_size),
404 (DataflowCurrent, export),
405 (DataflowGlobal, dataflow_global_ids),
406 (ErrorCount, error_count),
407 (FrontierCurrent, frontier),
408 (HydrationTime, hydration_time),
409 (ImportFrontierCurrent, import_frontier),
410 (LirMapping, lir_mapping),
411 (OperatorHydrationStatus, operator_hydration_status),
412 (PeekCurrent, peek),
413 (PeekDuration, peek_duration),
414 (ShutdownDuration, shutdown_duration),
415 ];
416
417 let mut collections = BTreeMap::new();
419 for (variant, stream) in logs {
420 let variant = LogVariant::Compute(variant);
421 if config.index_logs.contains_key(&variant) {
422 let trace = stream
423 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
424 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
425 &format!("Arrange {variant:?}"),
426 )
427 .trace;
428 let collection = LogCollection {
429 trace,
430 token: Rc::clone(&token),
431 };
432 collections.insert(variant, collection);
433 }
434 }
435
436 Return { collections }
437 })
438}
439
440fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
446where
447 V: Display,
448{
449 scratch.clear();
450 write!(scratch, "{}", value).expect("writing to a `String` can't fail");
451 Datum::String(scratch)
452}
453
454struct DemuxState<A> {
456 scheduler: A,
458 worker_id: usize,
460 scratch_string_a: String,
462 scratch_string_b: String,
464 exports: BTreeMap<GlobalId, ExportState>,
466 dataflow_export_counts: BTreeMap<usize, u32>,
468 dataflow_drop_times: BTreeMap<usize, Duration>,
470 shutdown_dataflows: BTreeSet<usize>,
472 peek_stash: BTreeMap<Uuid, Duration>,
474 arrangement_size: BTreeMap<usize, ArrangementSizeState>,
476 lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
478 dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
480 arrangement_heap_allocations_packer: PermutedRowPacker,
482 arrangement_heap_capacity_packer: PermutedRowPacker,
484 arrangement_heap_size_packer: PermutedRowPacker,
486 dataflow_global_packer: PermutedRowPacker,
488 error_count_packer: PermutedRowPacker,
490 export_packer: PermutedRowPacker,
492 frontier_packer: PermutedRowPacker,
494 import_frontier_packer: PermutedRowPacker,
496 lir_mapping_packer: PermutedRowPacker,
498 operator_hydration_status_packer: PermutedRowPacker,
500 peek_duration_packer: PermutedRowPacker,
502 peek_packer: PermutedRowPacker,
504 shutdown_duration_packer: PermutedRowPacker,
506 hydration_time_packer: PermutedRowPacker,
508}
509
510impl<A: Scheduler> DemuxState<A> {
511 fn new(scheduler: A, worker_id: usize) -> Self {
512 Self {
513 scheduler,
514 worker_id,
515 scratch_string_a: String::new(),
516 scratch_string_b: String::new(),
517 exports: Default::default(),
518 dataflow_export_counts: Default::default(),
519 dataflow_drop_times: Default::default(),
520 shutdown_dataflows: Default::default(),
521 peek_stash: Default::default(),
522 arrangement_size: Default::default(),
523 lir_mapping: Default::default(),
524 dataflow_global_ids: Default::default(),
525 arrangement_heap_allocations_packer: PermutedRowPacker::new(
526 ComputeLog::ArrangementHeapAllocations,
527 ),
528 arrangement_heap_capacity_packer: PermutedRowPacker::new(
529 ComputeLog::ArrangementHeapCapacity,
530 ),
531 arrangement_heap_size_packer: PermutedRowPacker::new(ComputeLog::ArrangementHeapSize),
532 dataflow_global_packer: PermutedRowPacker::new(ComputeLog::DataflowGlobal),
533 error_count_packer: PermutedRowPacker::new(ComputeLog::ErrorCount),
534 export_packer: PermutedRowPacker::new(ComputeLog::DataflowCurrent),
535 frontier_packer: PermutedRowPacker::new(ComputeLog::FrontierCurrent),
536 hydration_time_packer: PermutedRowPacker::new(ComputeLog::HydrationTime),
537 import_frontier_packer: PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent),
538 lir_mapping_packer: PermutedRowPacker::new(ComputeLog::LirMapping),
539 operator_hydration_status_packer: PermutedRowPacker::new(
540 ComputeLog::OperatorHydrationStatus,
541 ),
542 peek_duration_packer: PermutedRowPacker::new(ComputeLog::PeekDuration),
543 peek_packer: PermutedRowPacker::new(ComputeLog::PeekCurrent),
544 shutdown_duration_packer: PermutedRowPacker::new(ComputeLog::ShutdownDuration),
545 }
546 }
547
548 fn pack_arrangement_heap_allocations_update(
550 &mut self,
551 operator_id: usize,
552 ) -> (&RowRef, &RowRef) {
553 self.arrangement_heap_allocations_packer.pack_slice(&[
554 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
555 Datum::UInt64(u64::cast_from(self.worker_id)),
556 ])
557 }
558
559 fn pack_arrangement_heap_capacity_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
561 self.arrangement_heap_capacity_packer.pack_slice(&[
562 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
563 Datum::UInt64(u64::cast_from(self.worker_id)),
564 ])
565 }
566
567 fn pack_arrangement_heap_size_update(&mut self, operator_id: usize) -> (&RowRef, &RowRef) {
569 self.arrangement_heap_size_packer.pack_slice(&[
570 Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
571 Datum::UInt64(u64::cast_from(self.worker_id)),
572 ])
573 }
574
575 fn pack_dataflow_global_update(
577 &mut self,
578 dataflow_index: usize,
579 global_id: GlobalId,
580 ) -> (&RowRef, &RowRef) {
581 self.dataflow_global_packer.pack_slice(&[
582 Datum::UInt64(u64::cast_from(dataflow_index)),
583 Datum::UInt64(u64::cast_from(self.worker_id)),
584 make_string_datum(global_id, &mut self.scratch_string_a),
585 ])
586 }
587
588 fn pack_error_count_update(&mut self, export_id: GlobalId, count: Diff) -> (&RowRef, &RowRef) {
590 self.error_count_packer.pack_slice(&[
594 make_string_datum(export_id, &mut self.scratch_string_a),
595 Datum::UInt64(u64::cast_from(self.worker_id)),
596 Datum::Int64(count.into_inner()),
597 ])
598 }
599
600 fn pack_export_update(
602 &mut self,
603 export_id: GlobalId,
604 dataflow_index: usize,
605 ) -> (&RowRef, &RowRef) {
606 self.export_packer.pack_slice(&[
607 make_string_datum(export_id, &mut self.scratch_string_a),
608 Datum::UInt64(u64::cast_from(self.worker_id)),
609 Datum::UInt64(u64::cast_from(dataflow_index)),
610 ])
611 }
612
613 fn pack_hydration_time_update(
615 &mut self,
616 export_id: GlobalId,
617 time_ns: Option<u64>,
618 ) -> (&RowRef, &RowRef) {
619 self.hydration_time_packer.pack_slice(&[
620 make_string_datum(export_id, &mut self.scratch_string_a),
621 Datum::UInt64(u64::cast_from(self.worker_id)),
622 Datum::from(time_ns),
623 ])
624 }
625
626 fn pack_import_frontier_update(
628 &mut self,
629 export_id: GlobalId,
630 import_id: GlobalId,
631 time: Timestamp,
632 ) -> (&RowRef, &RowRef) {
633 self.import_frontier_packer.pack_slice(&[
634 make_string_datum(export_id, &mut self.scratch_string_a),
635 make_string_datum(import_id, &mut self.scratch_string_b),
636 Datum::UInt64(u64::cast_from(self.worker_id)),
637 Datum::MzTimestamp(time),
638 ])
639 }
640
641 fn pack_lir_mapping_update(
643 &mut self,
644 global_id: GlobalId,
645 lir_id: LirId,
646 operator: String,
647 parent_lir_id: Option<LirId>,
648 nesting: u8,
649 operator_span: (usize, usize),
650 ) -> (&RowRef, &RowRef) {
651 self.lir_mapping_packer.pack_slice(&[
652 make_string_datum(global_id, &mut self.scratch_string_a),
653 Datum::UInt64(lir_id.into()),
654 Datum::UInt64(u64::cast_from(self.worker_id)),
655 make_string_datum(operator, &mut self.scratch_string_b),
656 parent_lir_id.map_or(Datum::Null, |lir_id| Datum::UInt64(lir_id.into())),
657 Datum::UInt16(u16::cast_from(nesting)),
658 Datum::UInt64(u64::cast_from(operator_span.0)),
659 Datum::UInt64(u64::cast_from(operator_span.1)),
660 ])
661 }
662
663 fn pack_operator_hydration_status_update(
666 &mut self,
667 export_id: GlobalId,
668 lir_id: LirId,
669 hydrated: bool,
670 ) -> (&RowRef, &RowRef) {
671 self.operator_hydration_status_packer.pack_slice(&[
672 make_string_datum(export_id, &mut self.scratch_string_a),
673 Datum::UInt64(lir_id.into()),
674 Datum::UInt64(u64::cast_from(self.worker_id)),
675 Datum::from(hydrated),
676 ])
677 }
678
679 fn pack_peek_duration_update(
681 &mut self,
682 peek_type: PeekType,
683 bucket: u128,
684 ) -> (&RowRef, &RowRef) {
685 self.peek_duration_packer.pack_slice(&[
686 Datum::UInt64(u64::cast_from(self.worker_id)),
687 Datum::String(peek_type.name()),
688 Datum::UInt64(bucket.try_into().expect("bucket too big")),
689 ])
690 }
691
692 fn pack_peek_update(
694 &mut self,
695 id: GlobalId,
696 time: Timestamp,
697 uuid: Uuid,
698 peek_type: PeekType,
699 ) -> (&RowRef, &RowRef) {
700 self.peek_packer.pack_slice(&[
701 Datum::Uuid(uuid),
702 Datum::UInt64(u64::cast_from(self.worker_id)),
703 make_string_datum(id, &mut self.scratch_string_a),
704 Datum::String(peek_type.name()),
705 Datum::MzTimestamp(time),
706 ])
707 }
708
709 fn pack_frontier_update(&mut self, export_id: GlobalId, time: Timestamp) -> (&RowRef, &RowRef) {
711 self.frontier_packer.pack_slice(&[
712 make_string_datum(export_id, &mut self.scratch_string_a),
713 Datum::UInt64(u64::cast_from(self.worker_id)),
714 Datum::MzTimestamp(time),
715 ])
716 }
717
718 fn pack_shutdown_duration_update(&mut self, bucket: u128) -> (&RowRef, &RowRef) {
720 self.shutdown_duration_packer.pack_slice(&[
721 Datum::UInt64(u64::cast_from(self.worker_id)),
722 Datum::UInt64(bucket.try_into().expect("bucket too big")),
723 ])
724 }
725}
726
727struct ExportState {
729 dataflow_index: usize,
731 error_count: Diff,
736 created_at: Instant,
738 hydration_time_ns: Option<u64>,
740 operator_hydration: BTreeMap<LirId, bool>,
742}
743
744impl ExportState {
745 fn new(dataflow_index: usize) -> Self {
746 Self {
747 dataflow_index,
748 error_count: Diff::ZERO,
749 created_at: Instant::now(),
750 hydration_time_ns: None,
751 operator_hydration: BTreeMap::new(),
752 }
753 }
754}
755
756#[derive(Default, Debug)]
758struct ArrangementSizeState {
759 size: isize,
760 capacity: isize,
761 count: isize,
762}
763
764struct DemuxOutput<'a> {
766 export: OutputSessionColumnar<'a, Update<(Row, Row)>>,
767 frontier: OutputSessionColumnar<'a, Update<(Row, Row)>>,
768 import_frontier: OutputSessionColumnar<'a, Update<(Row, Row)>>,
769 peek: OutputSessionColumnar<'a, Update<(Row, Row)>>,
770 peek_duration: OutputSessionColumnar<'a, Update<(Row, Row)>>,
771 shutdown_duration: OutputSessionColumnar<'a, Update<(Row, Row)>>,
772 arrangement_heap_allocations: OutputSessionColumnar<'a, Update<(Row, Row)>>,
773 arrangement_heap_capacity: OutputSessionColumnar<'a, Update<(Row, Row)>>,
774 arrangement_heap_size: OutputSessionColumnar<'a, Update<(Row, Row)>>,
775 hydration_time: OutputSessionColumnar<'a, Update<(Row, Row)>>,
776 operator_hydration_status: OutputSessionColumnar<'a, Update<(Row, Row)>>,
777 error_count: OutputSessionColumnar<'a, Update<(Row, Row)>>,
778 lir_mapping: OutputSessionColumnar<'a, Update<(Row, Row)>>,
779 dataflow_global_ids: OutputSessionColumnar<'a, Update<(Row, Row)>>,
780}
781
782struct DemuxHandler<'a, 'b, A: Scheduler> {
784 state: &'a mut DemuxState<A>,
786 shared_state: &'a mut SharedLoggingState,
788 output: &'a mut DemuxOutput<'b>,
790 logging_interval_ms: u128,
792 time: Duration,
794}
795
796impl<A: Scheduler> DemuxHandler<'_, '_, A> {
797 fn ts(&self) -> Timestamp {
800 let time_ms = self.time.as_millis();
801 let interval = self.logging_interval_ms;
802 let rounded = (time_ms / interval + 1) * interval;
803 rounded.try_into().expect("must fit")
804 }
805
806 fn handle(&mut self, event: Ref<'_, ComputeEvent>) {
808 use ComputeEventReference::*;
809 match event {
810 Export(export) => self.handle_export(export),
811 ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
812 Peek(peek) if peek.installed => self.handle_peek_install(peek),
813 Peek(peek) => self.handle_peek_retire(peek),
814 Frontier(frontier) => self.handle_frontier(frontier),
815 ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
816 ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
817 ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
818 ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
819 ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
820 ArrangementHeapSizeOperatorDrop(inner) => {
821 self.handle_arrangement_heap_size_operator_dropped(inner)
822 }
823 DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
824 ErrorCount(error_count) => self.handle_error_count(error_count),
825 Hydration(hydration) => self.handle_hydration(hydration),
826 OperatorHydration(hydration) => self.handle_operator_hydration(hydration),
827 LirMapping(mapping) => self.handle_lir_mapping(mapping),
828 DataflowGlobal(global) => self.handle_dataflow_global(global),
829 }
830 }
831
832 fn handle_export(
833 &mut self,
834 ExportReference {
835 export_id,
836 dataflow_index,
837 }: Ref<'_, Export>,
838 ) {
839 let export_id = Columnar::into_owned(export_id);
840 let ts = self.ts();
841 let datum = self.state.pack_export_update(export_id, dataflow_index);
842 self.output.export.give((datum, ts, Diff::ONE));
843
844 let existing = self
845 .state
846 .exports
847 .insert(export_id, ExportState::new(dataflow_index));
848 if existing.is_some() {
849 error!(%export_id, "export already registered");
850 }
851
852 *self
853 .state
854 .dataflow_export_counts
855 .entry(dataflow_index)
856 .or_default() += 1;
857
858 let datum = self.state.pack_hydration_time_update(export_id, None);
860 self.output.hydration_time.give((datum, ts, Diff::ONE));
861 }
862
863 fn handle_export_dropped(
864 &mut self,
865 ExportDroppedReference { export_id }: Ref<'_, ExportDropped>,
866 ) {
867 let export_id = Columnar::into_owned(export_id);
868 let Some(export) = self.state.exports.remove(&export_id) else {
869 error!(%export_id, "missing exports entry at time of export drop");
870 return;
871 };
872
873 let ts = self.ts();
874 let dataflow_index = export.dataflow_index;
875
876 let datum = self.state.pack_export_update(export_id, dataflow_index);
877 self.output.export.give((datum, ts, Diff::MINUS_ONE));
878
879 match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
880 entry @ Some(0) | entry @ None => {
881 error!(
882 %export_id,
883 %dataflow_index,
884 "invalid dataflow_export_counts entry at time of export drop: {entry:?}",
885 );
886 }
887 Some(1) => self.handle_dataflow_dropped(dataflow_index),
888 Some(count) => *count -= 1,
889 }
890
891 if export.error_count != Diff::ZERO {
893 let datum = self
894 .state
895 .pack_error_count_update(export_id, export.error_count);
896 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
897 }
898
899 let datum = self
901 .state
902 .pack_hydration_time_update(export_id, export.hydration_time_ns);
903 self.output
904 .hydration_time
905 .give((datum, ts, Diff::MINUS_ONE));
906
907 for (lir_id, hydrated) in export.operator_hydration {
909 let datum = self
910 .state
911 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
912 self.output
913 .operator_hydration_status
914 .give((datum, ts, Diff::MINUS_ONE));
915 }
916 }
917
918 fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
919 let ts = self.ts();
920 self.state.dataflow_export_counts.remove(&dataflow_index);
921
922 if self.state.shutdown_dataflows.remove(&dataflow_index) {
923 let datum = self.state.pack_shutdown_duration_update(0);
925 self.output.shutdown_duration.give((datum, ts, Diff::ONE));
926 } else {
927 let existing = self
929 .state
930 .dataflow_drop_times
931 .insert(dataflow_index, self.time);
932 if existing.is_some() {
933 error!(%dataflow_index, "dataflow already dropped");
934 }
935 }
936 }
937
938 fn handle_dataflow_shutdown(
939 &mut self,
940 DataflowShutdownReference { dataflow_index }: Ref<'_, DataflowShutdown>,
941 ) {
942 let ts = self.ts();
943
944 if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
945 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
947 let elapsed_pow = elapsed_ns.next_power_of_two();
948 let datum = self.state.pack_shutdown_duration_update(elapsed_pow);
949 self.output.shutdown_duration.give((datum, ts, Diff::ONE));
950 } else {
951 let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
953 if !was_new {
954 error!(%dataflow_index, "dataflow already shutdown");
955 }
956 }
957
958 if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
960 for global_id in global_ids {
961 let datum = self
963 .state
964 .pack_dataflow_global_update(dataflow_index, global_id);
965 self.output
966 .dataflow_global_ids
967 .give((datum, ts, Diff::MINUS_ONE));
968
969 if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
971 for (
972 lir_id,
973 LirMetadata {
974 operator,
975 parent_lir_id,
976 nesting,
977 operator_span,
978 },
979 ) in mappings
980 {
981 let datum = self.state.pack_lir_mapping_update(
982 global_id,
983 lir_id,
984 operator,
985 parent_lir_id,
986 nesting,
987 operator_span,
988 );
989 self.output.lir_mapping.give((datum, ts, Diff::MINUS_ONE));
990 }
991 }
992 }
993 }
994 }
995
996 fn handle_error_count(&mut self, ErrorCountReference { export_id, diff }: Ref<'_, ErrorCount>) {
997 let ts = self.ts();
998 let export_id = Columnar::into_owned(export_id);
999
1000 let Some(export) = self.state.exports.get_mut(&export_id) else {
1001 return;
1004 };
1005
1006 let old_count = export.error_count;
1007 let new_count = old_count + diff;
1008 export.error_count = new_count;
1009
1010 if old_count != Diff::ZERO {
1011 let datum = self.state.pack_error_count_update(export_id, old_count);
1012 self.output.error_count.give((datum, ts, Diff::MINUS_ONE));
1013 }
1014 if new_count != Diff::ZERO {
1015 let datum = self.state.pack_error_count_update(export_id, new_count);
1016 self.output.error_count.give((datum, ts, Diff::ONE));
1017 }
1018 }
1019
1020 fn handle_hydration(&mut self, HydrationReference { export_id }: Ref<'_, Hydration>) {
1021 let ts = self.ts();
1022 let export_id = Columnar::into_owned(export_id);
1023
1024 let Some(export) = self.state.exports.get_mut(&export_id) else {
1025 error!(%export_id, "hydration event for unknown export");
1026 return;
1027 };
1028 if export.hydration_time_ns.is_some() {
1029 return;
1032 }
1033
1034 let duration = export.created_at.elapsed();
1035 let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
1036 export.hydration_time_ns = Some(nanos);
1037
1038 let retraction = self.state.pack_hydration_time_update(export_id, None);
1039 self.output
1040 .hydration_time
1041 .give((retraction, ts, Diff::MINUS_ONE));
1042 let insertion = self
1043 .state
1044 .pack_hydration_time_update(export_id, Some(nanos));
1045 self.output.hydration_time.give((insertion, ts, Diff::ONE));
1046 }
1047
1048 fn handle_operator_hydration(
1049 &mut self,
1050 OperatorHydrationReference {
1051 export_id,
1052 lir_id,
1053 hydrated,
1054 }: Ref<'_, OperatorHydration>,
1055 ) {
1056 let ts = self.ts();
1057 let export_id = Columnar::into_owned(export_id);
1058 let lir_id = Columnar::into_owned(lir_id);
1059 let hydrated = Columnar::into_owned(hydrated);
1060
1061 let Some(export) = self.state.exports.get_mut(&export_id) else {
1062 return;
1065 };
1066
1067 let old_status = export.operator_hydration.get(&lir_id).copied();
1068 export.operator_hydration.insert(lir_id, hydrated);
1069
1070 if let Some(hydrated) = old_status {
1071 let retraction = self
1072 .state
1073 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1074 self.output
1075 .operator_hydration_status
1076 .give((retraction, ts, Diff::MINUS_ONE));
1077 }
1078
1079 let insertion = self
1080 .state
1081 .pack_operator_hydration_status_update(export_id, lir_id, hydrated);
1082 self.output
1083 .operator_hydration_status
1084 .give((insertion, ts, Diff::ONE));
1085 }
1086
1087 fn handle_peek_install(
1088 &mut self,
1089 PeekEventReference {
1090 id,
1091 time,
1092 uuid,
1093 peek_type,
1094 installed: _,
1095 }: Ref<'_, PeekEvent>,
1096 ) {
1097 let id = Columnar::into_owned(id);
1098 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1099 let ts = self.ts();
1100 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1101 self.output.peek.give((datum, ts, Diff::ONE));
1102
1103 let existing = self.state.peek_stash.insert(uuid, self.time);
1104 if existing.is_some() {
1105 error!(%uuid, "peek already registered");
1106 }
1107 }
1108
1109 fn handle_peek_retire(
1110 &mut self,
1111 PeekEventReference {
1112 id,
1113 time,
1114 uuid,
1115 peek_type,
1116 installed: _,
1117 }: Ref<'_, PeekEvent>,
1118 ) {
1119 let id = Columnar::into_owned(id);
1120 let uuid = Uuid::from_bytes(uuid::Bytes::into_owned(uuid));
1121 let ts = self.ts();
1122 let datum = self.state.pack_peek_update(id, time, uuid, peek_type);
1123 self.output.peek.give((datum, ts, Diff::MINUS_ONE));
1124
1125 if let Some(start) = self.state.peek_stash.remove(&uuid) {
1126 let elapsed_ns = self.time.saturating_sub(start).as_nanos();
1127 let bucket = elapsed_ns.next_power_of_two();
1128 let datum = self.state.pack_peek_duration_update(peek_type, bucket);
1129 self.output.peek_duration.give((datum, ts, Diff::ONE));
1130 } else {
1131 error!(%uuid, "peek not yet registered");
1132 }
1133 }
1134
1135 fn handle_frontier(
1136 &mut self,
1137 FrontierReference {
1138 export_id,
1139 time,
1140 diff,
1141 }: Ref<'_, Frontier>,
1142 ) {
1143 let export_id = Columnar::into_owned(export_id);
1144 let diff = Diff::from(*diff);
1145 let ts = self.ts();
1146 let time = Columnar::into_owned(time);
1147 let datum = self.state.pack_frontier_update(export_id, time);
1148 self.output.frontier.give((datum, ts, diff));
1149 }
1150
1151 fn handle_import_frontier(
1152 &mut self,
1153 ImportFrontierReference {
1154 import_id,
1155 export_id,
1156 time,
1157 diff,
1158 }: Ref<'_, ImportFrontier>,
1159 ) {
1160 let import_id = Columnar::into_owned(import_id);
1161 let export_id = Columnar::into_owned(export_id);
1162 let diff = Diff::from(*diff);
1163 let ts = self.ts();
1164 let time = Columnar::into_owned(time);
1165 let datum = self
1166 .state
1167 .pack_import_frontier_update(export_id, import_id, time);
1168 self.output.import_frontier.give((datum, ts, diff));
1169 }
1170
1171 fn handle_arrangement_heap_size(
1173 &mut self,
1174 ArrangementHeapSizeReference {
1175 operator_id,
1176 delta_size,
1177 }: Ref<'_, ArrangementHeapSize>,
1178 ) {
1179 let ts = self.ts();
1180 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1181 return;
1182 };
1183
1184 state.size += delta_size;
1185
1186 let datum = self.state.pack_arrangement_heap_size_update(operator_id);
1187 let diff = Diff::cast_from(delta_size);
1188 self.output.arrangement_heap_size.give((datum, ts, diff));
1189 }
1190
1191 fn handle_arrangement_heap_capacity(
1193 &mut self,
1194 ArrangementHeapCapacityReference {
1195 operator_id,
1196 delta_capacity,
1197 }: Ref<'_, ArrangementHeapCapacity>,
1198 ) {
1199 let ts = self.ts();
1200 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1201 return;
1202 };
1203
1204 state.capacity += delta_capacity;
1205
1206 let datum = self
1207 .state
1208 .pack_arrangement_heap_capacity_update(operator_id);
1209 let diff = Diff::cast_from(delta_capacity);
1210 self.output
1211 .arrangement_heap_capacity
1212 .give((datum, ts, diff));
1213 }
1214
1215 fn handle_arrangement_heap_allocations(
1217 &mut self,
1218 ArrangementHeapAllocationsReference {
1219 operator_id,
1220 delta_allocations,
1221 }: Ref<'_, ArrangementHeapAllocations>,
1222 ) {
1223 let ts = self.ts();
1224 let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
1225 return;
1226 };
1227
1228 state.count += delta_allocations;
1229
1230 let datum = self
1231 .state
1232 .pack_arrangement_heap_allocations_update(operator_id);
1233 let diff = Diff::cast_from(delta_allocations);
1234 self.output
1235 .arrangement_heap_allocations
1236 .give((datum, ts, diff));
1237 }
1238
1239 fn handle_arrangement_heap_size_operator(
1241 &mut self,
1242 ArrangementHeapSizeOperatorReference {
1243 operator_id,
1244 address,
1245 }: Ref<'_, ArrangementHeapSizeOperator>,
1246 ) {
1247 let activator = self
1248 .state
1249 .scheduler
1250 .activator_for(address.into_iter().collect());
1251 let existing = self
1252 .state
1253 .arrangement_size
1254 .insert(operator_id, Default::default());
1255 if existing.is_some() {
1256 error!(%operator_id, "arrangement size operator already registered");
1257 }
1258 let existing = self
1259 .shared_state
1260 .arrangement_size_activators
1261 .insert(operator_id, activator);
1262 if existing.is_some() {
1263 error!(%operator_id, "arrangement size activator already registered");
1264 }
1265 }
1266
1267 fn handle_arrangement_heap_size_operator_dropped(
1269 &mut self,
1270 event: Ref<'_, ArrangementHeapSizeOperatorDrop>,
1271 ) {
1272 let operator_id = event.operator_id;
1273 if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
1274 let ts = self.ts();
1275 let allocations = self
1276 .state
1277 .pack_arrangement_heap_allocations_update(operator_id);
1278 let diff = -Diff::cast_from(state.count);
1279 self.output
1280 .arrangement_heap_allocations
1281 .give((allocations, ts, diff));
1282
1283 let capacity = self
1284 .state
1285 .pack_arrangement_heap_capacity_update(operator_id);
1286 let diff = -Diff::cast_from(state.capacity);
1287 self.output
1288 .arrangement_heap_capacity
1289 .give((capacity, ts, diff));
1290
1291 let size = self.state.pack_arrangement_heap_size_update(operator_id);
1292 let diff = -Diff::cast_from(state.size);
1293 self.output.arrangement_heap_size.give((size, ts, diff));
1294 }
1295 self.shared_state
1296 .arrangement_size_activators
1297 .remove(&operator_id);
1298 }
1299
1300 fn handle_lir_mapping(
1302 &mut self,
1303 LirMappingReference { global_id, mapping }: Ref<'_, LirMapping>,
1304 ) {
1305 let global_id = Columnar::into_owned(global_id);
1306 let mappings = || mapping.into_iter().map(Columnar::into_owned);
1308 self.state
1309 .lir_mapping
1310 .entry(global_id)
1311 .and_modify(|existing_mapping| existing_mapping.extend(mappings()))
1312 .or_insert_with(|| mappings().collect());
1313
1314 let ts = self.ts();
1316 for (lir_id, meta) in mapping.into_iter() {
1317 let datum = self.state.pack_lir_mapping_update(
1318 global_id,
1319 Columnar::into_owned(lir_id),
1320 Columnar::into_owned(meta.operator),
1321 Columnar::into_owned(meta.parent_lir_id),
1322 Columnar::into_owned(meta.nesting),
1323 Columnar::into_owned(meta.operator_span),
1324 );
1325 self.output.lir_mapping.give((datum, ts, Diff::ONE));
1326 }
1327 }
1328
1329 fn handle_dataflow_global(
1330 &mut self,
1331 DataflowGlobalReference {
1332 dataflow_index,
1333 global_id,
1334 }: Ref<'_, DataflowGlobal>,
1335 ) {
1336 let global_id = Columnar::into_owned(global_id);
1337 self.state
1338 .dataflow_global_ids
1339 .entry(dataflow_index)
1340 .and_modify(|globals| {
1341 if !globals.insert(global_id) {
1343 error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
1344 }
1345 })
1346 .or_insert_with(|| BTreeSet::from([global_id]));
1347
1348 let ts = self.ts();
1349 let datum = self
1350 .state
1351 .pack_dataflow_global_update(dataflow_index, global_id);
1352 self.output.dataflow_global_ids.give((datum, ts, Diff::ONE));
1353 }
1354}
1355
1356pub struct CollectionLogging {
1361 export_id: GlobalId,
1362 logger: Logger,
1363
1364 logged_frontier: Option<Timestamp>,
1365 logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
1366}
1367
1368impl CollectionLogging {
1369 pub fn new(
1371 export_id: GlobalId,
1372 logger: Logger,
1373 dataflow_index: usize,
1374 import_ids: impl Iterator<Item = GlobalId>,
1375 ) -> Self {
1376 logger.log(&ComputeEvent::Export(Export {
1377 export_id,
1378 dataflow_index,
1379 }));
1380
1381 let mut self_ = Self {
1382 export_id,
1383 logger,
1384 logged_frontier: None,
1385 logged_import_frontiers: Default::default(),
1386 };
1387
1388 let initial_frontier = Some(Timestamp::MIN);
1390 self_.set_frontier(initial_frontier);
1391 import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
1392
1393 self_
1394 }
1395
1396 pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
1398 let old_time = self.logged_frontier;
1399 self.logged_frontier = new_time;
1400
1401 if old_time != new_time {
1402 let export_id = self.export_id;
1403 let retraction = old_time.map(|time| {
1404 ComputeEvent::Frontier(Frontier {
1405 export_id,
1406 time,
1407 diff: -1,
1408 })
1409 });
1410 let insertion = new_time.map(|time| {
1411 ComputeEvent::Frontier(Frontier {
1412 export_id,
1413 time,
1414 diff: 1,
1415 })
1416 });
1417 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1418 self.logger.log_many(events);
1419 }
1420 }
1421
1422 pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
1425 let old_time = self.logged_import_frontiers.remove(&import_id);
1426 if let Some(time) = new_time {
1427 self.logged_import_frontiers.insert(import_id, time);
1428 }
1429
1430 if old_time != new_time {
1431 let export_id = self.export_id;
1432 let retraction = old_time.map(|time| {
1433 ComputeEvent::ImportFrontier(ImportFrontier {
1434 import_id,
1435 export_id,
1436 time,
1437 diff: -1,
1438 })
1439 });
1440 let insertion = new_time.map(|time| {
1441 ComputeEvent::ImportFrontier(ImportFrontier {
1442 import_id,
1443 export_id,
1444 time,
1445 diff: 1,
1446 })
1447 });
1448 let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
1449 self.logger.log_many(events);
1450 }
1451 }
1452
1453 pub fn set_hydrated(&self) {
1455 self.logger.log(&ComputeEvent::Hydration(Hydration {
1456 export_id: self.export_id,
1457 }));
1458 }
1459}
1460
1461impl Drop for CollectionLogging {
1462 fn drop(&mut self) {
1463 self.set_frontier(None);
1465
1466 let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
1467 for import_id in import_ids {
1468 self.set_import_frontier(import_id, None);
1469 }
1470
1471 self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
1472 export_id: self.export_id,
1473 }));
1474 }
1475}
1476
1477pub(crate) trait LogDataflowErrors {
1480 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
1481}
1482
1483impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
1484where
1485 G: Scope,
1486 D: Data,
1487{
1488 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1489 self.inner
1490 .unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
1491 move |input, output| {
1492 input.for_each(|cap, data| {
1493 let diff = data.iter().map(|(_d, _t, r)| *r).sum::<Diff>();
1494 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1495
1496 output.session(&cap).give_container(data);
1497 });
1498 }
1499 })
1500 .as_collection()
1501 }
1502}
1503
1504impl<G, B> LogDataflowErrors for Stream<G, B>
1505where
1506 G: Scope,
1507 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
1508{
1509 fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
1510 self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
1511 move |input, output| {
1512 input.for_each(|cap, data| {
1513 let diff = data.iter().map(sum_batch_diffs).sum::<Diff>();
1514 logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
1515
1516 output.session(&cap).give_container(data);
1517 });
1518 }
1519 })
1520 }
1521}
1522
1523fn sum_batch_diffs<B>(batch: &B) -> Diff
1530where
1531 for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
1532{
1533 let mut sum = Diff::ZERO;
1534 let mut cursor = batch.cursor();
1535
1536 while cursor.key_valid(batch) {
1537 while cursor.val_valid(batch) {
1538 cursor.map_times(batch, |_t, r| sum += r);
1539 cursor.step_val(batch);
1540 }
1541 cursor.step_key(batch);
1542 }
1543
1544 sum
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549 use super::*;
1550
1551 #[mz_ore::test]
1552 fn test_compute_event_size() {
1553 assert_eq!(56, std::mem::size_of::<ComputeEvent>())
1555 }
1556}