1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use columnation::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::batcher;
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use mz_timely_util::columnation::ColumnationChunker;
26use mz_timely_util::replay::MzReplay;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::Operator;
30use timely::dataflow::operators::generic::OutputBuilder;
31use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
32use timely::dataflow::operators::generic::operator::empty;
33use timely::logging::{
34 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
35 TimelyEvent,
36};
37use tracing::error;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::compute::{ComputeEvent, DataflowShutdown};
41use crate::logging::{
42 EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
43 Update,
44};
45use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
46use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
47use mz_row_spine::RowRowBuilder;
48
49pub(super) struct Return {
51 pub collections: BTreeMap<LogVariant, LogCollection>,
53}
54
55pub(super) fn construct(
62 scope: Scope<'_, Timestamp>,
63 config: &LoggingConfig,
64 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
65 shared_state: Rc<RefCell<SharedLoggingState>>,
66 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
67) -> Return {
68 scope.scoped("timely logging", move |scope| {
69 let enable_logging = config.enable_logging;
70 let (logs, token) = if enable_logging {
71 event_queue.links.mz_replay(
72 scope,
73 "timely logs",
74 config.interval,
75 event_queue.activator,
76 )
77 } else {
78 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
79 (empty(scope), token)
80 };
81
82 let (logs, storage_token) = if let Some(reader) = storage_log_reader {
84 use mz_timely_util::activator::RcActivator;
85 use timely::dataflow::operators::Concatenate;
86
87 let activator = RcActivator::new("storage_timely_activator".to_string(), 128);
88 let (storage_logs, s_token) =
89 [reader].mz_replay(scope, "storage timely logs", config.interval, activator);
90 let merged = scope.concatenate([logs, storage_logs]);
91 (merged, Some(s_token))
92 } else {
93 (logs, None)
94 };
95 let storage_token: Rc<dyn std::any::Any> = match storage_token {
97 Some(t) => t,
98 None => Rc::new(()),
99 };
100
101 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
104 let mut input = demux.new_input(logs, Pipeline);
105 let (operates_out, operates) = demux.new_output();
106 let mut operates_out = OutputBuilder::from(operates_out);
107 let (channels_out, channels) = demux.new_output();
108 let mut channels_out = OutputBuilder::from(channels_out);
109 let (addresses_out, addresses) = demux.new_output();
110 let mut addresses_out = OutputBuilder::from(addresses_out);
111 let (parks_out, parks) = demux.new_output();
112 let mut parks_out = OutputBuilder::from(parks_out);
113 let (messages_sent_out, messages_sent) = demux.new_output();
114 let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
115 let (messages_received_out, messages_received) = demux.new_output();
116 let mut messages_received_out = OutputBuilder::from(messages_received_out);
117 let (schedules_duration_out, schedules_duration) = demux.new_output();
118 let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
119 let (schedules_histogram_out, schedules_histogram) = demux.new_output();
120 let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
121 let (batches_sent_out, batches_sent) = demux.new_output();
122 let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
123 let (batches_received_out, batches_received) = demux.new_output();
124 let mut batches_received_out = OutputBuilder::from(batches_received_out);
125
126 let worker_id = scope.index();
127 let mut demux_state = DemuxState::default();
128 demux.build(|_capability| {
129 let peers = scope.peers();
130 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
131 move |_frontiers| {
132 let mut operates = operates_out.activate();
133 let mut channels = channels_out.activate();
134 let mut addresses = addresses_out.activate();
135 let mut parks = parks_out.activate();
136 let mut messages_sent = messages_sent_out.activate();
137 let mut messages_received = messages_received_out.activate();
138 let mut batches_sent = batches_sent_out.activate();
139 let mut batches_received = batches_received_out.activate();
140 let mut schedules_duration = schedules_duration_out.activate();
141 let mut schedules_histogram = schedules_histogram_out.activate();
142
143 input.for_each(|cap, data| {
144 let mut output_buffers = DemuxOutput {
145 operates: operates.session_with_builder(&cap),
146 channels: channels.session_with_builder(&cap),
147 addresses: addresses.session_with_builder(&cap),
148 parks: parks.session_with_builder(&cap),
149 messages_sent: messages_sent.session_with_builder(&cap),
150 messages_received: messages_received.session_with_builder(&cap),
151 schedules_duration: schedules_duration.session_with_builder(&cap),
152 schedules_histogram: schedules_histogram.session_with_builder(&cap),
153 batches_sent: batches_sent.session_with_builder(&cap),
154 batches_received: batches_received.session_with_builder(&cap),
155 };
156
157 for (time, event) in data.drain(..) {
158 if let TimelyEvent::Messages(msg) = &event {
159 match msg.is_send {
160 true => assert_eq!(msg.source, worker_id),
161 false => assert_eq!(msg.target, worker_id),
162 }
163 }
164
165 DemuxHandler {
166 state: &mut demux_state,
167 shared_state: &mut shared_state.borrow_mut(),
168 output: &mut output_buffers,
169 logging_interval_ms,
170 peers,
171 time,
172 }
173 .handle(event);
174 }
175 });
176 }
177 });
178
179 let operates = consolidate_and_pack::<
184 ColumnationChunker<_>,
185 KeyValBatcher<_, _, _, _>,
186 ColumnBuilder<_>,
187 _,
188 _,
189 _,
190 >(
191 operates,
192 TimelyLog::Operates,
193 move |data, packer, session| {
194 for ((id, name), time, diff) in data.iter() {
195 let data = packer.pack_slice(&[
196 Datum::UInt64(u64::cast_from(*id)),
197 Datum::UInt64(u64::cast_from(worker_id)),
198 Datum::String(name),
199 ]);
200 session.give((data, time, diff));
201 }
202 },
203 );
204
205 let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
208 Pipeline,
209 "ToRow Channels",
210 |_cap, _info| {
211 let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
212 move |input, output| {
213 input.for_each_time(|time, data| {
214 let mut session = output.session_with_builder(&time);
215 for d in data.flat_map(|c| c.borrow().into_index_iter()) {
216 let ((datum, ()), time, diff) = d;
217 let (source_node, source_port) = datum.source;
218 let (target_node, target_port) = datum.target;
219 let data = packer.pack_slice(&[
220 Datum::UInt64(u64::cast_from(datum.id)),
221 Datum::UInt64(u64::cast_from(worker_id)),
222 Datum::UInt64(u64::cast_from(source_node)),
223 Datum::UInt64(u64::cast_from(source_port)),
224 Datum::UInt64(u64::cast_from(target_node)),
225 Datum::UInt64(u64::cast_from(target_port)),
226 Datum::String(
227 std::str::from_utf8(datum.typ).expect("valid string"),
228 ),
229 ]);
230 session.give((data, time, diff));
231 }
232 });
233 }
234 },
235 );
236
237 type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
239 type KB<K, T, D> = KeyBatcher<K, T, D>;
240
241 let addresses = consolidate_and_pack::<
242 ColumnationChunker<_>,
243 KVB<_, _, _, _>,
244 ColumnBuilder<_>,
245 _,
246 _,
247 _,
248 >(
249 addresses,
250 TimelyLog::Addresses,
251 move |data, packer, session| {
252 for ((id, address), time, diff) in data.iter() {
253 let data = packer.pack_by_index(|packer, index| match index {
254 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
255 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
256 2 => {
257 let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
258 packer.push_list(list)
259 }
260 _ => unreachable!("Addresses relation has three columns"),
261 });
262 session.give((data, time, diff));
263 }
264 },
265 );
266
267 let parks =
268 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
269 parks,
270 TimelyLog::Parks,
271 move |data, packer, session| {
272 for ((datum, ()), time, diff) in data.iter() {
273 let data = packer.pack_slice(&[
274 Datum::UInt64(u64::cast_from(worker_id)),
275 Datum::UInt64(datum.duration_pow),
276 datum
277 .requested_pow
278 .map(Datum::UInt64)
279 .unwrap_or(Datum::Null),
280 ]);
281 session.give((data, time, diff));
282 }
283 },
284 );
285
286 let batches_sent =
287 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
288 batches_sent,
289 TimelyLog::BatchesSent,
290 move |data, packer, session| {
291 for ((datum, ()), time, diff) in data.iter() {
292 let data = packer.pack_slice(&[
293 Datum::UInt64(u64::cast_from(datum.channel)),
294 Datum::UInt64(u64::cast_from(worker_id)),
295 Datum::UInt64(u64::cast_from(datum.worker)),
296 ]);
297 session.give((data, time, diff));
298 }
299 },
300 );
301
302 let batches_received =
303 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
304 batches_received,
305 TimelyLog::BatchesReceived,
306 move |data, packer, session| {
307 for ((datum, ()), time, diff) in data.iter() {
308 let data = packer.pack_slice(&[
309 Datum::UInt64(u64::cast_from(datum.channel)),
310 Datum::UInt64(u64::cast_from(datum.worker)),
311 Datum::UInt64(u64::cast_from(worker_id)),
312 ]);
313 session.give((data, time, diff));
314 }
315 },
316 );
317
318 let messages_sent =
319 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
320 messages_sent,
321 TimelyLog::MessagesSent,
322 move |data, packer, session| {
323 for ((datum, ()), time, diff) in data.iter() {
324 let data = packer.pack_slice(&[
325 Datum::UInt64(u64::cast_from(datum.channel)),
326 Datum::UInt64(u64::cast_from(worker_id)),
327 Datum::UInt64(u64::cast_from(datum.worker)),
328 ]);
329 session.give((data, time, diff));
330 }
331 },
332 );
333
334 let messages_received =
335 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
336 messages_received,
337 TimelyLog::MessagesReceived,
338 move |data, packer, session| {
339 for ((datum, ()), time, diff) in data.iter() {
340 let data = packer.pack_slice(&[
341 Datum::UInt64(u64::cast_from(datum.channel)),
342 Datum::UInt64(u64::cast_from(datum.worker)),
343 Datum::UInt64(u64::cast_from(worker_id)),
344 ]);
345 session.give((data, time, diff));
346 }
347 },
348 );
349
350 let elapsed =
351 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
352 schedules_duration,
353 TimelyLog::Elapsed,
354 move |data, packer, session| {
355 for ((operator, ()), time, diff) in data.iter() {
356 let data = packer.pack_slice(&[
357 Datum::UInt64(u64::cast_from(*operator)),
358 Datum::UInt64(u64::cast_from(worker_id)),
359 ]);
360 session.give((data, time, diff));
361 }
362 },
363 );
364
365 let histogram =
366 consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
367 schedules_histogram,
368 TimelyLog::Histogram,
369 move |data, packer, session| {
370 for ((datum, ()), time, diff) in data.iter() {
371 let data = packer.pack_slice(&[
372 Datum::UInt64(u64::cast_from(datum.operator)),
373 Datum::UInt64(u64::cast_from(worker_id)),
374 Datum::UInt64(datum.duration_pow),
375 ]);
376 session.give((data, time, diff));
377 }
378 },
379 );
380
381 let logs = {
382 use TimelyLog::*;
383 [
384 (Operates, operates),
385 (Channels, channels),
386 (Elapsed, elapsed),
387 (Histogram, histogram),
388 (Addresses, addresses),
389 (Parks, parks),
390 (MessagesSent, messages_sent),
391 (MessagesReceived, messages_received),
392 (BatchesSent, batches_sent),
393 (BatchesReceived, batches_received),
394 ]
395 };
396
397 let mut collections = BTreeMap::new();
399 for (variant, collection) in logs {
400 let variant = LogVariant::Timely(variant);
401 if config.index_logs.contains_key(&variant) {
402 type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
404 type Builder<T, R> = RowRowBuilder<T, R>;
405 let trace = collection
406 .mz_arrange_core::<
407 _,
408 batcher::Chunker<_>,
409 Batcher<_, _, _, _>,
410 Builder<_, _>,
411 RowRowSpine<_, _>,
412 >(
413 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
414 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
415 ),
416 &format!("Arrange {variant:?}"),
417 )
418 .trace;
419 let combined_token: Rc<dyn std::any::Any> =
420 Rc::new((Rc::clone(&token), Rc::clone(&storage_token)));
421 let collection = LogCollection {
422 trace,
423 token: combined_token,
424 };
425 collections.insert(variant, collection);
426 }
427 }
428
429 Return { collections }
430 })
431}
432
433#[derive(Default)]
435struct DemuxState {
436 operators: BTreeMap<usize, OperatesEvent>,
438 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
440 last_park: Option<Park>,
442 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
444 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
446 schedule_starts: Vec<(usize, Duration)>,
448 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
451}
452
453struct Park {
454 time: Duration,
456 requested: Option<Duration>,
458}
459
460#[derive(Default, Copy, Clone, Debug)]
462struct MessageCount {
463 batches: i64,
465 records: Diff,
467}
468
469struct DemuxOutput<'a, 'b> {
475 operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
476 channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
477 addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
478 parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
479 batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
480 batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
481 messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
482 messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
483 schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
484 schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
485}
486
487#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
488struct ChannelDatum {
489 id: usize,
490 source: (usize, usize),
491 target: (usize, usize),
492 typ: String,
493}
494
495#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
496struct ParkDatum {
497 duration_pow: u64,
498 requested_pow: Option<u64>,
499}
500
501impl Columnation for ParkDatum {
502 type InnerRegion = CopyRegion<Self>;
503}
504
505#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
506struct MessageDatum {
507 channel: usize,
508 worker: usize,
509}
510
511impl Columnation for MessageDatum {
512 type InnerRegion = CopyRegion<Self>;
513}
514
515#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
516struct ScheduleHistogramDatum {
517 operator: usize,
518 duration_pow: u64,
519}
520
521impl Columnation for ScheduleHistogramDatum {
522 type InnerRegion = CopyRegion<Self>;
523}
524
525struct DemuxHandler<'a, 'b, 'c> {
527 state: &'a mut DemuxState,
529 shared_state: &'a mut SharedLoggingState,
531 output: &'a mut DemuxOutput<'b, 'c>,
533 logging_interval_ms: u128,
535 peers: usize,
537 time: Duration,
539}
540
541impl DemuxHandler<'_, '_, '_> {
542 fn ts(&self) -> Timestamp {
545 let time_ms = self.time.as_millis();
546 let interval = self.logging_interval_ms;
547 let rounded = (time_ms / interval + 1) * interval;
548 rounded.try_into().expect("must fit")
549 }
550
551 fn handle(&mut self, event: TimelyEvent) {
553 use TimelyEvent::*;
554
555 match event {
556 Operates(e) => self.handle_operates(e),
557 Channels(e) => self.handle_channels(e),
558 Shutdown(e) => self.handle_shutdown(e),
559 Park(e) => self.handle_park(e),
560 Messages(e) => self.handle_messages(e),
561 Schedule(e) => self.handle_schedule(e),
562 _ => (),
563 }
564 }
565
566 fn handle_operates(&mut self, event: OperatesEvent) {
567 let ts = self.ts();
568 let datum = (event.id, event.name.clone());
569 self.output.operates.give((datum, ts, Diff::ONE));
570
571 let datum = (event.id, event.addr.clone());
572 self.output.addresses.give((datum, ts, Diff::ONE));
573
574 self.state.operators.insert(event.id, event);
575 }
576
577 fn handle_channels(&mut self, event: ChannelsEvent) {
578 let ts = self.ts();
579 let datum = ChannelDatumReference {
580 id: event.id,
581 source: event.source,
582 target: event.target,
583 typ: &event.typ,
584 };
585 self.output.channels.give(((datum, ()), ts, Diff::ONE));
586
587 let datum = (event.id, event.scope_addr.clone());
588 self.output.addresses.give((datum, ts, Diff::ONE));
589
590 let dataflow_index = event.scope_addr[0];
591 self.state
592 .dataflow_channels
593 .entry(dataflow_index)
594 .or_default()
595 .push(event);
596 }
597
598 fn handle_shutdown(&mut self, event: ShutdownEvent) {
599 let Some(operator) = self.state.operators.remove(&event.id) else {
605 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
606 return;
607 };
608
609 let ts = self.ts();
611 let datum = (operator.id, operator.name);
612 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
613
614 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
616 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
617 .enumerate()
618 .filter(|(_, (count, _))| *count != 0)
619 {
620 self.output
621 .schedules_duration
622 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
623
624 let datum = ScheduleHistogramDatum {
625 operator: event.id,
626 duration_pow: 1 << bucket,
627 };
628 let diff = Diff::cast_from(-count);
629 self.output
630 .schedules_histogram
631 .give(((datum, ()), ts, diff));
632 }
633 }
634
635 if operator.addr.len() == 1 {
636 let dataflow_index = operator.addr[0];
637 self.handle_dataflow_shutdown(dataflow_index);
638 }
639
640 let datum = (operator.id, operator.addr);
641 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
642 }
643
644 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
645 self.shared_state.compute_logger.as_ref().map(|logger| {
647 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
648 });
649
650 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
652 return;
653 };
654
655 let ts = self.ts();
656 for channel in channels {
657 let datum = ChannelDatumReference {
659 id: channel.id,
660 source: channel.source,
661 target: channel.target,
662 typ: &channel.typ,
663 };
664 self.output
665 .channels
666 .give(((datum, ()), ts, Diff::MINUS_ONE));
667
668 let datum = (channel.id, channel.scope_addr);
669 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
670
671 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
673 for (target_worker, count) in sent.iter().enumerate() {
674 let datum = MessageDatum {
675 channel: channel.id,
676 worker: target_worker,
677 };
678 self.output
679 .messages_sent
680 .give(((datum, ()), ts, Diff::from(-count.records)));
681 self.output
682 .batches_sent
683 .give(((datum, ()), ts, Diff::from(-count.batches)));
684 }
685 }
686 if let Some(received) = self.state.messages_received.remove(&channel.id) {
687 for (source_worker, count) in received.iter().enumerate() {
688 let datum = MessageDatum {
689 channel: channel.id,
690 worker: source_worker,
691 };
692 self.output.messages_received.give((
693 (datum, ()),
694 ts,
695 Diff::from(-count.records),
696 ));
697 self.output.batches_received.give((
698 (datum, ()),
699 ts,
700 Diff::from(-count.batches),
701 ));
702 }
703 }
704 }
705 }
706
707 fn handle_park(&mut self, event: ParkEvent) {
708 match event {
709 ParkEvent::Park(requested) => {
710 let park = Park {
711 time: self.time,
712 requested,
713 };
714 let existing = self.state.last_park.replace(park);
715 if existing.is_some() {
716 error!("park without a succeeding unpark");
717 }
718 }
719 ParkEvent::Unpark => {
720 let Some(park) = self.state.last_park.take() else {
721 error!("unpark without a preceding park");
722 return;
723 };
724
725 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
726 let duration_pow =
727 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
728 let requested_pow = park
729 .requested
730 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
731
732 let ts = self.ts();
733 let datum = ParkDatum {
734 duration_pow,
735 requested_pow,
736 };
737 self.output.parks.give(((datum, ()), ts, Diff::ONE));
738 }
739 }
740 }
741
742 fn handle_messages(&mut self, event: MessagesEvent) {
743 let ts = self.ts();
744 let count = Diff::from(event.record_count);
745
746 if event.is_send {
747 let datum = MessageDatum {
748 channel: event.channel,
749 worker: event.target,
750 };
751 self.output.messages_sent.give(((datum, ()), ts, count));
752 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
753
754 let sent_counts = self
755 .state
756 .messages_sent
757 .entry(event.channel)
758 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
759 sent_counts[event.target].records += count;
760 sent_counts[event.target].batches += 1;
761 } else {
762 let datum = MessageDatum {
763 channel: event.channel,
764 worker: event.source,
765 };
766 self.output.messages_received.give(((datum, ()), ts, count));
767 self.output
768 .batches_received
769 .give(((datum, ()), ts, Diff::ONE));
770
771 let received_counts = self
772 .state
773 .messages_received
774 .entry(event.channel)
775 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
776 received_counts[event.source].records += count;
777 received_counts[event.source].batches += 1;
778 }
779 }
780
781 fn handle_schedule(&mut self, event: ScheduleEvent) {
782 match event.start_stop {
783 timely::logging::StartStop::Start => {
784 self.state.schedule_starts.push((event.id, self.time));
785 }
786 timely::logging::StartStop::Stop => {
787 let Some((old_id, start_time)) = self.state.schedule_starts.pop() else {
788 error!(operator_id = ?event.id, "schedule stop without preceding start");
789 return;
790 };
791
792 if old_id != event.id {
793 error!(start_id = ?old_id, stop_id = ?event.id, "schedule stop without preceding start");
794 return;
795 }
796
797 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
798 let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
799 let elapsed_diff = Diff::from(elapsed_i64);
800 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
801
802 let ts = self.ts();
803 let datum = event.id;
804 self.output
805 .schedules_duration
806 .give(((datum, ()), ts, elapsed_diff));
807
808 let datum = ScheduleHistogramDatum {
809 operator: event.id,
810 duration_pow: elapsed_pow,
811 };
812 self.output
813 .schedules_histogram
814 .give(((datum, ()), ts, Diff::ONE));
815
816 let index = usize::cast_from(elapsed_pow.trailing_zeros());
818 let data = self.state.schedules_data.entry(event.id).or_default();
819 grow_vec(data, index);
820 let (count, duration) = &mut data[index];
821 *count += 1;
822 *duration += elapsed_diff;
823 }
824 }
825 }
826}
827
828fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
832where
833 T: Clone + Default,
834{
835 if vec.len() <= index {
836 vec.resize(index + 1, Default::default());
837 }
838}