1use std::collections::BTreeMap;
13use std::convert::TryInto;
14use std::rc::Rc;
15use std::time::Duration;
16
17use mz_compute_client::logging::LoggingConfig;
18use mz_ore::cast::CastFrom;
19use mz_repr::{Datum, Diff, Row, Timestamp};
20use mz_timely_util::columnar::builder::ColumnBuilder;
21use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
22use mz_timely_util::replay::MzReplay;
23use timely::Container;
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::ExchangeCore;
26
27use crate::extensions::arrange::MzArrangeCore;
28use crate::logging::initialize::ReachabilityEvent;
29use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
30use crate::row_spine::RowRowBuilder;
31use crate::typedefs::RowRowSpine;
32
33pub(super) struct Return {
35 pub collections: BTreeMap<LogVariant, LogCollection>,
37}
38
39pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
46 mut scope: G,
47 config: &LoggingConfig,
48 event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
49) -> Return {
50 let collections = scope.scoped("timely reachability logging", move |scope| {
51 let enable_logging = config.enable_logging;
52 let interval_ms = std::cmp::max(1, config.interval.as_millis());
53 type UpdatesKey = (bool, usize, usize, usize, Timestamp);
54
55 type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
56 let (updates, token) = event_queue.links.mz_replay::<_, CB, _>(
57 scope,
58 "reachability logs",
59 config.interval,
60 event_queue.activator,
61 move |mut session, data| {
62 if !enable_logging {
65 return;
66 }
67 for (time, (operator_id, massaged)) in data.iter() {
68 let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms;
69 let time_ms: Timestamp = time_ms.try_into().expect("must fit");
70 for (source, port, update_type, ts, diff) in massaged.into_iter() {
71 let datum = (update_type, operator_id, source, port, ts);
72 session.give(((datum, ()), time_ms, diff));
73 }
74 }
75 },
76 );
77
78 let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
80 let worker_id = scope.index();
81
82 let updates = consolidate_and_pack::<_, Col2ValBatcher<UpdatesKey, _, _, _>, ColumnBuilder<_>, _, _>(
83 &updates,
84 TimelyLog::Reachability,
85 move |((datum, ()), time, diff), packer, session| {
86 let (update_type, operator_id, source, port, ts) = datum;
87 let update_type = if *update_type { "source" } else { "target" };
88 let data = packer.pack_slice(&[
89 Datum::UInt64(u64::cast_from(*operator_id)),
90 Datum::UInt64(u64::cast_from(worker_id)),
91 Datum::UInt64(u64::cast_from(*source)),
92 Datum::UInt64(u64::cast_from(*port)),
93 Datum::String(update_type),
94 Datum::from(*ts),
95 ]);
96 session.give((data, time, diff));
97 }
98 );
99
100 let mut result = BTreeMap::new();
101 for variant in logs_active {
102 if config.index_logs.contains_key(&variant) {
103 let trace = updates
104 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
105 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
106 &format!("Arrange {variant:?}"),
107 )
108 .trace;
109 let collection = LogCollection {
110 trace,
111 token: Rc::clone(&token),
112 };
113 result.insert(variant, collection);
114 }
115 }
116 result
117 });
118
119 Return { collections }
120}