1use std::collections::BTreeMap;
13use std::convert::TryInto;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::Index;
18use mz_compute_client::logging::LoggingConfig;
19use mz_ore::cast::CastFrom;
20use mz_repr::{Datum, Diff, Row, Timestamp};
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
23use mz_timely_util::replay::MzReplay;
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
26use timely::dataflow::operators::Operator;
27use timely::dataflow::operators::generic::operator::empty;
28
29use crate::extensions::arrange::MzArrangeCore;
30use crate::logging::initialize::ReachabilityEvent;
31use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
32use crate::row_spine::RowRowBuilder;
33use crate::typedefs::RowRowSpine;
34
35pub(super) struct Return {
37 pub collections: BTreeMap<LogVariant, LogCollection>,
39}
40
41pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
48 mut scope: G,
49 config: &LoggingConfig,
50 event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
51) -> Return {
52 let collections = scope.scoped("timely reachability logging", move |scope| {
53 let enable_logging = config.enable_logging;
54 let interval_ms = std::cmp::max(1, config.interval.as_millis());
55 type UpdatesKey = (bool, usize, usize, usize, Timestamp);
56
57 type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
58 let (logs, token) = if enable_logging {
59 event_queue.links.mz_replay(
60 scope,
61 "reachability logs",
62 config.interval,
63 event_queue.activator,
64 )
65 } else {
66 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
67 (empty(scope), token)
68 };
69 let logs = logs.unary::<CB, _, _, _>(Pipeline, "FlatMapReachability", move |_, _| {
70 move |input, output| {
71 input.for_each_time(|time, data| {
72 output
73 .session_with_builder(&time)
74 .give_iterator(data.flat_map(|d| {
75 d.borrow().into_index_iter().flat_map(
76 move |(time, (operator_id, massaged))| {
77 let time_ms =
78 ((time.as_millis() / interval_ms) + 1) * interval_ms;
79 let time_ms: Timestamp = time_ms.try_into().expect("must fit");
80 massaged.into_iter().map(
81 move |(source, port, update_type, ts, diff)| {
82 let datum =
83 (update_type, operator_id, source, port, ts);
84 ((datum, ()), time_ms, diff)
85 },
86 )
87 },
88 )
89 }));
90 });
91 }
92 });
93
94 let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
96 let worker_id = scope.index();
97
98 let updates =
99 consolidate_and_pack::<_, Col2ValBatcher<UpdatesKey, _, _, _>, ColumnBuilder<_>, _, _>(
100 &logs,
101 TimelyLog::Reachability,
102 move |data, packer, session| {
103 for ((datum, ()), time, diff) in data.iter() {
104 let (update_type, operator_id, source, port, ts) = datum;
105 let update_type = if *update_type { "source" } else { "target" };
106 let data = packer.pack_slice(&[
107 Datum::UInt64(u64::cast_from(*operator_id)),
108 Datum::UInt64(u64::cast_from(worker_id)),
109 Datum::UInt64(u64::cast_from(*source)),
110 Datum::UInt64(u64::cast_from(*port)),
111 Datum::String(update_type),
112 Datum::from(*ts),
113 ]);
114 session.give((data, time, diff));
115 }
116 },
117 );
118
119 let mut result = BTreeMap::new();
120 for variant in logs_active {
121 if config.index_logs.contains_key(&variant) {
122 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
123 columnar_exchange::<Row, Row, Timestamp, Diff>,
124 );
125 let trace = updates
126 .mz_arrange_core::<
127 _,
128 Col2ValBatcher<_, _, _, _>,
129 RowRowBuilder<_, _>,
130 RowRowSpine<_, _>,
131 >(exchange, &format!("Arrange {variant:?}"))
132 .trace;
133 let collection = LogCollection {
134 trace,
135 token: Rc::clone(&token),
136 };
137 result.insert(variant, collection);
138 }
139 }
140 result
141 });
142
143 Return { collections }
144}