mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::containers::{
23    Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange,
24};
25use mz_timely_util::replay::MzReplay;
26use timely::Container;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::channels::pushers::buffer::Session;
30use timely::dataflow::channels::pushers::{Counter, Tee};
31use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
32use timely::logging::{
33    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
34    TimelyEvent,
35};
36use tracing::error;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::logging::compute::{ComputeEvent, DataflowShutdown};
40use crate::logging::{EventQueue, LogVariant, TimelyLog};
41use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
42use crate::row_spine::RowRowBuilder;
43use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
44
45/// The return type of [`construct`].
46pub(super) struct Return {
47    /// Collections to export.
48    pub collections: BTreeMap<LogVariant, LogCollection>,
49}
50
51/// Constructs the logging dataflow fragment for timely logs.
52///
53/// Params
54/// * `scope`: The Timely scope hosting the log analysis dataflow.
55/// * `config`: Logging configuration
56/// * `event_queue`: The source to read log events from.
57pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
58    mut scope: G,
59    config: &LoggingConfig,
60    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
61    shared_state: Rc<RefCell<SharedLoggingState>>,
62) -> Return {
63    scope.scoped("timely logging", move |scope| {
64        let enable_logging = config.enable_logging;
65        let (logs, token) =
66            event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
67                scope,
68                "timely logs",
69                config.interval,
70                event_queue.activator,
71                move |mut session, mut data| {
72                    // If logging is disabled, we still need to install the indexes, but we can leave them
73                    // empty. We do so by immediately filtering all logs events.
74                    if enable_logging {
75                        session.give_container(data.to_mut())
76                    }
77                },
78            );
79
80        // Build a demux operator that splits the replayed event stream up into the separate
81        // logging streams.
82        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
83        let mut input = demux.new_input(&logs, Pipeline);
84        let (mut operates_out, operates) = demux.new_output();
85        let (mut channels_out, channels) = demux.new_output();
86        let (mut addresses_out, addresses) = demux.new_output();
87        let (mut parks_out, parks) = demux.new_output();
88        let (mut messages_sent_out, messages_sent) = demux.new_output();
89        let (mut messages_received_out, messages_received) = demux.new_output();
90        let (mut schedules_duration_out, schedules_duration) = demux.new_output();
91        let (mut schedules_histogram_out, schedules_histogram) = demux.new_output();
92        let (mut batches_sent_out, batches_sent) = demux.new_output();
93        let (mut batches_received_out, batches_received) = demux.new_output();
94
95        let worker_id = scope.index();
96        let mut demux_state = DemuxState::default();
97        demux.build(|_capability| {
98            let peers = scope.peers();
99            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
100            move |_frontiers| {
101                let mut operates = operates_out.activate();
102                let mut channels = channels_out.activate();
103                let mut addresses = addresses_out.activate();
104                let mut parks = parks_out.activate();
105                let mut messages_sent = messages_sent_out.activate();
106                let mut messages_received = messages_received_out.activate();
107                let mut batches_sent = batches_sent_out.activate();
108                let mut batches_received = batches_received_out.activate();
109                let mut schedules_duration = schedules_duration_out.activate();
110                let mut schedules_histogram = schedules_histogram_out.activate();
111
112                input.for_each(|cap, data| {
113                    let mut output_buffers = DemuxOutput {
114                        operates: operates.session_with_builder(&cap),
115                        channels: channels.session_with_builder(&cap),
116                        addresses: addresses.session_with_builder(&cap),
117                        parks: parks.session_with_builder(&cap),
118                        messages_sent: messages_sent.session_with_builder(&cap),
119                        messages_received: messages_received.session_with_builder(&cap),
120                        schedules_duration: schedules_duration.session_with_builder(&cap),
121                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
122                        batches_sent: batches_sent.session_with_builder(&cap),
123                        batches_received: batches_received.session_with_builder(&cap),
124                    };
125
126                    for (time, event) in data.drain(..) {
127                        if let TimelyEvent::Messages(msg) = &event {
128                            match msg.is_send {
129                                true => assert_eq!(msg.source, worker_id),
130                                false => assert_eq!(msg.target, worker_id),
131                            }
132                        }
133
134                        DemuxHandler {
135                            state: &mut demux_state,
136                            shared_state: &mut shared_state.borrow_mut(),
137                            output: &mut output_buffers,
138                            logging_interval_ms,
139                            peers,
140                            time,
141                        }
142                        .handle(event);
143                    }
144                });
145            }
146        });
147
148        // Encode the contents of each logging stream into its expected `Row` format.
149        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
150        // updates that reach `Row` encoding.
151
152        let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
153            &operates,
154            TimelyLog::Operates,
155            move |((id, name), time, diff), packer, session| {
156                let data = packer.pack_slice(&[
157                    Datum::UInt64(u64::cast_from(*id)),
158                    Datum::UInt64(u64::cast_from(worker_id)),
159                    Datum::String(name),
160                ]);
161                session.give((data, time, diff));
162            },
163        );
164
165        let channels = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
166            &channels,
167            TimelyLog::Channels,
168            move |((datum, ()), time, diff), packer, session| {
169                let (source_node, source_port) = datum.source;
170                let (target_node, target_port) = datum.target;
171                let data = packer.pack_slice(&[
172                    Datum::UInt64(u64::cast_from(datum.id)),
173                    Datum::UInt64(u64::cast_from(worker_id)),
174                    Datum::UInt64(u64::cast_from(source_node)),
175                    Datum::UInt64(u64::cast_from(source_port)),
176                    Datum::UInt64(u64::cast_from(target_node)),
177                    Datum::UInt64(u64::cast_from(target_port)),
178                ]);
179                session.give((data, time, diff));
180            },
181        );
182
183        let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
184            &addresses,
185            TimelyLog::Addresses,
186            move |((id, address), time, diff), packer, session| {
187                let data = packer.pack_by_index(|packer, index| match index {
188                    0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
189                    1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
190                    2 => {
191                        packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i))))
192                    }
193                    _ => unreachable!("Addresses relation has three columns"),
194                });
195                session.give((data, time, diff));
196            },
197        );
198
199        let parks = consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
200            &parks,
201            TimelyLog::Parks,
202            move |((datum, ()), time, diff), packer, session| {
203                let data = packer.pack_slice(&[
204                    Datum::UInt64(u64::cast_from(worker_id)),
205                    Datum::UInt64(datum.duration_pow),
206                    datum
207                        .requested_pow
208                        .map(Datum::UInt64)
209                        .unwrap_or(Datum::Null),
210                ]);
211                session.give((data, time, diff));
212            },
213        );
214
215        let batches_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
216            &batches_sent,
217            TimelyLog::BatchesSent,
218            move |((datum, ()), time, diff), packer, session| {
219                let data = packer.pack_slice(&[
220                    Datum::UInt64(u64::cast_from(datum.channel)),
221                    Datum::UInt64(u64::cast_from(worker_id)),
222                    Datum::UInt64(u64::cast_from(datum.worker)),
223                ]);
224                session.give((data, time, diff));
225            },
226        );
227
228        let batches_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
229            &batches_received,
230            TimelyLog::BatchesReceived,
231            move |((datum, ()), time, diff), packer, session| {
232                let data = packer.pack_slice(&[
233                    Datum::UInt64(u64::cast_from(datum.channel)),
234                    Datum::UInt64(u64::cast_from(datum.worker)),
235                    Datum::UInt64(u64::cast_from(worker_id)),
236                ]);
237                session.give((data, time, diff));
238            },
239        );
240
241
242        let messages_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
243            &messages_sent,
244            TimelyLog::MessagesSent,
245            move |((datum, ()), time, diff), packer, session| {
246                let data = packer.pack_slice(&[
247                    Datum::UInt64(u64::cast_from(datum.channel)),
248                    Datum::UInt64(u64::cast_from(worker_id)),
249                    Datum::UInt64(u64::cast_from(datum.worker)),
250                ]);
251                session.give((data, time, diff));
252            },
253        );
254
255        let messages_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
256            &messages_received,
257            TimelyLog::MessagesReceived,
258            move |((datum, ()), time, diff), packer, session| {
259                let data = packer.pack_slice(&[
260                    Datum::UInt64(u64::cast_from(datum.channel)),
261                    Datum::UInt64(u64::cast_from(datum.worker)),
262                    Datum::UInt64(u64::cast_from(worker_id)),
263                ]);
264                session.give((data, time, diff));
265            },
266        );
267
268        let elapsed = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
269            &schedules_duration,
270            TimelyLog::Elapsed,
271            move |((operator, ()), time, diff), packer, session| {
272                let data = packer.pack_slice(&[Datum::UInt64(u64::cast_from(*operator)),
273                    Datum::UInt64(u64::cast_from(worker_id)),
274                ]);
275                session.give((data, time, diff));
276            },
277        );
278
279
280        let histogram = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
281            &schedules_histogram,
282            TimelyLog::Histogram,
283            move |((datum, ()), time, diff), packer, session| {
284                let data = packer.pack_slice(&[
285                    Datum::UInt64(u64::cast_from(datum.operator)),
286                    Datum::UInt64(u64::cast_from(worker_id)),
287                    Datum::UInt64(datum.duration_pow),
288                ]);
289                session.give((data, time, diff));
290            },
291        );
292
293        let logs = {
294            use TimelyLog::*;
295            [
296                (Operates, operates),
297                (Channels, channels),
298                (Elapsed, elapsed),
299                (Histogram, histogram),
300                (Addresses, addresses),
301                (Parks, parks),
302                (MessagesSent, messages_sent),
303                (MessagesReceived, messages_received),
304                (BatchesSent, batches_sent),
305                (BatchesReceived, batches_received),
306            ]
307        };
308
309        // Build the output arrangements.
310        let mut collections = BTreeMap::new();
311        for (variant, collection) in logs {
312            let variant = LogVariant::Timely(variant);
313            if config.index_logs.contains_key(&variant) {
314                let trace = collection
315                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
316                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
317                        &format!("Arrange {variant:?}"),
318                    )
319                    .trace;
320                let collection = LogCollection {
321                    trace,
322                    token: Rc::clone(&token),
323                };
324                collections.insert(variant, collection);
325            }
326        }
327
328        Return { collections }
329    })
330}
331
332/// State maintained by the demux operator.
333#[derive(Default)]
334struct DemuxState {
335    /// Information about live operators, indexed by operator ID.
336    operators: BTreeMap<usize, OperatesEvent>,
337    /// Maps dataflow IDs to channels in the dataflow.
338    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
339    /// Information about the last requested park.
340    last_park: Option<Park>,
341    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
342    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
343    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
344    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
345    /// Stores for scheduled operators the time when they were scheduled.
346    schedule_starts: BTreeMap<usize, Duration>,
347    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
348    /// bucket.
349    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
350}
351
352struct Park {
353    /// Time when the park occurred.
354    time: Duration,
355    /// Requested park time.
356    requested: Option<Duration>,
357}
358
359/// Organize message counts into number of batches and records.
360#[derive(Default, Copy, Clone, Debug)]
361struct MessageCount {
362    /// The number of batches sent across a channel.
363    batches: i64,
364    /// The number of records sent across a channel.
365    records: Diff,
366}
367
368type Pusher<D> =
369    Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
370type OutputSession<'a, D> =
371    Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
372
373/// Bundled output buffers used by the demux operator.
374//
375// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
376// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
377// wouldn't be an issue.
378struct DemuxOutput<'a> {
379    operates: OutputSession<'a, (usize, String)>,
380    channels: OutputSession<'a, (ChannelDatum, ())>,
381    addresses: OutputSession<'a, (usize, Vec<usize>)>,
382    parks: OutputSession<'a, (ParkDatum, ())>,
383    batches_sent: OutputSession<'a, (MessageDatum, ())>,
384    batches_received: OutputSession<'a, (MessageDatum, ())>,
385    messages_sent: OutputSession<'a, (MessageDatum, ())>,
386    messages_received: OutputSession<'a, (MessageDatum, ())>,
387    schedules_duration: OutputSession<'a, (usize, ())>,
388    schedules_histogram: OutputSession<'a, (ScheduleHistogramDatum, ())>,
389}
390
391#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
392struct ChannelDatum {
393    id: usize,
394    source: (usize, usize),
395    target: (usize, usize),
396}
397
398impl Columnation for ChannelDatum {
399    type InnerRegion = CopyRegion<Self>;
400}
401
402#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
403struct ParkDatum {
404    duration_pow: u64,
405    requested_pow: Option<u64>,
406}
407
408impl Columnation for ParkDatum {
409    type InnerRegion = CopyRegion<Self>;
410}
411
412#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
413struct MessageDatum {
414    channel: usize,
415    worker: usize,
416}
417
418impl Columnation for MessageDatum {
419    type InnerRegion = CopyRegion<Self>;
420}
421
422#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
423struct ScheduleHistogramDatum {
424    operator: usize,
425    duration_pow: u64,
426}
427
428impl Columnation for ScheduleHistogramDatum {
429    type InnerRegion = CopyRegion<Self>;
430}
431
432/// Event handler of the demux operator.
433struct DemuxHandler<'a, 'b> {
434    /// State kept by the demux operator.
435    state: &'a mut DemuxState,
436    /// State shared across log receivers.
437    shared_state: &'a mut SharedLoggingState,
438    /// Demux output buffers.
439    output: &'a mut DemuxOutput<'b>,
440    /// The logging interval specifying the time granularity for the updates.
441    logging_interval_ms: u128,
442    /// The number of timely workers.
443    peers: usize,
444    /// The current event time.
445    time: Duration,
446}
447
448impl DemuxHandler<'_, '_> {
449    /// Return the timestamp associated with the current event, based on the event time and the
450    /// logging interval.
451    fn ts(&self) -> Timestamp {
452        let time_ms = self.time.as_millis();
453        let interval = self.logging_interval_ms;
454        let rounded = (time_ms / interval + 1) * interval;
455        rounded.try_into().expect("must fit")
456    }
457
458    /// Handle the given timely event.
459    fn handle(&mut self, event: TimelyEvent) {
460        use TimelyEvent::*;
461
462        match event {
463            Operates(e) => self.handle_operates(e),
464            Channels(e) => self.handle_channels(e),
465            Shutdown(e) => self.handle_shutdown(e),
466            Park(e) => self.handle_park(e),
467            Messages(e) => self.handle_messages(e),
468            Schedule(e) => self.handle_schedule(e),
469            _ => (),
470        }
471    }
472
473    fn handle_operates(&mut self, event: OperatesEvent) {
474        let ts = self.ts();
475        let datum = (event.id, event.name.clone());
476        self.output.operates.give((datum, ts, Diff::ONE));
477
478        let datum = (event.id, event.addr.clone());
479        self.output.addresses.give((datum, ts, Diff::ONE));
480
481        self.state.operators.insert(event.id, event);
482    }
483
484    fn handle_channels(&mut self, event: ChannelsEvent) {
485        let ts = self.ts();
486        let datum = ChannelDatum {
487            id: event.id,
488            source: event.source,
489            target: event.target,
490        };
491        self.output.channels.give(((datum, ()), ts, Diff::ONE));
492
493        let datum = (event.id, event.scope_addr.clone());
494        self.output.addresses.give((datum, ts, Diff::ONE));
495
496        let dataflow_index = event.scope_addr[0];
497        self.state
498            .dataflow_channels
499            .entry(dataflow_index)
500            .or_default()
501            .push(event);
502    }
503
504    fn handle_shutdown(&mut self, event: ShutdownEvent) {
505        // Dropped operators should result in a negative record for
506        // the `operates` collection, cancelling out the initial
507        // operator announcement.
508        // Remove logging for this operator.
509
510        let Some(operator) = self.state.operators.remove(&event.id) else {
511            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
512            return;
513        };
514
515        // Retract operator information.
516        let ts = self.ts();
517        let datum = (operator.id, operator.name);
518        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
519
520        // Retract schedules information for the operator
521        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
522            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
523                .enumerate()
524                .filter(|(_, (count, _))| *count != 0)
525            {
526                self.output
527                    .schedules_duration
528                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
529
530                let datum = ScheduleHistogramDatum {
531                    operator: event.id,
532                    duration_pow: 1 << bucket,
533                };
534                let diff = Diff::cast_from(-count);
535                self.output
536                    .schedules_histogram
537                    .give(((datum, ()), ts, diff));
538            }
539        }
540
541        if operator.addr.len() == 1 {
542            let dataflow_index = operator.addr[0];
543            self.handle_dataflow_shutdown(dataflow_index);
544        }
545
546        let datum = (operator.id, operator.addr);
547        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
548    }
549
550    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
551        // Notify compute logging about the shutdown.
552        self.shared_state.compute_logger.as_ref().map(|logger| {
553            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
554        });
555
556        // When a dataflow shuts down, we need to retract all its channels.
557        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
558            return;
559        };
560
561        let ts = self.ts();
562        for channel in channels {
563            // Retract channel description.
564            let datum = ChannelDatum {
565                id: channel.id,
566                source: channel.source,
567                target: channel.target,
568            };
569            self.output
570                .channels
571                .give(((datum, ()), ts, Diff::MINUS_ONE));
572
573            let datum = (channel.id, channel.scope_addr);
574            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
575
576            // Retract messages logged for this channel.
577            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
578                for (target_worker, count) in sent.iter().enumerate() {
579                    let datum = MessageDatum {
580                        channel: channel.id,
581                        worker: target_worker,
582                    };
583                    self.output
584                        .messages_sent
585                        .give(((datum, ()), ts, Diff::from(-count.records)));
586                    self.output
587                        .batches_sent
588                        .give(((datum, ()), ts, Diff::from(-count.batches)));
589                }
590            }
591            if let Some(received) = self.state.messages_received.remove(&channel.id) {
592                for (source_worker, count) in received.iter().enumerate() {
593                    let datum = MessageDatum {
594                        channel: channel.id,
595                        worker: source_worker,
596                    };
597                    self.output.messages_received.give((
598                        (datum, ()),
599                        ts,
600                        Diff::from(-count.records),
601                    ));
602                    self.output.batches_received.give((
603                        (datum, ()),
604                        ts,
605                        Diff::from(-count.batches),
606                    ));
607                }
608            }
609        }
610    }
611
612    fn handle_park(&mut self, event: ParkEvent) {
613        match event {
614            ParkEvent::Park(requested) => {
615                let park = Park {
616                    time: self.time,
617                    requested,
618                };
619                let existing = self.state.last_park.replace(park);
620                if existing.is_some() {
621                    error!("park without a succeeding unpark");
622                }
623            }
624            ParkEvent::Unpark => {
625                let Some(park) = self.state.last_park.take() else {
626                    error!("unpark without a preceding park");
627                    return;
628                };
629
630                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
631                let duration_pow =
632                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
633                let requested_pow = park
634                    .requested
635                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
636
637                let ts = self.ts();
638                let datum = ParkDatum {
639                    duration_pow,
640                    requested_pow,
641                };
642                self.output.parks.give(((datum, ()), ts, Diff::ONE));
643            }
644        }
645    }
646
647    fn handle_messages(&mut self, event: MessagesEvent) {
648        let ts = self.ts();
649        let count = Diff::try_from(event.length).expect("must fit");
650
651        if event.is_send {
652            let datum = MessageDatum {
653                channel: event.channel,
654                worker: event.target,
655            };
656            self.output.messages_sent.give(((datum, ()), ts, count));
657            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
658
659            let sent_counts = self
660                .state
661                .messages_sent
662                .entry(event.channel)
663                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
664            sent_counts[event.target].records += count;
665            sent_counts[event.target].batches += 1;
666        } else {
667            let datum = MessageDatum {
668                channel: event.channel,
669                worker: event.source,
670            };
671            self.output.messages_received.give(((datum, ()), ts, count));
672            self.output
673                .batches_received
674                .give(((datum, ()), ts, Diff::ONE));
675
676            let received_counts = self
677                .state
678                .messages_received
679                .entry(event.channel)
680                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
681            received_counts[event.source].records += count;
682            received_counts[event.source].batches += 1;
683        }
684    }
685
686    fn handle_schedule(&mut self, event: ScheduleEvent) {
687        match event.start_stop {
688            timely::logging::StartStop::Start => {
689                let existing = self.state.schedule_starts.insert(event.id, self.time);
690                if existing.is_some() {
691                    error!(operator_id = ?event.id, "schedule start without succeeding stop");
692                }
693            }
694            timely::logging::StartStop::Stop => {
695                let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
696                    error!(operator_id = ?event.id, "schedule stop without preceeding start");
697                    return;
698                };
699
700                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
701                let elapsed_diff = Diff::from(i64::try_from(elapsed_ns).expect("must fit"));
702                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
703
704                let ts = self.ts();
705                let datum = event.id;
706                self.output
707                    .schedules_duration
708                    .give(((datum, ()), ts, elapsed_diff));
709
710                let datum = ScheduleHistogramDatum {
711                    operator: event.id,
712                    duration_pow: elapsed_pow,
713                };
714                self.output
715                    .schedules_histogram
716                    .give(((datum, ()), ts, Diff::ONE));
717
718                // Record count and elapsed time for later retraction.
719                let index = usize::cast_from(elapsed_pow.trailing_zeros());
720                let data = self.state.schedules_data.entry(event.id).or_default();
721                grow_vec(data, index);
722                let (count, duration) = &mut data[index];
723                *count += 1;
724                *duration += elapsed_diff;
725            }
726        }
727    }
728}
729
730/// Grow the given vector so it fits the given index.
731///
732/// This does nothing if the vector is already large enough.
733fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
734where
735    T: Clone + Default,
736{
737    if vec.len() <= index {
738        vec.resize(index + 1, Default::default());
739    }
740}