mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::Columnar;
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::containers::ProvidedBuilder;
25use mz_timely_util::replay::MzReplay;
26use timely::Container;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::Operator;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::logging::{
32    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33    TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40    EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41    Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47/// The return type of [`construct`].
48pub(super) struct Return {
49    /// Collections to export.
50    pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53/// Constructs the logging dataflow fragment for timely logs.
54///
55/// Params
56/// * `scope`: The Timely scope hosting the log analysis dataflow.
57/// * `config`: Logging configuration
58/// * `event_queue`: The source to read log events from.
59pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
60    mut scope: G,
61    config: &LoggingConfig,
62    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63    shared_state: Rc<RefCell<SharedLoggingState>>,
64) -> Return {
65    scope.scoped("timely logging", move |scope| {
66        let enable_logging = config.enable_logging;
67        let (logs, token) =
68            event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
69                scope,
70                "timely logs",
71                config.interval,
72                event_queue.activator,
73                move |mut session, mut data| {
74                    // If logging is disabled, we still need to install the indexes, but we can leave them
75                    // empty. We do so by immediately filtering all logs events.
76                    if enable_logging {
77                        session.give_container(data.to_mut())
78                    }
79                },
80            );
81
82        // Build a demux operator that splits the replayed event stream up into the separate
83        // logging streams.
84        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
85        let mut input = demux.new_input(&logs, Pipeline);
86        let (mut operates_out, operates) = demux.new_output();
87        let (mut channels_out, channels) = demux.new_output();
88        let (mut addresses_out, addresses) = demux.new_output();
89        let (mut parks_out, parks) = demux.new_output();
90        let (mut messages_sent_out, messages_sent) = demux.new_output();
91        let (mut messages_received_out, messages_received) = demux.new_output();
92        let (mut schedules_duration_out, schedules_duration) = demux.new_output();
93        let (mut schedules_histogram_out, schedules_histogram) = demux.new_output();
94        let (mut batches_sent_out, batches_sent) = demux.new_output();
95        let (mut batches_received_out, batches_received) = demux.new_output();
96
97        let worker_id = scope.index();
98        let mut demux_state = DemuxState::default();
99        demux.build(|_capability| {
100            let peers = scope.peers();
101            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
102            move |_frontiers| {
103                let mut operates = operates_out.activate();
104                let mut channels = channels_out.activate();
105                let mut addresses = addresses_out.activate();
106                let mut parks = parks_out.activate();
107                let mut messages_sent = messages_sent_out.activate();
108                let mut messages_received = messages_received_out.activate();
109                let mut batches_sent = batches_sent_out.activate();
110                let mut batches_received = batches_received_out.activate();
111                let mut schedules_duration = schedules_duration_out.activate();
112                let mut schedules_histogram = schedules_histogram_out.activate();
113
114                input.for_each(|cap, data| {
115                    let mut output_buffers = DemuxOutput {
116                        operates: operates.session_with_builder(&cap),
117                        channels: channels.session_with_builder(&cap),
118                        addresses: addresses.session_with_builder(&cap),
119                        parks: parks.session_with_builder(&cap),
120                        messages_sent: messages_sent.session_with_builder(&cap),
121                        messages_received: messages_received.session_with_builder(&cap),
122                        schedules_duration: schedules_duration.session_with_builder(&cap),
123                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
124                        batches_sent: batches_sent.session_with_builder(&cap),
125                        batches_received: batches_received.session_with_builder(&cap),
126                    };
127
128                    for (time, event) in data.drain(..) {
129                        if let TimelyEvent::Messages(msg) = &event {
130                            match msg.is_send {
131                                true => assert_eq!(msg.source, worker_id),
132                                false => assert_eq!(msg.target, worker_id),
133                            }
134                        }
135
136                        DemuxHandler {
137                            state: &mut demux_state,
138                            shared_state: &mut shared_state.borrow_mut(),
139                            output: &mut output_buffers,
140                            logging_interval_ms,
141                            peers,
142                            time,
143                        }
144                        .handle(event);
145                    }
146                });
147            }
148        });
149
150        // Encode the contents of each logging stream into its expected `Row` format.
151        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
152        // updates that reach `Row` encoding.
153
154        let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
155            &operates,
156            TimelyLog::Operates,
157            move |((id, name), time, diff), packer, session| {
158                let data = packer.pack_slice(&[
159                    Datum::UInt64(u64::cast_from(*id)),
160                    Datum::UInt64(u64::cast_from(worker_id)),
161                    Datum::String(name),
162                ]);
163                session.give((data, time, diff));
164            },
165        );
166
167        // TODO: `consolidate_and_pack` requires columnation, which `ChannelDatum` does not
168        // implement. Consider consolidating here once we support columnar.
169        let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(Pipeline, "ToRow Channels", |_cap, _info| {
170            let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
171            move |input, output| {
172                while let Some((time, data)) = input.next() {
173                    let mut session = output.session_with_builder(&time);
174                    for ((datum, ()), time, diff) in data.iter() {
175                        let (source_node, source_port) = datum.source;
176                        let (target_node, target_port) = datum.target;
177                        let data = packer.pack_slice(&[
178                            Datum::UInt64(u64::cast_from(datum.id)),
179                            Datum::UInt64(u64::cast_from(worker_id)),
180                            Datum::UInt64(u64::cast_from(source_node)),
181                            Datum::UInt64(u64::cast_from(source_port)),
182                            Datum::UInt64(u64::cast_from(target_node)),
183                            Datum::UInt64(u64::cast_from(target_port)),
184                            Datum::String(datum.typ),
185                        ]);
186                        session.give((data, time, diff));
187                    }
188                }
189            }
190        });
191
192        let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
193            &addresses,
194            TimelyLog::Addresses,
195            move |((id, address), time, diff), packer, session| {
196                let data = packer.pack_by_index(|packer, index| match index {
197                    0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
198                    1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
199                    2 => {
200                        packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i))))
201                    }
202                    _ => unreachable!("Addresses relation has three columns"),
203                });
204                session.give((data, time, diff));
205            },
206        );
207
208        let parks = consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
209            &parks,
210            TimelyLog::Parks,
211            move |((datum, ()), time, diff), packer, session| {
212                let data = packer.pack_slice(&[
213                    Datum::UInt64(u64::cast_from(worker_id)),
214                    Datum::UInt64(datum.duration_pow),
215                    datum
216                        .requested_pow
217                        .map(Datum::UInt64)
218                        .unwrap_or(Datum::Null),
219                ]);
220                session.give((data, time, diff));
221            },
222        );
223
224        let batches_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
225            &batches_sent,
226            TimelyLog::BatchesSent,
227            move |((datum, ()), time, diff), packer, session| {
228                let data = packer.pack_slice(&[
229                    Datum::UInt64(u64::cast_from(datum.channel)),
230                    Datum::UInt64(u64::cast_from(worker_id)),
231                    Datum::UInt64(u64::cast_from(datum.worker)),
232                ]);
233                session.give((data, time, diff));
234            },
235        );
236
237        let batches_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
238            &batches_received,
239            TimelyLog::BatchesReceived,
240            move |((datum, ()), time, diff), packer, session| {
241                let data = packer.pack_slice(&[
242                    Datum::UInt64(u64::cast_from(datum.channel)),
243                    Datum::UInt64(u64::cast_from(datum.worker)),
244                    Datum::UInt64(u64::cast_from(worker_id)),
245                ]);
246                session.give((data, time, diff));
247            },
248        );
249
250
251        let messages_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
252            &messages_sent,
253            TimelyLog::MessagesSent,
254            move |((datum, ()), time, diff), packer, session| {
255                let data = packer.pack_slice(&[
256                    Datum::UInt64(u64::cast_from(datum.channel)),
257                    Datum::UInt64(u64::cast_from(worker_id)),
258                    Datum::UInt64(u64::cast_from(datum.worker)),
259                ]);
260                session.give((data, time, diff));
261            },
262        );
263
264        let messages_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
265            &messages_received,
266            TimelyLog::MessagesReceived,
267            move |((datum, ()), time, diff), packer, session| {
268                let data = packer.pack_slice(&[
269                    Datum::UInt64(u64::cast_from(datum.channel)),
270                    Datum::UInt64(u64::cast_from(datum.worker)),
271                    Datum::UInt64(u64::cast_from(worker_id)),
272                ]);
273                session.give((data, time, diff));
274            },
275        );
276
277        let elapsed = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
278            &schedules_duration,
279            TimelyLog::Elapsed,
280            move |((operator, ()), time, diff), packer, session| {
281                let data = packer.pack_slice(&[Datum::UInt64(u64::cast_from(*operator)),
282                    Datum::UInt64(u64::cast_from(worker_id)),
283                ]);
284                session.give((data, time, diff));
285            },
286        );
287
288
289        let histogram = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
290            &schedules_histogram,
291            TimelyLog::Histogram,
292            move |((datum, ()), time, diff), packer, session| {
293                let data = packer.pack_slice(&[
294                    Datum::UInt64(u64::cast_from(datum.operator)),
295                    Datum::UInt64(u64::cast_from(worker_id)),
296                    Datum::UInt64(datum.duration_pow),
297                ]);
298                session.give((data, time, diff));
299            },
300        );
301
302        let logs = {
303            use TimelyLog::*;
304            [
305                (Operates, operates),
306                (Channels, channels),
307                (Elapsed, elapsed),
308                (Histogram, histogram),
309                (Addresses, addresses),
310                (Parks, parks),
311                (MessagesSent, messages_sent),
312                (MessagesReceived, messages_received),
313                (BatchesSent, batches_sent),
314                (BatchesReceived, batches_received),
315            ]
316        };
317
318        // Build the output arrangements.
319        let mut collections = BTreeMap::new();
320        for (variant, collection) in logs {
321            let variant = LogVariant::Timely(variant);
322            if config.index_logs.contains_key(&variant) {
323                let trace = collection
324                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
325                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
326                        &format!("Arrange {variant:?}"),
327                    )
328                    .trace;
329                let collection = LogCollection {
330                    trace,
331                    token: Rc::clone(&token),
332                };
333                collections.insert(variant, collection);
334            }
335        }
336
337        Return { collections }
338    })
339}
340
341/// State maintained by the demux operator.
342#[derive(Default)]
343struct DemuxState {
344    /// Information about live operators, indexed by operator ID.
345    operators: BTreeMap<usize, OperatesEvent>,
346    /// Maps dataflow IDs to channels in the dataflow.
347    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
348    /// Information about the last requested park.
349    last_park: Option<Park>,
350    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
351    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
352    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
353    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
354    /// Stores for scheduled operators the time when they were scheduled.
355    schedule_starts: BTreeMap<usize, Duration>,
356    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
357    /// bucket.
358    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
359}
360
361struct Park {
362    /// Time when the park occurred.
363    time: Duration,
364    /// Requested park time.
365    requested: Option<Duration>,
366}
367
368/// Organize message counts into number of batches and records.
369#[derive(Default, Copy, Clone, Debug)]
370struct MessageCount {
371    /// The number of batches sent across a channel.
372    batches: i64,
373    /// The number of records sent across a channel.
374    records: Diff,
375}
376
377/// Bundled output buffers used by the demux operator.
378//
379// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
380// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
381// wouldn't be an issue.
382struct DemuxOutput<'a> {
383    operates: OutputSessionVec<'a, Update<(usize, String)>>,
384    channels: OutputSessionColumnar<'a, Update<(ChannelDatum, ())>>,
385    addresses: OutputSessionVec<'a, Update<(usize, Vec<usize>)>>,
386    parks: OutputSessionVec<'a, Update<(ParkDatum, ())>>,
387    batches_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
388    batches_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
389    messages_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
390    messages_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
391    schedules_duration: OutputSessionVec<'a, Update<(usize, ())>>,
392    schedules_histogram: OutputSessionVec<'a, Update<(ScheduleHistogramDatum, ())>>,
393}
394
395#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
396struct ChannelDatum {
397    id: usize,
398    source: (usize, usize),
399    target: (usize, usize),
400    typ: String,
401}
402
403#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
404struct ParkDatum {
405    duration_pow: u64,
406    requested_pow: Option<u64>,
407}
408
409impl Columnation for ParkDatum {
410    type InnerRegion = CopyRegion<Self>;
411}
412
413#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
414struct MessageDatum {
415    channel: usize,
416    worker: usize,
417}
418
419impl Columnation for MessageDatum {
420    type InnerRegion = CopyRegion<Self>;
421}
422
423#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
424struct ScheduleHistogramDatum {
425    operator: usize,
426    duration_pow: u64,
427}
428
429impl Columnation for ScheduleHistogramDatum {
430    type InnerRegion = CopyRegion<Self>;
431}
432
433/// Event handler of the demux operator.
434struct DemuxHandler<'a, 'b> {
435    /// State kept by the demux operator.
436    state: &'a mut DemuxState,
437    /// State shared across log receivers.
438    shared_state: &'a mut SharedLoggingState,
439    /// Demux output buffers.
440    output: &'a mut DemuxOutput<'b>,
441    /// The logging interval specifying the time granularity for the updates.
442    logging_interval_ms: u128,
443    /// The number of timely workers.
444    peers: usize,
445    /// The current event time.
446    time: Duration,
447}
448
449impl DemuxHandler<'_, '_> {
450    /// Return the timestamp associated with the current event, based on the event time and the
451    /// logging interval.
452    fn ts(&self) -> Timestamp {
453        let time_ms = self.time.as_millis();
454        let interval = self.logging_interval_ms;
455        let rounded = (time_ms / interval + 1) * interval;
456        rounded.try_into().expect("must fit")
457    }
458
459    /// Handle the given timely event.
460    fn handle(&mut self, event: TimelyEvent) {
461        use TimelyEvent::*;
462
463        match event {
464            Operates(e) => self.handle_operates(e),
465            Channels(e) => self.handle_channels(e),
466            Shutdown(e) => self.handle_shutdown(e),
467            Park(e) => self.handle_park(e),
468            Messages(e) => self.handle_messages(e),
469            Schedule(e) => self.handle_schedule(e),
470            _ => (),
471        }
472    }
473
474    fn handle_operates(&mut self, event: OperatesEvent) {
475        let ts = self.ts();
476        let datum = (event.id, event.name.clone());
477        self.output.operates.give((datum, ts, Diff::ONE));
478
479        let datum = (event.id, event.addr.clone());
480        self.output.addresses.give((datum, ts, Diff::ONE));
481
482        self.state.operators.insert(event.id, event);
483    }
484
485    fn handle_channels(&mut self, event: ChannelsEvent) {
486        let ts = self.ts();
487        let datum = ChannelDatumReference {
488            id: event.id,
489            source: event.source,
490            target: event.target,
491            typ: &event.typ,
492        };
493        self.output.channels.give(((datum, ()), ts, Diff::ONE));
494
495        let datum = (event.id, event.scope_addr.clone());
496        self.output.addresses.give((datum, ts, Diff::ONE));
497
498        let dataflow_index = event.scope_addr[0];
499        self.state
500            .dataflow_channels
501            .entry(dataflow_index)
502            .or_default()
503            .push(event);
504    }
505
506    fn handle_shutdown(&mut self, event: ShutdownEvent) {
507        // Dropped operators should result in a negative record for
508        // the `operates` collection, cancelling out the initial
509        // operator announcement.
510        // Remove logging for this operator.
511
512        let Some(operator) = self.state.operators.remove(&event.id) else {
513            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
514            return;
515        };
516
517        // Retract operator information.
518        let ts = self.ts();
519        let datum = (operator.id, operator.name);
520        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
521
522        // Retract schedules information for the operator
523        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
524            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
525                .enumerate()
526                .filter(|(_, (count, _))| *count != 0)
527            {
528                self.output
529                    .schedules_duration
530                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
531
532                let datum = ScheduleHistogramDatum {
533                    operator: event.id,
534                    duration_pow: 1 << bucket,
535                };
536                let diff = Diff::cast_from(-count);
537                self.output
538                    .schedules_histogram
539                    .give(((datum, ()), ts, diff));
540            }
541        }
542
543        if operator.addr.len() == 1 {
544            let dataflow_index = operator.addr[0];
545            self.handle_dataflow_shutdown(dataflow_index);
546        }
547
548        let datum = (operator.id, operator.addr);
549        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
550    }
551
552    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
553        // Notify compute logging about the shutdown.
554        self.shared_state.compute_logger.as_ref().map(|logger| {
555            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
556        });
557
558        // When a dataflow shuts down, we need to retract all its channels.
559        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
560            return;
561        };
562
563        let ts = self.ts();
564        for channel in channels {
565            // Retract channel description.
566            let datum = ChannelDatumReference {
567                id: channel.id,
568                source: channel.source,
569                target: channel.target,
570                typ: &channel.typ,
571            };
572            self.output
573                .channels
574                .give(((datum, ()), ts, Diff::MINUS_ONE));
575
576            let datum = (channel.id, channel.scope_addr);
577            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
578
579            // Retract messages logged for this channel.
580            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
581                for (target_worker, count) in sent.iter().enumerate() {
582                    let datum = MessageDatum {
583                        channel: channel.id,
584                        worker: target_worker,
585                    };
586                    self.output
587                        .messages_sent
588                        .give(((datum, ()), ts, Diff::from(-count.records)));
589                    self.output
590                        .batches_sent
591                        .give(((datum, ()), ts, Diff::from(-count.batches)));
592                }
593            }
594            if let Some(received) = self.state.messages_received.remove(&channel.id) {
595                for (source_worker, count) in received.iter().enumerate() {
596                    let datum = MessageDatum {
597                        channel: channel.id,
598                        worker: source_worker,
599                    };
600                    self.output.messages_received.give((
601                        (datum, ()),
602                        ts,
603                        Diff::from(-count.records),
604                    ));
605                    self.output.batches_received.give((
606                        (datum, ()),
607                        ts,
608                        Diff::from(-count.batches),
609                    ));
610                }
611            }
612        }
613    }
614
615    fn handle_park(&mut self, event: ParkEvent) {
616        match event {
617            ParkEvent::Park(requested) => {
618                let park = Park {
619                    time: self.time,
620                    requested,
621                };
622                let existing = self.state.last_park.replace(park);
623                if existing.is_some() {
624                    error!("park without a succeeding unpark");
625                }
626            }
627            ParkEvent::Unpark => {
628                let Some(park) = self.state.last_park.take() else {
629                    error!("unpark without a preceding park");
630                    return;
631                };
632
633                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
634                let duration_pow =
635                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
636                let requested_pow = park
637                    .requested
638                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
639
640                let ts = self.ts();
641                let datum = ParkDatum {
642                    duration_pow,
643                    requested_pow,
644                };
645                self.output.parks.give(((datum, ()), ts, Diff::ONE));
646            }
647        }
648    }
649
650    fn handle_messages(&mut self, event: MessagesEvent) {
651        let ts = self.ts();
652        let count = Diff::try_from(event.length).expect("must fit");
653
654        if event.is_send {
655            let datum = MessageDatum {
656                channel: event.channel,
657                worker: event.target,
658            };
659            self.output.messages_sent.give(((datum, ()), ts, count));
660            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
661
662            let sent_counts = self
663                .state
664                .messages_sent
665                .entry(event.channel)
666                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
667            sent_counts[event.target].records += count;
668            sent_counts[event.target].batches += 1;
669        } else {
670            let datum = MessageDatum {
671                channel: event.channel,
672                worker: event.source,
673            };
674            self.output.messages_received.give(((datum, ()), ts, count));
675            self.output
676                .batches_received
677                .give(((datum, ()), ts, Diff::ONE));
678
679            let received_counts = self
680                .state
681                .messages_received
682                .entry(event.channel)
683                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
684            received_counts[event.source].records += count;
685            received_counts[event.source].batches += 1;
686        }
687    }
688
689    fn handle_schedule(&mut self, event: ScheduleEvent) {
690        match event.start_stop {
691            timely::logging::StartStop::Start => {
692                let existing = self.state.schedule_starts.insert(event.id, self.time);
693                if existing.is_some() {
694                    error!(operator_id = ?event.id, "schedule start without succeeding stop");
695                }
696            }
697            timely::logging::StartStop::Stop => {
698                let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
699                    error!(operator_id = ?event.id, "schedule stop without preceeding start");
700                    return;
701                };
702
703                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
704                let elapsed_diff = Diff::from(i64::try_from(elapsed_ns).expect("must fit"));
705                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
706
707                let ts = self.ts();
708                let datum = event.id;
709                self.output
710                    .schedules_duration
711                    .give(((datum, ()), ts, elapsed_diff));
712
713                let datum = ScheduleHistogramDatum {
714                    operator: event.id,
715                    duration_pow: elapsed_pow,
716                };
717                self.output
718                    .schedules_histogram
719                    .give(((datum, ()), ts, Diff::ONE));
720
721                // Record count and elapsed time for later retraction.
722                let index = usize::cast_from(elapsed_pow.trailing_zeros());
723                let data = self.state.schedules_data.entry(event.id).or_default();
724                grow_vec(data, index);
725                let (count, duration) = &mut data[index];
726                *count += 1;
727                *duration += elapsed_diff;
728            }
729        }
730    }
731}
732
733/// Grow the given vector so it fits the given index.
734///
735/// This does nothing if the vector is already large enough.
736fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
737where
738    T: Clone + Default,
739{
740    if vec.len() <= index {
741        vec.resize(index + 1, Default::default());
742    }
743}