1use std::collections::BTreeMap;
13use std::convert::TryInto;
14use std::rc::Rc;
15use std::time::Duration;
16
17use mz_compute_client::logging::LoggingConfig;
18use mz_ore::cast::CastFrom;
19use mz_repr::{Datum, Diff, Row, Timestamp};
20use mz_timely_util::containers::{Col2ValBatcher, Column, ColumnBuilder, columnar_exchange};
21use mz_timely_util::replay::MzReplay;
22use timely::Container;
23use timely::dataflow::Scope;
24use timely::dataflow::channels::pact::ExchangeCore;
25
26use crate::extensions::arrange::MzArrangeCore;
27use crate::logging::initialize::ReachabilityEvent;
28use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
29use crate::row_spine::RowRowBuilder;
30use crate::typedefs::RowRowSpine;
31
32pub(super) struct Return {
34 pub collections: BTreeMap<LogVariant, LogCollection>,
36}
37
38pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
45 mut scope: G,
46 config: &LoggingConfig,
47 event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
48) -> Return {
49 let collections = scope.scoped("timely reachability logging", move |scope| {
50 let enable_logging = config.enable_logging;
51 let interval_ms = std::cmp::max(1, config.interval.as_millis());
52 type UpdatesKey = (bool, usize, usize, usize, Timestamp);
53
54 type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
55 let (updates, token) = event_queue.links.mz_replay::<_, CB, _>(
56 scope,
57 "reachability logs",
58 config.interval,
59 event_queue.activator,
60 move |mut session, data| {
61 if !enable_logging {
64 return;
65 }
66 for (time, (operator_id, massaged)) in data.iter() {
67 let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms;
68 let time_ms: Timestamp = time_ms.try_into().expect("must fit");
69 for (source, port, update_type, ts, diff) in massaged.into_iter() {
70 let datum = (update_type, operator_id, source, port, ts);
71 session.give(((datum, ()), time_ms, diff));
72 }
73 }
74 },
75 );
76
77 let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
79 let worker_id = scope.index();
80
81 let updates = consolidate_and_pack::<_, Col2ValBatcher<UpdatesKey, _, _, _>, ColumnBuilder<_>, _, _>(
82 &updates,
83 TimelyLog::Reachability,
84 move |((datum, ()), time, diff), packer, session| {
85 let (update_type, operator_id, source, port, ts) = datum;
86 let update_type = if *update_type { "source" } else { "target" };
87 let data = packer.pack_slice(&[
88 Datum::UInt64(u64::cast_from(*operator_id)),
89 Datum::UInt64(u64::cast_from(worker_id)),
90 Datum::UInt64(u64::cast_from(*source)),
91 Datum::UInt64(u64::cast_from(*port)),
92 Datum::String(update_type),
93 Datum::from(*ts),
94 ]);
95 session.give((data, time, diff));
96 }
97 );
98
99 let mut result = BTreeMap::new();
100 for variant in logs_active {
101 if config.index_logs.contains_key(&variant) {
102 let trace = updates
103 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
104 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
105 &format!("Arrange {variant:?}"),
106 )
107 .trace;
108 let collection = LogCollection {
109 trace,
110 token: Rc::clone(&token),
111 };
112 result.insert(variant, collection);
113 }
114 }
115 result
116 });
117
118 Return { collections }
119}