mz_compute/logging/
reachability.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Logging dataflows for events generated by timely dataflow.

use std::collections::BTreeMap;
use std::convert::TryInto;
use std::rc::Rc;
use std::time::Duration;

use columnar::Index;
use mz_compute_client::logging::LoggingConfig;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Row, Timestamp};
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, Column, ColumnBuilder};
use mz_timely_util::replay::MzReplay;
use timely::dataflow::channels::pact::ExchangeCore;
use timely::dataflow::Scope;
use timely::Container;

use crate::extensions::arrange::MzArrangeCore;
use crate::logging::initialize::ReachabilityEvent;
use crate::logging::{consolidate_and_pack, EventQueue, LogCollection, LogVariant, TimelyLog};
use crate::row_spine::RowRowBuilder;
use crate::typedefs::RowRowSpine;

/// The return type of [`construct`].
pub(super) struct Return {
    /// Collections to export.
    pub collections: BTreeMap<LogVariant, LogCollection>,
}

/// Constructs the logging dataflow fragment for reachability logs.
///
/// Params
/// * `scope`: The Timely scope hosting the log analysis dataflow.
/// * `config`: Logging configuration
/// * `event_queue`: The source to read log events from.
pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
    mut scope: G,
    config: &LoggingConfig,
    event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
) -> Return {
    let collections = scope.scoped("timely reachability logging", move |scope| {
        let enable_logging = config.enable_logging;
        let interval_ms = std::cmp::max(1, config.interval.as_millis());
        type UpdatesKey = (bool, usize, usize, usize, Timestamp);

        type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
        let (updates, token) = event_queue.links.mz_replay::<_, CB, _>(
            scope,
            "reachability logs",
            config.interval,
            event_queue.activator,
            move |mut session, data| {
                // If logging is disabled, we still need to install the indexes, but we can leave them
                // empty. We do so by immediately filtering all logs events.
                if !enable_logging {
                    return;
                }
                for (time, (operator_id, massaged)) in data.iter() {
                    let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms;
                    let time_ms: Timestamp = time_ms.try_into().expect("must fit");
                    for (source, port, update_type, ts, diff) in massaged.into_iter() {
                        let datum = (update_type, operator_id, source, port, ts);
                        session.give(((datum, ()), time_ms, diff));
                    }
                }
            },
        );

        // Restrict results by those logs that are meant to be active.
        let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
        let worker_id = scope.index();

        let updates = consolidate_and_pack::<_, Col2ValBatcher<UpdatesKey, _, _, _>, ColumnBuilder<_>, _, _>(
            &updates,
            TimelyLog::Reachability,
            move |((datum, ()), time, diff), packer, session| {
                let (update_type, operator_id, source, port, ts) = datum;
                let update_type = if *update_type { "source" } else { "target" };
                let data = packer.pack_slice(&[
                    Datum::UInt64(u64::cast_from(*operator_id)),
                    Datum::UInt64(u64::cast_from(worker_id)),
                    Datum::UInt64(u64::cast_from(*source)),
                    Datum::UInt64(u64::cast_from(*port)),
                    Datum::String(update_type),
                    Datum::from(*ts),
                ]);
                session.give((data, time, diff));
            }
        );

        let mut result = BTreeMap::new();
        for variant in logs_active {
            if config.index_logs.contains_key(&variant) {
                let trace = updates
                    .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
                        ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
                        &format!("Arrange {variant:?}"),
                    )
                    .trace;
                let collection = LogCollection {
                    trace,
                    token: Rc::clone(&token),
                };
                result.insert(variant, collection);
            }
        }
        result
    });

    Return { collections }
}