Skip to main content

mz_compute/logging/
reachability.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logging dataflows for events generated by timely dataflow.
11
12use std::collections::BTreeMap;
13use std::convert::TryInto;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::Index;
18use mz_compute_client::logging::LoggingConfig;
19use mz_ore::cast::CastFrom;
20use mz_repr::{Datum, Diff, Row, Timestamp};
21use mz_timely_util::columnar::batcher;
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::operator::empty;
29
30use crate::extensions::arrange::MzArrangeCore;
31use crate::logging::initialize::ReachabilityEvent;
32use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
33use crate::typedefs::RowRowSpine;
34use mz_row_spine::RowRowBuilder;
35
36/// The return type of [`construct`].
37pub(super) struct Return {
38    /// Collections to export.
39    pub collections: BTreeMap<LogVariant, LogCollection>,
40}
41
42/// Constructs the logging dataflow fragment for reachability logs.
43///
44/// Params
45/// * `scope`: The Timely scope hosting the log analysis dataflow.
46/// * `config`: Logging configuration
47/// * `event_queue`: The source to read log events from.
48pub(super) fn construct(
49    scope: Scope<'_, Timestamp>,
50    config: &LoggingConfig,
51    event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
52) -> Return {
53    let collections = scope.scoped("timely reachability logging", move |scope| {
54        let enable_logging = config.enable_logging;
55        let interval_ms = std::cmp::max(1, config.interval.as_millis());
56        type UpdatesKey = (bool, usize, usize, usize, Timestamp);
57
58        type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
59        let (logs, token) = if enable_logging {
60            event_queue.links.mz_replay(
61                scope,
62                "reachability logs",
63                config.interval,
64                event_queue.activator,
65            )
66        } else {
67            let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
68            (empty(scope), token)
69        };
70        let logs = logs.unary::<CB, _, _, _>(Pipeline, "FlatMapReachability", move |_, _| {
71            move |input, output| {
72                input.for_each_time(|time, data| {
73                    output
74                        .session_with_builder(&time)
75                        .give_iterator(data.flat_map(|d| {
76                            d.borrow().into_index_iter().flat_map(
77                                move |(time, (operator_id, massaged))| {
78                                    let time_ms =
79                                        ((time.as_millis() / interval_ms) + 1) * interval_ms;
80                                    let time_ms: Timestamp = time_ms.try_into().expect("must fit");
81                                    massaged.into_iter().map(
82                                        move |(source, port, update_type, ts, diff)| {
83                                            let datum =
84                                                (update_type, operator_id, source, port, ts);
85                                            ((datum, ()), time_ms, diff)
86                                        },
87                                    )
88                                },
89                            )
90                        }));
91                });
92            }
93        });
94
95        // Restrict results by those logs that are meant to be active.
96        let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
97        let worker_id = scope.index();
98
99        let updates = consolidate_and_pack::<
100            batcher::Chunker<_>,
101            Col2ValBatcher<UpdatesKey, _, _, _>,
102            ColumnBuilder<_>,
103            _,
104            _,
105            _,
106        >(
107            logs,
108            TimelyLog::Reachability,
109            move |data, packer, session| {
110                for ((datum, ()), time, diff) in data.iter() {
111                    let (update_type, operator_id, source, port, ts) = datum;
112                    let update_type = if *update_type { "source" } else { "target" };
113                    let data = packer.pack_slice(&[
114                        Datum::UInt64(u64::cast_from(*operator_id)),
115                        Datum::UInt64(u64::cast_from(worker_id)),
116                        Datum::UInt64(u64::cast_from(*source)),
117                        Datum::UInt64(u64::cast_from(*port)),
118                        Datum::String(update_type),
119                        Datum::from(*ts),
120                    ]);
121                    session.give((data, time, diff));
122                }
123            },
124        );
125
126        let mut result = BTreeMap::new();
127        for variant in logs_active {
128            if config.index_logs.contains_key(&variant) {
129                let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
130                    columnar_exchange::<Row, Row, Timestamp, Diff>,
131                );
132                let trace = updates
133                    .clone()
134                    .mz_arrange_core::<
135                        _,
136                        batcher::Chunker<_>,
137                        Col2ValBatcher<_, _, _, _>,
138                        RowRowBuilder<_, _>,
139                        RowRowSpine<_, _>,
140                    >(exchange, &format!("Arrange {variant:?}"))
141                    .trace;
142                let collection = LogCollection {
143                    trace,
144                    token: Rc::clone(&token),
145                };
146                result.insert(variant, collection);
147            }
148        }
149        result
150    });
151
152    Return { collections }
153}