Skip to main content

mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use columnation::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::batcher;
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use mz_timely_util::columnation::ColumnationChunker;
26use mz_timely_util::replay::MzReplay;
27use timely::dataflow::Scope;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::Operator;
30use timely::dataflow::operators::generic::OutputBuilder;
31use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
32use timely::dataflow::operators::generic::operator::empty;
33use timely::logging::{
34    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
35    TimelyEvent,
36};
37use tracing::error;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::logging::compute::{ComputeEvent, DataflowShutdown};
41use crate::logging::{
42    EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
43    Update,
44};
45use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
46use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
47use mz_row_spine::RowRowBuilder;
48
49/// The return type of [`construct`].
50pub(super) struct Return {
51    /// Collections to export.
52    pub collections: BTreeMap<LogVariant, LogCollection>,
53}
54
55/// Constructs the logging dataflow fragment for timely logs.
56///
57/// Params
58/// * `scope`: The Timely scope hosting the log analysis dataflow.
59/// * `config`: Logging configuration
60/// * `event_queue`: The source to read log events from.
61pub(super) fn construct(
62    scope: Scope<'_, Timestamp>,
63    config: &LoggingConfig,
64    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
65    shared_state: Rc<RefCell<SharedLoggingState>>,
66    storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
67) -> Return {
68    scope.scoped("timely logging", move |scope| {
69        let enable_logging = config.enable_logging;
70        let (logs, token) = if enable_logging {
71            event_queue.links.mz_replay(
72                scope,
73                "timely logs",
74                config.interval,
75                event_queue.activator,
76            )
77        } else {
78            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
79            (empty(scope), token)
80        };
81
82        // If we have a storage log reader, replay it and concatenate with compute logs.
83        let (logs, storage_token) = if let Some(reader) = storage_log_reader {
84            use mz_timely_util::activator::RcActivator;
85            use timely::dataflow::operators::Concatenate;
86
87            let activator = RcActivator::new("storage_timely_activator".to_string(), 128);
88            let (storage_logs, s_token) =
89                [reader].mz_replay(scope, "storage timely logs", config.interval, activator);
90            let merged = scope.concatenate([logs, storage_logs]);
91            (merged, Some(s_token))
92        } else {
93            (logs, None)
94        };
95        // Convert storage token to Rc<dyn Any> so we can stash it alongside the compute token.
96        let storage_token: Rc<dyn std::any::Any> = match storage_token {
97            Some(t) => t,
98            None => Rc::new(()),
99        };
100
101        // Build a demux operator that splits the replayed event stream up into the separate
102        // logging streams.
103        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
104        let mut input = demux.new_input(logs, Pipeline);
105        let (operates_out, operates) = demux.new_output();
106        let mut operates_out = OutputBuilder::from(operates_out);
107        let (channels_out, channels) = demux.new_output();
108        let mut channels_out = OutputBuilder::from(channels_out);
109        let (addresses_out, addresses) = demux.new_output();
110        let mut addresses_out = OutputBuilder::from(addresses_out);
111        let (parks_out, parks) = demux.new_output();
112        let mut parks_out = OutputBuilder::from(parks_out);
113        let (messages_sent_out, messages_sent) = demux.new_output();
114        let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
115        let (messages_received_out, messages_received) = demux.new_output();
116        let mut messages_received_out = OutputBuilder::from(messages_received_out);
117        let (schedules_duration_out, schedules_duration) = demux.new_output();
118        let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
119        let (schedules_histogram_out, schedules_histogram) = demux.new_output();
120        let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
121        let (batches_sent_out, batches_sent) = demux.new_output();
122        let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
123        let (batches_received_out, batches_received) = demux.new_output();
124        let mut batches_received_out = OutputBuilder::from(batches_received_out);
125
126        let worker_id = scope.index();
127        let mut demux_state = DemuxState::default();
128        demux.build(|_capability| {
129            let peers = scope.peers();
130            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
131            move |_frontiers| {
132                let mut operates = operates_out.activate();
133                let mut channels = channels_out.activate();
134                let mut addresses = addresses_out.activate();
135                let mut parks = parks_out.activate();
136                let mut messages_sent = messages_sent_out.activate();
137                let mut messages_received = messages_received_out.activate();
138                let mut batches_sent = batches_sent_out.activate();
139                let mut batches_received = batches_received_out.activate();
140                let mut schedules_duration = schedules_duration_out.activate();
141                let mut schedules_histogram = schedules_histogram_out.activate();
142
143                input.for_each(|cap, data| {
144                    let mut output_buffers = DemuxOutput {
145                        operates: operates.session_with_builder(&cap),
146                        channels: channels.session_with_builder(&cap),
147                        addresses: addresses.session_with_builder(&cap),
148                        parks: parks.session_with_builder(&cap),
149                        messages_sent: messages_sent.session_with_builder(&cap),
150                        messages_received: messages_received.session_with_builder(&cap),
151                        schedules_duration: schedules_duration.session_with_builder(&cap),
152                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
153                        batches_sent: batches_sent.session_with_builder(&cap),
154                        batches_received: batches_received.session_with_builder(&cap),
155                    };
156
157                    for (time, event) in data.drain(..) {
158                        if let TimelyEvent::Messages(msg) = &event {
159                            match msg.is_send {
160                                true => assert_eq!(msg.source, worker_id),
161                                false => assert_eq!(msg.target, worker_id),
162                            }
163                        }
164
165                        DemuxHandler {
166                            state: &mut demux_state,
167                            shared_state: &mut shared_state.borrow_mut(),
168                            output: &mut output_buffers,
169                            logging_interval_ms,
170                            peers,
171                            time,
172                        }
173                        .handle(event);
174                    }
175                });
176            }
177        });
178
179        // Encode the contents of each logging stream into its expected `Row` format.
180        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
181        // updates that reach `Row` encoding.
182
183        let operates = consolidate_and_pack::<
184            ColumnationChunker<_>,
185            KeyValBatcher<_, _, _, _>,
186            ColumnBuilder<_>,
187            _,
188            _,
189            _,
190        >(
191            operates,
192            TimelyLog::Operates,
193            move |data, packer, session| {
194                for ((id, name), time, diff) in data.iter() {
195                    let data = packer.pack_slice(&[
196                        Datum::UInt64(u64::cast_from(*id)),
197                        Datum::UInt64(u64::cast_from(worker_id)),
198                        Datum::String(name),
199                    ]);
200                    session.give((data, time, diff));
201                }
202            },
203        );
204
205        // TODO: `consolidate_and_pack` requires columnation, which `ChannelDatum` does not
206        // implement. Consider consolidating here once we support columnar.
207        let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
208            Pipeline,
209            "ToRow Channels",
210            |_cap, _info| {
211                let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
212                move |input, output| {
213                    input.for_each_time(|time, data| {
214                        let mut session = output.session_with_builder(&time);
215                        for d in data.flat_map(|c| c.borrow().into_index_iter()) {
216                            let ((datum, ()), time, diff) = d;
217                            let (source_node, source_port) = datum.source;
218                            let (target_node, target_port) = datum.target;
219                            let data = packer.pack_slice(&[
220                                Datum::UInt64(u64::cast_from(datum.id)),
221                                Datum::UInt64(u64::cast_from(worker_id)),
222                                Datum::UInt64(u64::cast_from(source_node)),
223                                Datum::UInt64(u64::cast_from(source_port)),
224                                Datum::UInt64(u64::cast_from(target_node)),
225                                Datum::UInt64(u64::cast_from(target_port)),
226                                Datum::String(
227                                    std::str::from_utf8(datum.typ).expect("valid string"),
228                                ),
229                            ]);
230                            session.give((data, time, diff));
231                        }
232                    });
233                }
234            },
235        );
236
237        // Types to make rustfmt happy.
238        type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
239        type KB<K, T, D> = KeyBatcher<K, T, D>;
240
241        let addresses = consolidate_and_pack::<
242            ColumnationChunker<_>,
243            KVB<_, _, _, _>,
244            ColumnBuilder<_>,
245            _,
246            _,
247            _,
248        >(
249            addresses,
250            TimelyLog::Addresses,
251            move |data, packer, session| {
252                for ((id, address), time, diff) in data.iter() {
253                    let data = packer.pack_by_index(|packer, index| match index {
254                        0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
255                        1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
256                        2 => {
257                            let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
258                            packer.push_list(list)
259                        }
260                        _ => unreachable!("Addresses relation has three columns"),
261                    });
262                    session.give((data, time, diff));
263                }
264            },
265        );
266
267        let parks =
268            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
269                parks,
270                TimelyLog::Parks,
271                move |data, packer, session| {
272                    for ((datum, ()), time, diff) in data.iter() {
273                        let data = packer.pack_slice(&[
274                            Datum::UInt64(u64::cast_from(worker_id)),
275                            Datum::UInt64(datum.duration_pow),
276                            datum
277                                .requested_pow
278                                .map(Datum::UInt64)
279                                .unwrap_or(Datum::Null),
280                        ]);
281                        session.give((data, time, diff));
282                    }
283                },
284            );
285
286        let batches_sent =
287            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
288                batches_sent,
289                TimelyLog::BatchesSent,
290                move |data, packer, session| {
291                    for ((datum, ()), time, diff) in data.iter() {
292                        let data = packer.pack_slice(&[
293                            Datum::UInt64(u64::cast_from(datum.channel)),
294                            Datum::UInt64(u64::cast_from(worker_id)),
295                            Datum::UInt64(u64::cast_from(datum.worker)),
296                        ]);
297                        session.give((data, time, diff));
298                    }
299                },
300            );
301
302        let batches_received =
303            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
304                batches_received,
305                TimelyLog::BatchesReceived,
306                move |data, packer, session| {
307                    for ((datum, ()), time, diff) in data.iter() {
308                        let data = packer.pack_slice(&[
309                            Datum::UInt64(u64::cast_from(datum.channel)),
310                            Datum::UInt64(u64::cast_from(datum.worker)),
311                            Datum::UInt64(u64::cast_from(worker_id)),
312                        ]);
313                        session.give((data, time, diff));
314                    }
315                },
316            );
317
318        let messages_sent =
319            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
320                messages_sent,
321                TimelyLog::MessagesSent,
322                move |data, packer, session| {
323                    for ((datum, ()), time, diff) in data.iter() {
324                        let data = packer.pack_slice(&[
325                            Datum::UInt64(u64::cast_from(datum.channel)),
326                            Datum::UInt64(u64::cast_from(worker_id)),
327                            Datum::UInt64(u64::cast_from(datum.worker)),
328                        ]);
329                        session.give((data, time, diff));
330                    }
331                },
332            );
333
334        let messages_received =
335            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
336                messages_received,
337                TimelyLog::MessagesReceived,
338                move |data, packer, session| {
339                    for ((datum, ()), time, diff) in data.iter() {
340                        let data = packer.pack_slice(&[
341                            Datum::UInt64(u64::cast_from(datum.channel)),
342                            Datum::UInt64(u64::cast_from(datum.worker)),
343                            Datum::UInt64(u64::cast_from(worker_id)),
344                        ]);
345                        session.give((data, time, diff));
346                    }
347                },
348            );
349
350        let elapsed =
351            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
352                schedules_duration,
353                TimelyLog::Elapsed,
354                move |data, packer, session| {
355                    for ((operator, ()), time, diff) in data.iter() {
356                        let data = packer.pack_slice(&[
357                            Datum::UInt64(u64::cast_from(*operator)),
358                            Datum::UInt64(u64::cast_from(worker_id)),
359                        ]);
360                        session.give((data, time, diff));
361                    }
362                },
363            );
364
365        let histogram =
366            consolidate_and_pack::<ColumnationChunker<_>, KB<_, _, _>, ColumnBuilder<_>, _, _, _>(
367                schedules_histogram,
368                TimelyLog::Histogram,
369                move |data, packer, session| {
370                    for ((datum, ()), time, diff) in data.iter() {
371                        let data = packer.pack_slice(&[
372                            Datum::UInt64(u64::cast_from(datum.operator)),
373                            Datum::UInt64(u64::cast_from(worker_id)),
374                            Datum::UInt64(datum.duration_pow),
375                        ]);
376                        session.give((data, time, diff));
377                    }
378                },
379            );
380
381        let logs = {
382            use TimelyLog::*;
383            [
384                (Operates, operates),
385                (Channels, channels),
386                (Elapsed, elapsed),
387                (Histogram, histogram),
388                (Addresses, addresses),
389                (Parks, parks),
390                (MessagesSent, messages_sent),
391                (MessagesReceived, messages_received),
392                (BatchesSent, batches_sent),
393                (BatchesReceived, batches_received),
394            ]
395        };
396
397        // Build the output arrangements.
398        let mut collections = BTreeMap::new();
399        for (variant, collection) in logs {
400            let variant = LogVariant::Timely(variant);
401            if config.index_logs.contains_key(&variant) {
402                // Extract types to make rustfmt happy.
403                type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
404                type Builder<T, R> = RowRowBuilder<T, R>;
405                let trace = collection
406                    .mz_arrange_core::<
407                        _,
408                        batcher::Chunker<_>,
409                        Batcher<_, _, _, _>,
410                        Builder<_, _>,
411                        RowRowSpine<_, _>,
412                    >(
413                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(
414                            columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
415                        ),
416                        &format!("Arrange {variant:?}"),
417                    )
418                    .trace;
419                let combined_token: Rc<dyn std::any::Any> =
420                    Rc::new((Rc::clone(&token), Rc::clone(&storage_token)));
421                let collection = LogCollection {
422                    trace,
423                    token: combined_token,
424                };
425                collections.insert(variant, collection);
426            }
427        }
428
429        Return { collections }
430    })
431}
432
433/// State maintained by the demux operator.
434#[derive(Default)]
435struct DemuxState {
436    /// Information about live operators, indexed by operator ID.
437    operators: BTreeMap<usize, OperatesEvent>,
438    /// Maps dataflow IDs to channels in the dataflow.
439    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
440    /// Information about the last requested park.
441    last_park: Option<Park>,
442    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
443    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
444    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
445    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
446    /// Stores for scheduled operators the time when they were scheduled.
447    schedule_starts: Vec<(usize, Duration)>,
448    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
449    /// bucket.
450    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
451}
452
453struct Park {
454    /// Time when the park occurred.
455    time: Duration,
456    /// Requested park time.
457    requested: Option<Duration>,
458}
459
460/// Organize message counts into number of batches and records.
461#[derive(Default, Copy, Clone, Debug)]
462struct MessageCount {
463    /// The number of batches sent across a channel.
464    batches: i64,
465    /// The number of records sent across a channel.
466    records: Diff,
467}
468
469/// Bundled output buffers used by the demux operator.
470//
471// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
472// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
473// wouldn't be an issue.
474struct DemuxOutput<'a, 'b> {
475    operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
476    channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
477    addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
478    parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
479    batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
480    batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
481    messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
482    messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
483    schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
484    schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
485}
486
487#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
488struct ChannelDatum {
489    id: usize,
490    source: (usize, usize),
491    target: (usize, usize),
492    typ: String,
493}
494
495#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
496struct ParkDatum {
497    duration_pow: u64,
498    requested_pow: Option<u64>,
499}
500
501impl Columnation for ParkDatum {
502    type InnerRegion = CopyRegion<Self>;
503}
504
505#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
506struct MessageDatum {
507    channel: usize,
508    worker: usize,
509}
510
511impl Columnation for MessageDatum {
512    type InnerRegion = CopyRegion<Self>;
513}
514
515#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
516struct ScheduleHistogramDatum {
517    operator: usize,
518    duration_pow: u64,
519}
520
521impl Columnation for ScheduleHistogramDatum {
522    type InnerRegion = CopyRegion<Self>;
523}
524
525/// Event handler of the demux operator.
526struct DemuxHandler<'a, 'b, 'c> {
527    /// State kept by the demux operator.
528    state: &'a mut DemuxState,
529    /// State shared across log receivers.
530    shared_state: &'a mut SharedLoggingState,
531    /// Demux output buffers.
532    output: &'a mut DemuxOutput<'b, 'c>,
533    /// The logging interval specifying the time granularity for the updates.
534    logging_interval_ms: u128,
535    /// The number of timely workers.
536    peers: usize,
537    /// The current event time.
538    time: Duration,
539}
540
541impl DemuxHandler<'_, '_, '_> {
542    /// Return the timestamp associated with the current event, based on the event time and the
543    /// logging interval.
544    fn ts(&self) -> Timestamp {
545        let time_ms = self.time.as_millis();
546        let interval = self.logging_interval_ms;
547        let rounded = (time_ms / interval + 1) * interval;
548        rounded.try_into().expect("must fit")
549    }
550
551    /// Handle the given timely event.
552    fn handle(&mut self, event: TimelyEvent) {
553        use TimelyEvent::*;
554
555        match event {
556            Operates(e) => self.handle_operates(e),
557            Channels(e) => self.handle_channels(e),
558            Shutdown(e) => self.handle_shutdown(e),
559            Park(e) => self.handle_park(e),
560            Messages(e) => self.handle_messages(e),
561            Schedule(e) => self.handle_schedule(e),
562            _ => (),
563        }
564    }
565
566    fn handle_operates(&mut self, event: OperatesEvent) {
567        let ts = self.ts();
568        let datum = (event.id, event.name.clone());
569        self.output.operates.give((datum, ts, Diff::ONE));
570
571        let datum = (event.id, event.addr.clone());
572        self.output.addresses.give((datum, ts, Diff::ONE));
573
574        self.state.operators.insert(event.id, event);
575    }
576
577    fn handle_channels(&mut self, event: ChannelsEvent) {
578        let ts = self.ts();
579        let datum = ChannelDatumReference {
580            id: event.id,
581            source: event.source,
582            target: event.target,
583            typ: &event.typ,
584        };
585        self.output.channels.give(((datum, ()), ts, Diff::ONE));
586
587        let datum = (event.id, event.scope_addr.clone());
588        self.output.addresses.give((datum, ts, Diff::ONE));
589
590        let dataflow_index = event.scope_addr[0];
591        self.state
592            .dataflow_channels
593            .entry(dataflow_index)
594            .or_default()
595            .push(event);
596    }
597
598    fn handle_shutdown(&mut self, event: ShutdownEvent) {
599        // Dropped operators should result in a negative record for
600        // the `operates` collection, cancelling out the initial
601        // operator announcement.
602        // Remove logging for this operator.
603
604        let Some(operator) = self.state.operators.remove(&event.id) else {
605            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
606            return;
607        };
608
609        // Retract operator information.
610        let ts = self.ts();
611        let datum = (operator.id, operator.name);
612        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
613
614        // Retract schedules information for the operator
615        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
616            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
617                .enumerate()
618                .filter(|(_, (count, _))| *count != 0)
619            {
620                self.output
621                    .schedules_duration
622                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
623
624                let datum = ScheduleHistogramDatum {
625                    operator: event.id,
626                    duration_pow: 1 << bucket,
627                };
628                let diff = Diff::cast_from(-count);
629                self.output
630                    .schedules_histogram
631                    .give(((datum, ()), ts, diff));
632            }
633        }
634
635        if operator.addr.len() == 1 {
636            let dataflow_index = operator.addr[0];
637            self.handle_dataflow_shutdown(dataflow_index);
638        }
639
640        let datum = (operator.id, operator.addr);
641        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
642    }
643
644    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
645        // Notify compute logging about the shutdown.
646        self.shared_state.compute_logger.as_ref().map(|logger| {
647            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
648        });
649
650        // When a dataflow shuts down, we need to retract all its channels.
651        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
652            return;
653        };
654
655        let ts = self.ts();
656        for channel in channels {
657            // Retract channel description.
658            let datum = ChannelDatumReference {
659                id: channel.id,
660                source: channel.source,
661                target: channel.target,
662                typ: &channel.typ,
663            };
664            self.output
665                .channels
666                .give(((datum, ()), ts, Diff::MINUS_ONE));
667
668            let datum = (channel.id, channel.scope_addr);
669            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
670
671            // Retract messages logged for this channel.
672            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
673                for (target_worker, count) in sent.iter().enumerate() {
674                    let datum = MessageDatum {
675                        channel: channel.id,
676                        worker: target_worker,
677                    };
678                    self.output
679                        .messages_sent
680                        .give(((datum, ()), ts, Diff::from(-count.records)));
681                    self.output
682                        .batches_sent
683                        .give(((datum, ()), ts, Diff::from(-count.batches)));
684                }
685            }
686            if let Some(received) = self.state.messages_received.remove(&channel.id) {
687                for (source_worker, count) in received.iter().enumerate() {
688                    let datum = MessageDatum {
689                        channel: channel.id,
690                        worker: source_worker,
691                    };
692                    self.output.messages_received.give((
693                        (datum, ()),
694                        ts,
695                        Diff::from(-count.records),
696                    ));
697                    self.output.batches_received.give((
698                        (datum, ()),
699                        ts,
700                        Diff::from(-count.batches),
701                    ));
702                }
703            }
704        }
705    }
706
707    fn handle_park(&mut self, event: ParkEvent) {
708        match event {
709            ParkEvent::Park(requested) => {
710                let park = Park {
711                    time: self.time,
712                    requested,
713                };
714                let existing = self.state.last_park.replace(park);
715                if existing.is_some() {
716                    error!("park without a succeeding unpark");
717                }
718            }
719            ParkEvent::Unpark => {
720                let Some(park) = self.state.last_park.take() else {
721                    error!("unpark without a preceding park");
722                    return;
723                };
724
725                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
726                let duration_pow =
727                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
728                let requested_pow = park
729                    .requested
730                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
731
732                let ts = self.ts();
733                let datum = ParkDatum {
734                    duration_pow,
735                    requested_pow,
736                };
737                self.output.parks.give(((datum, ()), ts, Diff::ONE));
738            }
739        }
740    }
741
742    fn handle_messages(&mut self, event: MessagesEvent) {
743        let ts = self.ts();
744        let count = Diff::from(event.record_count);
745
746        if event.is_send {
747            let datum = MessageDatum {
748                channel: event.channel,
749                worker: event.target,
750            };
751            self.output.messages_sent.give(((datum, ()), ts, count));
752            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
753
754            let sent_counts = self
755                .state
756                .messages_sent
757                .entry(event.channel)
758                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
759            sent_counts[event.target].records += count;
760            sent_counts[event.target].batches += 1;
761        } else {
762            let datum = MessageDatum {
763                channel: event.channel,
764                worker: event.source,
765            };
766            self.output.messages_received.give(((datum, ()), ts, count));
767            self.output
768                .batches_received
769                .give(((datum, ()), ts, Diff::ONE));
770
771            let received_counts = self
772                .state
773                .messages_received
774                .entry(event.channel)
775                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
776            received_counts[event.source].records += count;
777            received_counts[event.source].batches += 1;
778        }
779    }
780
781    fn handle_schedule(&mut self, event: ScheduleEvent) {
782        match event.start_stop {
783            timely::logging::StartStop::Start => {
784                self.state.schedule_starts.push((event.id, self.time));
785            }
786            timely::logging::StartStop::Stop => {
787                let Some((old_id, start_time)) = self.state.schedule_starts.pop() else {
788                    error!(operator_id = ?event.id, "schedule stop without preceding start");
789                    return;
790                };
791
792                if old_id != event.id {
793                    error!(start_id = ?old_id, stop_id = ?event.id, "schedule stop without preceding start");
794                    return;
795                }
796
797                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
798                let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
799                let elapsed_diff = Diff::from(elapsed_i64);
800                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
801
802                let ts = self.ts();
803                let datum = event.id;
804                self.output
805                    .schedules_duration
806                    .give(((datum, ()), ts, elapsed_diff));
807
808                let datum = ScheduleHistogramDatum {
809                    operator: event.id,
810                    duration_pow: elapsed_pow,
811                };
812                self.output
813                    .schedules_histogram
814                    .give(((datum, ()), ts, Diff::ONE));
815
816                // Record count and elapsed time for later retraction.
817                let index = usize::cast_from(elapsed_pow.trailing_zeros());
818                let data = self.state.schedules_data.entry(event.id).or_default();
819                grow_vec(data, index);
820                let (count, duration) = &mut data[index];
821                *count += 1;
822                *duration += elapsed_diff;
823            }
824        }
825    }
826}
827
828/// Grow the given vector so it fits the given index.
829///
830/// This does nothing if the vector is already large enough.
831fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
832where
833    T: Clone + Default,
834{
835    if vec.len() <= index {
836        vec.resize(index + 1, Default::default());
837    }
838}