1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::dataflow::operators::generic::operator::empty;
31use timely::logging::{
32 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33 TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40 EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41 Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47pub(super) struct Return {
49 pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
60 mut scope: G,
61 config: &LoggingConfig,
62 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63 shared_state: Rc<RefCell<SharedLoggingState>>,
64) -> Return {
65 scope.scoped("timely logging", move |scope| {
66 let enable_logging = config.enable_logging;
67 let (logs, token) = if enable_logging {
68 event_queue.links.mz_replay(
69 scope,
70 "timely logs",
71 config.interval,
72 event_queue.activator,
73 )
74 } else {
75 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
76 (empty(scope), token)
77 };
78
79 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
82 let mut input = demux.new_input(logs, Pipeline);
83 let (operates_out, operates) = demux.new_output();
84 let mut operates_out = OutputBuilder::from(operates_out);
85 let (channels_out, channels) = demux.new_output();
86 let mut channels_out = OutputBuilder::from(channels_out);
87 let (addresses_out, addresses) = demux.new_output();
88 let mut addresses_out = OutputBuilder::from(addresses_out);
89 let (parks_out, parks) = demux.new_output();
90 let mut parks_out = OutputBuilder::from(parks_out);
91 let (messages_sent_out, messages_sent) = demux.new_output();
92 let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
93 let (messages_received_out, messages_received) = demux.new_output();
94 let mut messages_received_out = OutputBuilder::from(messages_received_out);
95 let (schedules_duration_out, schedules_duration) = demux.new_output();
96 let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
97 let (schedules_histogram_out, schedules_histogram) = demux.new_output();
98 let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
99 let (batches_sent_out, batches_sent) = demux.new_output();
100 let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
101 let (batches_received_out, batches_received) = demux.new_output();
102 let mut batches_received_out = OutputBuilder::from(batches_received_out);
103
104 let worker_id = scope.index();
105 let mut demux_state = DemuxState::default();
106 demux.build(|_capability| {
107 let peers = scope.peers();
108 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
109 move |_frontiers| {
110 let mut operates = operates_out.activate();
111 let mut channels = channels_out.activate();
112 let mut addresses = addresses_out.activate();
113 let mut parks = parks_out.activate();
114 let mut messages_sent = messages_sent_out.activate();
115 let mut messages_received = messages_received_out.activate();
116 let mut batches_sent = batches_sent_out.activate();
117 let mut batches_received = batches_received_out.activate();
118 let mut schedules_duration = schedules_duration_out.activate();
119 let mut schedules_histogram = schedules_histogram_out.activate();
120
121 input.for_each(|cap, data| {
122 let mut output_buffers = DemuxOutput {
123 operates: operates.session_with_builder(&cap),
124 channels: channels.session_with_builder(&cap),
125 addresses: addresses.session_with_builder(&cap),
126 parks: parks.session_with_builder(&cap),
127 messages_sent: messages_sent.session_with_builder(&cap),
128 messages_received: messages_received.session_with_builder(&cap),
129 schedules_duration: schedules_duration.session_with_builder(&cap),
130 schedules_histogram: schedules_histogram.session_with_builder(&cap),
131 batches_sent: batches_sent.session_with_builder(&cap),
132 batches_received: batches_received.session_with_builder(&cap),
133 };
134
135 for (time, event) in data.drain(..) {
136 if let TimelyEvent::Messages(msg) = &event {
137 match msg.is_send {
138 true => assert_eq!(msg.source, worker_id),
139 false => assert_eq!(msg.target, worker_id),
140 }
141 }
142
143 DemuxHandler {
144 state: &mut demux_state,
145 shared_state: &mut shared_state.borrow_mut(),
146 output: &mut output_buffers,
147 logging_interval_ms,
148 peers,
149 time,
150 }
151 .handle(event);
152 }
153 });
154 }
155 });
156
157 let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
162 operates,
163 TimelyLog::Operates,
164 move |data, packer, session| {
165 for ((id, name), time, diff) in data.iter() {
166 let data = packer.pack_slice(&[
167 Datum::UInt64(u64::cast_from(*id)),
168 Datum::UInt64(u64::cast_from(worker_id)),
169 Datum::String(name),
170 ]);
171 session.give((data, time, diff));
172 }
173 },
174 );
175
176 let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
179 Pipeline,
180 "ToRow Channels",
181 |_cap, _info| {
182 let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
183 move |input, output| {
184 input.for_each_time(|time, data| {
185 let mut session = output.session_with_builder(&time);
186 for d in data.flat_map(|c| c.borrow().into_index_iter()) {
187 let ((datum, ()), time, diff) = d;
188 let (source_node, source_port) = datum.source;
189 let (target_node, target_port) = datum.target;
190 let data = packer.pack_slice(&[
191 Datum::UInt64(u64::cast_from(datum.id)),
192 Datum::UInt64(u64::cast_from(worker_id)),
193 Datum::UInt64(u64::cast_from(source_node)),
194 Datum::UInt64(u64::cast_from(source_port)),
195 Datum::UInt64(u64::cast_from(target_node)),
196 Datum::UInt64(u64::cast_from(target_port)),
197 Datum::String(
198 std::str::from_utf8(datum.typ).expect("valid string"),
199 ),
200 ]);
201 session.give((data, time, diff));
202 }
203 });
204 }
205 },
206 );
207
208 type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
210 type KB<K, T, D> = KeyBatcher<K, T, D>;
211
212 let addresses = consolidate_and_pack::<_, KVB<_, _, _, _>, ColumnBuilder<_>, _, _>(
213 addresses,
214 TimelyLog::Addresses,
215 move |data, packer, session| {
216 for ((id, address), time, diff) in data.iter() {
217 let data = packer.pack_by_index(|packer, index| match index {
218 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
219 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
220 2 => {
221 let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
222 packer.push_list(list)
223 }
224 _ => unreachable!("Addresses relation has three columns"),
225 });
226 session.give((data, time, diff));
227 }
228 },
229 );
230
231 let parks = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
232 parks,
233 TimelyLog::Parks,
234 move |data, packer, session| {
235 for ((datum, ()), time, diff) in data.iter() {
236 let data = packer.pack_slice(&[
237 Datum::UInt64(u64::cast_from(worker_id)),
238 Datum::UInt64(datum.duration_pow),
239 datum
240 .requested_pow
241 .map(Datum::UInt64)
242 .unwrap_or(Datum::Null),
243 ]);
244 session.give((data, time, diff));
245 }
246 },
247 );
248
249 let batches_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
250 batches_sent,
251 TimelyLog::BatchesSent,
252 move |data, packer, session| {
253 for ((datum, ()), time, diff) in data.iter() {
254 let data = packer.pack_slice(&[
255 Datum::UInt64(u64::cast_from(datum.channel)),
256 Datum::UInt64(u64::cast_from(worker_id)),
257 Datum::UInt64(u64::cast_from(datum.worker)),
258 ]);
259 session.give((data, time, diff));
260 }
261 },
262 );
263
264 let batches_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
265 batches_received,
266 TimelyLog::BatchesReceived,
267 move |data, packer, session| {
268 for ((datum, ()), time, diff) in data.iter() {
269 let data = packer.pack_slice(&[
270 Datum::UInt64(u64::cast_from(datum.channel)),
271 Datum::UInt64(u64::cast_from(datum.worker)),
272 Datum::UInt64(u64::cast_from(worker_id)),
273 ]);
274 session.give((data, time, diff));
275 }
276 },
277 );
278
279 let messages_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
280 messages_sent,
281 TimelyLog::MessagesSent,
282 move |data, packer, session| {
283 for ((datum, ()), time, diff) in data.iter() {
284 let data = packer.pack_slice(&[
285 Datum::UInt64(u64::cast_from(datum.channel)),
286 Datum::UInt64(u64::cast_from(worker_id)),
287 Datum::UInt64(u64::cast_from(datum.worker)),
288 ]);
289 session.give((data, time, diff));
290 }
291 },
292 );
293
294 let messages_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
295 messages_received,
296 TimelyLog::MessagesReceived,
297 move |data, packer, session| {
298 for ((datum, ()), time, diff) in data.iter() {
299 let data = packer.pack_slice(&[
300 Datum::UInt64(u64::cast_from(datum.channel)),
301 Datum::UInt64(u64::cast_from(datum.worker)),
302 Datum::UInt64(u64::cast_from(worker_id)),
303 ]);
304 session.give((data, time, diff));
305 }
306 },
307 );
308
309 let elapsed = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
310 schedules_duration,
311 TimelyLog::Elapsed,
312 move |data, packer, session| {
313 for ((operator, ()), time, diff) in data.iter() {
314 let data = packer.pack_slice(&[
315 Datum::UInt64(u64::cast_from(*operator)),
316 Datum::UInt64(u64::cast_from(worker_id)),
317 ]);
318 session.give((data, time, diff));
319 }
320 },
321 );
322
323 let histogram = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
324 schedules_histogram,
325 TimelyLog::Histogram,
326 move |data, packer, session| {
327 for ((datum, ()), time, diff) in data.iter() {
328 let data = packer.pack_slice(&[
329 Datum::UInt64(u64::cast_from(datum.operator)),
330 Datum::UInt64(u64::cast_from(worker_id)),
331 Datum::UInt64(datum.duration_pow),
332 ]);
333 session.give((data, time, diff));
334 }
335 },
336 );
337
338 let logs = {
339 use TimelyLog::*;
340 [
341 (Operates, operates),
342 (Channels, channels),
343 (Elapsed, elapsed),
344 (Histogram, histogram),
345 (Addresses, addresses),
346 (Parks, parks),
347 (MessagesSent, messages_sent),
348 (MessagesReceived, messages_received),
349 (BatchesSent, batches_sent),
350 (BatchesReceived, batches_received),
351 ]
352 };
353
354 let mut collections = BTreeMap::new();
356 for (variant, collection) in logs {
357 let variant = LogVariant::Timely(variant);
358 if config.index_logs.contains_key(&variant) {
359 type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
361 type Builder<T, R> = RowRowBuilder<T, R>;
362 let trace = collection
363 .mz_arrange_core::<_, Batcher<_, _, _, _>, Builder<_, _>, RowRowSpine<_, _>>(
364 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
365 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
366 ),
367 &format!("Arrange {variant:?}"),
368 )
369 .trace;
370 let collection = LogCollection {
371 trace,
372 token: Rc::clone(&token),
373 };
374 collections.insert(variant, collection);
375 }
376 }
377
378 Return { collections }
379 })
380}
381
382#[derive(Default)]
384struct DemuxState {
385 operators: BTreeMap<usize, OperatesEvent>,
387 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
389 last_park: Option<Park>,
391 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
393 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
395 schedule_starts: Vec<(usize, Duration)>,
397 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
400}
401
402struct Park {
403 time: Duration,
405 requested: Option<Duration>,
407}
408
409#[derive(Default, Copy, Clone, Debug)]
411struct MessageCount {
412 batches: i64,
414 records: Diff,
416}
417
418struct DemuxOutput<'a, 'b> {
424 operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
425 channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
426 addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
427 parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
428 batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
429 batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
430 messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
431 messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
432 schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
433 schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
434}
435
436#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
437struct ChannelDatum {
438 id: usize,
439 source: (usize, usize),
440 target: (usize, usize),
441 typ: String,
442}
443
444#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
445struct ParkDatum {
446 duration_pow: u64,
447 requested_pow: Option<u64>,
448}
449
450impl Columnation for ParkDatum {
451 type InnerRegion = CopyRegion<Self>;
452}
453
454#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
455struct MessageDatum {
456 channel: usize,
457 worker: usize,
458}
459
460impl Columnation for MessageDatum {
461 type InnerRegion = CopyRegion<Self>;
462}
463
464#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
465struct ScheduleHistogramDatum {
466 operator: usize,
467 duration_pow: u64,
468}
469
470impl Columnation for ScheduleHistogramDatum {
471 type InnerRegion = CopyRegion<Self>;
472}
473
474struct DemuxHandler<'a, 'b, 'c> {
476 state: &'a mut DemuxState,
478 shared_state: &'a mut SharedLoggingState,
480 output: &'a mut DemuxOutput<'b, 'c>,
482 logging_interval_ms: u128,
484 peers: usize,
486 time: Duration,
488}
489
490impl DemuxHandler<'_, '_, '_> {
491 fn ts(&self) -> Timestamp {
494 let time_ms = self.time.as_millis();
495 let interval = self.logging_interval_ms;
496 let rounded = (time_ms / interval + 1) * interval;
497 rounded.try_into().expect("must fit")
498 }
499
500 fn handle(&mut self, event: TimelyEvent) {
502 use TimelyEvent::*;
503
504 match event {
505 Operates(e) => self.handle_operates(e),
506 Channels(e) => self.handle_channels(e),
507 Shutdown(e) => self.handle_shutdown(e),
508 Park(e) => self.handle_park(e),
509 Messages(e) => self.handle_messages(e),
510 Schedule(e) => self.handle_schedule(e),
511 _ => (),
512 }
513 }
514
515 fn handle_operates(&mut self, event: OperatesEvent) {
516 let ts = self.ts();
517 let datum = (event.id, event.name.clone());
518 self.output.operates.give((datum, ts, Diff::ONE));
519
520 let datum = (event.id, event.addr.clone());
521 self.output.addresses.give((datum, ts, Diff::ONE));
522
523 self.state.operators.insert(event.id, event);
524 }
525
526 fn handle_channels(&mut self, event: ChannelsEvent) {
527 let ts = self.ts();
528 let datum = ChannelDatumReference {
529 id: event.id,
530 source: event.source,
531 target: event.target,
532 typ: &event.typ,
533 };
534 self.output.channels.give(((datum, ()), ts, Diff::ONE));
535
536 let datum = (event.id, event.scope_addr.clone());
537 self.output.addresses.give((datum, ts, Diff::ONE));
538
539 let dataflow_index = event.scope_addr[0];
540 self.state
541 .dataflow_channels
542 .entry(dataflow_index)
543 .or_default()
544 .push(event);
545 }
546
547 fn handle_shutdown(&mut self, event: ShutdownEvent) {
548 let Some(operator) = self.state.operators.remove(&event.id) else {
554 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
555 return;
556 };
557
558 let ts = self.ts();
560 let datum = (operator.id, operator.name);
561 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
562
563 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
565 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
566 .enumerate()
567 .filter(|(_, (count, _))| *count != 0)
568 {
569 self.output
570 .schedules_duration
571 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
572
573 let datum = ScheduleHistogramDatum {
574 operator: event.id,
575 duration_pow: 1 << bucket,
576 };
577 let diff = Diff::cast_from(-count);
578 self.output
579 .schedules_histogram
580 .give(((datum, ()), ts, diff));
581 }
582 }
583
584 if operator.addr.len() == 1 {
585 let dataflow_index = operator.addr[0];
586 self.handle_dataflow_shutdown(dataflow_index);
587 }
588
589 let datum = (operator.id, operator.addr);
590 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
591 }
592
593 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
594 self.shared_state.compute_logger.as_ref().map(|logger| {
596 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
597 });
598
599 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
601 return;
602 };
603
604 let ts = self.ts();
605 for channel in channels {
606 let datum = ChannelDatumReference {
608 id: channel.id,
609 source: channel.source,
610 target: channel.target,
611 typ: &channel.typ,
612 };
613 self.output
614 .channels
615 .give(((datum, ()), ts, Diff::MINUS_ONE));
616
617 let datum = (channel.id, channel.scope_addr);
618 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
619
620 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
622 for (target_worker, count) in sent.iter().enumerate() {
623 let datum = MessageDatum {
624 channel: channel.id,
625 worker: target_worker,
626 };
627 self.output
628 .messages_sent
629 .give(((datum, ()), ts, Diff::from(-count.records)));
630 self.output
631 .batches_sent
632 .give(((datum, ()), ts, Diff::from(-count.batches)));
633 }
634 }
635 if let Some(received) = self.state.messages_received.remove(&channel.id) {
636 for (source_worker, count) in received.iter().enumerate() {
637 let datum = MessageDatum {
638 channel: channel.id,
639 worker: source_worker,
640 };
641 self.output.messages_received.give((
642 (datum, ()),
643 ts,
644 Diff::from(-count.records),
645 ));
646 self.output.batches_received.give((
647 (datum, ()),
648 ts,
649 Diff::from(-count.batches),
650 ));
651 }
652 }
653 }
654 }
655
656 fn handle_park(&mut self, event: ParkEvent) {
657 match event {
658 ParkEvent::Park(requested) => {
659 let park = Park {
660 time: self.time,
661 requested,
662 };
663 let existing = self.state.last_park.replace(park);
664 if existing.is_some() {
665 error!("park without a succeeding unpark");
666 }
667 }
668 ParkEvent::Unpark => {
669 let Some(park) = self.state.last_park.take() else {
670 error!("unpark without a preceding park");
671 return;
672 };
673
674 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
675 let duration_pow =
676 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
677 let requested_pow = park
678 .requested
679 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
680
681 let ts = self.ts();
682 let datum = ParkDatum {
683 duration_pow,
684 requested_pow,
685 };
686 self.output.parks.give(((datum, ()), ts, Diff::ONE));
687 }
688 }
689 }
690
691 fn handle_messages(&mut self, event: MessagesEvent) {
692 let ts = self.ts();
693 let count = Diff::from(event.record_count);
694
695 if event.is_send {
696 let datum = MessageDatum {
697 channel: event.channel,
698 worker: event.target,
699 };
700 self.output.messages_sent.give(((datum, ()), ts, count));
701 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
702
703 let sent_counts = self
704 .state
705 .messages_sent
706 .entry(event.channel)
707 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
708 sent_counts[event.target].records += count;
709 sent_counts[event.target].batches += 1;
710 } else {
711 let datum = MessageDatum {
712 channel: event.channel,
713 worker: event.source,
714 };
715 self.output.messages_received.give(((datum, ()), ts, count));
716 self.output
717 .batches_received
718 .give(((datum, ()), ts, Diff::ONE));
719
720 let received_counts = self
721 .state
722 .messages_received
723 .entry(event.channel)
724 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
725 received_counts[event.source].records += count;
726 received_counts[event.source].batches += 1;
727 }
728 }
729
730 fn handle_schedule(&mut self, event: ScheduleEvent) {
731 match event.start_stop {
732 timely::logging::StartStop::Start => {
733 self.state.schedule_starts.push((event.id, self.time));
734 }
735 timely::logging::StartStop::Stop => {
736 let Some((old_id, start_time)) = self.state.schedule_starts.pop() else {
737 error!(operator_id = ?event.id, "schedule stop without preceding start");
738 return;
739 };
740
741 if old_id != event.id {
742 error!(start_id = ?old_id, stop_id = ?event.id, "schedule stop without preceding start");
743 return;
744 }
745
746 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
747 let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
748 let elapsed_diff = Diff::from(elapsed_i64);
749 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
750
751 let ts = self.ts();
752 let datum = event.id;
753 self.output
754 .schedules_duration
755 .give(((datum, ()), ts, elapsed_diff));
756
757 let datum = ScheduleHistogramDatum {
758 operator: event.id,
759 duration_pow: elapsed_pow,
760 };
761 self.output
762 .schedules_histogram
763 .give(((datum, ()), ts, Diff::ONE));
764
765 let index = usize::cast_from(elapsed_pow.trailing_zeros());
767 let data = self.state.schedules_data.entry(event.id).or_default();
768 grow_vec(data, index);
769 let (count, duration) = &mut data[index];
770 *count += 1;
771 *duration += elapsed_diff;
772 }
773 }
774 }
775}
776
777fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
781where
782 T: Clone + Default,
783{
784 if vec.len() <= index {
785 vec.resize(index + 1, Default::default());
786 }
787}