Skip to main content

mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::dataflow::operators::generic::operator::empty;
31use timely::logging::{
32    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33    TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40    EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41    Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47/// The return type of [`construct`].
48pub(super) struct Return {
49    /// Collections to export.
50    pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53/// Constructs the logging dataflow fragment for timely logs.
54///
55/// Params
56/// * `scope`: The Timely scope hosting the log analysis dataflow.
57/// * `config`: Logging configuration
58/// * `event_queue`: The source to read log events from.
59pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
60    mut scope: G,
61    config: &LoggingConfig,
62    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63    shared_state: Rc<RefCell<SharedLoggingState>>,
64) -> Return {
65    scope.scoped("timely logging", move |scope| {
66        let enable_logging = config.enable_logging;
67        let (logs, token) = if enable_logging {
68            event_queue.links.mz_replay(
69                scope,
70                "timely logs",
71                config.interval,
72                event_queue.activator,
73            )
74        } else {
75            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
76            (empty(scope), token)
77        };
78
79        // Build a demux operator that splits the replayed event stream up into the separate
80        // logging streams.
81        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
82        let mut input = demux.new_input(logs, Pipeline);
83        let (operates_out, operates) = demux.new_output();
84        let mut operates_out = OutputBuilder::from(operates_out);
85        let (channels_out, channels) = demux.new_output();
86        let mut channels_out = OutputBuilder::from(channels_out);
87        let (addresses_out, addresses) = demux.new_output();
88        let mut addresses_out = OutputBuilder::from(addresses_out);
89        let (parks_out, parks) = demux.new_output();
90        let mut parks_out = OutputBuilder::from(parks_out);
91        let (messages_sent_out, messages_sent) = demux.new_output();
92        let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
93        let (messages_received_out, messages_received) = demux.new_output();
94        let mut messages_received_out = OutputBuilder::from(messages_received_out);
95        let (schedules_duration_out, schedules_duration) = demux.new_output();
96        let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
97        let (schedules_histogram_out, schedules_histogram) = demux.new_output();
98        let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
99        let (batches_sent_out, batches_sent) = demux.new_output();
100        let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
101        let (batches_received_out, batches_received) = demux.new_output();
102        let mut batches_received_out = OutputBuilder::from(batches_received_out);
103
104        let worker_id = scope.index();
105        let mut demux_state = DemuxState::default();
106        demux.build(|_capability| {
107            let peers = scope.peers();
108            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
109            move |_frontiers| {
110                let mut operates = operates_out.activate();
111                let mut channels = channels_out.activate();
112                let mut addresses = addresses_out.activate();
113                let mut parks = parks_out.activate();
114                let mut messages_sent = messages_sent_out.activate();
115                let mut messages_received = messages_received_out.activate();
116                let mut batches_sent = batches_sent_out.activate();
117                let mut batches_received = batches_received_out.activate();
118                let mut schedules_duration = schedules_duration_out.activate();
119                let mut schedules_histogram = schedules_histogram_out.activate();
120
121                input.for_each(|cap, data| {
122                    let mut output_buffers = DemuxOutput {
123                        operates: operates.session_with_builder(&cap),
124                        channels: channels.session_with_builder(&cap),
125                        addresses: addresses.session_with_builder(&cap),
126                        parks: parks.session_with_builder(&cap),
127                        messages_sent: messages_sent.session_with_builder(&cap),
128                        messages_received: messages_received.session_with_builder(&cap),
129                        schedules_duration: schedules_duration.session_with_builder(&cap),
130                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
131                        batches_sent: batches_sent.session_with_builder(&cap),
132                        batches_received: batches_received.session_with_builder(&cap),
133                    };
134
135                    for (time, event) in data.drain(..) {
136                        if let TimelyEvent::Messages(msg) = &event {
137                            match msg.is_send {
138                                true => assert_eq!(msg.source, worker_id),
139                                false => assert_eq!(msg.target, worker_id),
140                            }
141                        }
142
143                        DemuxHandler {
144                            state: &mut demux_state,
145                            shared_state: &mut shared_state.borrow_mut(),
146                            output: &mut output_buffers,
147                            logging_interval_ms,
148                            peers,
149                            time,
150                        }
151                        .handle(event);
152                    }
153                });
154            }
155        });
156
157        // Encode the contents of each logging stream into its expected `Row` format.
158        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
159        // updates that reach `Row` encoding.
160
161        let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
162            operates,
163            TimelyLog::Operates,
164            move |data, packer, session| {
165                for ((id, name), time, diff) in data.iter() {
166                    let data = packer.pack_slice(&[
167                        Datum::UInt64(u64::cast_from(*id)),
168                        Datum::UInt64(u64::cast_from(worker_id)),
169                        Datum::String(name),
170                    ]);
171                    session.give((data, time, diff));
172                }
173            },
174        );
175
176        // TODO: `consolidate_and_pack` requires columnation, which `ChannelDatum` does not
177        // implement. Consider consolidating here once we support columnar.
178        let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
179            Pipeline,
180            "ToRow Channels",
181            |_cap, _info| {
182                let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
183                move |input, output| {
184                    input.for_each_time(|time, data| {
185                        let mut session = output.session_with_builder(&time);
186                        for d in data.flat_map(|c| c.borrow().into_index_iter()) {
187                            let ((datum, ()), time, diff) = d;
188                            let (source_node, source_port) = datum.source;
189                            let (target_node, target_port) = datum.target;
190                            let data = packer.pack_slice(&[
191                                Datum::UInt64(u64::cast_from(datum.id)),
192                                Datum::UInt64(u64::cast_from(worker_id)),
193                                Datum::UInt64(u64::cast_from(source_node)),
194                                Datum::UInt64(u64::cast_from(source_port)),
195                                Datum::UInt64(u64::cast_from(target_node)),
196                                Datum::UInt64(u64::cast_from(target_port)),
197                                Datum::String(
198                                    std::str::from_utf8(datum.typ).expect("valid string"),
199                                ),
200                            ]);
201                            session.give((data, time, diff));
202                        }
203                    });
204                }
205            },
206        );
207
208        // Types to make rustfmt happy.
209        type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
210        type KB<K, T, D> = KeyBatcher<K, T, D>;
211
212        let addresses = consolidate_and_pack::<_, KVB<_, _, _, _>, ColumnBuilder<_>, _, _>(
213            addresses,
214            TimelyLog::Addresses,
215            move |data, packer, session| {
216                for ((id, address), time, diff) in data.iter() {
217                    let data = packer.pack_by_index(|packer, index| match index {
218                        0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
219                        1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
220                        2 => {
221                            let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
222                            packer.push_list(list)
223                        }
224                        _ => unreachable!("Addresses relation has three columns"),
225                    });
226                    session.give((data, time, diff));
227                }
228            },
229        );
230
231        let parks = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
232            parks,
233            TimelyLog::Parks,
234            move |data, packer, session| {
235                for ((datum, ()), time, diff) in data.iter() {
236                    let data = packer.pack_slice(&[
237                        Datum::UInt64(u64::cast_from(worker_id)),
238                        Datum::UInt64(datum.duration_pow),
239                        datum
240                            .requested_pow
241                            .map(Datum::UInt64)
242                            .unwrap_or(Datum::Null),
243                    ]);
244                    session.give((data, time, diff));
245                }
246            },
247        );
248
249        let batches_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
250            batches_sent,
251            TimelyLog::BatchesSent,
252            move |data, packer, session| {
253                for ((datum, ()), time, diff) in data.iter() {
254                    let data = packer.pack_slice(&[
255                        Datum::UInt64(u64::cast_from(datum.channel)),
256                        Datum::UInt64(u64::cast_from(worker_id)),
257                        Datum::UInt64(u64::cast_from(datum.worker)),
258                    ]);
259                    session.give((data, time, diff));
260                }
261            },
262        );
263
264        let batches_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
265            batches_received,
266            TimelyLog::BatchesReceived,
267            move |data, packer, session| {
268                for ((datum, ()), time, diff) in data.iter() {
269                    let data = packer.pack_slice(&[
270                        Datum::UInt64(u64::cast_from(datum.channel)),
271                        Datum::UInt64(u64::cast_from(datum.worker)),
272                        Datum::UInt64(u64::cast_from(worker_id)),
273                    ]);
274                    session.give((data, time, diff));
275                }
276            },
277        );
278
279        let messages_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
280            messages_sent,
281            TimelyLog::MessagesSent,
282            move |data, packer, session| {
283                for ((datum, ()), time, diff) in data.iter() {
284                    let data = packer.pack_slice(&[
285                        Datum::UInt64(u64::cast_from(datum.channel)),
286                        Datum::UInt64(u64::cast_from(worker_id)),
287                        Datum::UInt64(u64::cast_from(datum.worker)),
288                    ]);
289                    session.give((data, time, diff));
290                }
291            },
292        );
293
294        let messages_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
295            messages_received,
296            TimelyLog::MessagesReceived,
297            move |data, packer, session| {
298                for ((datum, ()), time, diff) in data.iter() {
299                    let data = packer.pack_slice(&[
300                        Datum::UInt64(u64::cast_from(datum.channel)),
301                        Datum::UInt64(u64::cast_from(datum.worker)),
302                        Datum::UInt64(u64::cast_from(worker_id)),
303                    ]);
304                    session.give((data, time, diff));
305                }
306            },
307        );
308
309        let elapsed = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
310            schedules_duration,
311            TimelyLog::Elapsed,
312            move |data, packer, session| {
313                for ((operator, ()), time, diff) in data.iter() {
314                    let data = packer.pack_slice(&[
315                        Datum::UInt64(u64::cast_from(*operator)),
316                        Datum::UInt64(u64::cast_from(worker_id)),
317                    ]);
318                    session.give((data, time, diff));
319                }
320            },
321        );
322
323        let histogram = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
324            schedules_histogram,
325            TimelyLog::Histogram,
326            move |data, packer, session| {
327                for ((datum, ()), time, diff) in data.iter() {
328                    let data = packer.pack_slice(&[
329                        Datum::UInt64(u64::cast_from(datum.operator)),
330                        Datum::UInt64(u64::cast_from(worker_id)),
331                        Datum::UInt64(datum.duration_pow),
332                    ]);
333                    session.give((data, time, diff));
334                }
335            },
336        );
337
338        let logs = {
339            use TimelyLog::*;
340            [
341                (Operates, operates),
342                (Channels, channels),
343                (Elapsed, elapsed),
344                (Histogram, histogram),
345                (Addresses, addresses),
346                (Parks, parks),
347                (MessagesSent, messages_sent),
348                (MessagesReceived, messages_received),
349                (BatchesSent, batches_sent),
350                (BatchesReceived, batches_received),
351            ]
352        };
353
354        // Build the output arrangements.
355        let mut collections = BTreeMap::new();
356        for (variant, collection) in logs {
357            let variant = LogVariant::Timely(variant);
358            if config.index_logs.contains_key(&variant) {
359                // Extract types to make rustfmt happy.
360                type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
361                type Builder<T, R> = RowRowBuilder<T, R>;
362                let trace = collection
363                    .mz_arrange_core::<_, Batcher<_, _, _, _>, Builder<_, _>, RowRowSpine<_, _>>(
364                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(
365                            columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
366                        ),
367                        &format!("Arrange {variant:?}"),
368                    )
369                    .trace;
370                let collection = LogCollection {
371                    trace,
372                    token: Rc::clone(&token),
373                };
374                collections.insert(variant, collection);
375            }
376        }
377
378        Return { collections }
379    })
380}
381
382/// State maintained by the demux operator.
383#[derive(Default)]
384struct DemuxState {
385    /// Information about live operators, indexed by operator ID.
386    operators: BTreeMap<usize, OperatesEvent>,
387    /// Maps dataflow IDs to channels in the dataflow.
388    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
389    /// Information about the last requested park.
390    last_park: Option<Park>,
391    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
392    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
393    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
394    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
395    /// Stores for scheduled operators the time when they were scheduled.
396    schedule_starts: Vec<(usize, Duration)>,
397    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
398    /// bucket.
399    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
400}
401
402struct Park {
403    /// Time when the park occurred.
404    time: Duration,
405    /// Requested park time.
406    requested: Option<Duration>,
407}
408
409/// Organize message counts into number of batches and records.
410#[derive(Default, Copy, Clone, Debug)]
411struct MessageCount {
412    /// The number of batches sent across a channel.
413    batches: i64,
414    /// The number of records sent across a channel.
415    records: Diff,
416}
417
418/// Bundled output buffers used by the demux operator.
419//
420// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
421// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
422// wouldn't be an issue.
423struct DemuxOutput<'a, 'b> {
424    operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
425    channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
426    addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
427    parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
428    batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
429    batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
430    messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
431    messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
432    schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
433    schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
434}
435
436#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
437struct ChannelDatum {
438    id: usize,
439    source: (usize, usize),
440    target: (usize, usize),
441    typ: String,
442}
443
444#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
445struct ParkDatum {
446    duration_pow: u64,
447    requested_pow: Option<u64>,
448}
449
450impl Columnation for ParkDatum {
451    type InnerRegion = CopyRegion<Self>;
452}
453
454#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
455struct MessageDatum {
456    channel: usize,
457    worker: usize,
458}
459
460impl Columnation for MessageDatum {
461    type InnerRegion = CopyRegion<Self>;
462}
463
464#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
465struct ScheduleHistogramDatum {
466    operator: usize,
467    duration_pow: u64,
468}
469
470impl Columnation for ScheduleHistogramDatum {
471    type InnerRegion = CopyRegion<Self>;
472}
473
474/// Event handler of the demux operator.
475struct DemuxHandler<'a, 'b, 'c> {
476    /// State kept by the demux operator.
477    state: &'a mut DemuxState,
478    /// State shared across log receivers.
479    shared_state: &'a mut SharedLoggingState,
480    /// Demux output buffers.
481    output: &'a mut DemuxOutput<'b, 'c>,
482    /// The logging interval specifying the time granularity for the updates.
483    logging_interval_ms: u128,
484    /// The number of timely workers.
485    peers: usize,
486    /// The current event time.
487    time: Duration,
488}
489
490impl DemuxHandler<'_, '_, '_> {
491    /// Return the timestamp associated with the current event, based on the event time and the
492    /// logging interval.
493    fn ts(&self) -> Timestamp {
494        let time_ms = self.time.as_millis();
495        let interval = self.logging_interval_ms;
496        let rounded = (time_ms / interval + 1) * interval;
497        rounded.try_into().expect("must fit")
498    }
499
500    /// Handle the given timely event.
501    fn handle(&mut self, event: TimelyEvent) {
502        use TimelyEvent::*;
503
504        match event {
505            Operates(e) => self.handle_operates(e),
506            Channels(e) => self.handle_channels(e),
507            Shutdown(e) => self.handle_shutdown(e),
508            Park(e) => self.handle_park(e),
509            Messages(e) => self.handle_messages(e),
510            Schedule(e) => self.handle_schedule(e),
511            _ => (),
512        }
513    }
514
515    fn handle_operates(&mut self, event: OperatesEvent) {
516        let ts = self.ts();
517        let datum = (event.id, event.name.clone());
518        self.output.operates.give((datum, ts, Diff::ONE));
519
520        let datum = (event.id, event.addr.clone());
521        self.output.addresses.give((datum, ts, Diff::ONE));
522
523        self.state.operators.insert(event.id, event);
524    }
525
526    fn handle_channels(&mut self, event: ChannelsEvent) {
527        let ts = self.ts();
528        let datum = ChannelDatumReference {
529            id: event.id,
530            source: event.source,
531            target: event.target,
532            typ: &event.typ,
533        };
534        self.output.channels.give(((datum, ()), ts, Diff::ONE));
535
536        let datum = (event.id, event.scope_addr.clone());
537        self.output.addresses.give((datum, ts, Diff::ONE));
538
539        let dataflow_index = event.scope_addr[0];
540        self.state
541            .dataflow_channels
542            .entry(dataflow_index)
543            .or_default()
544            .push(event);
545    }
546
547    fn handle_shutdown(&mut self, event: ShutdownEvent) {
548        // Dropped operators should result in a negative record for
549        // the `operates` collection, cancelling out the initial
550        // operator announcement.
551        // Remove logging for this operator.
552
553        let Some(operator) = self.state.operators.remove(&event.id) else {
554            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
555            return;
556        };
557
558        // Retract operator information.
559        let ts = self.ts();
560        let datum = (operator.id, operator.name);
561        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
562
563        // Retract schedules information for the operator
564        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
565            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
566                .enumerate()
567                .filter(|(_, (count, _))| *count != 0)
568            {
569                self.output
570                    .schedules_duration
571                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
572
573                let datum = ScheduleHistogramDatum {
574                    operator: event.id,
575                    duration_pow: 1 << bucket,
576                };
577                let diff = Diff::cast_from(-count);
578                self.output
579                    .schedules_histogram
580                    .give(((datum, ()), ts, diff));
581            }
582        }
583
584        if operator.addr.len() == 1 {
585            let dataflow_index = operator.addr[0];
586            self.handle_dataflow_shutdown(dataflow_index);
587        }
588
589        let datum = (operator.id, operator.addr);
590        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
591    }
592
593    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
594        // Notify compute logging about the shutdown.
595        self.shared_state.compute_logger.as_ref().map(|logger| {
596            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
597        });
598
599        // When a dataflow shuts down, we need to retract all its channels.
600        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
601            return;
602        };
603
604        let ts = self.ts();
605        for channel in channels {
606            // Retract channel description.
607            let datum = ChannelDatumReference {
608                id: channel.id,
609                source: channel.source,
610                target: channel.target,
611                typ: &channel.typ,
612            };
613            self.output
614                .channels
615                .give(((datum, ()), ts, Diff::MINUS_ONE));
616
617            let datum = (channel.id, channel.scope_addr);
618            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
619
620            // Retract messages logged for this channel.
621            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
622                for (target_worker, count) in sent.iter().enumerate() {
623                    let datum = MessageDatum {
624                        channel: channel.id,
625                        worker: target_worker,
626                    };
627                    self.output
628                        .messages_sent
629                        .give(((datum, ()), ts, Diff::from(-count.records)));
630                    self.output
631                        .batches_sent
632                        .give(((datum, ()), ts, Diff::from(-count.batches)));
633                }
634            }
635            if let Some(received) = self.state.messages_received.remove(&channel.id) {
636                for (source_worker, count) in received.iter().enumerate() {
637                    let datum = MessageDatum {
638                        channel: channel.id,
639                        worker: source_worker,
640                    };
641                    self.output.messages_received.give((
642                        (datum, ()),
643                        ts,
644                        Diff::from(-count.records),
645                    ));
646                    self.output.batches_received.give((
647                        (datum, ()),
648                        ts,
649                        Diff::from(-count.batches),
650                    ));
651                }
652            }
653        }
654    }
655
656    fn handle_park(&mut self, event: ParkEvent) {
657        match event {
658            ParkEvent::Park(requested) => {
659                let park = Park {
660                    time: self.time,
661                    requested,
662                };
663                let existing = self.state.last_park.replace(park);
664                if existing.is_some() {
665                    error!("park without a succeeding unpark");
666                }
667            }
668            ParkEvent::Unpark => {
669                let Some(park) = self.state.last_park.take() else {
670                    error!("unpark without a preceding park");
671                    return;
672                };
673
674                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
675                let duration_pow =
676                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
677                let requested_pow = park
678                    .requested
679                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
680
681                let ts = self.ts();
682                let datum = ParkDatum {
683                    duration_pow,
684                    requested_pow,
685                };
686                self.output.parks.give(((datum, ()), ts, Diff::ONE));
687            }
688        }
689    }
690
691    fn handle_messages(&mut self, event: MessagesEvent) {
692        let ts = self.ts();
693        let count = Diff::from(event.record_count);
694
695        if event.is_send {
696            let datum = MessageDatum {
697                channel: event.channel,
698                worker: event.target,
699            };
700            self.output.messages_sent.give(((datum, ()), ts, count));
701            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
702
703            let sent_counts = self
704                .state
705                .messages_sent
706                .entry(event.channel)
707                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
708            sent_counts[event.target].records += count;
709            sent_counts[event.target].batches += 1;
710        } else {
711            let datum = MessageDatum {
712                channel: event.channel,
713                worker: event.source,
714            };
715            self.output.messages_received.give(((datum, ()), ts, count));
716            self.output
717                .batches_received
718                .give(((datum, ()), ts, Diff::ONE));
719
720            let received_counts = self
721                .state
722                .messages_received
723                .entry(event.channel)
724                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
725            received_counts[event.source].records += count;
726            received_counts[event.source].batches += 1;
727        }
728    }
729
730    fn handle_schedule(&mut self, event: ScheduleEvent) {
731        match event.start_stop {
732            timely::logging::StartStop::Start => {
733                self.state.schedule_starts.push((event.id, self.time));
734            }
735            timely::logging::StartStop::Stop => {
736                let Some((old_id, start_time)) = self.state.schedule_starts.pop() else {
737                    error!(operator_id = ?event.id, "schedule stop without preceding start");
738                    return;
739                };
740
741                if old_id != event.id {
742                    error!(start_id = ?old_id, stop_id = ?event.id, "schedule stop without preceding start");
743                    return;
744                }
745
746                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
747                let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
748                let elapsed_diff = Diff::from(elapsed_i64);
749                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
750
751                let ts = self.ts();
752                let datum = event.id;
753                self.output
754                    .schedules_duration
755                    .give(((datum, ()), ts, elapsed_diff));
756
757                let datum = ScheduleHistogramDatum {
758                    operator: event.id,
759                    duration_pow: elapsed_pow,
760                };
761                self.output
762                    .schedules_histogram
763                    .give(((datum, ()), ts, Diff::ONE));
764
765                // Record count and elapsed time for later retraction.
766                let index = usize::cast_from(elapsed_pow.trailing_zeros());
767                let data = self.state.schedules_data.entry(event.id).or_default();
768                grow_vec(data, index);
769                let (count, duration) = &mut data[index];
770                *count += 1;
771                *duration += elapsed_diff;
772            }
773        }
774    }
775}
776
777/// Grow the given vector so it fits the given index.
778///
779/// This does nothing if the vector is already large enough.
780fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
781where
782    T: Clone + Default,
783{
784    if vec.len() <= index {
785        vec.resize(index + 1, Default::default());
786    }
787}