1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::containers::ProvidedBuilder;
25use mz_timely_util::replay::MzReplay;
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
28use timely::dataflow::operators::Operator;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::logging::{
31 ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
32 TimelyEvent,
33};
34use tracing::error;
35
36use crate::extensions::arrange::MzArrangeCore;
37use crate::logging::compute::{ComputeEvent, DataflowShutdown};
38use crate::logging::{
39 EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
40 Update,
41};
42use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
43use crate::row_spine::RowRowBuilder;
44use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
45
46pub(super) struct Return {
48 pub collections: BTreeMap<LogVariant, LogCollection>,
50}
51
52pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
59 mut scope: G,
60 config: &LoggingConfig,
61 event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
62 shared_state: Rc<RefCell<SharedLoggingState>>,
63) -> Return {
64 scope.scoped("timely logging", move |scope| {
65 let enable_logging = config.enable_logging;
66 let (logs, token) =
67 event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
68 scope,
69 "timely logs",
70 config.interval,
71 event_queue.activator,
72 move |mut session, mut data| {
73 if enable_logging {
76 session.give_container(data.to_mut())
77 }
78 },
79 );
80
81 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
84 let mut input = demux.new_input(&logs, Pipeline);
85 let (mut operates_out, operates) = demux.new_output();
86 let (mut channels_out, channels) = demux.new_output();
87 let (mut addresses_out, addresses) = demux.new_output();
88 let (mut parks_out, parks) = demux.new_output();
89 let (mut messages_sent_out, messages_sent) = demux.new_output();
90 let (mut messages_received_out, messages_received) = demux.new_output();
91 let (mut schedules_duration_out, schedules_duration) = demux.new_output();
92 let (mut schedules_histogram_out, schedules_histogram) = demux.new_output();
93 let (mut batches_sent_out, batches_sent) = demux.new_output();
94 let (mut batches_received_out, batches_received) = demux.new_output();
95
96 let worker_id = scope.index();
97 let mut demux_state = DemuxState::default();
98 demux.build(|_capability| {
99 let peers = scope.peers();
100 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
101 move |_frontiers| {
102 let mut operates = operates_out.activate();
103 let mut channels = channels_out.activate();
104 let mut addresses = addresses_out.activate();
105 let mut parks = parks_out.activate();
106 let mut messages_sent = messages_sent_out.activate();
107 let mut messages_received = messages_received_out.activate();
108 let mut batches_sent = batches_sent_out.activate();
109 let mut batches_received = batches_received_out.activate();
110 let mut schedules_duration = schedules_duration_out.activate();
111 let mut schedules_histogram = schedules_histogram_out.activate();
112
113 input.for_each(|cap, data| {
114 let mut output_buffers = DemuxOutput {
115 operates: operates.session_with_builder(&cap),
116 channels: channels.session_with_builder(&cap),
117 addresses: addresses.session_with_builder(&cap),
118 parks: parks.session_with_builder(&cap),
119 messages_sent: messages_sent.session_with_builder(&cap),
120 messages_received: messages_received.session_with_builder(&cap),
121 schedules_duration: schedules_duration.session_with_builder(&cap),
122 schedules_histogram: schedules_histogram.session_with_builder(&cap),
123 batches_sent: batches_sent.session_with_builder(&cap),
124 batches_received: batches_received.session_with_builder(&cap),
125 };
126
127 for (time, event) in data.drain(..) {
128 if let TimelyEvent::Messages(msg) = &event {
129 match msg.is_send {
130 true => assert_eq!(msg.source, worker_id),
131 false => assert_eq!(msg.target, worker_id),
132 }
133 }
134
135 DemuxHandler {
136 state: &mut demux_state,
137 shared_state: &mut shared_state.borrow_mut(),
138 output: &mut output_buffers,
139 logging_interval_ms,
140 peers,
141 time,
142 }
143 .handle(event);
144 }
145 });
146 }
147 });
148
149 let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
154 &operates,
155 TimelyLog::Operates,
156 move |((id, name), time, diff), packer, session| {
157 let data = packer.pack_slice(&[
158 Datum::UInt64(u64::cast_from(*id)),
159 Datum::UInt64(u64::cast_from(worker_id)),
160 Datum::String(name),
161 ]);
162 session.give((data, time, diff));
163 },
164 );
165
166 let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(Pipeline, "ToRow Channels", |_cap, _info| {
169 let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
170 move |input, output| {
171 while let Some((time, data)) = input.next() {
172 let mut session = output.session_with_builder(&time);
173 for ((datum, ()), time, diff) in data.borrow().into_index_iter() {
174 let (source_node, source_port) = datum.source;
175 let (target_node, target_port) = datum.target;
176 let data = packer.pack_slice(&[
177 Datum::UInt64(u64::cast_from(datum.id)),
178 Datum::UInt64(u64::cast_from(worker_id)),
179 Datum::UInt64(u64::cast_from(source_node)),
180 Datum::UInt64(u64::cast_from(source_port)),
181 Datum::UInt64(u64::cast_from(target_node)),
182 Datum::UInt64(u64::cast_from(target_port)),
183 Datum::String(datum.typ),
184 ]);
185 session.give((data, time, diff));
186 }
187 }
188 }
189 });
190
191 let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
192 &addresses,
193 TimelyLog::Addresses,
194 move |((id, address), time, diff), packer, session| {
195 let data = packer.pack_by_index(|packer, index| match index {
196 0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
197 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
198 2 => {
199 packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i))))
200 }
201 _ => unreachable!("Addresses relation has three columns"),
202 });
203 session.give((data, time, diff));
204 },
205 );
206
207 let parks = consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
208 &parks,
209 TimelyLog::Parks,
210 move |((datum, ()), time, diff), packer, session| {
211 let data = packer.pack_slice(&[
212 Datum::UInt64(u64::cast_from(worker_id)),
213 Datum::UInt64(datum.duration_pow),
214 datum
215 .requested_pow
216 .map(Datum::UInt64)
217 .unwrap_or(Datum::Null),
218 ]);
219 session.give((data, time, diff));
220 },
221 );
222
223 let batches_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
224 &batches_sent,
225 TimelyLog::BatchesSent,
226 move |((datum, ()), time, diff), packer, session| {
227 let data = packer.pack_slice(&[
228 Datum::UInt64(u64::cast_from(datum.channel)),
229 Datum::UInt64(u64::cast_from(worker_id)),
230 Datum::UInt64(u64::cast_from(datum.worker)),
231 ]);
232 session.give((data, time, diff));
233 },
234 );
235
236 let batches_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
237 &batches_received,
238 TimelyLog::BatchesReceived,
239 move |((datum, ()), time, diff), packer, session| {
240 let data = packer.pack_slice(&[
241 Datum::UInt64(u64::cast_from(datum.channel)),
242 Datum::UInt64(u64::cast_from(datum.worker)),
243 Datum::UInt64(u64::cast_from(worker_id)),
244 ]);
245 session.give((data, time, diff));
246 },
247 );
248
249
250 let messages_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
251 &messages_sent,
252 TimelyLog::MessagesSent,
253 move |((datum, ()), time, diff), packer, session| {
254 let data = packer.pack_slice(&[
255 Datum::UInt64(u64::cast_from(datum.channel)),
256 Datum::UInt64(u64::cast_from(worker_id)),
257 Datum::UInt64(u64::cast_from(datum.worker)),
258 ]);
259 session.give((data, time, diff));
260 },
261 );
262
263 let messages_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
264 &messages_received,
265 TimelyLog::MessagesReceived,
266 move |((datum, ()), time, diff), packer, session| {
267 let data = packer.pack_slice(&[
268 Datum::UInt64(u64::cast_from(datum.channel)),
269 Datum::UInt64(u64::cast_from(datum.worker)),
270 Datum::UInt64(u64::cast_from(worker_id)),
271 ]);
272 session.give((data, time, diff));
273 },
274 );
275
276 let elapsed = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
277 &schedules_duration,
278 TimelyLog::Elapsed,
279 move |((operator, ()), time, diff), packer, session| {
280 let data = packer.pack_slice(&[Datum::UInt64(u64::cast_from(*operator)),
281 Datum::UInt64(u64::cast_from(worker_id)),
282 ]);
283 session.give((data, time, diff));
284 },
285 );
286
287
288 let histogram = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
289 &schedules_histogram,
290 TimelyLog::Histogram,
291 move |((datum, ()), time, diff), packer, session| {
292 let data = packer.pack_slice(&[
293 Datum::UInt64(u64::cast_from(datum.operator)),
294 Datum::UInt64(u64::cast_from(worker_id)),
295 Datum::UInt64(datum.duration_pow),
296 ]);
297 session.give((data, time, diff));
298 },
299 );
300
301 let logs = {
302 use TimelyLog::*;
303 [
304 (Operates, operates),
305 (Channels, channels),
306 (Elapsed, elapsed),
307 (Histogram, histogram),
308 (Addresses, addresses),
309 (Parks, parks),
310 (MessagesSent, messages_sent),
311 (MessagesReceived, messages_received),
312 (BatchesSent, batches_sent),
313 (BatchesReceived, batches_received),
314 ]
315 };
316
317 let mut collections = BTreeMap::new();
319 for (variant, collection) in logs {
320 let variant = LogVariant::Timely(variant);
321 if config.index_logs.contains_key(&variant) {
322 let trace = collection
323 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
324 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
325 &format!("Arrange {variant:?}"),
326 )
327 .trace;
328 let collection = LogCollection {
329 trace,
330 token: Rc::clone(&token),
331 };
332 collections.insert(variant, collection);
333 }
334 }
335
336 Return { collections }
337 })
338}
339
340#[derive(Default)]
342struct DemuxState {
343 operators: BTreeMap<usize, OperatesEvent>,
345 dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
347 last_park: Option<Park>,
349 messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
351 messages_received: BTreeMap<usize, Box<[MessageCount]>>,
353 schedule_starts: BTreeMap<usize, Duration>,
355 schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
358}
359
360struct Park {
361 time: Duration,
363 requested: Option<Duration>,
365}
366
367#[derive(Default, Copy, Clone, Debug)]
369struct MessageCount {
370 batches: i64,
372 records: Diff,
374}
375
376struct DemuxOutput<'a> {
382 operates: OutputSessionVec<'a, Update<(usize, String)>>,
383 channels: OutputSessionColumnar<'a, Update<(ChannelDatum, ())>>,
384 addresses: OutputSessionVec<'a, Update<(usize, Vec<usize>)>>,
385 parks: OutputSessionVec<'a, Update<(ParkDatum, ())>>,
386 batches_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
387 batches_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
388 messages_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
389 messages_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
390 schedules_duration: OutputSessionVec<'a, Update<(usize, ())>>,
391 schedules_histogram: OutputSessionVec<'a, Update<(ScheduleHistogramDatum, ())>>,
392}
393
394#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
395struct ChannelDatum {
396 id: usize,
397 source: (usize, usize),
398 target: (usize, usize),
399 typ: String,
400}
401
402#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
403struct ParkDatum {
404 duration_pow: u64,
405 requested_pow: Option<u64>,
406}
407
408impl Columnation for ParkDatum {
409 type InnerRegion = CopyRegion<Self>;
410}
411
412#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
413struct MessageDatum {
414 channel: usize,
415 worker: usize,
416}
417
418impl Columnation for MessageDatum {
419 type InnerRegion = CopyRegion<Self>;
420}
421
422#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
423struct ScheduleHistogramDatum {
424 operator: usize,
425 duration_pow: u64,
426}
427
428impl Columnation for ScheduleHistogramDatum {
429 type InnerRegion = CopyRegion<Self>;
430}
431
432struct DemuxHandler<'a, 'b> {
434 state: &'a mut DemuxState,
436 shared_state: &'a mut SharedLoggingState,
438 output: &'a mut DemuxOutput<'b>,
440 logging_interval_ms: u128,
442 peers: usize,
444 time: Duration,
446}
447
448impl DemuxHandler<'_, '_> {
449 fn ts(&self) -> Timestamp {
452 let time_ms = self.time.as_millis();
453 let interval = self.logging_interval_ms;
454 let rounded = (time_ms / interval + 1) * interval;
455 rounded.try_into().expect("must fit")
456 }
457
458 fn handle(&mut self, event: TimelyEvent) {
460 use TimelyEvent::*;
461
462 match event {
463 Operates(e) => self.handle_operates(e),
464 Channels(e) => self.handle_channels(e),
465 Shutdown(e) => self.handle_shutdown(e),
466 Park(e) => self.handle_park(e),
467 Messages(e) => self.handle_messages(e),
468 Schedule(e) => self.handle_schedule(e),
469 _ => (),
470 }
471 }
472
473 fn handle_operates(&mut self, event: OperatesEvent) {
474 let ts = self.ts();
475 let datum = (event.id, event.name.clone());
476 self.output.operates.give((datum, ts, Diff::ONE));
477
478 let datum = (event.id, event.addr.clone());
479 self.output.addresses.give((datum, ts, Diff::ONE));
480
481 self.state.operators.insert(event.id, event);
482 }
483
484 fn handle_channels(&mut self, event: ChannelsEvent) {
485 let ts = self.ts();
486 let datum = ChannelDatumReference {
487 id: event.id,
488 source: event.source,
489 target: event.target,
490 typ: &event.typ,
491 };
492 self.output.channels.give(((datum, ()), ts, Diff::ONE));
493
494 let datum = (event.id, event.scope_addr.clone());
495 self.output.addresses.give((datum, ts, Diff::ONE));
496
497 let dataflow_index = event.scope_addr[0];
498 self.state
499 .dataflow_channels
500 .entry(dataflow_index)
501 .or_default()
502 .push(event);
503 }
504
505 fn handle_shutdown(&mut self, event: ShutdownEvent) {
506 let Some(operator) = self.state.operators.remove(&event.id) else {
512 error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
513 return;
514 };
515
516 let ts = self.ts();
518 let datum = (operator.id, operator.name);
519 self.output.operates.give((datum, ts, Diff::MINUS_ONE));
520
521 if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
523 for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
524 .enumerate()
525 .filter(|(_, (count, _))| *count != 0)
526 {
527 self.output
528 .schedules_duration
529 .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
530
531 let datum = ScheduleHistogramDatum {
532 operator: event.id,
533 duration_pow: 1 << bucket,
534 };
535 let diff = Diff::cast_from(-count);
536 self.output
537 .schedules_histogram
538 .give(((datum, ()), ts, diff));
539 }
540 }
541
542 if operator.addr.len() == 1 {
543 let dataflow_index = operator.addr[0];
544 self.handle_dataflow_shutdown(dataflow_index);
545 }
546
547 let datum = (operator.id, operator.addr);
548 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
549 }
550
551 fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
552 self.shared_state.compute_logger.as_ref().map(|logger| {
554 logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
555 });
556
557 let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
559 return;
560 };
561
562 let ts = self.ts();
563 for channel in channels {
564 let datum = ChannelDatumReference {
566 id: channel.id,
567 source: channel.source,
568 target: channel.target,
569 typ: &channel.typ,
570 };
571 self.output
572 .channels
573 .give(((datum, ()), ts, Diff::MINUS_ONE));
574
575 let datum = (channel.id, channel.scope_addr);
576 self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
577
578 if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
580 for (target_worker, count) in sent.iter().enumerate() {
581 let datum = MessageDatum {
582 channel: channel.id,
583 worker: target_worker,
584 };
585 self.output
586 .messages_sent
587 .give(((datum, ()), ts, Diff::from(-count.records)));
588 self.output
589 .batches_sent
590 .give(((datum, ()), ts, Diff::from(-count.batches)));
591 }
592 }
593 if let Some(received) = self.state.messages_received.remove(&channel.id) {
594 for (source_worker, count) in received.iter().enumerate() {
595 let datum = MessageDatum {
596 channel: channel.id,
597 worker: source_worker,
598 };
599 self.output.messages_received.give((
600 (datum, ()),
601 ts,
602 Diff::from(-count.records),
603 ));
604 self.output.batches_received.give((
605 (datum, ()),
606 ts,
607 Diff::from(-count.batches),
608 ));
609 }
610 }
611 }
612 }
613
614 fn handle_park(&mut self, event: ParkEvent) {
615 match event {
616 ParkEvent::Park(requested) => {
617 let park = Park {
618 time: self.time,
619 requested,
620 };
621 let existing = self.state.last_park.replace(park);
622 if existing.is_some() {
623 error!("park without a succeeding unpark");
624 }
625 }
626 ParkEvent::Unpark => {
627 let Some(park) = self.state.last_park.take() else {
628 error!("unpark without a preceding park");
629 return;
630 };
631
632 let duration_ns = self.time.saturating_sub(park.time).as_nanos();
633 let duration_pow =
634 u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
635 let requested_pow = park
636 .requested
637 .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
638
639 let ts = self.ts();
640 let datum = ParkDatum {
641 duration_pow,
642 requested_pow,
643 };
644 self.output.parks.give(((datum, ()), ts, Diff::ONE));
645 }
646 }
647 }
648
649 fn handle_messages(&mut self, event: MessagesEvent) {
650 let ts = self.ts();
651 let count = Diff::from(event.record_count);
652
653 if event.is_send {
654 let datum = MessageDatum {
655 channel: event.channel,
656 worker: event.target,
657 };
658 self.output.messages_sent.give(((datum, ()), ts, count));
659 self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
660
661 let sent_counts = self
662 .state
663 .messages_sent
664 .entry(event.channel)
665 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
666 sent_counts[event.target].records += count;
667 sent_counts[event.target].batches += 1;
668 } else {
669 let datum = MessageDatum {
670 channel: event.channel,
671 worker: event.source,
672 };
673 self.output.messages_received.give(((datum, ()), ts, count));
674 self.output
675 .batches_received
676 .give(((datum, ()), ts, Diff::ONE));
677
678 let received_counts = self
679 .state
680 .messages_received
681 .entry(event.channel)
682 .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
683 received_counts[event.source].records += count;
684 received_counts[event.source].batches += 1;
685 }
686 }
687
688 fn handle_schedule(&mut self, event: ScheduleEvent) {
689 match event.start_stop {
690 timely::logging::StartStop::Start => {
691 let existing = self.state.schedule_starts.insert(event.id, self.time);
692 if existing.is_some() {
693 error!(operator_id = ?event.id, "schedule start without succeeding stop");
694 }
695 }
696 timely::logging::StartStop::Stop => {
697 let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
698 error!(operator_id = ?event.id, "schedule stop without preceeding start");
699 return;
700 };
701
702 let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
703 let elapsed_diff = Diff::from(i64::try_from(elapsed_ns).expect("must fit"));
704 let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
705
706 let ts = self.ts();
707 let datum = event.id;
708 self.output
709 .schedules_duration
710 .give(((datum, ()), ts, elapsed_diff));
711
712 let datum = ScheduleHistogramDatum {
713 operator: event.id,
714 duration_pow: elapsed_pow,
715 };
716 self.output
717 .schedules_histogram
718 .give(((datum, ()), ts, Diff::ONE));
719
720 let index = usize::cast_from(elapsed_pow.trailing_zeros());
722 let data = self.state.schedules_data.entry(event.id).or_default();
723 grow_vec(data, index);
724 let (count, duration) = &mut data[index];
725 *count += 1;
726 *duration += elapsed_diff;
727 }
728 }
729 }
730}
731
732fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
736where
737 T: Clone + Default,
738{
739 if vec.len() <= index {
740 vec.resize(index + 1, Default::default());
741 }
742}