Skip to main content

mz_compute/logging/
timely.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::{Columnar, Index};
18use columnation::{Columnation, CopyRegion};
19use mz_compute_client::logging::LoggingConfig;
20use mz_ore::cast::CastFrom;
21use mz_repr::{Datum, Diff, Timestamp};
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::OutputBuilder;
29use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
30use timely::dataflow::operators::generic::operator::empty;
31use timely::logging::{
32    ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
33    TimelyEvent,
34};
35use tracing::error;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::logging::compute::{ComputeEvent, DataflowShutdown};
39use crate::logging::{
40    EventQueue, LogVariant, OutputSessionColumnar, OutputSessionVec, PermutedRowPacker, TimelyLog,
41    Update,
42};
43use crate::logging::{LogCollection, SharedLoggingState, consolidate_and_pack};
44use crate::row_spine::RowRowBuilder;
45use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
46
47/// The return type of [`construct`].
48pub(super) struct Return {
49    /// Collections to export.
50    pub collections: BTreeMap<LogVariant, LogCollection>,
51}
52
53/// Constructs the logging dataflow fragment for timely logs.
54///
55/// Params
56/// * `scope`: The Timely scope hosting the log analysis dataflow.
57/// * `config`: Logging configuration
58/// * `event_queue`: The source to read log events from.
59pub(super) fn construct(
60    scope: Scope<'_, Timestamp>,
61    config: &LoggingConfig,
62    event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
63    shared_state: Rc<RefCell<SharedLoggingState>>,
64    storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
65) -> Return {
66    scope.scoped("timely logging", move |scope| {
67        let enable_logging = config.enable_logging;
68        let (logs, token) = if enable_logging {
69            event_queue.links.mz_replay(
70                scope,
71                "timely logs",
72                config.interval,
73                event_queue.activator,
74            )
75        } else {
76            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
77            (empty(scope), token)
78        };
79
80        // If we have a storage log reader, replay it and concatenate with compute logs.
81        let (logs, storage_token) = if let Some(reader) = storage_log_reader {
82            use mz_timely_util::activator::RcActivator;
83            use timely::dataflow::operators::Concatenate;
84
85            let activator = RcActivator::new("storage_timely_activator".to_string(), 128);
86            let (storage_logs, s_token) =
87                [reader].mz_replay(scope, "storage timely logs", config.interval, activator);
88            let merged = scope.concatenate([logs, storage_logs]);
89            (merged, Some(s_token))
90        } else {
91            (logs, None)
92        };
93        // Convert storage token to Rc<dyn Any> so we can stash it alongside the compute token.
94        let storage_token: Rc<dyn std::any::Any> = match storage_token {
95            Some(t) => t,
96            None => Rc::new(()),
97        };
98
99        // Build a demux operator that splits the replayed event stream up into the separate
100        // logging streams.
101        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
102        let mut input = demux.new_input(logs, Pipeline);
103        let (operates_out, operates) = demux.new_output();
104        let mut operates_out = OutputBuilder::from(operates_out);
105        let (channels_out, channels) = demux.new_output();
106        let mut channels_out = OutputBuilder::from(channels_out);
107        let (addresses_out, addresses) = demux.new_output();
108        let mut addresses_out = OutputBuilder::from(addresses_out);
109        let (parks_out, parks) = demux.new_output();
110        let mut parks_out = OutputBuilder::from(parks_out);
111        let (messages_sent_out, messages_sent) = demux.new_output();
112        let mut messages_sent_out = OutputBuilder::from(messages_sent_out);
113        let (messages_received_out, messages_received) = demux.new_output();
114        let mut messages_received_out = OutputBuilder::from(messages_received_out);
115        let (schedules_duration_out, schedules_duration) = demux.new_output();
116        let mut schedules_duration_out = OutputBuilder::from(schedules_duration_out);
117        let (schedules_histogram_out, schedules_histogram) = demux.new_output();
118        let mut schedules_histogram_out = OutputBuilder::from(schedules_histogram_out);
119        let (batches_sent_out, batches_sent) = demux.new_output();
120        let mut batches_sent_out = OutputBuilder::from(batches_sent_out);
121        let (batches_received_out, batches_received) = demux.new_output();
122        let mut batches_received_out = OutputBuilder::from(batches_received_out);
123
124        let worker_id = scope.index();
125        let mut demux_state = DemuxState::default();
126        demux.build(|_capability| {
127            let peers = scope.peers();
128            let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
129            move |_frontiers| {
130                let mut operates = operates_out.activate();
131                let mut channels = channels_out.activate();
132                let mut addresses = addresses_out.activate();
133                let mut parks = parks_out.activate();
134                let mut messages_sent = messages_sent_out.activate();
135                let mut messages_received = messages_received_out.activate();
136                let mut batches_sent = batches_sent_out.activate();
137                let mut batches_received = batches_received_out.activate();
138                let mut schedules_duration = schedules_duration_out.activate();
139                let mut schedules_histogram = schedules_histogram_out.activate();
140
141                input.for_each(|cap, data| {
142                    let mut output_buffers = DemuxOutput {
143                        operates: operates.session_with_builder(&cap),
144                        channels: channels.session_with_builder(&cap),
145                        addresses: addresses.session_with_builder(&cap),
146                        parks: parks.session_with_builder(&cap),
147                        messages_sent: messages_sent.session_with_builder(&cap),
148                        messages_received: messages_received.session_with_builder(&cap),
149                        schedules_duration: schedules_duration.session_with_builder(&cap),
150                        schedules_histogram: schedules_histogram.session_with_builder(&cap),
151                        batches_sent: batches_sent.session_with_builder(&cap),
152                        batches_received: batches_received.session_with_builder(&cap),
153                    };
154
155                    for (time, event) in data.drain(..) {
156                        if let TimelyEvent::Messages(msg) = &event {
157                            match msg.is_send {
158                                true => assert_eq!(msg.source, worker_id),
159                                false => assert_eq!(msg.target, worker_id),
160                            }
161                        }
162
163                        DemuxHandler {
164                            state: &mut demux_state,
165                            shared_state: &mut shared_state.borrow_mut(),
166                            output: &mut output_buffers,
167                            logging_interval_ms,
168                            peers,
169                            time,
170                        }
171                        .handle(event);
172                    }
173                });
174            }
175        });
176
177        // Encode the contents of each logging stream into its expected `Row` format.
178        // We pre-arrange the logging streams to force a consolidation and reduce the amount of
179        // updates that reach `Row` encoding.
180
181        let operates = consolidate_and_pack::<KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
182            operates,
183            TimelyLog::Operates,
184            move |data, packer, session| {
185                for ((id, name), time, diff) in data.iter() {
186                    let data = packer.pack_slice(&[
187                        Datum::UInt64(u64::cast_from(*id)),
188                        Datum::UInt64(u64::cast_from(worker_id)),
189                        Datum::String(name),
190                    ]);
191                    session.give((data, time, diff));
192                }
193            },
194        );
195
196        // TODO: `consolidate_and_pack` requires columnation, which `ChannelDatum` does not
197        // implement. Consider consolidating here once we support columnar.
198        let channels = channels.unary::<ColumnBuilder<_>, _, _, _>(
199            Pipeline,
200            "ToRow Channels",
201            |_cap, _info| {
202                let mut packer = PermutedRowPacker::new(TimelyLog::Channels);
203                move |input, output| {
204                    input.for_each_time(|time, data| {
205                        let mut session = output.session_with_builder(&time);
206                        for d in data.flat_map(|c| c.borrow().into_index_iter()) {
207                            let ((datum, ()), time, diff) = d;
208                            let (source_node, source_port) = datum.source;
209                            let (target_node, target_port) = datum.target;
210                            let data = packer.pack_slice(&[
211                                Datum::UInt64(u64::cast_from(datum.id)),
212                                Datum::UInt64(u64::cast_from(worker_id)),
213                                Datum::UInt64(u64::cast_from(source_node)),
214                                Datum::UInt64(u64::cast_from(source_port)),
215                                Datum::UInt64(u64::cast_from(target_node)),
216                                Datum::UInt64(u64::cast_from(target_port)),
217                                Datum::String(
218                                    std::str::from_utf8(datum.typ).expect("valid string"),
219                                ),
220                            ]);
221                            session.give((data, time, diff));
222                        }
223                    });
224                }
225            },
226        );
227
228        // Types to make rustfmt happy.
229        type KVB<K, V, T, D> = KeyValBatcher<K, V, T, D>;
230        type KB<K, T, D> = KeyBatcher<K, T, D>;
231
232        let addresses = consolidate_and_pack::<KVB<_, _, _, _>, ColumnBuilder<_>, _, _>(
233            addresses,
234            TimelyLog::Addresses,
235            move |data, packer, session| {
236                for ((id, address), time, diff) in data.iter() {
237                    let data = packer.pack_by_index(|packer, index| match index {
238                        0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
239                        1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
240                        2 => {
241                            let list = address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)));
242                            packer.push_list(list)
243                        }
244                        _ => unreachable!("Addresses relation has three columns"),
245                    });
246                    session.give((data, time, diff));
247                }
248            },
249        );
250
251        let parks = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
252            parks,
253            TimelyLog::Parks,
254            move |data, packer, session| {
255                for ((datum, ()), time, diff) in data.iter() {
256                    let data = packer.pack_slice(&[
257                        Datum::UInt64(u64::cast_from(worker_id)),
258                        Datum::UInt64(datum.duration_pow),
259                        datum
260                            .requested_pow
261                            .map(Datum::UInt64)
262                            .unwrap_or(Datum::Null),
263                    ]);
264                    session.give((data, time, diff));
265                }
266            },
267        );
268
269        let batches_sent = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
270            batches_sent,
271            TimelyLog::BatchesSent,
272            move |data, packer, session| {
273                for ((datum, ()), time, diff) in data.iter() {
274                    let data = packer.pack_slice(&[
275                        Datum::UInt64(u64::cast_from(datum.channel)),
276                        Datum::UInt64(u64::cast_from(worker_id)),
277                        Datum::UInt64(u64::cast_from(datum.worker)),
278                    ]);
279                    session.give((data, time, diff));
280                }
281            },
282        );
283
284        let batches_received = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
285            batches_received,
286            TimelyLog::BatchesReceived,
287            move |data, packer, session| {
288                for ((datum, ()), time, diff) in data.iter() {
289                    let data = packer.pack_slice(&[
290                        Datum::UInt64(u64::cast_from(datum.channel)),
291                        Datum::UInt64(u64::cast_from(datum.worker)),
292                        Datum::UInt64(u64::cast_from(worker_id)),
293                    ]);
294                    session.give((data, time, diff));
295                }
296            },
297        );
298
299        let messages_sent = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
300            messages_sent,
301            TimelyLog::MessagesSent,
302            move |data, packer, session| {
303                for ((datum, ()), time, diff) in data.iter() {
304                    let data = packer.pack_slice(&[
305                        Datum::UInt64(u64::cast_from(datum.channel)),
306                        Datum::UInt64(u64::cast_from(worker_id)),
307                        Datum::UInt64(u64::cast_from(datum.worker)),
308                    ]);
309                    session.give((data, time, diff));
310                }
311            },
312        );
313
314        let messages_received = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
315            messages_received,
316            TimelyLog::MessagesReceived,
317            move |data, packer, session| {
318                for ((datum, ()), time, diff) in data.iter() {
319                    let data = packer.pack_slice(&[
320                        Datum::UInt64(u64::cast_from(datum.channel)),
321                        Datum::UInt64(u64::cast_from(datum.worker)),
322                        Datum::UInt64(u64::cast_from(worker_id)),
323                    ]);
324                    session.give((data, time, diff));
325                }
326            },
327        );
328
329        let elapsed = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
330            schedules_duration,
331            TimelyLog::Elapsed,
332            move |data, packer, session| {
333                for ((operator, ()), time, diff) in data.iter() {
334                    let data = packer.pack_slice(&[
335                        Datum::UInt64(u64::cast_from(*operator)),
336                        Datum::UInt64(u64::cast_from(worker_id)),
337                    ]);
338                    session.give((data, time, diff));
339                }
340            },
341        );
342
343        let histogram = consolidate_and_pack::<KB<_, _, _>, ColumnBuilder<_>, _, _>(
344            schedules_histogram,
345            TimelyLog::Histogram,
346            move |data, packer, session| {
347                for ((datum, ()), time, diff) in data.iter() {
348                    let data = packer.pack_slice(&[
349                        Datum::UInt64(u64::cast_from(datum.operator)),
350                        Datum::UInt64(u64::cast_from(worker_id)),
351                        Datum::UInt64(datum.duration_pow),
352                    ]);
353                    session.give((data, time, diff));
354                }
355            },
356        );
357
358        let logs = {
359            use TimelyLog::*;
360            [
361                (Operates, operates),
362                (Channels, channels),
363                (Elapsed, elapsed),
364                (Histogram, histogram),
365                (Addresses, addresses),
366                (Parks, parks),
367                (MessagesSent, messages_sent),
368                (MessagesReceived, messages_received),
369                (BatchesSent, batches_sent),
370                (BatchesReceived, batches_received),
371            ]
372        };
373
374        // Build the output arrangements.
375        let mut collections = BTreeMap::new();
376        for (variant, collection) in logs {
377            let variant = LogVariant::Timely(variant);
378            if config.index_logs.contains_key(&variant) {
379                // Extract types to make rustfmt happy.
380                type Batcher<K, V, T, R> = Col2ValBatcher<K, V, T, R>;
381                type Builder<T, R> = RowRowBuilder<T, R>;
382                let trace = collection
383                    .mz_arrange_core::<_, Batcher<_, _, _, _>, Builder<_, _>, RowRowSpine<_, _>>(
384                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(
385                            columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, Diff>,
386                        ),
387                        &format!("Arrange {variant:?}"),
388                    )
389                    .trace;
390                let combined_token: Rc<dyn std::any::Any> =
391                    Rc::new((Rc::clone(&token), Rc::clone(&storage_token)));
392                let collection = LogCollection {
393                    trace,
394                    token: combined_token,
395                };
396                collections.insert(variant, collection);
397            }
398        }
399
400        Return { collections }
401    })
402}
403
404/// State maintained by the demux operator.
405#[derive(Default)]
406struct DemuxState {
407    /// Information about live operators, indexed by operator ID.
408    operators: BTreeMap<usize, OperatesEvent>,
409    /// Maps dataflow IDs to channels in the dataflow.
410    dataflow_channels: BTreeMap<usize, Vec<ChannelsEvent>>,
411    /// Information about the last requested park.
412    last_park: Option<Park>,
413    /// Maps channel IDs to boxed slices counting the messages sent to each target worker.
414    messages_sent: BTreeMap<usize, Box<[MessageCount]>>,
415    /// Maps channel IDs to boxed slices counting the messages received from each source worker.
416    messages_received: BTreeMap<usize, Box<[MessageCount]>>,
417    /// Stores for scheduled operators the time when they were scheduled.
418    schedule_starts: Vec<(usize, Duration)>,
419    /// Maps operator IDs to a vector recording the (count, elapsed_ns) values in each histogram
420    /// bucket.
421    schedules_data: BTreeMap<usize, Vec<(isize, Diff)>>,
422}
423
424struct Park {
425    /// Time when the park occurred.
426    time: Duration,
427    /// Requested park time.
428    requested: Option<Duration>,
429}
430
431/// Organize message counts into number of batches and records.
432#[derive(Default, Copy, Clone, Debug)]
433struct MessageCount {
434    /// The number of batches sent across a channel.
435    batches: i64,
436    /// The number of records sent across a channel.
437    records: Diff,
438}
439
440/// Bundled output buffers used by the demux operator.
441//
442// We use tuples rather than dedicated `*Datum` structs for `operates` and `addresses` to avoid
443// having to manually implement `Columnation`. If `Columnation` could be `#[derive]`ed, that
444// wouldn't be an issue.
445struct DemuxOutput<'a, 'b> {
446    operates: OutputSessionVec<'a, 'b, Update<(usize, String)>>,
447    channels: OutputSessionColumnar<'a, 'b, Update<(ChannelDatum, ())>>,
448    addresses: OutputSessionVec<'a, 'b, Update<(usize, Vec<usize>)>>,
449    parks: OutputSessionVec<'a, 'b, Update<(ParkDatum, ())>>,
450    batches_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
451    batches_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
452    messages_sent: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
453    messages_received: OutputSessionVec<'a, 'b, Update<(MessageDatum, ())>>,
454    schedules_duration: OutputSessionVec<'a, 'b, Update<(usize, ())>>,
455    schedules_histogram: OutputSessionVec<'a, 'b, Update<(ScheduleHistogramDatum, ())>>,
456}
457
458#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Columnar)]
459struct ChannelDatum {
460    id: usize,
461    source: (usize, usize),
462    target: (usize, usize),
463    typ: String,
464}
465
466#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
467struct ParkDatum {
468    duration_pow: u64,
469    requested_pow: Option<u64>,
470}
471
472impl Columnation for ParkDatum {
473    type InnerRegion = CopyRegion<Self>;
474}
475
476#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
477struct MessageDatum {
478    channel: usize,
479    worker: usize,
480}
481
482impl Columnation for MessageDatum {
483    type InnerRegion = CopyRegion<Self>;
484}
485
486#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
487struct ScheduleHistogramDatum {
488    operator: usize,
489    duration_pow: u64,
490}
491
492impl Columnation for ScheduleHistogramDatum {
493    type InnerRegion = CopyRegion<Self>;
494}
495
496/// Event handler of the demux operator.
497struct DemuxHandler<'a, 'b, 'c> {
498    /// State kept by the demux operator.
499    state: &'a mut DemuxState,
500    /// State shared across log receivers.
501    shared_state: &'a mut SharedLoggingState,
502    /// Demux output buffers.
503    output: &'a mut DemuxOutput<'b, 'c>,
504    /// The logging interval specifying the time granularity for the updates.
505    logging_interval_ms: u128,
506    /// The number of timely workers.
507    peers: usize,
508    /// The current event time.
509    time: Duration,
510}
511
512impl DemuxHandler<'_, '_, '_> {
513    /// Return the timestamp associated with the current event, based on the event time and the
514    /// logging interval.
515    fn ts(&self) -> Timestamp {
516        let time_ms = self.time.as_millis();
517        let interval = self.logging_interval_ms;
518        let rounded = (time_ms / interval + 1) * interval;
519        rounded.try_into().expect("must fit")
520    }
521
522    /// Handle the given timely event.
523    fn handle(&mut self, event: TimelyEvent) {
524        use TimelyEvent::*;
525
526        match event {
527            Operates(e) => self.handle_operates(e),
528            Channels(e) => self.handle_channels(e),
529            Shutdown(e) => self.handle_shutdown(e),
530            Park(e) => self.handle_park(e),
531            Messages(e) => self.handle_messages(e),
532            Schedule(e) => self.handle_schedule(e),
533            _ => (),
534        }
535    }
536
537    fn handle_operates(&mut self, event: OperatesEvent) {
538        let ts = self.ts();
539        let datum = (event.id, event.name.clone());
540        self.output.operates.give((datum, ts, Diff::ONE));
541
542        let datum = (event.id, event.addr.clone());
543        self.output.addresses.give((datum, ts, Diff::ONE));
544
545        self.state.operators.insert(event.id, event);
546    }
547
548    fn handle_channels(&mut self, event: ChannelsEvent) {
549        let ts = self.ts();
550        let datum = ChannelDatumReference {
551            id: event.id,
552            source: event.source,
553            target: event.target,
554            typ: &event.typ,
555        };
556        self.output.channels.give(((datum, ()), ts, Diff::ONE));
557
558        let datum = (event.id, event.scope_addr.clone());
559        self.output.addresses.give((datum, ts, Diff::ONE));
560
561        let dataflow_index = event.scope_addr[0];
562        self.state
563            .dataflow_channels
564            .entry(dataflow_index)
565            .or_default()
566            .push(event);
567    }
568
569    fn handle_shutdown(&mut self, event: ShutdownEvent) {
570        // Dropped operators should result in a negative record for
571        // the `operates` collection, cancelling out the initial
572        // operator announcement.
573        // Remove logging for this operator.
574
575        let Some(operator) = self.state.operators.remove(&event.id) else {
576            error!(operator_id = ?event.id, "missing operator entry at time of shutdown");
577            return;
578        };
579
580        // Retract operator information.
581        let ts = self.ts();
582        let datum = (operator.id, operator.name);
583        self.output.operates.give((datum, ts, Diff::MINUS_ONE));
584
585        // Retract schedules information for the operator
586        if let Some(schedules) = self.state.schedules_data.remove(&event.id) {
587            for (bucket, (count, elapsed_ns)) in IntoIterator::into_iter(schedules)
588                .enumerate()
589                .filter(|(_, (count, _))| *count != 0)
590            {
591                self.output
592                    .schedules_duration
593                    .give(((event.id, ()), ts, Diff::from(-elapsed_ns)));
594
595                let datum = ScheduleHistogramDatum {
596                    operator: event.id,
597                    duration_pow: 1 << bucket,
598                };
599                let diff = Diff::cast_from(-count);
600                self.output
601                    .schedules_histogram
602                    .give(((datum, ()), ts, diff));
603            }
604        }
605
606        if operator.addr.len() == 1 {
607            let dataflow_index = operator.addr[0];
608            self.handle_dataflow_shutdown(dataflow_index);
609        }
610
611        let datum = (operator.id, operator.addr);
612        self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
613    }
614
615    fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) {
616        // Notify compute logging about the shutdown.
617        self.shared_state.compute_logger.as_ref().map(|logger| {
618            logger.log(&(ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index })))
619        });
620
621        // When a dataflow shuts down, we need to retract all its channels.
622        let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else {
623            return;
624        };
625
626        let ts = self.ts();
627        for channel in channels {
628            // Retract channel description.
629            let datum = ChannelDatumReference {
630                id: channel.id,
631                source: channel.source,
632                target: channel.target,
633                typ: &channel.typ,
634            };
635            self.output
636                .channels
637                .give(((datum, ()), ts, Diff::MINUS_ONE));
638
639            let datum = (channel.id, channel.scope_addr);
640            self.output.addresses.give((datum, ts, Diff::MINUS_ONE));
641
642            // Retract messages logged for this channel.
643            if let Some(sent) = self.state.messages_sent.remove(&channel.id) {
644                for (target_worker, count) in sent.iter().enumerate() {
645                    let datum = MessageDatum {
646                        channel: channel.id,
647                        worker: target_worker,
648                    };
649                    self.output
650                        .messages_sent
651                        .give(((datum, ()), ts, Diff::from(-count.records)));
652                    self.output
653                        .batches_sent
654                        .give(((datum, ()), ts, Diff::from(-count.batches)));
655                }
656            }
657            if let Some(received) = self.state.messages_received.remove(&channel.id) {
658                for (source_worker, count) in received.iter().enumerate() {
659                    let datum = MessageDatum {
660                        channel: channel.id,
661                        worker: source_worker,
662                    };
663                    self.output.messages_received.give((
664                        (datum, ()),
665                        ts,
666                        Diff::from(-count.records),
667                    ));
668                    self.output.batches_received.give((
669                        (datum, ()),
670                        ts,
671                        Diff::from(-count.batches),
672                    ));
673                }
674            }
675        }
676    }
677
678    fn handle_park(&mut self, event: ParkEvent) {
679        match event {
680            ParkEvent::Park(requested) => {
681                let park = Park {
682                    time: self.time,
683                    requested,
684                };
685                let existing = self.state.last_park.replace(park);
686                if existing.is_some() {
687                    error!("park without a succeeding unpark");
688                }
689            }
690            ParkEvent::Unpark => {
691                let Some(park) = self.state.last_park.take() else {
692                    error!("unpark without a preceding park");
693                    return;
694                };
695
696                let duration_ns = self.time.saturating_sub(park.time).as_nanos();
697                let duration_pow =
698                    u64::try_from(duration_ns.next_power_of_two()).expect("must fit");
699                let requested_pow = park
700                    .requested
701                    .map(|r| u64::try_from(r.as_nanos().next_power_of_two()).expect("must fit"));
702
703                let ts = self.ts();
704                let datum = ParkDatum {
705                    duration_pow,
706                    requested_pow,
707                };
708                self.output.parks.give(((datum, ()), ts, Diff::ONE));
709            }
710        }
711    }
712
713    fn handle_messages(&mut self, event: MessagesEvent) {
714        let ts = self.ts();
715        let count = Diff::from(event.record_count);
716
717        if event.is_send {
718            let datum = MessageDatum {
719                channel: event.channel,
720                worker: event.target,
721            };
722            self.output.messages_sent.give(((datum, ()), ts, count));
723            self.output.batches_sent.give(((datum, ()), ts, Diff::ONE));
724
725            let sent_counts = self
726                .state
727                .messages_sent
728                .entry(event.channel)
729                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
730            sent_counts[event.target].records += count;
731            sent_counts[event.target].batches += 1;
732        } else {
733            let datum = MessageDatum {
734                channel: event.channel,
735                worker: event.source,
736            };
737            self.output.messages_received.give(((datum, ()), ts, count));
738            self.output
739                .batches_received
740                .give(((datum, ()), ts, Diff::ONE));
741
742            let received_counts = self
743                .state
744                .messages_received
745                .entry(event.channel)
746                .or_insert_with(|| vec![Default::default(); self.peers].into_boxed_slice());
747            received_counts[event.source].records += count;
748            received_counts[event.source].batches += 1;
749        }
750    }
751
752    fn handle_schedule(&mut self, event: ScheduleEvent) {
753        match event.start_stop {
754            timely::logging::StartStop::Start => {
755                self.state.schedule_starts.push((event.id, self.time));
756            }
757            timely::logging::StartStop::Stop => {
758                let Some((old_id, start_time)) = self.state.schedule_starts.pop() else {
759                    error!(operator_id = ?event.id, "schedule stop without preceding start");
760                    return;
761                };
762
763                if old_id != event.id {
764                    error!(start_id = ?old_id, stop_id = ?event.id, "schedule stop without preceding start");
765                    return;
766                }
767
768                let elapsed_ns = self.time.saturating_sub(start_time).as_nanos();
769                let elapsed_i64 = i64::try_from(elapsed_ns).expect("must fit");
770                let elapsed_diff = Diff::from(elapsed_i64);
771                let elapsed_pow = u64::try_from(elapsed_ns.next_power_of_two()).expect("must fit");
772
773                let ts = self.ts();
774                let datum = event.id;
775                self.output
776                    .schedules_duration
777                    .give(((datum, ()), ts, elapsed_diff));
778
779                let datum = ScheduleHistogramDatum {
780                    operator: event.id,
781                    duration_pow: elapsed_pow,
782                };
783                self.output
784                    .schedules_histogram
785                    .give(((datum, ()), ts, Diff::ONE));
786
787                // Record count and elapsed time for later retraction.
788                let index = usize::cast_from(elapsed_pow.trailing_zeros());
789                let data = self.state.schedules_data.entry(event.id).or_default();
790                grow_vec(data, index);
791                let (count, duration) = &mut data[index];
792                *count += 1;
793                *duration += elapsed_diff;
794            }
795        }
796    }
797}
798
799/// Grow the given vector so it fits the given index.
800///
801/// This does nothing if the vector is already large enough.
802fn grow_vec<T>(vec: &mut Vec<T>, index: usize)
803where
804    T: Clone + Default,
805{
806    if vec.len() <= index {
807        vec.resize(index + 1, Default::default());
808    }
809}