1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use columnation::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::dataflow::operators::generic::operator::empty;
31use timely::logging::{
32 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33 TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40 EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41 Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47pub(super) struct Return {
49 pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53pub(super) fn construct(
60 scope: Scope<'_, Timestamp>,
61 config: &LoggingConfig,
62 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63 shared_state: Rc<RefCell<SharedLoggingState>>,
64 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
65) -> Return {
66 scope.scoped("timely logging", move |scope| {
67 let enable_logging = config.enable_logging;
68 let (logs, token) = if enable_logging {
69 event_queue.links.mz_replay(
70 scope,
71 "timely logs",
72 config.interval,
73 event_queue.activator,
74 )
75 } else {
76 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
77 (empty(scope), token)
78 };
79
80 let (logs, storage_token) = if let Some(reader) = storage_log_reader {
82 use mz_timely_util::activator::RcActivator;
83 use timely::dataflow::operators::Concatenate;
84
85 let activator = RcActivator::new("storage_timely_activator".to_string(), 128);
86 let (storage_logs, s_token) =
87 [reader].mz_replay(scope, "storage timely logs", config.interval, activator);
88 let merged = scope.concatenate([logs, storage_logs]);
89 (merged, Some(s_token))
90 } else {
91 (logs, None)
92 };
93 let storage_token: Rc<dyn std::any::Any> = match storage_token {
95 Some(t) => t,
96 None => Rc::new(()),
97 };
98
99 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
102 let mut input = demux.new_input(logs, Pipeline);
103 let (operates_out, operates) = demux.new_output();
104 let mut operates_out = OutputBuilder::from(operates_out);
105 let (channels_out, channels) = demux.new_output();
106 let mut channels_out = OutputBuilder::from(channels_out);
107 let (addresses_out, addresses) = demux.new_output();
108 let mut addresses_out = OutputBuilder::from(addresses_out);
109 let (parks_out, parks) = demux.new_output();
110 let mut parks_out = OutputBuilder::from(parks_out);
111 let (messages_sent_out, messages_sent) = demux.new_output();
112 let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
113 let (messages_received_out, messages_received) = demux.new_output();
114 let mut messages_received_out = OutputBuilder::from(messages_received_out);
115 let (schedules_duration_out, schedules_duration) = demux.new_output();
116 let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
117 let (schedules_histogram_out, schedules_histogram) = demux.new_output();
118 let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
119 let (batches_sent_out, batches_sent) = demux.new_output();
120 let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
121 let (batches_received_out, batches_received) = demux.new_output();
122 let mut batches_received_out = OutputBuilder::from(batches_received_out);
123
124 let worker_id = scope.index();
125 let mut demux_state = DemuxState::default();
126 demux.build(|_capability| {
127 let peers = scope.peers();
128 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
129 move |_frontiers| {
130 let mut operates = operates_out.activate();
131 let mut channels = channels_out.activate();
132 let mut addresses = addresses_out.activate();
133 let mut parks = parks_out.activate();
134 let mut messages_sent = messages_sent_out.activate();
135 let mut messages_received = messages_received_out.activate();
136 let mut batches_sent = batches_sent_out.activate();
137 let mut batches_received = batches_received_out.activate();
138 let mut schedules_duration = schedules_duration_out.activate();
139 let mut schedules_histogram = schedules_histogram_out.activate();
140
141 input.for_each(|cap, data| {
142 let mut output_buffers = DemuxOutput {
143 operates: operates.session_with_builder(&cap),
144 channels: channels.session_with_builder(&cap),
145 addresses: addresses.session_with_builder(&cap),
146 parks: parks.session_with_builder(&cap),
147 messages_sent: messages_sent.session_with_builder(&cap),
148 messages_received: messages_received.session_with_builder(&cap),
149 schedules_duration: schedules_duration.session_with_builder(&cap),
150 schedules_histogram: schedules_histogram.session_with_builder(&cap),
151 batches_sent: batches_sent.session_with_builder(&cap),
152 batches_received: batches_received.session_with_builder(&cap),
153 };
154
155 for (time, event) in data.drain(..) {
156 if let TimelyEvent::Messages(msg) = &event {
157 match msg.is_send {
158 true => assert_eq!(msg.source, worker_id),
159 false => assert_eq!(msg.target, worker_id),
160 }
161 }
162
163 DemuxHandler {
164 state: &mut demux_state,
165 shared_state: &mut shared_state.borrow_mut(),
166 output: &mut output_buffers,
167 logging_interval_ms,
168 peers,
169 time,
170 }
171 .handle(event);
172 }
173 });
174 }
175 });
176
177 let operates = consolidate_and_pack::<KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
182 operates,
183 TimelyLog::Operates,
184 move |data, packer, session| {
185 for ((id, name), time, diff) in data.iter() {
186 let data = packer.pack_slice(&[
187 Datum::UInt64(u64::cast_from(*id)),
188 Datum::UInt64(u64::cast_from(worker_id)),
189 Datum::String(name),
190 ]);
191 session.give((data, time, diff));
192 }
193 },
194 );
195
196 let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
199 Pipeline,
200 "ToRow Channels",
201 |_cap, _info| {
202 let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
203 move |input, output| {
204 input.for_each_time(|time, data| {
205 let mut session = output.session_with_builder(&time);
206 for d in data.flat_map(|c| c.borrow().into_index_iter()) {
207 let ((datum, ()), time, diff) = d;
208 let (source_node, source_port) = datum.source;
209 let (target_node, target_port) = datum.target;
210 let data = packer.pack_slice(&[
211 Datum::UInt64(u64::cast_from(datum.id)),
212 Datum::UInt64(u64::cast_from(worker_id)),
213 Datum::UInt64(u64::cast_from(source_node)),
214 Datum::UInt64(u64::cast_from(source_port)),
215 Datum::UInt64(u64::cast_from(target_node)),
216 Datum::UInt64(u64::cast_from(target_port)),
217 Datum::String(
218 std::str::from_utf8(datum.typ).expect("valid string"),
219 ),
220 ]);
221 session.give((data, time, diff));
222 }
223 });
224 }
225 },
226 );
227
228 type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
230 type KB<K, T, D> = KeyBatcher<K, T, D>;
231
232 let addresses = consolidate_and_pack::<KVB<_, _, _, _>, ColumnBuilder<_>, _, _>(
233 addresses,
234 TimelyLog::Addresses,
235 move |data, packer, session| {
236 for ((id, address), time, diff) in data.iter() {
237 let data = packer.pack_by_index(|packer, index| match index {
238 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
239 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
240 2 => {
241 let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
242 packer.push_list(list)
243 }
244 _ => unreachable!("Addresses relation has three columns"),
245 });
246 session.give((data, time, diff));
247 }
248 },
249 );
250
251 let parks = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
252 parks,
253 TimelyLog::Parks,
254 move |data, packer, session| {
255 for ((datum, ()), time, diff) in data.iter() {
256 let data = packer.pack_slice(&[
257 Datum::UInt64(u64::cast_from(worker_id)),
258 Datum::UInt64(datum.duration_pow),
259 datum
260 .requested_pow
261 .map(Datum::UInt64)
262 .unwrap_or(Datum::Null),
263 ]);
264 session.give((data, time, diff));
265 }
266 },
267 );
268
269 let batches_sent = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
270 batches_sent,
271 TimelyLog::BatchesSent,
272 move |data, packer, session| {
273 for ((datum, ()), time, diff) in data.iter() {
274 let data = packer.pack_slice(&[
275 Datum::UInt64(u64::cast_from(datum.channel)),
276 Datum::UInt64(u64::cast_from(worker_id)),
277 Datum::UInt64(u64::cast_from(datum.worker)),
278 ]);
279 session.give((data, time, diff));
280 }
281 },
282 );
283
284 let batches_received = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
285 batches_received,
286 TimelyLog::BatchesReceived,
287 move |data, packer, session| {
288 for ((datum, ()), time, diff) in data.iter() {
289 let data = packer.pack_slice(&[
290 Datum::UInt64(u64::cast_from(datum.channel)),
291 Datum::UInt64(u64::cast_from(datum.worker)),
292 Datum::UInt64(u64::cast_from(worker_id)),
293 ]);
294 session.give((data, time, diff));
295 }
296 },
297 );
298
299 let messages_sent = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
300 messages_sent,
301 TimelyLog::MessagesSent,
302 move |data, packer, session| {
303 for ((datum, ()), time, diff) in data.iter() {
304 let data = packer.pack_slice(&[
305 Datum::UInt64(u64::cast_from(datum.channel)),
306 Datum::UInt64(u64::cast_from(worker_id)),
307 Datum::UInt64(u64::cast_from(datum.worker)),
308 ]);
309 session.give((data, time, diff));
310 }
311 },
312 );
313
314 let messages_received = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
315 messages_received,
316 TimelyLog::MessagesReceived,
317 move |data, packer, session| {
318 for ((datum, ()), time, diff) in data.iter() {
319 let data = packer.pack_slice(&[
320 Datum::UInt64(u64::cast_from(datum.channel)),
321 Datum::UInt64(u64::cast_from(datum.worker)),
322 Datum::UInt64(u64::cast_from(worker_id)),
323 ]);
324 session.give((data, time, diff));
325 }
326 },
327 );
328
329 let elapsed = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
330 schedules_duration,
331 TimelyLog::Elapsed,
332 move |data, packer, session| {
333 for ((operator, ()), time, diff) in data.iter() {
334 let data = packer.pack_slice(&[
335 Datum::UInt64(u64::cast_from(*operator)),
336 Datum::UInt64(u64::cast_from(worker_id)),
337 ]);
338 session.give((data, time, diff));
339 }
340 },
341 );
342
343 let histogram = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
344 schedules_histogram,
345 TimelyLog::Histogram,
346 move |data, packer, session| {
347 for ((datum, ()), time, diff) in data.iter() {
348 let data = packer.pack_slice(&[
349 Datum::UInt64(u64::cast_from(datum.operator)),
350 Datum::UInt64(u64::cast_from(worker_id)),
351 Datum::UInt64(datum.duration_pow),
352 ]);
353 session.give((data, time, diff));
354 }
355 },
356 );
357
358 let logs = {
359 use TimelyLog::*;
360 [
361 (Operates, operates),
362 (Channels, channels),
363 (Elapsed, elapsed),
364 (Histogram, histogram),
365 (Addresses, addresses),
366 (Parks, parks),
367 (MessagesSent, messages_sent),
368 (MessagesReceived, messages_received),
369 (BatchesSent, batches_sent),
370 (BatchesReceived, batches_received),
371 ]
372 };
373
374 let mut collections = BTreeMap::new();
376 for (variant, collection) in logs {
377 let variant = LogVariant::Timely(variant);
378 if config.index_logs.contains_key(&variant) {
379 type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
381 type Builder<T, R> = RowRowBuilder<T, R>;
382 let trace = collection
383 .mz_arrange_core::<_, Batcher<_, _, _, _>, Builder<_, _>, RowRowSpine<_, _>>(
384 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
385 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
386 ),
387 &format!("Arrange {variant:?}"),
388 )
389 .trace;
390 let combined_token: Rc<dyn std::any::Any> =
391 Rc::new((Rc::clone(&token), Rc::clone(&storage_token)));
392 let collection = LogCollection {
393 trace,
394 token: combined_token,
395 };
396 collections.insert(variant, collection);
397 }
398 }
399
400 Return { collections }
401 })
402}
403
404#[derive(Default)]
406struct DemuxState {
407 operators: BTreeMap<usize, OperatesEvent>,
409 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
411 last_park: Option<Park>,
413 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
415 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
417 schedule_starts: Vec<(usize, Duration)>,
419 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
422}
423
424struct Park {
425 time: Duration,
427 requested: Option<Duration>,
429}
430
431#[derive(Default, Copy, Clone, Debug)]
433struct MessageCount {
434 batches: i64,
436 records: Diff,
438}
439
440struct DemuxOutput<'a, 'b> {
446 operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
447 channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
448 addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
449 parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
450 batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
451 batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
452 messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
453 messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
454 schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
455 schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
456}
457
458#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
459struct ChannelDatum {
460 id: usize,
461 source: (usize, usize),
462 target: (usize, usize),
463 typ: String,
464}
465
466#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
467struct ParkDatum {
468 duration_pow: u64,
469 requested_pow: Option<u64>,
470}
471
472impl Columnation for ParkDatum {
473 type InnerRegion = CopyRegion<Self>;
474}
475
476#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
477struct MessageDatum {
478 channel: usize,
479 worker: usize,
480}
481
482impl Columnation for MessageDatum {
483 type InnerRegion = CopyRegion<Self>;
484}
485
486#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
487struct ScheduleHistogramDatum {
488 operator: usize,
489 duration_pow: u64,
490}
491
492impl Columnation for ScheduleHistogramDatum {
493 type InnerRegion = CopyRegion<Self>;
494}
495
496struct DemuxHandler<'a, 'b, 'c> {
498 state: &'a mut DemuxState,
500 shared_state: &'a mut SharedLoggingState,
502 output: &'a mut DemuxOutput<'b, 'c>,
504 logging_interval_ms: u128,
506 peers: usize,
508 time: Duration,
510}
511
512impl DemuxHandler<'_, '_, '_> {
513 fn ts(&self) -> Timestamp {
516 let time_ms = self.time.as_millis();
517 let interval = self.logging_interval_ms;
518 let rounded = (time_ms / interval + 1) * interval;
519 rounded.try_into().expect("must fit")
520 }
521
522 fn handle(&mut self, event: TimelyEvent) {
524 use TimelyEvent::*;
525
526 match event {
527 Operates(e) => self.handle_operates(e),
528 Channels(e) => self.handle_channels(e),
529 Shutdown(e) => self.handle_shutdown(e),
530 Park(e) => self.handle_park(e),
531 Messages(e) => self.handle_messages(e),
532 Schedule(e) => self.handle_schedule(e),
533 _ => (),
534 }
535 }
536
537 fn handle_operates(&mut self, event: OperatesEvent) {
538 let ts = self.ts();
539 let datum = (event.id, event.name.clone());
540 self.output.operates.give((datum, ts, Diff::ONE));
541
542 let datum = (event.id, event.addr.clone());
543 self.output.addresses.give((datum, ts, Diff::ONE));
544
545 self.state.operators.insert(event.id, event);
546 }
547
548 fn handle_channels(&mut self, event: ChannelsEvent) {
549 let ts = self.ts();
550 let datum = ChannelDatumReference {
551 id: event.id,
552 source: event.source,
553 target: event.target,
554 typ: &event.typ,
555 };
556 self.output.channels.give(((datum, ()), ts, Diff::ONE));
557
558 let datum = (event.id, event.scope_addr.clone());
559 self.output.addresses.give((datum, ts, Diff::ONE));
560
561 let dataflow_index = event.scope_addr[0];
562 self.state
563 .dataflow_channels
564 .entry(dataflow_index)
565 .or_default()
566 .push(event);
567 }
568
569 fn handle_shutdown(&mut self, event: ShutdownEvent) {
570 let Some(operator) = self.state.operators.remove(&event.id) else {
576 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
577 return;
578 };
579
580 let ts = self.ts();
582 let datum = (operator.id, operator.name);
583 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
584
585 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
587 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
588 .enumerate()
589 .filter(|(_, (count, _))| *count != 0)
590 {
591 self.output
592 .schedules_duration
593 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
594
595 let datum = ScheduleHistogramDatum {
596 operator: event.id,
597 duration_pow: 1 << bucket,
598 };
599 let diff = Diff::cast_from(-count);
600 self.output
601 .schedules_histogram
602 .give(((datum, ()), ts, diff));
603 }
604 }
605
606 if operator.addr.len() == 1 {
607 let dataflow_index = operator.addr[0];
608 self.handle_dataflow_shutdown(dataflow_index);
609 }
610
611 let datum = (operator.id, operator.addr);
612 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
613 }
614
615 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
616 self.shared_state.compute_logger.as_ref().map(|logger| {
618 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
619 });
620
621 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
623 return;
624 };
625
626 let ts = self.ts();
627 for channel in channels {
628 let datum = ChannelDatumReference {
630 id: channel.id,
631 source: channel.source,
632 target: channel.target,
633 typ: &channel.typ,
634 };
635 self.output
636 .channels
637 .give(((datum, ()), ts, Diff::MINUS_ONE));
638
639 let datum = (channel.id, channel.scope_addr);
640 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
641
642 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
644 for (target_worker, count) in sent.iter().enumerate() {
645 let datum = MessageDatum {
646 channel: channel.id,
647 worker: target_worker,
648 };
649 self.output
650 .messages_sent
651 .give(((datum, ()), ts, Diff::from(-count.records)));
652 self.output
653 .batches_sent
654 .give(((datum, ()), ts, Diff::from(-count.batches)));
655 }
656 }
657 if let Some(received) = self.state.messages_received.remove(&channel.id) {
658 for (source_worker, count) in received.iter().enumerate() {
659 let datum = MessageDatum {
660 channel: channel.id,
661 worker: source_worker,
662 };
663 self.output.messages_received.give((
664 (datum, ()),
665 ts,
666 Diff::from(-count.records),
667 ));
668 self.output.batches_received.give((
669 (datum, ()),
670 ts,
671 Diff::from(-count.batches),
672 ));
673 }
674 }
675 }
676 }
677
678 fn handle_park(&mut self, event: ParkEvent) {
679 match event {
680 ParkEvent::Park(requested) => {
681 let park = Park {
682 time: self.time,
683 requested,
684 };
685 let existing = self.state.last_park.replace(park);
686 if existing.is_some() {
687 error!("park without a succeeding unpark");
688 }
689 }
690 ParkEvent::Unpark => {
691 let Some(park) = self.state.last_park.take() else {
692 error!("unpark without a preceding park");
693 return;
694 };
695
696 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
697 let duration_pow =
698 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
699 let requested_pow = park
700 .requested
701 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
702
703 let ts = self.ts();
704 let datum = ParkDatum {
705 duration_pow,
706 requested_pow,
707 };
708 self.output.parks.give(((datum, ()), ts, Diff::ONE));
709 }
710 }
711 }
712
713 fn handle_messages(&mut self, event: MessagesEvent) {
714 let ts = self.ts();
715 let count = Diff::from(event.record_count);
716
717 if event.is_send {
718 let datum = MessageDatum {
719 channel: event.channel,
720 worker: event.target,
721 };
722 self.output.messages_sent.give(((datum, ()), ts, count));
723 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
724
725 let sent_counts = self
726 .state
727 .messages_sent
728 .entry(event.channel)
729 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
730 sent_counts[event.target].records += count;
731 sent_counts[event.target].batches += 1;
732 } else {
733 let datum = MessageDatum {
734 channel: event.channel,
735 worker: event.source,
736 };
737 self.output.messages_received.give(((datum, ()), ts, count));
738 self.output
739 .batches_received
740 .give(((datum, ()), ts, Diff::ONE));
741
742 let received_counts = self
743 .state
744 .messages_received
745 .entry(event.channel)
746 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
747 received_counts[event.source].records += count;
748 received_counts[event.source].batches += 1;
749 }
750 }
751
752 fn handle_schedule(&mut self, event: ScheduleEvent) {
753 match event.start_stop {
754 timely::logging::StartStop::Start => {
755 self.state.schedule_starts.push((event.id, self.time));
756 }
757 timely::logging::StartStop::Stop => {
758 let Some((old_id, start_time)) = self.state.schedule_starts.pop() else {
759 error!(operator_id = ?event.id, "schedule stop without preceding start");
760 return;
761 };
762
763 if old_id != event.id {
764 error!(start_id = ?old_id, stop_id = ?event.id, "schedule stop without preceding start");
765 return;
766 }
767
768 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
769 let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
770 let elapsed_diff = Diff::from(elapsed_i64);
771 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
772
773 let ts = self.ts();
774 let datum = event.id;
775 self.output
776 .schedules_duration
777 .give(((datum, ()), ts, elapsed_diff));
778
779 let datum = ScheduleHistogramDatum {
780 operator: event.id,
781 duration_pow: elapsed_pow,
782 };
783 self.output
784 .schedules_histogram
785 .give(((datum, ()), ts, Diff::ONE));
786
787 let index = usize::cast_from(elapsed_pow.trailing_zeros());
789 let data = self.state.schedules_data.entry(event.id).or_default();
790 grow_vec(data, index);
791 let (count, duration) = &mut data[index];
792 *count += 1;
793 *duration += elapsed_diff;
794 }
795 }
796 }
797}
798
799fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
803where
804 T: Clone + Default,
805{
806 if vec.len() <= index {
807 vec.resize(index + 1, Default::default());
808 }
809}