1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::containers::{
23 Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange,
24};
25use mz_timely_util::replay::MzReplay;
26use timely::Container;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::channels::pushers::buffer::Session;
30use timely::dataflow::channels::pushers::{Counter, Tee};
31use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
32use timely::logging::{
33 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
34 TimelyEvent,
35};
36use tracing::error;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::logging::compute::{ComputeEvent, DataflowShutdown};
40use crate::logging::{EventQueue, LogVariant, TimelyLog};
41use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
42use crate::row_spine::RowRowBuilder;
43use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
44
45pub(super) struct Return {
47 pub collections: BTreeMap<LogVariant, LogCollection>,
49}
50
51pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
58 mut scope: G,
59 config: &LoggingConfig,
60 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
61 shared_state: Rc<RefCell<SharedLoggingState>>,
62) -> Return {
63 scope.scoped("timely logging", move |scope| {
64 let enable_logging = config.enable_logging;
65 let (logs, token) =
66 event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
67 scope,
68 "timely logs",
69 config.interval,
70 event_queue.activator,
71 move |mut session, mut data| {
72 if enable_logging {
75 session.give_container(data.to_mut())
76 }
77 },
78 );
79
80 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
83 let mut input = demux.new_input(&logs, Pipeline);
84 let (mut operates_out, operates) = demux.new_output();
85 let (mut channels_out, channels) = demux.new_output();
86 let (mut addresses_out, addresses) = demux.new_output();
87 let (mut parks_out, parks) = demux.new_output();
88 let (mut messages_sent_out, messages_sent) = demux.new_output();
89 let (mut messages_received_out, messages_received) = demux.new_output();
90 let (mut schedules_duration_out, schedules_duration) = demux.new_output();
91 let (mut schedules_histogram_out, schedules_histogram) = demux.new_output();
92 let (mut batches_sent_out, batches_sent) = demux.new_output();
93 let (mut batches_received_out, batches_received) = demux.new_output();
94
95 let worker_id = scope.index();
96 let mut demux_state = DemuxState::default();
97 demux.build(|_capability| {
98 let peers = scope.peers();
99 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
100 move |_frontiers| {
101 let mut operates = operates_out.activate();
102 let mut channels = channels_out.activate();
103 let mut addresses = addresses_out.activate();
104 let mut parks = parks_out.activate();
105 let mut messages_sent = messages_sent_out.activate();
106 let mut messages_received = messages_received_out.activate();
107 let mut batches_sent = batches_sent_out.activate();
108 let mut batches_received = batches_received_out.activate();
109 let mut schedules_duration = schedules_duration_out.activate();
110 let mut schedules_histogram = schedules_histogram_out.activate();
111
112 input.for_each(|cap, data| {
113 let mut output_buffers = DemuxOutput {
114 operates: operates.session_with_builder(&cap),
115 channels: channels.session_with_builder(&cap),
116 addresses: addresses.session_with_builder(&cap),
117 parks: parks.session_with_builder(&cap),
118 messages_sent: messages_sent.session_with_builder(&cap),
119 messages_received: messages_received.session_with_builder(&cap),
120 schedules_duration: schedules_duration.session_with_builder(&cap),
121 schedules_histogram: schedules_histogram.session_with_builder(&cap),
122 batches_sent: batches_sent.session_with_builder(&cap),
123 batches_received: batches_received.session_with_builder(&cap),
124 };
125
126 for (time, event) in data.drain(..) {
127 if let TimelyEvent::Messages(msg) = &event {
128 match msg.is_send {
129 true => assert_eq!(msg.source, worker_id),
130 false => assert_eq!(msg.target, worker_id),
131 }
132 }
133
134 DemuxHandler {
135 state: &mut demux_state,
136 shared_state: &mut shared_state.borrow_mut(),
137 output: &mut output_buffers,
138 logging_interval_ms,
139 peers,
140 time,
141 }
142 .handle(event);
143 }
144 });
145 }
146 });
147
148 let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
153 &operates,
154 TimelyLog::Operates,
155 move |((id, name), time, diff), packer, session| {
156 let data = packer.pack_slice(&[
157 Datum::UInt64(u64::cast_from(*id)),
158 Datum::UInt64(u64::cast_from(worker_id)),
159 Datum::String(name),
160 ]);
161 session.give((data, time, diff));
162 },
163 );
164
165 let channels = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
166 &channels,
167 TimelyLog::Channels,
168 move |((datum, ()), time, diff), packer, session| {
169 let (source_node, source_port) = datum.source;
170 let (target_node, target_port) = datum.target;
171 let data = packer.pack_slice(&[
172 Datum::UInt64(u64::cast_from(datum.id)),
173 Datum::UInt64(u64::cast_from(worker_id)),
174 Datum::UInt64(u64::cast_from(source_node)),
175 Datum::UInt64(u64::cast_from(source_port)),
176 Datum::UInt64(u64::cast_from(target_node)),
177 Datum::UInt64(u64::cast_from(target_port)),
178 ]);
179 session.give((data, time, diff));
180 },
181 );
182
183 let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
184 &addresses,
185 TimelyLog::Addresses,
186 move |((id, address), time, diff), packer, session| {
187 let data = packer.pack_by_index(|packer, index| match index {
188 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
189 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
190 2 => {
191 packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i))))
192 }
193 _ => unreachable!("Addresses relation has three columns"),
194 });
195 session.give((data, time, diff));
196 },
197 );
198
199 let parks = consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
200 &parks,
201 TimelyLog::Parks,
202 move |((datum, ()), time, diff), packer, session| {
203 let data = packer.pack_slice(&[
204 Datum::UInt64(u64::cast_from(worker_id)),
205 Datum::UInt64(datum.duration_pow),
206 datum
207 .requested_pow
208 .map(Datum::UInt64)
209 .unwrap_or(Datum::Null),
210 ]);
211 session.give((data, time, diff));
212 },
213 );
214
215 let batches_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
216 &batches_sent,
217 TimelyLog::BatchesSent,
218 move |((datum, ()), time, diff), packer, session| {
219 let data = packer.pack_slice(&[
220 Datum::UInt64(u64::cast_from(datum.channel)),
221 Datum::UInt64(u64::cast_from(worker_id)),
222 Datum::UInt64(u64::cast_from(datum.worker)),
223 ]);
224 session.give((data, time, diff));
225 },
226 );
227
228 let batches_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
229 &batches_received,
230 TimelyLog::BatchesReceived,
231 move |((datum, ()), time, diff), packer, session| {
232 let data = packer.pack_slice(&[
233 Datum::UInt64(u64::cast_from(datum.channel)),
234 Datum::UInt64(u64::cast_from(datum.worker)),
235 Datum::UInt64(u64::cast_from(worker_id)),
236 ]);
237 session.give((data, time, diff));
238 },
239 );
240
241
242 let messages_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
243 &messages_sent,
244 TimelyLog::MessagesSent,
245 move |((datum, ()), time, diff), packer, session| {
246 let data = packer.pack_slice(&[
247 Datum::UInt64(u64::cast_from(datum.channel)),
248 Datum::UInt64(u64::cast_from(worker_id)),
249 Datum::UInt64(u64::cast_from(datum.worker)),
250 ]);
251 session.give((data, time, diff));
252 },
253 );
254
255 let messages_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
256 &messages_received,
257 TimelyLog::MessagesReceived,
258 move |((datum, ()), time, diff), packer, session| {
259 let data = packer.pack_slice(&[
260 Datum::UInt64(u64::cast_from(datum.channel)),
261 Datum::UInt64(u64::cast_from(datum.worker)),
262 Datum::UInt64(u64::cast_from(worker_id)),
263 ]);
264 session.give((data, time, diff));
265 },
266 );
267
268 let elapsed = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
269 &schedules_duration,
270 TimelyLog::Elapsed,
271 move |((operator, ()), time, diff), packer, session| {
272 let data = packer.pack_slice(&[Datum::UInt64(u64::cast_from(*operator)),
273 Datum::UInt64(u64::cast_from(worker_id)),
274 ]);
275 session.give((data, time, diff));
276 },
277 );
278
279
280 let histogram = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
281 &schedules_histogram,
282 TimelyLog::Histogram,
283 move |((datum, ()), time, diff), packer, session| {
284 let data = packer.pack_slice(&[
285 Datum::UInt64(u64::cast_from(datum.operator)),
286 Datum::UInt64(u64::cast_from(worker_id)),
287 Datum::UInt64(datum.duration_pow),
288 ]);
289 session.give((data, time, diff));
290 },
291 );
292
293 let logs = {
294 use TimelyLog::*;
295 [
296 (Operates, operates),
297 (Channels, channels),
298 (Elapsed, elapsed),
299 (Histogram, histogram),
300 (Addresses, addresses),
301 (Parks, parks),
302 (MessagesSent, messages_sent),
303 (MessagesReceived, messages_received),
304 (BatchesSent, batches_sent),
305 (BatchesReceived, batches_received),
306 ]
307 };
308
309 let mut collections = BTreeMap::new();
311 for (variant, collection) in logs {
312 let variant = LogVariant::Timely(variant);
313 if config.index_logs.contains_key(&variant) {
314 let trace = collection
315 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
316 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
317 &format!("Arrange {variant:?}"),
318 )
319 .trace;
320 let collection = LogCollection {
321 trace,
322 token: Rc::clone(&token),
323 };
324 collections.insert(variant, collection);
325 }
326 }
327
328 Return { collections }
329 })
330}
331
332#[derive(Default)]
334struct DemuxState {
335 operators: BTreeMap<usize, OperatesEvent>,
337 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
339 last_park: Option<Park>,
341 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
343 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
345 schedule_starts: BTreeMap<usize, Duration>,
347 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
350}
351
352struct Park {
353 time: Duration,
355 requested: Option<Duration>,
357}
358
359#[derive(Default, Copy, Clone, Debug)]
361struct MessageCount {
362 batches: i64,
364 records: Diff,
366}
367
368type Pusher<D> =
369 Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
370type OutputSession<'a, D> =
371 Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
372
373struct DemuxOutput<'a> {
379 operates: OutputSession<'a, (usize, String)>,
380 channels: OutputSession<'a, (ChannelDatum, ())>,
381 addresses: OutputSession<'a, (usize, Vec<usize>)>,
382 parks: OutputSession<'a, (ParkDatum, ())>,
383 batches_sent: OutputSession<'a, (MessageDatum, ())>,
384 batches_received: OutputSession<'a, (MessageDatum, ())>,
385 messages_sent: OutputSession<'a, (MessageDatum, ())>,
386 messages_received: OutputSession<'a, (MessageDatum, ())>,
387 schedules_duration: OutputSession<'a, (usize, ())>,
388 schedules_histogram: OutputSession<'a, (ScheduleHistogramDatum, ())>,
389}
390
391#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
392struct ChannelDatum {
393 id: usize,
394 source: (usize, usize),
395 target: (usize, usize),
396}
397
398impl Columnation for ChannelDatum {
399 type InnerRegion = CopyRegion<Self>;
400}
401
402#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
403struct ParkDatum {
404 duration_pow: u64,
405 requested_pow: Option<u64>,
406}
407
408impl Columnation for ParkDatum {
409 type InnerRegion = CopyRegion<Self>;
410}
411
412#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
413struct MessageDatum {
414 channel: usize,
415 worker: usize,
416}
417
418impl Columnation for MessageDatum {
419 type InnerRegion = CopyRegion<Self>;
420}
421
422#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
423struct ScheduleHistogramDatum {
424 operator: usize,
425 duration_pow: u64,
426}
427
428impl Columnation for ScheduleHistogramDatum {
429 type InnerRegion = CopyRegion<Self>;
430}
431
432struct DemuxHandler<'a, 'b> {
434 state: &'a mut DemuxState,
436 shared_state: &'a mut SharedLoggingState,
438 output: &'a mut DemuxOutput<'b>,
440 logging_interval_ms: u128,
442 peers: usize,
444 time: Duration,
446}
447
448impl DemuxHandler<'_, '_> {
449 fn ts(&self) -> Timestamp {
452 let time_ms = self.time.as_millis();
453 let interval = self.logging_interval_ms;
454 let rounded = (time_ms / interval + 1) * interval;
455 rounded.try_into().expect("must fit")
456 }
457
458 fn handle(&mut self, event: TimelyEvent) {
460 use TimelyEvent::*;
461
462 match event {
463 Operates(e) => self.handle_operates(e),
464 Channels(e) => self.handle_channels(e),
465 Shutdown(e) => self.handle_shutdown(e),
466 Park(e) => self.handle_park(e),
467 Messages(e) => self.handle_messages(e),
468 Schedule(e) => self.handle_schedule(e),
469 _ => (),
470 }
471 }
472
473 fn handle_operates(&mut self, event: OperatesEvent) {
474 let ts = self.ts();
475 let datum = (event.id, event.name.clone());
476 self.output.operates.give((datum, ts, Diff::ONE));
477
478 let datum = (event.id, event.addr.clone());
479 self.output.addresses.give((datum, ts, Diff::ONE));
480
481 self.state.operators.insert(event.id, event);
482 }
483
484 fn handle_channels(&mut self, event: ChannelsEvent) {
485 let ts = self.ts();
486 let datum = ChannelDatum {
487 id: event.id,
488 source: event.source,
489 target: event.target,
490 };
491 self.output.channels.give(((datum, ()), ts, Diff::ONE));
492
493 let datum = (event.id, event.scope_addr.clone());
494 self.output.addresses.give((datum, ts, Diff::ONE));
495
496 let dataflow_index = event.scope_addr[0];
497 self.state
498 .dataflow_channels
499 .entry(dataflow_index)
500 .or_default()
501 .push(event);
502 }
503
504 fn handle_shutdown(&mut self, event: ShutdownEvent) {
505 let Some(operator) = self.state.operators.remove(&event.id) else {
511 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
512 return;
513 };
514
515 let ts = self.ts();
517 let datum = (operator.id, operator.name);
518 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
519
520 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
522 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
523 .enumerate()
524 .filter(|(_, (count, _))| *count != 0)
525 {
526 self.output
527 .schedules_duration
528 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
529
530 let datum = ScheduleHistogramDatum {
531 operator: event.id,
532 duration_pow: 1 << bucket,
533 };
534 let diff = Diff::cast_from(-count);
535 self.output
536 .schedules_histogram
537 .give(((datum, ()), ts, diff));
538 }
539 }
540
541 if operator.addr.len() == 1 {
542 let dataflow_index = operator.addr[0];
543 self.handle_dataflow_shutdown(dataflow_index);
544 }
545
546 let datum = (operator.id, operator.addr);
547 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
548 }
549
550 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
551 self.shared_state.compute_logger.as_ref().map(|logger| {
553 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
554 });
555
556 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
558 return;
559 };
560
561 let ts = self.ts();
562 for channel in channels {
563 let datum = ChannelDatum {
565 id: channel.id,
566 source: channel.source,
567 target: channel.target,
568 };
569 self.output
570 .channels
571 .give(((datum, ()), ts, Diff::MINUS_ONE));
572
573 let datum = (channel.id, channel.scope_addr);
574 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
575
576 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
578 for (target_worker, count) in sent.iter().enumerate() {
579 let datum = MessageDatum {
580 channel: channel.id,
581 worker: target_worker,
582 };
583 self.output
584 .messages_sent
585 .give(((datum, ()), ts, Diff::from(-count.records)));
586 self.output
587 .batches_sent
588 .give(((datum, ()), ts, Diff::from(-count.batches)));
589 }
590 }
591 if let Some(received) = self.state.messages_received.remove(&channel.id) {
592 for (source_worker, count) in received.iter().enumerate() {
593 let datum = MessageDatum {
594 channel: channel.id,
595 worker: source_worker,
596 };
597 self.output.messages_received.give((
598 (datum, ()),
599 ts,
600 Diff::from(-count.records),
601 ));
602 self.output.batches_received.give((
603 (datum, ()),
604 ts,
605 Diff::from(-count.batches),
606 ));
607 }
608 }
609 }
610 }
611
612 fn handle_park(&mut self, event: ParkEvent) {
613 match event {
614 ParkEvent::Park(requested) => {
615 let park = Park {
616 time: self.time,
617 requested,
618 };
619 let existing = self.state.last_park.replace(park);
620 if existing.is_some() {
621 error!("park without a succeeding unpark");
622 }
623 }
624 ParkEvent::Unpark => {
625 let Some(park) = self.state.last_park.take() else {
626 error!("unpark without a preceding park");
627 return;
628 };
629
630 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
631 let duration_pow =
632 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
633 let requested_pow = park
634 .requested
635 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
636
637 let ts = self.ts();
638 let datum = ParkDatum {
639 duration_pow,
640 requested_pow,
641 };
642 self.output.parks.give(((datum, ()), ts, Diff::ONE));
643 }
644 }
645 }
646
647 fn handle_messages(&mut self, event: MessagesEvent) {
648 let ts = self.ts();
649 let count = Diff::try_from(event.length).expect("must fit");
650
651 if event.is_send {
652 let datum = MessageDatum {
653 channel: event.channel,
654 worker: event.target,
655 };
656 self.output.messages_sent.give(((datum, ()), ts, count));
657 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
658
659 let sent_counts = self
660 .state
661 .messages_sent
662 .entry(event.channel)
663 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
664 sent_counts[event.target].records += count;
665 sent_counts[event.target].batches += 1;
666 } else {
667 let datum = MessageDatum {
668 channel: event.channel,
669 worker: event.source,
670 };
671 self.output.messages_received.give(((datum, ()), ts, count));
672 self.output
673 .batches_received
674 .give(((datum, ()), ts, Diff::ONE));
675
676 let received_counts = self
677 .state
678 .messages_received
679 .entry(event.channel)
680 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
681 received_counts[event.source].records += count;
682 received_counts[event.source].batches += 1;
683 }
684 }
685
686 fn handle_schedule(&mut self, event: ScheduleEvent) {
687 match event.start_stop {
688 timely::logging::StartStop::Start => {
689 let existing = self.state.schedule_starts.insert(event.id, self.time);
690 if existing.is_some() {
691 error!(operator_id = ?event.id, "schedule start without succeeding stop");
692 }
693 }
694 timely::logging::StartStop::Stop => {
695 let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
696 error!(operator_id = ?event.id, "schedule stop without preceeding start");
697 return;
698 };
699
700 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
701 let elapsed_diff = Diff::from(i64::try_from(elapsed_ns).expect("must fit"));
702 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
703
704 let ts = self.ts();
705 let datum = event.id;
706 self.output
707 .schedules_duration
708 .give(((datum, ()), ts, elapsed_diff));
709
710 let datum = ScheduleHistogramDatum {
711 operator: event.id,
712 duration_pow: elapsed_pow,
713 };
714 self.output
715 .schedules_histogram
716 .give(((datum, ()), ts, Diff::ONE));
717
718 let index = usize::cast_from(elapsed_pow.trailing_zeros());
720 let data = self.state.schedules_data.entry(event.id).or_default();
721 grow_vec(data, index);
722 let (count, duration) = &mut data[index];
723 *count += 1;
724 *duration += elapsed_diff;
725 }
726 }
727 }
728}
729
730fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
734where
735 T: Clone + Default,
736{
737 if vec.len() <= index {
738 vec.resize(index + 1, Default::default());
739 }
740}