mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::dataflow::operators::generic::operator::empty;
31use timely::logging::{
32    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33    TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40    EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41    Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47/// The return type of [`construct`].
48pub(super) struct Return {
49    /// Collections to export.
50    pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53/// Constructs the logging dataflow fragment for timely logs.
54///
55/// Params
56/// * `scope`: The Timely scope hosting the log analysis dataflow.
57/// * `config`: Logging configuration
58/// * `event_queue`: The source to read log events from.
59pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
60    mut scope: G,
61    config: &LoggingConfig,
62    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63    shared_state: Rc<RefCell<SharedLoggingState>>,
64) -> Return {
65    scope.scoped("timely logging", move |scope| {
66        let enable_logging = config.enable_logging;
67        let (logs, token) = if enable_logging {
68            event_queue.links.mz_replay(
69                scope,
70                "timely logs",
71                config.interval,
72                event_queue.activator,
73            )
74        } else {
75            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
76            (empty(scope), token)
77        };
78
79        // Build a demux operator that splits the replayed event stream up into the separate
80        // logging streams.
81        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
82        let mut input = demux.new_input(&logs, Pipeline);
83        let (operates_out, operates) = demux.new_output();
84        let mut operates_out = OutputBuilder::from(operates_out);
85        let (channels_out, channels) = demux.new_output();
86        let mut channels_out = OutputBuilder::from(channels_out);
87        let (addresses_out, addresses) = demux.new_output();
88        let mut addresses_out = OutputBuilder::from(addresses_out);
89        let (parks_out, parks) = demux.new_output();
90        let mut parks_out = OutputBuilder::from(parks_out);
91        let (messages_sent_out, messages_sent) = demux.new_output();
92        let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
93        let (messages_received_out, messages_received) = demux.new_output();
94        let mut messages_received_out = OutputBuilder::from(messages_received_out);
95        let (schedules_duration_out, schedules_duration) = demux.new_output();
96        let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
97        let (schedules_histogram_out, schedules_histogram) = demux.new_output();
98        let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
99        let (batches_sent_out, batches_sent) = demux.new_output();
100        let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
101        let (batches_received_out, batches_received) = demux.new_output();
102        let mut batches_received_out = OutputBuilder::from(batches_received_out);
103
104        let worker_id = scope.index();
105        let mut demux_state = DemuxState::default();
106        demux.build(|_capability| {
107            let peers = scope.peers();
108            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
109            move |_frontiers| {
110                let mut operates = operates_out.activate();
111                let mut channels = channels_out.activate();
112                let mut addresses = addresses_out.activate();
113                let mut parks = parks_out.activate();
114                let mut messages_sent = messages_sent_out.activate();
115                let mut messages_received = messages_received_out.activate();
116                let mut batches_sent = batches_sent_out.activate();
117                let mut batches_received = batches_received_out.activate();
118                let mut schedules_duration = schedules_duration_out.activate();
119                let mut schedules_histogram = schedules_histogram_out.activate();
120
121                input.for_each(|cap, data| {
122                    let mut output_buffers = DemuxOutput {
123                        operates: operates.session_with_builder(&cap),
124                        channels: channels.session_with_builder(&cap),
125                        addresses: addresses.session_with_builder(&cap),
126                        parks: parks.session_with_builder(&cap),
127                        messages_sent: messages_sent.session_with_builder(&cap),
128                        messages_received: messages_received.session_with_builder(&cap),
129                        schedules_duration: schedules_duration.session_with_builder(&cap),
130                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
131                        batches_sent: batches_sent.session_with_builder(&cap),
132                        batches_received: batches_received.session_with_builder(&cap),
133                    };
134
135                    for (time, event) in data.drain(..) {
136                        if let TimelyEvent::Messages(msg) = &event {
137                            match msg.is_send {
138                                true => assert_eq!(msg.source, worker_id),
139                                false => assert_eq!(msg.target, worker_id),
140                            }
141                        }
142
143                        DemuxHandler {
144                            state: &mut demux_state,
145                            shared_state: &mut shared_state.borrow_mut(),
146                            output: &mut output_buffers,
147                            logging_interval_ms,
148                            peers,
149                            time,
150                        }
151                        .handle(event);
152                    }
153                });
154            }
155        });
156
157        // Encode the contents of each logging stream into its expected `Row` format.
158        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
159        // updates that reach `Row` encoding.
160
161        let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
162            &operates,
163            TimelyLog::Operates,
164            move |data, packer, session| {
165                for ((id, name), time, diff) in data.iter() {
166                    let data = packer.pack_slice(&[
167                        Datum::UInt64(u64::cast_from(*id)),
168                        Datum::UInt64(u64::cast_from(worker_id)),
169                        Datum::String(name),
170                    ]);
171                    session.give((data, time, diff));
172                }
173            },
174        );
175
176        // TODO: `consolidate_and_pack` requires columnation, which `ChannelDatum` does not
177        // implement. Consider consolidating here once we support columnar.
178        let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
179            Pipeline,
180            "ToRow Channels",
181            |_cap, _info| {
182                let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
183                move |input, output| {
184                    input.for_each_time(|time, data| {
185                        let mut session = output.session_with_builder(&time);
186                        for d in data.flat_map(|c| c.borrow().into_index_iter()) {
187                            let ((datum, ()), time, diff) = d;
188                            let (source_node, source_port) = datum.source;
189                            let (target_node, target_port) = datum.target;
190                            let data = packer.pack_slice(&[
191                                Datum::UInt64(u64::cast_from(datum.id)),
192                                Datum::UInt64(u64::cast_from(worker_id)),
193                                Datum::UInt64(u64::cast_from(source_node)),
194                                Datum::UInt64(u64::cast_from(source_port)),
195                                Datum::UInt64(u64::cast_from(target_node)),
196                                Datum::UInt64(u64::cast_from(target_port)),
197                                Datum::String(datum.typ),
198                            ]);
199                            session.give((data, time, diff));
200                        }
201                    });
202                }
203            },
204        );
205
206        // Types to make rustfmt happy.
207        type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
208        type KB<K, T, D> = KeyBatcher<K, T, D>;
209
210        let addresses = consolidate_and_pack::<_, KVB<_, _, _, _>, ColumnBuilder<_>, _, _>(
211            &addresses,
212            TimelyLog::Addresses,
213            move |data, packer, session| {
214                for ((id, address), time, diff) in data.iter() {
215                    let data = packer.pack_by_index(|packer, index| match index {
216                        0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
217                        1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
218                        2 => {
219                            let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
220                            packer.push_list(list)
221                        }
222                        _ => unreachable!("Addresses relation has three columns"),
223                    });
224                    session.give((data, time, diff));
225                }
226            },
227        );
228
229        let parks = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
230            &parks,
231            TimelyLog::Parks,
232            move |data, packer, session| {
233                for ((datum, ()), time, diff) in data.iter() {
234                    let data = packer.pack_slice(&[
235                        Datum::UInt64(u64::cast_from(worker_id)),
236                        Datum::UInt64(datum.duration_pow),
237                        datum
238                            .requested_pow
239                            .map(Datum::UInt64)
240                            .unwrap_or(Datum::Null),
241                    ]);
242                    session.give((data, time, diff));
243                }
244            },
245        );
246
247        let batches_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
248            &batches_sent,
249            TimelyLog::BatchesSent,
250            move |data, packer, session| {
251                for ((datum, ()), time, diff) in data.iter() {
252                    let data = packer.pack_slice(&[
253                        Datum::UInt64(u64::cast_from(datum.channel)),
254                        Datum::UInt64(u64::cast_from(worker_id)),
255                        Datum::UInt64(u64::cast_from(datum.worker)),
256                    ]);
257                    session.give((data, time, diff));
258                }
259            },
260        );
261
262        let batches_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
263            &batches_received,
264            TimelyLog::BatchesReceived,
265            move |data, packer, session| {
266                for ((datum, ()), time, diff) in data.iter() {
267                    let data = packer.pack_slice(&[
268                        Datum::UInt64(u64::cast_from(datum.channel)),
269                        Datum::UInt64(u64::cast_from(datum.worker)),
270                        Datum::UInt64(u64::cast_from(worker_id)),
271                    ]);
272                    session.give((data, time, diff));
273                }
274            },
275        );
276
277        let messages_sent = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
278            &messages_sent,
279            TimelyLog::MessagesSent,
280            move |data, packer, session| {
281                for ((datum, ()), time, diff) in data.iter() {
282                    let data = packer.pack_slice(&[
283                        Datum::UInt64(u64::cast_from(datum.channel)),
284                        Datum::UInt64(u64::cast_from(worker_id)),
285                        Datum::UInt64(u64::cast_from(datum.worker)),
286                    ]);
287                    session.give((data, time, diff));
288                }
289            },
290        );
291
292        let messages_received = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
293            &messages_received,
294            TimelyLog::MessagesReceived,
295            move |data, packer, session| {
296                for ((datum, ()), time, diff) in data.iter() {
297                    let data = packer.pack_slice(&[
298                        Datum::UInt64(u64::cast_from(datum.channel)),
299                        Datum::UInt64(u64::cast_from(datum.worker)),
300                        Datum::UInt64(u64::cast_from(worker_id)),
301                    ]);
302                    session.give((data, time, diff));
303                }
304            },
305        );
306
307        let elapsed = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
308            &schedules_duration,
309            TimelyLog::Elapsed,
310            move |data, packer, session| {
311                for ((operator, ()), time, diff) in data.iter() {
312                    let data = packer.pack_slice(&[
313                        Datum::UInt64(u64::cast_from(*operator)),
314                        Datum::UInt64(u64::cast_from(worker_id)),
315                    ]);
316                    session.give((data, time, diff));
317                }
318            },
319        );
320
321        let histogram = consolidate_and_pack::<_, KB<_, _, _>, ColumnBuilder<_>, _, _>(
322            &schedules_histogram,
323            TimelyLog::Histogram,
324            move |data, packer, session| {
325                for ((datum, ()), time, diff) in data.iter() {
326                    let data = packer.pack_slice(&[
327                        Datum::UInt64(u64::cast_from(datum.operator)),
328                        Datum::UInt64(u64::cast_from(worker_id)),
329                        Datum::UInt64(datum.duration_pow),
330                    ]);
331                    session.give((data, time, diff));
332                }
333            },
334        );
335
336        let logs = {
337            use TimelyLog::*;
338            [
339                (Operates, operates),
340                (Channels, channels),
341                (Elapsed, elapsed),
342                (Histogram, histogram),
343                (Addresses, addresses),
344                (Parks, parks),
345                (MessagesSent, messages_sent),
346                (MessagesReceived, messages_received),
347                (BatchesSent, batches_sent),
348                (BatchesReceived, batches_received),
349            ]
350        };
351
352        // Build the output arrangements.
353        let mut collections = BTreeMap::new();
354        for (variant, collection) in logs {
355            let variant = LogVariant::Timely(variant);
356            if config.index_logs.contains_key(&variant) {
357                // Extract types to make rustfmt happy.
358                type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
359                type Builder<T, R> = RowRowBuilder<T, R>;
360                let trace = collection
361                    .mz_arrange_core::<_, Batcher<_, _, _, _>, Builder<_, _>, RowRowSpine<_, _>>(
362                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(
363                            columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
364                        ),
365                        &format!("Arrange {variant:?}"),
366                    )
367                    .trace;
368                let collection = LogCollection {
369                    trace,
370                    token: Rc::clone(&token),
371                };
372                collections.insert(variant, collection);
373            }
374        }
375
376        Return { collections }
377    })
378}
379
380/// State maintained by the demux operator.
381#[derive(Default)]
382struct DemuxState {
383    /// Information about live operators, indexed by operator ID.
384    operators: BTreeMap<usize, OperatesEvent>,
385    /// Maps dataflow IDs to channels in the dataflow.
386    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
387    /// Information about the last requested park.
388    last_park: Option<Park>,
389    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
390    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
391    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
392    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
393    /// Stores for scheduled operators the time when they were scheduled.
394    schedule_starts: BTreeMap<usize, Duration>,
395    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
396    /// bucket.
397    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
398}
399
400struct Park {
401    /// Time when the park occurred.
402    time: Duration,
403    /// Requested park time.
404    requested: Option<Duration>,
405}
406
407/// Organize message counts into number of batches and records.
408#[derive(Default, Copy, Clone, Debug)]
409struct MessageCount {
410    /// The number of batches sent across a channel.
411    batches: i64,
412    /// The number of records sent across a channel.
413    records: Diff,
414}
415
416/// Bundled output buffers used by the demux operator.
417//
418// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
419// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
420// wouldn't be an issue.
421struct DemuxOutput<'a, 'b> {
422    operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
423    channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
424    addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
425    parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
426    batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
427    batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
428    messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
429    messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
430    schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
431    schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
432}
433
434#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
435struct ChannelDatum {
436    id: usize,
437    source: (usize, usize),
438    target: (usize, usize),
439    typ: String,
440}
441
442#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
443struct ParkDatum {
444    duration_pow: u64,
445    requested_pow: Option<u64>,
446}
447
448impl Columnation for ParkDatum {
449    type InnerRegion = CopyRegion<Self>;
450}
451
452#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
453struct MessageDatum {
454    channel: usize,
455    worker: usize,
456}
457
458impl Columnation for MessageDatum {
459    type InnerRegion = CopyRegion<Self>;
460}
461
462#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
463struct ScheduleHistogramDatum {
464    operator: usize,
465    duration_pow: u64,
466}
467
468impl Columnation for ScheduleHistogramDatum {
469    type InnerRegion = CopyRegion<Self>;
470}
471
472/// Event handler of the demux operator.
473struct DemuxHandler<'a, 'b, 'c> {
474    /// State kept by the demux operator.
475    state: &'a mut DemuxState,
476    /// State shared across log receivers.
477    shared_state: &'a mut SharedLoggingState,
478    /// Demux output buffers.
479    output: &'a mut DemuxOutput<'b, 'c>,
480    /// The logging interval specifying the time granularity for the updates.
481    logging_interval_ms: u128,
482    /// The number of timely workers.
483    peers: usize,
484    /// The current event time.
485    time: Duration,
486}
487
488impl DemuxHandler<'_, '_, '_> {
489    /// Return the timestamp associated with the current event, based on the event time and the
490    /// logging interval.
491    fn ts(&self) -> Timestamp {
492        let time_ms = self.time.as_millis();
493        let interval = self.logging_interval_ms;
494        let rounded = (time_ms / interval + 1) * interval;
495        rounded.try_into().expect("must fit")
496    }
497
498    /// Handle the given timely event.
499    fn handle(&mut self, event: TimelyEvent) {
500        use TimelyEvent::*;
501
502        match event {
503            Operates(e) => self.handle_operates(e),
504            Channels(e) => self.handle_channels(e),
505            Shutdown(e) => self.handle_shutdown(e),
506            Park(e) => self.handle_park(e),
507            Messages(e) => self.handle_messages(e),
508            Schedule(e) => self.handle_schedule(e),
509            _ => (),
510        }
511    }
512
513    fn handle_operates(&mut self, event: OperatesEvent) {
514        let ts = self.ts();
515        let datum = (event.id, event.name.clone());
516        self.output.operates.give((datum, ts, Diff::ONE));
517
518        let datum = (event.id, event.addr.clone());
519        self.output.addresses.give((datum, ts, Diff::ONE));
520
521        self.state.operators.insert(event.id, event);
522    }
523
524    fn handle_channels(&mut self, event: ChannelsEvent) {
525        let ts = self.ts();
526        let datum = ChannelDatumReference {
527            id: event.id,
528            source: event.source,
529            target: event.target,
530            typ: &event.typ,
531        };
532        self.output.channels.give(((datum, ()), ts, Diff::ONE));
533
534        let datum = (event.id, event.scope_addr.clone());
535        self.output.addresses.give((datum, ts, Diff::ONE));
536
537        let dataflow_index = event.scope_addr[0];
538        self.state
539            .dataflow_channels
540            .entry(dataflow_index)
541            .or_default()
542            .push(event);
543    }
544
545    fn handle_shutdown(&mut self, event: ShutdownEvent) {
546        // Dropped operators should result in a negative record for
547        // the `operates` collection, cancelling out the initial
548        // operator announcement.
549        // Remove logging for this operator.
550
551        let Some(operator) = self.state.operators.remove(&event.id) else {
552            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
553            return;
554        };
555
556        // Retract operator information.
557        let ts = self.ts();
558        let datum = (operator.id, operator.name);
559        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
560
561        // Retract schedules information for the operator
562        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
563            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
564                .enumerate()
565                .filter(|(_, (count, _))| *count != 0)
566            {
567                self.output
568                    .schedules_duration
569                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
570
571                let datum = ScheduleHistogramDatum {
572                    operator: event.id,
573                    duration_pow: 1 << bucket,
574                };
575                let diff = Diff::cast_from(-count);
576                self.output
577                    .schedules_histogram
578                    .give(((datum, ()), ts, diff));
579            }
580        }
581
582        if operator.addr.len() == 1 {
583            let dataflow_index = operator.addr[0];
584            self.handle_dataflow_shutdown(dataflow_index);
585        }
586
587        let datum = (operator.id, operator.addr);
588        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
589    }
590
591    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
592        // Notify compute logging about the shutdown.
593        self.shared_state.compute_logger.as_ref().map(|logger| {
594            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
595        });
596
597        // When a dataflow shuts down, we need to retract all its channels.
598        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
599            return;
600        };
601
602        let ts = self.ts();
603        for channel in channels {
604            // Retract channel description.
605            let datum = ChannelDatumReference {
606                id: channel.id,
607                source: channel.source,
608                target: channel.target,
609                typ: &channel.typ,
610            };
611            self.output
612                .channels
613                .give(((datum, ()), ts, Diff::MINUS_ONE));
614
615            let datum = (channel.id, channel.scope_addr);
616            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
617
618            // Retract messages logged for this channel.
619            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
620                for (target_worker, count) in sent.iter().enumerate() {
621                    let datum = MessageDatum {
622                        channel: channel.id,
623                        worker: target_worker,
624                    };
625                    self.output
626                        .messages_sent
627                        .give(((datum, ()), ts, Diff::from(-count.records)));
628                    self.output
629                        .batches_sent
630                        .give(((datum, ()), ts, Diff::from(-count.batches)));
631                }
632            }
633            if let Some(received) = self.state.messages_received.remove(&channel.id) {
634                for (source_worker, count) in received.iter().enumerate() {
635                    let datum = MessageDatum {
636                        channel: channel.id,
637                        worker: source_worker,
638                    };
639                    self.output.messages_received.give((
640                        (datum, ()),
641                        ts,
642                        Diff::from(-count.records),
643                    ));
644                    self.output.batches_received.give((
645                        (datum, ()),
646                        ts,
647                        Diff::from(-count.batches),
648                    ));
649                }
650            }
651        }
652    }
653
654    fn handle_park(&mut self, event: ParkEvent) {
655        match event {
656            ParkEvent::Park(requested) => {
657                let park = Park {
658                    time: self.time,
659                    requested,
660                };
661                let existing = self.state.last_park.replace(park);
662                if existing.is_some() {
663                    error!("park without a succeeding unpark");
664                }
665            }
666            ParkEvent::Unpark => {
667                let Some(park) = self.state.last_park.take() else {
668                    error!("unpark without a preceding park");
669                    return;
670                };
671
672                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
673                let duration_pow =
674                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
675                let requested_pow = park
676                    .requested
677                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
678
679                let ts = self.ts();
680                let datum = ParkDatum {
681                    duration_pow,
682                    requested_pow,
683                };
684                self.output.parks.give(((datum, ()), ts, Diff::ONE));
685            }
686        }
687    }
688
689    fn handle_messages(&mut self, event: MessagesEvent) {
690        let ts = self.ts();
691        let count = Diff::from(event.record_count);
692
693        if event.is_send {
694            let datum = MessageDatum {
695                channel: event.channel,
696                worker: event.target,
697            };
698            self.output.messages_sent.give(((datum, ()), ts, count));
699            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
700
701            let sent_counts = self
702                .state
703                .messages_sent
704                .entry(event.channel)
705                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
706            sent_counts[event.target].records += count;
707            sent_counts[event.target].batches += 1;
708        } else {
709            let datum = MessageDatum {
710                channel: event.channel,
711                worker: event.source,
712            };
713            self.output.messages_received.give(((datum, ()), ts, count));
714            self.output
715                .batches_received
716                .give(((datum, ()), ts, Diff::ONE));
717
718            let received_counts = self
719                .state
720                .messages_received
721                .entry(event.channel)
722                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
723            received_counts[event.source].records += count;
724            received_counts[event.source].batches += 1;
725        }
726    }
727
728    fn handle_schedule(&mut self, event: ScheduleEvent) {
729        match event.start_stop {
730            timely::logging::StartStop::Start => {
731                let existing = self.state.schedule_starts.insert(event.id, self.time);
732                if existing.is_some() {
733                    error!(operator_id = ?event.id, "schedule start without succeeding stop");
734                }
735            }
736            timely::logging::StartStop::Stop => {
737                let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
738                    error!(operator_id = ?event.id, "schedule stop without preceeding start");
739                    return;
740                };
741
742                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
743                let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
744                let elapsed_diff = Diff::from(elapsed_i64);
745                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
746
747                let ts = self.ts();
748                let datum = event.id;
749                self.output
750                    .schedules_duration
751                    .give(((datum, ()), ts, elapsed_diff));
752
753                let datum = ScheduleHistogramDatum {
754                    operator: event.id,
755                    duration_pow: elapsed_pow,
756                };
757                self.output
758                    .schedules_histogram
759                    .give(((datum, ()), ts, Diff::ONE));
760
761                // Record count and elapsed time for later retraction.
762                let index = usize::cast_from(elapsed_pow.trailing_zeros());
763                let data = self.state.schedules_data.entry(event.id).or_default();
764                grow_vec(data, index);
765                let (count, duration) = &mut data[index];
766                *count += 1;
767                *duration += elapsed_diff;
768            }
769        }
770    }
771}
772
773/// Grow the given vector so it fits the given index.
774///
775/// This does nothing if the vector is already large enough.
776fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
777where
778    T: Clone + Default,
779{
780    if vec.len() <= index {
781        vec.resize(index + 1, Default::default());
782    }
783}