1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::Columnar;
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::containers::ProvidedBuilder;
25use mz_timely_util::replay::MzReplay;
26use timely::Container;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::Operator;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::logging::{
32 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33 TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40 EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41 Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47pub(super) struct Return {
49 pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
60 mut scope: G,
61 config: &LoggingConfig,
62 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63 shared_state: Rc<RefCell<SharedLoggingState>>,
64) -> Return {
65 scope.scoped("timely logging", move |scope| {
66 let enable_logging = config.enable_logging;
67 let (logs, token) =
68 event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
69 scope,
70 "timely logs",
71 config.interval,
72 event_queue.activator,
73 move |mut session, mut data| {
74 if enable_logging {
77 session.give_container(data.to_mut())
78 }
79 },
80 );
81
82 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
85 let mut input = demux.new_input(&logs, Pipeline);
86 let (mut operates_out, operates) = demux.new_output();
87 let (mut channels_out, channels) = demux.new_output();
88 let (mut addresses_out, addresses) = demux.new_output();
89 let (mut parks_out, parks) = demux.new_output();
90 let (mut messages_sent_out, messages_sent) = demux.new_output();
91 let (mut messages_received_out, messages_received) = demux.new_output();
92 let (mut schedules_duration_out, schedules_duration) = demux.new_output();
93 let (mut schedules_histogram_out, schedules_histogram) = demux.new_output();
94 let (mut batches_sent_out, batches_sent) = demux.new_output();
95 let (mut batches_received_out, batches_received) = demux.new_output();
96
97 let worker_id = scope.index();
98 let mut demux_state = DemuxState::default();
99 demux.build(|_capability| {
100 let peers = scope.peers();
101 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
102 move |_frontiers| {
103 let mut operates = operates_out.activate();
104 let mut channels = channels_out.activate();
105 let mut addresses = addresses_out.activate();
106 let mut parks = parks_out.activate();
107 let mut messages_sent = messages_sent_out.activate();
108 let mut messages_received = messages_received_out.activate();
109 let mut batches_sent = batches_sent_out.activate();
110 let mut batches_received = batches_received_out.activate();
111 let mut schedules_duration = schedules_duration_out.activate();
112 let mut schedules_histogram = schedules_histogram_out.activate();
113
114 input.for_each(|cap, data| {
115 let mut output_buffers = DemuxOutput {
116 operates: operates.session_with_builder(&cap),
117 channels: channels.session_with_builder(&cap),
118 addresses: addresses.session_with_builder(&cap),
119 parks: parks.session_with_builder(&cap),
120 messages_sent: messages_sent.session_with_builder(&cap),
121 messages_received: messages_received.session_with_builder(&cap),
122 schedules_duration: schedules_duration.session_with_builder(&cap),
123 schedules_histogram: schedules_histogram.session_with_builder(&cap),
124 batches_sent: batches_sent.session_with_builder(&cap),
125 batches_received: batches_received.session_with_builder(&cap),
126 };
127
128 for (time, event) in data.drain(..) {
129 if let TimelyEvent::Messages(msg) = &event {
130 match msg.is_send {
131 true => assert_eq!(msg.source, worker_id),
132 false => assert_eq!(msg.target, worker_id),
133 }
134 }
135
136 DemuxHandler {
137 state: &mut demux_state,
138 shared_state: &mut shared_state.borrow_mut(),
139 output: &mut output_buffers,
140 logging_interval_ms,
141 peers,
142 time,
143 }
144 .handle(event);
145 }
146 });
147 }
148 });
149
150 let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
155 &operates,
156 TimelyLog::Operates,
157 move |((id, name), time, diff), packer, session| {
158 let data = packer.pack_slice(&[
159 Datum::UInt64(u64::cast_from(*id)),
160 Datum::UInt64(u64::cast_from(worker_id)),
161 Datum::String(name),
162 ]);
163 session.give((data, time, diff));
164 },
165 );
166
167 let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(Pipeline, "ToRow Channels", |_cap, _info| {
170 let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
171 move |input, output| {
172 while let Some((time, data)) = input.next() {
173 let mut session = output.session_with_builder(&time);
174 for ((datum, ()), time, diff) in data.iter() {
175 let (source_node, source_port) = datum.source;
176 let (target_node, target_port) = datum.target;
177 let data = packer.pack_slice(&[
178 Datum::UInt64(u64::cast_from(datum.id)),
179 Datum::UInt64(u64::cast_from(worker_id)),
180 Datum::UInt64(u64::cast_from(source_node)),
181 Datum::UInt64(u64::cast_from(source_port)),
182 Datum::UInt64(u64::cast_from(target_node)),
183 Datum::UInt64(u64::cast_from(target_port)),
184 Datum::String(datum.typ),
185 ]);
186 session.give((data, time, diff));
187 }
188 }
189 }
190 });
191
192 let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
193 &addresses,
194 TimelyLog::Addresses,
195 move |((id, address), time, diff), packer, session| {
196 let data = packer.pack_by_index(|packer, index| match index {
197 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
198 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
199 2 => {
200 packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i))))
201 }
202 _ => unreachable!("Addresses relation has three columns"),
203 });
204 session.give((data, time, diff));
205 },
206 );
207
208 let parks = consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
209 &parks,
210 TimelyLog::Parks,
211 move |((datum, ()), time, diff), packer, session| {
212 let data = packer.pack_slice(&[
213 Datum::UInt64(u64::cast_from(worker_id)),
214 Datum::UInt64(datum.duration_pow),
215 datum
216 .requested_pow
217 .map(Datum::UInt64)
218 .unwrap_or(Datum::Null),
219 ]);
220 session.give((data, time, diff));
221 },
222 );
223
224 let batches_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
225 &batches_sent,
226 TimelyLog::BatchesSent,
227 move |((datum, ()), time, diff), packer, session| {
228 let data = packer.pack_slice(&[
229 Datum::UInt64(u64::cast_from(datum.channel)),
230 Datum::UInt64(u64::cast_from(worker_id)),
231 Datum::UInt64(u64::cast_from(datum.worker)),
232 ]);
233 session.give((data, time, diff));
234 },
235 );
236
237 let batches_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
238 &batches_received,
239 TimelyLog::BatchesReceived,
240 move |((datum, ()), time, diff), packer, session| {
241 let data = packer.pack_slice(&[
242 Datum::UInt64(u64::cast_from(datum.channel)),
243 Datum::UInt64(u64::cast_from(datum.worker)),
244 Datum::UInt64(u64::cast_from(worker_id)),
245 ]);
246 session.give((data, time, diff));
247 },
248 );
249
250
251 let messages_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
252 &messages_sent,
253 TimelyLog::MessagesSent,
254 move |((datum, ()), time, diff), packer, session| {
255 let data = packer.pack_slice(&[
256 Datum::UInt64(u64::cast_from(datum.channel)),
257 Datum::UInt64(u64::cast_from(worker_id)),
258 Datum::UInt64(u64::cast_from(datum.worker)),
259 ]);
260 session.give((data, time, diff));
261 },
262 );
263
264 let messages_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
265 &messages_received,
266 TimelyLog::MessagesReceived,
267 move |((datum, ()), time, diff), packer, session| {
268 let data = packer.pack_slice(&[
269 Datum::UInt64(u64::cast_from(datum.channel)),
270 Datum::UInt64(u64::cast_from(datum.worker)),
271 Datum::UInt64(u64::cast_from(worker_id)),
272 ]);
273 session.give((data, time, diff));
274 },
275 );
276
277 let elapsed = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
278 &schedules_duration,
279 TimelyLog::Elapsed,
280 move |((operator, ()), time, diff), packer, session| {
281 let data = packer.pack_slice(&[Datum::UInt64(u64::cast_from(*operator)),
282 Datum::UInt64(u64::cast_from(worker_id)),
283 ]);
284 session.give((data, time, diff));
285 },
286 );
287
288
289 let histogram = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
290 &schedules_histogram,
291 TimelyLog::Histogram,
292 move |((datum, ()), time, diff), packer, session| {
293 let data = packer.pack_slice(&[
294 Datum::UInt64(u64::cast_from(datum.operator)),
295 Datum::UInt64(u64::cast_from(worker_id)),
296 Datum::UInt64(datum.duration_pow),
297 ]);
298 session.give((data, time, diff));
299 },
300 );
301
302 let logs = {
303 use TimelyLog::*;
304 [
305 (Operates, operates),
306 (Channels, channels),
307 (Elapsed, elapsed),
308 (Histogram, histogram),
309 (Addresses, addresses),
310 (Parks, parks),
311 (MessagesSent, messages_sent),
312 (MessagesReceived, messages_received),
313 (BatchesSent, batches_sent),
314 (BatchesReceived, batches_received),
315 ]
316 };
317
318 let mut collections = BTreeMap::new();
320 for (variant, collection) in logs {
321 let variant = LogVariant::Timely(variant);
322 if config.index_logs.contains_key(&variant) {
323 let trace = collection
324 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
325 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
326 &format!("Arrange {variant:?}"),
327 )
328 .trace;
329 let collection = LogCollection {
330 trace,
331 token: Rc::clone(&token),
332 };
333 collections.insert(variant, collection);
334 }
335 }
336
337 Return { collections }
338 })
339}
340
341#[derive(Default)]
343struct DemuxState {
344 operators: BTreeMap<usize, OperatesEvent>,
346 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
348 last_park: Option<Park>,
350 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
352 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
354 schedule_starts: BTreeMap<usize, Duration>,
356 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
359}
360
361struct Park {
362 time: Duration,
364 requested: Option<Duration>,
366}
367
368#[derive(Default, Copy, Clone, Debug)]
370struct MessageCount {
371 batches: i64,
373 records: Diff,
375}
376
377struct DemuxOutput<'a> {
383 operates: OutputSessionVec<'a, Update<(usize, String)>>,
384 channels: OutputSessionColumnar<'a, Update<(ChannelDatum, ())>>,
385 addresses: OutputSessionVec<'a, Update<(usize, Vec<usize>)>>,
386 parks: OutputSessionVec<'a, Update<(ParkDatum, ())>>,
387 batches_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
388 batches_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
389 messages_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
390 messages_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
391 schedules_duration: OutputSessionVec<'a, Update<(usize, ())>>,
392 schedules_histogram: OutputSessionVec<'a, Update<(ScheduleHistogramDatum, ())>>,
393}
394
395#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
396struct ChannelDatum {
397 id: usize,
398 source: (usize, usize),
399 target: (usize, usize),
400 typ: String,
401}
402
403#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
404struct ParkDatum {
405 duration_pow: u64,
406 requested_pow: Option<u64>,
407}
408
409impl Columnation for ParkDatum {
410 type InnerRegion = CopyRegion<Self>;
411}
412
413#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
414struct MessageDatum {
415 channel: usize,
416 worker: usize,
417}
418
419impl Columnation for MessageDatum {
420 type InnerRegion = CopyRegion<Self>;
421}
422
423#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
424struct ScheduleHistogramDatum {
425 operator: usize,
426 duration_pow: u64,
427}
428
429impl Columnation for ScheduleHistogramDatum {
430 type InnerRegion = CopyRegion<Self>;
431}
432
433struct DemuxHandler<'a, 'b> {
435 state: &'a mut DemuxState,
437 shared_state: &'a mut SharedLoggingState,
439 output: &'a mut DemuxOutput<'b>,
441 logging_interval_ms: u128,
443 peers: usize,
445 time: Duration,
447}
448
449impl DemuxHandler<'_, '_> {
450 fn ts(&self) -> Timestamp {
453 let time_ms = self.time.as_millis();
454 let interval = self.logging_interval_ms;
455 let rounded = (time_ms / interval + 1) * interval;
456 rounded.try_into().expect("must fit")
457 }
458
459 fn handle(&mut self, event: TimelyEvent) {
461 use TimelyEvent::*;
462
463 match event {
464 Operates(e) => self.handle_operates(e),
465 Channels(e) => self.handle_channels(e),
466 Shutdown(e) => self.handle_shutdown(e),
467 Park(e) => self.handle_park(e),
468 Messages(e) => self.handle_messages(e),
469 Schedule(e) => self.handle_schedule(e),
470 _ => (),
471 }
472 }
473
474 fn handle_operates(&mut self, event: OperatesEvent) {
475 let ts = self.ts();
476 let datum = (event.id, event.name.clone());
477 self.output.operates.give((datum, ts, Diff::ONE));
478
479 let datum = (event.id, event.addr.clone());
480 self.output.addresses.give((datum, ts, Diff::ONE));
481
482 self.state.operators.insert(event.id, event);
483 }
484
485 fn handle_channels(&mut self, event: ChannelsEvent) {
486 let ts = self.ts();
487 let datum = ChannelDatumReference {
488 id: event.id,
489 source: event.source,
490 target: event.target,
491 typ: &event.typ,
492 };
493 self.output.channels.give(((datum, ()), ts, Diff::ONE));
494
495 let datum = (event.id, event.scope_addr.clone());
496 self.output.addresses.give((datum, ts, Diff::ONE));
497
498 let dataflow_index = event.scope_addr[0];
499 self.state
500 .dataflow_channels
501 .entry(dataflow_index)
502 .or_default()
503 .push(event);
504 }
505
506 fn handle_shutdown(&mut self, event: ShutdownEvent) {
507 let Some(operator) = self.state.operators.remove(&event.id) else {
513 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
514 return;
515 };
516
517 let ts = self.ts();
519 let datum = (operator.id, operator.name);
520 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
521
522 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
524 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
525 .enumerate()
526 .filter(|(_, (count, _))| *count != 0)
527 {
528 self.output
529 .schedules_duration
530 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
531
532 let datum = ScheduleHistogramDatum {
533 operator: event.id,
534 duration_pow: 1 << bucket,
535 };
536 let diff = Diff::cast_from(-count);
537 self.output
538 .schedules_histogram
539 .give(((datum, ()), ts, diff));
540 }
541 }
542
543 if operator.addr.len() == 1 {
544 let dataflow_index = operator.addr[0];
545 self.handle_dataflow_shutdown(dataflow_index);
546 }
547
548 let datum = (operator.id, operator.addr);
549 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
550 }
551
552 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
553 self.shared_state.compute_logger.as_ref().map(|logger| {
555 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
556 });
557
558 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
560 return;
561 };
562
563 let ts = self.ts();
564 for channel in channels {
565 let datum = ChannelDatumReference {
567 id: channel.id,
568 source: channel.source,
569 target: channel.target,
570 typ: &channel.typ,
571 };
572 self.output
573 .channels
574 .give(((datum, ()), ts, Diff::MINUS_ONE));
575
576 let datum = (channel.id, channel.scope_addr);
577 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
578
579 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
581 for (target_worker, count) in sent.iter().enumerate() {
582 let datum = MessageDatum {
583 channel: channel.id,
584 worker: target_worker,
585 };
586 self.output
587 .messages_sent
588 .give(((datum, ()), ts, Diff::from(-count.records)));
589 self.output
590 .batches_sent
591 .give(((datum, ()), ts, Diff::from(-count.batches)));
592 }
593 }
594 if let Some(received) = self.state.messages_received.remove(&channel.id) {
595 for (source_worker, count) in received.iter().enumerate() {
596 let datum = MessageDatum {
597 channel: channel.id,
598 worker: source_worker,
599 };
600 self.output.messages_received.give((
601 (datum, ()),
602 ts,
603 Diff::from(-count.records),
604 ));
605 self.output.batches_received.give((
606 (datum, ()),
607 ts,
608 Diff::from(-count.batches),
609 ));
610 }
611 }
612 }
613 }
614
615 fn handle_park(&mut self, event: ParkEvent) {
616 match event {
617 ParkEvent::Park(requested) => {
618 let park = Park {
619 time: self.time,
620 requested,
621 };
622 let existing = self.state.last_park.replace(park);
623 if existing.is_some() {
624 error!("park without a succeeding unpark");
625 }
626 }
627 ParkEvent::Unpark => {
628 let Some(park) = self.state.last_park.take() else {
629 error!("unpark without a preceding park");
630 return;
631 };
632
633 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
634 let duration_pow =
635 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
636 let requested_pow = park
637 .requested
638 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
639
640 let ts = self.ts();
641 let datum = ParkDatum {
642 duration_pow,
643 requested_pow,
644 };
645 self.output.parks.give(((datum, ()), ts, Diff::ONE));
646 }
647 }
648 }
649
650 fn handle_messages(&mut self, event: MessagesEvent) {
651 let ts = self.ts();
652 let count = Diff::try_from(event.length).expect("must fit");
653
654 if event.is_send {
655 let datum = MessageDatum {
656 channel: event.channel,
657 worker: event.target,
658 };
659 self.output.messages_sent.give(((datum, ()), ts, count));
660 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
661
662 let sent_counts = self
663 .state
664 .messages_sent
665 .entry(event.channel)
666 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
667 sent_counts[event.target].records += count;
668 sent_counts[event.target].batches += 1;
669 } else {
670 let datum = MessageDatum {
671 channel: event.channel,
672 worker: event.source,
673 };
674 self.output.messages_received.give(((datum, ()), ts, count));
675 self.output
676 .batches_received
677 .give(((datum, ()), ts, Diff::ONE));
678
679 let received_counts = self
680 .state
681 .messages_received
682 .entry(event.channel)
683 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
684 received_counts[event.source].records += count;
685 received_counts[event.source].batches += 1;
686 }
687 }
688
689 fn handle_schedule(&mut self, event: ScheduleEvent) {
690 match event.start_stop {
691 timely::logging::StartStop::Start => {
692 let existing = self.state.schedule_starts.insert(event.id, self.time);
693 if existing.is_some() {
694 error!(operator_id = ?event.id, "schedule start without succeeding stop");
695 }
696 }
697 timely::logging::StartStop::Stop => {
698 let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
699 error!(operator_id = ?event.id, "schedule stop without preceeding start");
700 return;
701 };
702
703 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
704 let elapsed_diff = Diff::from(i64::try_from(elapsed_ns).expect("must fit"));
705 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
706
707 let ts = self.ts();
708 let datum = event.id;
709 self.output
710 .schedules_duration
711 .give(((datum, ()), ts, elapsed_diff));
712
713 let datum = ScheduleHistogramDatum {
714 operator: event.id,
715 duration_pow: elapsed_pow,
716 };
717 self.output
718 .schedules_histogram
719 .give(((datum, ()), ts, Diff::ONE));
720
721 let index = usize::cast_from(elapsed_pow.trailing_zeros());
723 let data = self.state.schedules_data.entry(event.id).or_default();
724 grow_vec(data, index);
725 let (count, duration) = &mut data[index];
726 *count += 1;
727 *duration += elapsed_diff;
728 }
729 }
730 }
731}
732
733fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
737where
738 T: Clone + Default,
739{
740 if vec.len() <= index {
741 vec.resize(index + 1, Default::default());
742 }
743}