mz_compute/logging/
reachability.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::collections::BTreeMap;
13use std::convert::TryInto;
14use std::rc::Rc;
15use std::time::Duration;
16
17use mz_compute_client::logging::LoggingConfig;
18use mz_ore::cast::CastFrom;
19use mz_repr::{Datum, Diff, Row, Timestamp};
20use mz_timely_util::columnar::builder::ColumnBuilder;
21use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
22use mz_timely_util::replay::MzReplay;
23use timely::Container;
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::ExchangeCore;
26
27use crate::extensions::arrange::MzArrangeCore;
28use crate::logging::initialize::ReachabilityEvent;
29use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
30use crate::row_spine::RowRowBuilder;
31use crate::typedefs::RowRowSpine;
32
33/// The return type of [`construct`].
34pub(super) struct Return {
35    /// Collections to export.
36    pub collections: BTreeMap<LogVariant, LogCollection>,
37}
38
39/// Constructs the logging dataflow fragment for reachability logs.
40///
41/// Params
42/// * `scope`: The Timely scope hosting the log analysis dataflow.
43/// * `config`: Logging configuration
44/// * `event_queue`: The source to read log events from.
45pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
46    mut scope: G,
47    config: &LoggingConfig,
48    event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
49) -> Return {
50    let collections = scope.scoped("timely reachability logging", move |scope| {
51        let enable_logging = config.enable_logging;
52        let interval_ms = std::cmp::max(1, config.interval.as_millis());
53        type UpdatesKey = (bool, usize, usize, usize, Timestamp);
54
55        type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
56        let (updates, token) = event_queue.links.mz_replay::<_, CB, _>(
57            scope,
58            "reachability logs",
59            config.interval,
60            event_queue.activator,
61            move |mut session, data| {
62                // If logging is disabled, we still need to install the indexes, but we can leave them
63                // empty. We do so by immediately filtering all logs events.
64                if !enable_logging {
65                    return;
66                }
67                for (time, (operator_id, massaged)) in data.iter() {
68                    let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms;
69                    let time_ms: Timestamp = time_ms.try_into().expect("must fit");
70                    for (source, port, update_type, ts, diff) in massaged.into_iter() {
71                        let datum = (update_type, operator_id, source, port, ts);
72                        session.give(((datum, ()), time_ms, diff));
73                    }
74                }
75            },
76        );
77
78        // Restrict results by those logs that are meant to be active.
79        let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
80        let worker_id = scope.index();
81
82        let updates = consolidate_and_pack::<_, Col2ValBatcher<UpdatesKey, _, _, _>, ColumnBuilder<_>, _, _>(
83            &updates,
84            TimelyLog::Reachability,
85            move |((datum, ()), time, diff), packer, session| {
86                let (update_type, operator_id, source, port, ts) = datum;
87                let update_type = if *update_type { "source" } else { "target" };
88                let data = packer.pack_slice(&[
89                    Datum::UInt64(u64::cast_from(*operator_id)),
90                    Datum::UInt64(u64::cast_from(worker_id)),
91                    Datum::UInt64(u64::cast_from(*source)),
92                    Datum::UInt64(u64::cast_from(*port)),
93                    Datum::String(update_type),
94                    Datum::from(*ts),
95                ]);
96                session.give((data, time, diff));
97            }
98        );
99
100        let mut result = BTreeMap::new();
101        for variant in logs_active {
102            if config.index_logs.contains_key(&variant) {
103                let trace = updates
104                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
105                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
106                        &format!("Arrange {variant:?}"),
107                    )
108                    .trace;
109                let collection = LogCollection {
110                    trace,
111                    token: Rc::clone(&token),
112                };
113                result.insert(variant, collection);
114            }
115        }
116        result
117    });
118
119    Return { collections }
120}