1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::dataflow::operators::generic::operator::empty;
31use timely::logging::{
32 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33 TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40 EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41 Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47pub(super) struct Return {
49 pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
60 mut scope: G,
61 config: &LoggingConfig,
62 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63 shared_state: Rc<RefCell<SharedLoggingState>>,
64) -> Return {
65 scope.scoped("timely logging", move |scope| {
66 let enable_logging = config.enable_logging;
67 let (logs, token) = if enable_logging {
68 event_queue.links.mz_replay(
69 scope,
70 "timely logs",
71 config.interval,
72 event_queue.activator,
73 )
74 } else {
75 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
76 (empty(scope), token)
77 };
78
79 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
82 let mut input = demux.new_input(&logs, Pipeline);
83 let (operates_out, operates) = demux.new_output();
84 let mut operates_out = OutputBuilder::from(operates_out);
85 let (channels_out, channels) = demux.new_output();
86 let mut channels_out = OutputBuilder::from(channels_out);
87 let (addresses_out, addresses) = demux.new_output();
88 let mut addresses_out = OutputBuilder::from(addresses_out);
89 let (parks_out, parks) = demux.new_output();
90 let mut parks_out = OutputBuilder::from(parks_out);
91 let (messages_sent_out, messages_sent) = demux.new_output();
92 let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
93 let (messages_received_out, messages_received) = demux.new_output();
94 let mut messages_received_out = OutputBuilder::from(messages_received_out);
95 let (schedules_duration_out, schedules_duration) = demux.new_output();
96 let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
97 let (schedules_histogram_out, schedules_histogram) = demux.new_output();
98 let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
99 let (batches_sent_out, batches_sent) = demux.new_output();
100 let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
101 let (batches_received_out, batches_received) = demux.new_output();
102 let mut batches_received_out = OutputBuilder::from(batches_received_out);
103
104 let worker_id = scope.index();
105 let mut demux_state = DemuxState::default();
106 demux.build(|_capability| {
107 let peers = scope.peers();
108 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
109 move |_frontiers| {
110 let mut operates = operates_out.activate();
111 let mut channels = channels_out.activate();
112 let mut addresses = addresses_out.activate();
113 let mut parks = parks_out.activate();
114 let mut messages_sent = messages_sent_out.activate();
115 let mut messages_received = messages_received_out.activate();
116 let mut batches_sent = batches_sent_out.activate();
117 let mut batches_received = batches_received_out.activate();
118 let mut schedules_duration = schedules_duration_out.activate();
119 let mut schedules_histogram = schedules_histogram_out.activate();
120
121 input.for_each(|cap, data| {
122 let mut output_buffers = DemuxOutput {
123 operates: operates.session_with_builder(&cap),
124 channels: channels.session_with_builder(&cap),
125 addresses: addresses.session_with_builder(&cap),
126 parks: parks.session_with_builder(&cap),
127 messages_sent: messages_sent.session_with_builder(&cap),
128 messages_received: messages_received.session_with_builder(&cap),
129 schedules_duration: schedules_duration.session_with_builder(&cap),
130 schedules_histogram: schedules_histogram.session_with_builder(&cap),
131 batches_sent: batches_sent.session_with_builder(&cap),
132 batches_received: batches_received.session_with_builder(&cap),
133 };
134
135 for (time, event) in data.drain(..) {
136 if let TimelyEvent::Messages(msg) = &event {
137 match msg.is_send {
138 true => assert_eq!(msg.source, worker_id),
139 false => assert_eq!(msg.target, worker_id),
140 }
141 }
142
143 DemuxHandler {
144 state: &mut demux_state,
145 shared_state: &mut shared_state.borrow_mut(),
146 output: &mut output_buffers,
147 logging_interval_ms,
148 peers,
149 time,
150 }
151 .handle(event);
152 }
153 });
154 }
155 });
156
157 let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
162 &operates,
163 TimelyLog::Operates,
164 move |data, packer, session| {
165 for ((id, name), time, diff) in data.iter() {
166 let data = packer.pack_slice(&[
167 Datum::UInt64(u64::cast_from(*id)),
168 Datum::UInt64(u64::cast_from(worker_id)),
169 Datum::String(name),
170 ]);
171 session.give((data, time, diff));
172 }
173 },
174 );
175
176 let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
179 Pipeline,
180 "ToRow Channels",
181 |_cap, _info| {
182 let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
183 move |input, output| {
184 input.for_each_time(|time, data| {
185 let mut session = output.session_with_builder(&time);
186 for d in data.flat_map(|c| c.borrow().into_index_iter()) {
187 let ((datum, ()), time, diff) = d;
188 let (source_node, source_port) = datum.source;
189 let (target_node, target_port) = datum.target;
190 let data = packer.pack_slice(&[
191 Datum::UInt64(u64::cast_from(datum.id)),
192 Datum::UInt64(u64::cast_from(worker_id)),
193 Datum::UInt64(u64::cast_from(source_node)),
194 Datum::UInt64(u64::cast_from(source_port)),
195 Datum::UInt64(u64::cast_from(target_node)),
196 Datum::UInt64(u64::cast_from(target_port)),
197 Datum::String(datum.typ),
198 ]);
199 session.give((data, time, diff));
200 }
201 });
202 }
203 },
204 );
205
206 type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
208 type KB<K, T, D> = KeyBatcher<K, T, D>;
209
210 let addresses = consolidate_and_pack::<_, KVB<_, _, _, _>, ColumnBuilder<_>, _, _>(
211 &addresses,
212 TimelyLog::Addresses,
213 move |data, packer, session| {
214 for ((id, address), time, diff) in data.iter() {
215 let data = packer.pack_by_index(|packer, index| match index {
216 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
217 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
218 2 => {
219 let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
220 packer.push_list(list)
221 }
222 _ => unreachable!("Addresses relation has three columns"),
223 });
224 session.give((data, time, diff));
225 }
226 },
227 );
228
229 let parks = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
230 &parks,
231 TimelyLog::Parks,
232 move |data, packer, session| {
233 for ((datum, ()), time, diff) in data.iter() {
234 let data = packer.pack_slice(&[
235 Datum::UInt64(u64::cast_from(worker_id)),
236 Datum::UInt64(datum.duration_pow),
237 datum
238 .requested_pow
239 .map(Datum::UInt64)
240 .unwrap_or(Datum::Null),
241 ]);
242 session.give((data, time, diff));
243 }
244 },
245 );
246
247 let batches_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
248 &batches_sent,
249 TimelyLog::BatchesSent,
250 move |data, packer, session| {
251 for ((datum, ()), time, diff) in data.iter() {
252 let data = packer.pack_slice(&[
253 Datum::UInt64(u64::cast_from(datum.channel)),
254 Datum::UInt64(u64::cast_from(worker_id)),
255 Datum::UInt64(u64::cast_from(datum.worker)),
256 ]);
257 session.give((data, time, diff));
258 }
259 },
260 );
261
262 let batches_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
263 &batches_received,
264 TimelyLog::BatchesReceived,
265 move |data, packer, session| {
266 for ((datum, ()), time, diff) in data.iter() {
267 let data = packer.pack_slice(&[
268 Datum::UInt64(u64::cast_from(datum.channel)),
269 Datum::UInt64(u64::cast_from(datum.worker)),
270 Datum::UInt64(u64::cast_from(worker_id)),
271 ]);
272 session.give((data, time, diff));
273 }
274 },
275 );
276
277 let messages_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
278 &messages_sent,
279 TimelyLog::MessagesSent,
280 move |data, packer, session| {
281 for ((datum, ()), time, diff) in data.iter() {
282 let data = packer.pack_slice(&[
283 Datum::UInt64(u64::cast_from(datum.channel)),
284 Datum::UInt64(u64::cast_from(worker_id)),
285 Datum::UInt64(u64::cast_from(datum.worker)),
286 ]);
287 session.give((data, time, diff));
288 }
289 },
290 );
291
292 let messages_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
293 &messages_received,
294 TimelyLog::MessagesReceived,
295 move |data, packer, session| {
296 for ((datum, ()), time, diff) in data.iter() {
297 let data = packer.pack_slice(&[
298 Datum::UInt64(u64::cast_from(datum.channel)),
299 Datum::UInt64(u64::cast_from(datum.worker)),
300 Datum::UInt64(u64::cast_from(worker_id)),
301 ]);
302 session.give((data, time, diff));
303 }
304 },
305 );
306
307 let elapsed = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
308 &schedules_duration,
309 TimelyLog::Elapsed,
310 move |data, packer, session| {
311 for ((operator, ()), time, diff) in data.iter() {
312 let data = packer.pack_slice(&[
313 Datum::UInt64(u64::cast_from(*operator)),
314 Datum::UInt64(u64::cast_from(worker_id)),
315 ]);
316 session.give((data, time, diff));
317 }
318 },
319 );
320
321 let histogram = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
322 &schedules_histogram,
323 TimelyLog::Histogram,
324 move |data, packer, session| {
325 for ((datum, ()), time, diff) in data.iter() {
326 let data = packer.pack_slice(&[
327 Datum::UInt64(u64::cast_from(datum.operator)),
328 Datum::UInt64(u64::cast_from(worker_id)),
329 Datum::UInt64(datum.duration_pow),
330 ]);
331 session.give((data, time, diff));
332 }
333 },
334 );
335
336 let logs = {
337 use TimelyLog::*;
338 [
339 (Operates, operates),
340 (Channels, channels),
341 (Elapsed, elapsed),
342 (Histogram, histogram),
343 (Addresses, addresses),
344 (Parks, parks),
345 (MessagesSent, messages_sent),
346 (MessagesReceived, messages_received),
347 (BatchesSent, batches_sent),
348 (BatchesReceived, batches_received),
349 ]
350 };
351
352 let mut collections = BTreeMap::new();
354 for (variant, collection) in logs {
355 let variant = LogVariant::Timely(variant);
356 if config.index_logs.contains_key(&variant) {
357 type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
359 type Builder<T, R> = RowRowBuilder<T, R>;
360 let trace = collection
361 .mz_arrange_core::<_, Batcher<_, _, _, _>, Builder<_, _>, RowRowSpine<_, _>>(
362 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
363 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
364 ),
365 &format!("Arrange {variant:?}"),
366 )
367 .trace;
368 let collection = LogCollection {
369 trace,
370 token: Rc::clone(&token),
371 };
372 collections.insert(variant, collection);
373 }
374 }
375
376 Return { collections }
377 })
378}
379
380#[derive(Default)]
382struct DemuxState {
383 operators: BTreeMap<usize, OperatesEvent>,
385 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
387 last_park: Option<Park>,
389 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
391 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
393 schedule_starts: BTreeMap<usize, Duration>,
395 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
398}
399
400struct Park {
401 time: Duration,
403 requested: Option<Duration>,
405}
406
407#[derive(Default, Copy, Clone, Debug)]
409struct MessageCount {
410 batches: i64,
412 records: Diff,
414}
415
416struct DemuxOutput<'a, 'b> {
422 operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
423 channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
424 addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
425 parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
426 batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
427 batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
428 messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
429 messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
430 schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
431 schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
432}
433
434#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
435struct ChannelDatum {
436 id: usize,
437 source: (usize, usize),
438 target: (usize, usize),
439 typ: String,
440}
441
442#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
443struct ParkDatum {
444 duration_pow: u64,
445 requested_pow: Option<u64>,
446}
447
448impl Columnation for ParkDatum {
449 type InnerRegion = CopyRegion<Self>;
450}
451
452#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
453struct MessageDatum {
454 channel: usize,
455 worker: usize,
456}
457
458impl Columnation for MessageDatum {
459 type InnerRegion = CopyRegion<Self>;
460}
461
462#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
463struct ScheduleHistogramDatum {
464 operator: usize,
465 duration_pow: u64,
466}
467
468impl Columnation for ScheduleHistogramDatum {
469 type InnerRegion = CopyRegion<Self>;
470}
471
472struct DemuxHandler<'a, 'b, 'c> {
474 state: &'a mut DemuxState,
476 shared_state: &'a mut SharedLoggingState,
478 output: &'a mut DemuxOutput<'b, 'c>,
480 logging_interval_ms: u128,
482 peers: usize,
484 time: Duration,
486}
487
488impl DemuxHandler<'_, '_, '_> {
489 fn ts(&self) -> Timestamp {
492 let time_ms = self.time.as_millis();
493 let interval = self.logging_interval_ms;
494 let rounded = (time_ms / interval + 1) * interval;
495 rounded.try_into().expect("must fit")
496 }
497
498 fn handle(&mut self, event: TimelyEvent) {
500 use TimelyEvent::*;
501
502 match event {
503 Operates(e) => self.handle_operates(e),
504 Channels(e) => self.handle_channels(e),
505 Shutdown(e) => self.handle_shutdown(e),
506 Park(e) => self.handle_park(e),
507 Messages(e) => self.handle_messages(e),
508 Schedule(e) => self.handle_schedule(e),
509 _ => (),
510 }
511 }
512
513 fn handle_operates(&mut self, event: OperatesEvent) {
514 let ts = self.ts();
515 let datum = (event.id, event.name.clone());
516 self.output.operates.give((datum, ts, Diff::ONE));
517
518 let datum = (event.id, event.addr.clone());
519 self.output.addresses.give((datum, ts, Diff::ONE));
520
521 self.state.operators.insert(event.id, event);
522 }
523
524 fn handle_channels(&mut self, event: ChannelsEvent) {
525 let ts = self.ts();
526 let datum = ChannelDatumReference {
527 id: event.id,
528 source: event.source,
529 target: event.target,
530 typ: &event.typ,
531 };
532 self.output.channels.give(((datum, ()), ts, Diff::ONE));
533
534 let datum = (event.id, event.scope_addr.clone());
535 self.output.addresses.give((datum, ts, Diff::ONE));
536
537 let dataflow_index = event.scope_addr[0];
538 self.state
539 .dataflow_channels
540 .entry(dataflow_index)
541 .or_default()
542 .push(event);
543 }
544
545 fn handle_shutdown(&mut self, event: ShutdownEvent) {
546 let Some(operator) = self.state.operators.remove(&event.id) else {
552 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
553 return;
554 };
555
556 let ts = self.ts();
558 let datum = (operator.id, operator.name);
559 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
560
561 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
563 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
564 .enumerate()
565 .filter(|(_, (count, _))| *count != 0)
566 {
567 self.output
568 .schedules_duration
569 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
570
571 let datum = ScheduleHistogramDatum {
572 operator: event.id,
573 duration_pow: 1 << bucket,
574 };
575 let diff = Diff::cast_from(-count);
576 self.output
577 .schedules_histogram
578 .give(((datum, ()), ts, diff));
579 }
580 }
581
582 if operator.addr.len() == 1 {
583 let dataflow_index = operator.addr[0];
584 self.handle_dataflow_shutdown(dataflow_index);
585 }
586
587 let datum = (operator.id, operator.addr);
588 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
589 }
590
591 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
592 self.shared_state.compute_logger.as_ref().map(|logger| {
594 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
595 });
596
597 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
599 return;
600 };
601
602 let ts = self.ts();
603 for channel in channels {
604 let datum = ChannelDatumReference {
606 id: channel.id,
607 source: channel.source,
608 target: channel.target,
609 typ: &channel.typ,
610 };
611 self.output
612 .channels
613 .give(((datum, ()), ts, Diff::MINUS_ONE));
614
615 let datum = (channel.id, channel.scope_addr);
616 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
617
618 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
620 for (target_worker, count) in sent.iter().enumerate() {
621 let datum = MessageDatum {
622 channel: channel.id,
623 worker: target_worker,
624 };
625 self.output
626 .messages_sent
627 .give(((datum, ()), ts, Diff::from(-count.records)));
628 self.output
629 .batches_sent
630 .give(((datum, ()), ts, Diff::from(-count.batches)));
631 }
632 }
633 if let Some(received) = self.state.messages_received.remove(&channel.id) {
634 for (source_worker, count) in received.iter().enumerate() {
635 let datum = MessageDatum {
636 channel: channel.id,
637 worker: source_worker,
638 };
639 self.output.messages_received.give((
640 (datum, ()),
641 ts,
642 Diff::from(-count.records),
643 ));
644 self.output.batches_received.give((
645 (datum, ()),
646 ts,
647 Diff::from(-count.batches),
648 ));
649 }
650 }
651 }
652 }
653
654 fn handle_park(&mut self, event: ParkEvent) {
655 match event {
656 ParkEvent::Park(requested) => {
657 let park = Park {
658 time: self.time,
659 requested,
660 };
661 let existing = self.state.last_park.replace(park);
662 if existing.is_some() {
663 error!("park without a succeeding unpark");
664 }
665 }
666 ParkEvent::Unpark => {
667 let Some(park) = self.state.last_park.take() else {
668 error!("unpark without a preceding park");
669 return;
670 };
671
672 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
673 let duration_pow =
674 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
675 let requested_pow = park
676 .requested
677 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
678
679 let ts = self.ts();
680 let datum = ParkDatum {
681 duration_pow,
682 requested_pow,
683 };
684 self.output.parks.give(((datum, ()), ts, Diff::ONE));
685 }
686 }
687 }
688
689 fn handle_messages(&mut self, event: MessagesEvent) {
690 let ts = self.ts();
691 let count = Diff::from(event.record_count);
692
693 if event.is_send {
694 let datum = MessageDatum {
695 channel: event.channel,
696 worker: event.target,
697 };
698 self.output.messages_sent.give(((datum, ()), ts, count));
699 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
700
701 let sent_counts = self
702 .state
703 .messages_sent
704 .entry(event.channel)
705 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
706 sent_counts[event.target].records += count;
707 sent_counts[event.target].batches += 1;
708 } else {
709 let datum = MessageDatum {
710 channel: event.channel,
711 worker: event.source,
712 };
713 self.output.messages_received.give(((datum, ()), ts, count));
714 self.output
715 .batches_received
716 .give(((datum, ()), ts, Diff::ONE));
717
718 let received_counts = self
719 .state
720 .messages_received
721 .entry(event.channel)
722 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
723 received_counts[event.source].records += count;
724 received_counts[event.source].batches += 1;
725 }
726 }
727
728 fn handle_schedule(&mut self, event: ScheduleEvent) {
729 match event.start_stop {
730 timely::logging::StartStop::Start => {
731 let existing = self.state.schedule_starts.insert(event.id, self.time);
732 if existing.is_some() {
733 error!(operator_id = ?event.id, "schedule start without succeeding stop");
734 }
735 }
736 timely::logging::StartStop::Stop => {
737 let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
738 error!(operator_id = ?event.id, "schedule stop without preceeding start");
739 return;
740 };
741
742 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
743 let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
744 let elapsed_diff = Diff::from(elapsed_i64);
745 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
746
747 let ts = self.ts();
748 let datum = event.id;
749 self.output
750 .schedules_duration
751 .give(((datum, ()), ts, elapsed_diff));
752
753 let datum = ScheduleHistogramDatum {
754 operator: event.id,
755 duration_pow: elapsed_pow,
756 };
757 self.output
758 .schedules_histogram
759 .give(((datum, ()), ts, Diff::ONE));
760
761 let index = usize::cast_from(elapsed_pow.trailing_zeros());
763 let data = self.state.schedules_data.entry(event.id).or_default();
764 grow_vec(data, index);
765 let (count, duration) = &mut data[index];
766 *count += 1;
767 *duration += elapsed_diff;
768 }
769 }
770 }
771}
772
773fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
777where
778 T: Clone + Default,
779{
780 if vec.len() <= index {
781 vec.resize(index + 1, Default::default());
782 }
783}