mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use differential_dataflow::containers::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::containers::ProvidedBuilder;
25use mz_timely_util::replay::MzReplay;
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
28use timely::dataflow::operators::Operator;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::logging::{
31    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
32    TimelyEvent,
33};
34use tracing::error;
35
36use crate::extensions::arrange::MzArrangeCore;
37use crate::logging::compute::{ComputeEvent, DataflowShutdown};
38use crate::logging::{
39    EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
40    Update,
41};
42use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
43use crate::row_spine::RowRowBuilder;
44use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
45
46/// The return type of [`construct`].
47pub(super) struct Return {
48    /// Collections to export.
49    pub collections: BTreeMap<LogVariant, LogCollection>,
50}
51
52/// Constructs the logging dataflow fragment for timely logs.
53///
54/// Params
55/// * `scope`: The Timely scope hosting the log analysis dataflow.
56/// * `config`: Logging configuration
57/// * `event_queue`: The source to read log events from.
58pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
59    mut scope: G,
60    config: &LoggingConfig,
61    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
62    shared_state: Rc<RefCell<SharedLoggingState>>,
63) -> Return {
64    scope.scoped("timely logging", move |scope| {
65        let enable_logging = config.enable_logging;
66        let (logs, token) =
67            event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
68                scope,
69                "timely logs",
70                config.interval,
71                event_queue.activator,
72                move |mut session, mut data| {
73                    // If logging is disabled, we still need to install the indexes, but we can leave them
74                    // empty. We do so by immediately filtering all logs events.
75                    if enable_logging {
76                        session.give_container(data.to_mut())
77                    }
78                },
79            );
80
81        // Build a demux operator that splits the replayed event stream up into the separate
82        // logging streams.
83        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
84        let mut input = demux.new_input(&logs, Pipeline);
85        let (mut operates_out, operates) = demux.new_output();
86        let (mut channels_out, channels) = demux.new_output();
87        let (mut addresses_out, addresses) = demux.new_output();
88        let (mut parks_out, parks) = demux.new_output();
89        let (mut messages_sent_out, messages_sent) = demux.new_output();
90        let (mut messages_received_out, messages_received) = demux.new_output();
91        let (mut schedules_duration_out, schedules_duration) = demux.new_output();
92        let (mut schedules_histogram_out, schedules_histogram) = demux.new_output();
93        let (mut batches_sent_out, batches_sent) = demux.new_output();
94        let (mut batches_received_out, batches_received) = demux.new_output();
95
96        let worker_id = scope.index();
97        let mut demux_state = DemuxState::default();
98        demux.build(|_capability| {
99            let peers = scope.peers();
100            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
101            move |_frontiers| {
102                let mut operates = operates_out.activate();
103                let mut channels = channels_out.activate();
104                let mut addresses = addresses_out.activate();
105                let mut parks = parks_out.activate();
106                let mut messages_sent = messages_sent_out.activate();
107                let mut messages_received = messages_received_out.activate();
108                let mut batches_sent = batches_sent_out.activate();
109                let mut batches_received = batches_received_out.activate();
110                let mut schedules_duration = schedules_duration_out.activate();
111                let mut schedules_histogram = schedules_histogram_out.activate();
112
113                input.for_each(|cap, data| {
114                    let mut output_buffers = DemuxOutput {
115                        operates: operates.session_with_builder(&cap),
116                        channels: channels.session_with_builder(&cap),
117                        addresses: addresses.session_with_builder(&cap),
118                        parks: parks.session_with_builder(&cap),
119                        messages_sent: messages_sent.session_with_builder(&cap),
120                        messages_received: messages_received.session_with_builder(&cap),
121                        schedules_duration: schedules_duration.session_with_builder(&cap),
122                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
123                        batches_sent: batches_sent.session_with_builder(&cap),
124                        batches_received: batches_received.session_with_builder(&cap),
125                    };
126
127                    for (time, event) in data.drain(..) {
128                        if let TimelyEvent::Messages(msg) = &event {
129                            match msg.is_send {
130                                true => assert_eq!(msg.source, worker_id),
131                                false => assert_eq!(msg.target, worker_id),
132                            }
133                        }
134
135                        DemuxHandler {
136                            state: &mut demux_state,
137                            shared_state: &mut shared_state.borrow_mut(),
138                            output: &mut output_buffers,
139                            logging_interval_ms,
140                            peers,
141                            time,
142                        }
143                        .handle(event);
144                    }
145                });
146            }
147        });
148
149        // Encode the contents of each logging stream into its expected `Row` format.
150        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
151        // updates that reach `Row` encoding.
152
153        let operates = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
154            &operates,
155            TimelyLog::Operates,
156            move |((id, name), time, diff), packer, session| {
157                let data = packer.pack_slice(&[
158                    Datum::UInt64(u64::cast_from(*id)),
159                    Datum::UInt64(u64::cast_from(worker_id)),
160                    Datum::String(name),
161                ]);
162                session.give((data, time, diff));
163            },
164        );
165
166        // TODO: `consolidate_and_pack` requires columnation, which `ChannelDatum` does not
167        // implement. Consider consolidating here once we support columnar.
168        let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(Pipeline, "ToRow Channels", |_cap, _info| {
169            let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
170            move |input, output| {
171                while let Some((time, data)) = input.next() {
172                    let mut session = output.session_with_builder(&time);
173                    for ((datum, ()), time, diff) in data.borrow().into_index_iter() {
174                        let (source_node, source_port) = datum.source;
175                        let (target_node, target_port) = datum.target;
176                        let data = packer.pack_slice(&[
177                            Datum::UInt64(u64::cast_from(datum.id)),
178                            Datum::UInt64(u64::cast_from(worker_id)),
179                            Datum::UInt64(u64::cast_from(source_node)),
180                            Datum::UInt64(u64::cast_from(source_port)),
181                            Datum::UInt64(u64::cast_from(target_node)),
182                            Datum::UInt64(u64::cast_from(target_port)),
183                            Datum::String(datum.typ),
184                        ]);
185                        session.give((data, time, diff));
186                    }
187                }
188            }
189        });
190
191        let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
192            &addresses,
193            TimelyLog::Addresses,
194            move |((id, address), time, diff), packer, session| {
195                let data = packer.pack_by_index(|packer, index| match index {
196                    0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
197                    1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
198                    2 => {
199                        packer.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i))))
200                    }
201                    _ => unreachable!("Addresses relation has three columns"),
202                });
203                session.give((data, time, diff));
204            },
205        );
206
207        let parks = consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
208            &parks,
209            TimelyLog::Parks,
210            move |((datum, ()), time, diff), packer, session| {
211                let data = packer.pack_slice(&[
212                    Datum::UInt64(u64::cast_from(worker_id)),
213                    Datum::UInt64(datum.duration_pow),
214                    datum
215                        .requested_pow
216                        .map(Datum::UInt64)
217                        .unwrap_or(Datum::Null),
218                ]);
219                session.give((data, time, diff));
220            },
221        );
222
223        let batches_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
224            &batches_sent,
225            TimelyLog::BatchesSent,
226            move |((datum, ()), time, diff), packer, session| {
227                let data = packer.pack_slice(&[
228                    Datum::UInt64(u64::cast_from(datum.channel)),
229                    Datum::UInt64(u64::cast_from(worker_id)),
230                    Datum::UInt64(u64::cast_from(datum.worker)),
231                ]);
232                session.give((data, time, diff));
233            },
234        );
235
236        let batches_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
237            &batches_received,
238            TimelyLog::BatchesReceived,
239            move |((datum, ()), time, diff), packer, session| {
240                let data = packer.pack_slice(&[
241                    Datum::UInt64(u64::cast_from(datum.channel)),
242                    Datum::UInt64(u64::cast_from(datum.worker)),
243                    Datum::UInt64(u64::cast_from(worker_id)),
244                ]);
245                session.give((data, time, diff));
246            },
247        );
248
249
250        let messages_sent = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
251            &messages_sent,
252            TimelyLog::MessagesSent,
253            move |((datum, ()), time, diff), packer, session| {
254                let data = packer.pack_slice(&[
255                    Datum::UInt64(u64::cast_from(datum.channel)),
256                    Datum::UInt64(u64::cast_from(worker_id)),
257                    Datum::UInt64(u64::cast_from(datum.worker)),
258                ]);
259                session.give((data, time, diff));
260            },
261        );
262
263        let messages_received = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
264            &messages_received,
265            TimelyLog::MessagesReceived,
266            move |((datum, ()), time, diff), packer, session| {
267                let data = packer.pack_slice(&[
268                    Datum::UInt64(u64::cast_from(datum.channel)),
269                    Datum::UInt64(u64::cast_from(datum.worker)),
270                    Datum::UInt64(u64::cast_from(worker_id)),
271                ]);
272                session.give((data, time, diff));
273            },
274        );
275
276        let elapsed = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
277            &schedules_duration,
278            TimelyLog::Elapsed,
279            move |((operator, ()), time, diff), packer, session| {
280                let data = packer.pack_slice(&[Datum::UInt64(u64::cast_from(*operator)),
281                    Datum::UInt64(u64::cast_from(worker_id)),
282                ]);
283                session.give((data, time, diff));
284            },
285        );
286
287
288        let histogram = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
289            &schedules_histogram,
290            TimelyLog::Histogram,
291            move |((datum, ()), time, diff), packer, session| {
292                let data = packer.pack_slice(&[
293                    Datum::UInt64(u64::cast_from(datum.operator)),
294                    Datum::UInt64(u64::cast_from(worker_id)),
295                    Datum::UInt64(datum.duration_pow),
296                ]);
297                session.give((data, time, diff));
298            },
299        );
300
301        let logs = {
302            use TimelyLog::*;
303            [
304                (Operates, operates),
305                (Channels, channels),
306                (Elapsed, elapsed),
307                (Histogram, histogram),
308                (Addresses, addresses),
309                (Parks, parks),
310                (MessagesSent, messages_sent),
311                (MessagesReceived, messages_received),
312                (BatchesSent, batches_sent),
313                (BatchesReceived, batches_received),
314            ]
315        };
316
317        // Build the output arrangements.
318        let mut collections = BTreeMap::new();
319        for (variant, collection) in logs {
320            let variant = LogVariant::Timely(variant);
321            if config.index_logs.contains_key(&variant) {
322                let trace = collection
323                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
324                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>),
325                        &format!("Arrange {variant:?}"),
326                    )
327                    .trace;
328                let collection = LogCollection {
329                    trace,
330                    token: Rc::clone(&token),
331                };
332                collections.insert(variant, collection);
333            }
334        }
335
336        Return { collections }
337    })
338}
339
340/// State maintained by the demux operator.
341#[derive(Default)]
342struct DemuxState {
343    /// Information about live operators, indexed by operator ID.
344    operators: BTreeMap<usize, OperatesEvent>,
345    /// Maps dataflow IDs to channels in the dataflow.
346    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
347    /// Information about the last requested park.
348    last_park: Option<Park>,
349    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
350    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
351    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
352    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
353    /// Stores for scheduled operators the time when they were scheduled.
354    schedule_starts: BTreeMap<usize, Duration>,
355    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
356    /// bucket.
357    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
358}
359
360struct Park {
361    /// Time when the park occurred.
362    time: Duration,
363    /// Requested park time.
364    requested: Option<Duration>,
365}
366
367/// Organize message counts into number of batches and records.
368#[derive(Default, Copy, Clone, Debug)]
369struct MessageCount {
370    /// The number of batches sent across a channel.
371    batches: i64,
372    /// The number of records sent across a channel.
373    records: Diff,
374}
375
376/// Bundled output buffers used by the demux operator.
377//
378// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
379// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
380// wouldn't be an issue.
381struct DemuxOutput<'a> {
382    operates: OutputSessionVec<'a, Update<(usize, String)>>,
383    channels: OutputSessionColumnar<'a, Update<(ChannelDatum, ())>>,
384    addresses: OutputSessionVec<'a, Update<(usize, Vec<usize>)>>,
385    parks: OutputSessionVec<'a, Update<(ParkDatum, ())>>,
386    batches_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
387    batches_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
388    messages_sent: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
389    messages_received: OutputSessionVec<'a, Update<(MessageDatum, ())>>,
390    schedules_duration: OutputSessionVec<'a, Update<(usize, ())>>,
391    schedules_histogram: OutputSessionVec<'a, Update<(ScheduleHistogramDatum, ())>>,
392}
393
394#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
395struct ChannelDatum {
396    id: usize,
397    source: (usize, usize),
398    target: (usize, usize),
399    typ: String,
400}
401
402#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
403struct ParkDatum {
404    duration_pow: u64,
405    requested_pow: Option<u64>,
406}
407
408impl Columnation for ParkDatum {
409    type InnerRegion = CopyRegion<Self>;
410}
411
412#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
413struct MessageDatum {
414    channel: usize,
415    worker: usize,
416}
417
418impl Columnation for MessageDatum {
419    type InnerRegion = CopyRegion<Self>;
420}
421
422#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
423struct ScheduleHistogramDatum {
424    operator: usize,
425    duration_pow: u64,
426}
427
428impl Columnation for ScheduleHistogramDatum {
429    type InnerRegion = CopyRegion<Self>;
430}
431
432/// Event handler of the demux operator.
433struct DemuxHandler<'a, 'b> {
434    /// State kept by the demux operator.
435    state: &'a mut DemuxState,
436    /// State shared across log receivers.
437    shared_state: &'a mut SharedLoggingState,
438    /// Demux output buffers.
439    output: &'a mut DemuxOutput<'b>,
440    /// The logging interval specifying the time granularity for the updates.
441    logging_interval_ms: u128,
442    /// The number of timely workers.
443    peers: usize,
444    /// The current event time.
445    time: Duration,
446}
447
448impl DemuxHandler<'_, '_> {
449    /// Return the timestamp associated with the current event, based on the event time and the
450    /// logging interval.
451    fn ts(&self) -> Timestamp {
452        let time_ms = self.time.as_millis();
453        let interval = self.logging_interval_ms;
454        let rounded = (time_ms / interval + 1) * interval;
455        rounded.try_into().expect("must fit")
456    }
457
458    /// Handle the given timely event.
459    fn handle(&mut self, event: TimelyEvent) {
460        use TimelyEvent::*;
461
462        match event {
463            Operates(e) => self.handle_operates(e),
464            Channels(e) => self.handle_channels(e),
465            Shutdown(e) => self.handle_shutdown(e),
466            Park(e) => self.handle_park(e),
467            Messages(e) => self.handle_messages(e),
468            Schedule(e) => self.handle_schedule(e),
469            _ => (),
470        }
471    }
472
473    fn handle_operates(&mut self, event: OperatesEvent) {
474        let ts = self.ts();
475        let datum = (event.id, event.name.clone());
476        self.output.operates.give((datum, ts, Diff::ONE));
477
478        let datum = (event.id, event.addr.clone());
479        self.output.addresses.give((datum, ts, Diff::ONE));
480
481        self.state.operators.insert(event.id, event);
482    }
483
484    fn handle_channels(&mut self, event: ChannelsEvent) {
485        let ts = self.ts();
486        let datum = ChannelDatumReference {
487            id: event.id,
488            source: event.source,
489            target: event.target,
490            typ: &event.typ,
491        };
492        self.output.channels.give(((datum, ()), ts, Diff::ONE));
493
494        let datum = (event.id, event.scope_addr.clone());
495        self.output.addresses.give((datum, ts, Diff::ONE));
496
497        let dataflow_index = event.scope_addr[0];
498        self.state
499            .dataflow_channels
500            .entry(dataflow_index)
501            .or_default()
502            .push(event);
503    }
504
505    fn handle_shutdown(&mut self, event: ShutdownEvent) {
506        // Dropped operators should result in a negative record for
507        // the `operates` collection, cancelling out the initial
508        // operator announcement.
509        // Remove logging for this operator.
510
511        let Some(operator) = self.state.operators.remove(&event.id) else {
512            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
513            return;
514        };
515
516        // Retract operator information.
517        let ts = self.ts();
518        let datum = (operator.id, operator.name);
519        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
520
521        // Retract schedules information for the operator
522        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
523            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
524                .enumerate()
525                .filter(|(_, (count, _))| *count != 0)
526            {
527                self.output
528                    .schedules_duration
529                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
530
531                let datum = ScheduleHistogramDatum {
532                    operator: event.id,
533                    duration_pow: 1 << bucket,
534                };
535                let diff = Diff::cast_from(-count);
536                self.output
537                    .schedules_histogram
538                    .give(((datum, ()), ts, diff));
539            }
540        }
541
542        if operator.addr.len() == 1 {
543            let dataflow_index = operator.addr[0];
544            self.handle_dataflow_shutdown(dataflow_index);
545        }
546
547        let datum = (operator.id, operator.addr);
548        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
549    }
550
551    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
552        // Notify compute logging about the shutdown.
553        self.shared_state.compute_logger.as_ref().map(|logger| {
554            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
555        });
556
557        // When a dataflow shuts down, we need to retract all its channels.
558        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
559            return;
560        };
561
562        let ts = self.ts();
563        for channel in channels {
564            // Retract channel description.
565            let datum = ChannelDatumReference {
566                id: channel.id,
567                source: channel.source,
568                target: channel.target,
569                typ: &channel.typ,
570            };
571            self.output
572                .channels
573                .give(((datum, ()), ts, Diff::MINUS_ONE));
574
575            let datum = (channel.id, channel.scope_addr);
576            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
577
578            // Retract messages logged for this channel.
579            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
580                for (target_worker, count) in sent.iter().enumerate() {
581                    let datum = MessageDatum {
582                        channel: channel.id,
583                        worker: target_worker,
584                    };
585                    self.output
586                        .messages_sent
587                        .give(((datum, ()), ts, Diff::from(-count.records)));
588                    self.output
589                        .batches_sent
590                        .give(((datum, ()), ts, Diff::from(-count.batches)));
591                }
592            }
593            if let Some(received) = self.state.messages_received.remove(&channel.id) {
594                for (source_worker, count) in received.iter().enumerate() {
595                    let datum = MessageDatum {
596                        channel: channel.id,
597                        worker: source_worker,
598                    };
599                    self.output.messages_received.give((
600                        (datum, ()),
601                        ts,
602                        Diff::from(-count.records),
603                    ));
604                    self.output.batches_received.give((
605                        (datum, ()),
606                        ts,
607                        Diff::from(-count.batches),
608                    ));
609                }
610            }
611        }
612    }
613
614    fn handle_park(&mut self, event: ParkEvent) {
615        match event {
616            ParkEvent::Park(requested) => {
617                let park = Park {
618                    time: self.time,
619                    requested,
620                };
621                let existing = self.state.last_park.replace(park);
622                if existing.is_some() {
623                    error!("park without a succeeding unpark");
624                }
625            }
626            ParkEvent::Unpark => {
627                let Some(park) = self.state.last_park.take() else {
628                    error!("unpark without a preceding park");
629                    return;
630                };
631
632                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
633                let duration_pow =
634                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
635                let requested_pow = park
636                    .requested
637                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
638
639                let ts = self.ts();
640                let datum = ParkDatum {
641                    duration_pow,
642                    requested_pow,
643                };
644                self.output.parks.give(((datum, ()), ts, Diff::ONE));
645            }
646        }
647    }
648
649    fn handle_messages(&mut self, event: MessagesEvent) {
650        let ts = self.ts();
651        let count = Diff::from(event.record_count);
652
653        if event.is_send {
654            let datum = MessageDatum {
655                channel: event.channel,
656                worker: event.target,
657            };
658            self.output.messages_sent.give(((datum, ()), ts, count));
659            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
660
661            let sent_counts = self
662                .state
663                .messages_sent
664                .entry(event.channel)
665                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
666            sent_counts[event.target].records += count;
667            sent_counts[event.target].batches += 1;
668        } else {
669            let datum = MessageDatum {
670                channel: event.channel,
671                worker: event.source,
672            };
673            self.output.messages_received.give(((datum, ()), ts, count));
674            self.output
675                .batches_received
676                .give(((datum, ()), ts, Diff::ONE));
677
678            let received_counts = self
679                .state
680                .messages_received
681                .entry(event.channel)
682                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
683            received_counts[event.source].records += count;
684            received_counts[event.source].batches += 1;
685        }
686    }
687
688    fn handle_schedule(&mut self, event: ScheduleEvent) {
689        match event.start_stop {
690            timely::logging::StartStop::Start => {
691                let existing = self.state.schedule_starts.insert(event.id, self.time);
692                if existing.is_some() {
693                    error!(operator_id = ?event.id, "schedule start without succeeding stop");
694                }
695            }
696            timely::logging::StartStop::Stop => {
697                let Some(start_time) = self.state.schedule_starts.remove(&event.id) else {
698                    error!(operator_id = ?event.id, "schedule stop without preceeding start");
699                    return;
700                };
701
702                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
703                let elapsed_diff = Diff::from(i64::try_from(elapsed_ns).expect("must fit"));
704                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
705
706                let ts = self.ts();
707                let datum = event.id;
708                self.output
709                    .schedules_duration
710                    .give(((datum, ()), ts, elapsed_diff));
711
712                let datum = ScheduleHistogramDatum {
713                    operator: event.id,
714                    duration_pow: elapsed_pow,
715                };
716                self.output
717                    .schedules_histogram
718                    .give(((datum, ()), ts, Diff::ONE));
719
720                // Record count and elapsed time for later retraction.
721                let index = usize::cast_from(elapsed_pow.trailing_zeros());
722                let data = self.state.schedules_data.entry(event.id).or_default();
723                grow_vec(data, index);
724                let (count, duration) = &mut data[index];
725                *count += 1;
726                *duration += elapsed_diff;
727            }
728        }
729    }
730}
731
732/// Grow the given vector so it fits the given index.
733///
734/// This does nothing if the vector is already large enough.
735fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
736where
737    T: Clone + Default,
738{
739    if vec.len() <= index {
740        vec.resize(index + 1, Default::default());
741    }
742}