1use std::collections::BTreeMap;
13use std::convert::TryInto;
14use std::rc::Rc;
15use std::time::Duration;
16
17use columnar::Index;
18use mz_compute_client::logging::LoggingConfig;
19use mz_ore::cast::CastFrom;
20use mz_repr::{Datum, Diff, Row, Timestamp};
21use mz_timely_util::columnar::batcher;
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnar::{Col2ValBatcher, Column, columnar_exchange};
24use mz_timely_util::replay::MzReplay;
25use timely::dataflow::Scope;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::dataflow::operators::generic::operator::empty;
29
30use crate::extensions::arrange::MzArrangeCore;
31use crate::logging::initialize::ReachabilityEvent;
32use crate::logging::{EventQueue, LogCollection, LogVariant, TimelyLog, consolidate_and_pack};
33use crate::typedefs::RowRowSpine;
34use mz_row_spine::RowRowBuilder;
35
36pub(super) struct Return {
38 pub collections: BTreeMap<LogVariant, LogCollection>,
40}
41
42pub(super) fn construct(
49 scope: Scope<'_, Timestamp>,
50 config: &LoggingConfig,
51 event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
52) -> Return {
53 let collections = scope.scoped("timely reachability logging", move |scope| {
54 let enable_logging = config.enable_logging;
55 let interval_ms = std::cmp::max(1, config.interval.as_millis());
56 type UpdatesKey = (bool, usize, usize, usize, Timestamp);
57
58 type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
59 let (logs, token) = if enable_logging {
60 event_queue.links.mz_replay(
61 scope,
62 "reachability logs",
63 config.interval,
64 event_queue.activator,
65 )
66 } else {
67 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
68 (empty(scope), token)
69 };
70 let logs = logs.unary::<CB, _, _, _>(Pipeline, "FlatMapReachability", move |_, _| {
71 move |input, output| {
72 input.for_each_time(|time, data| {
73 output
74 .session_with_builder(&time)
75 .give_iterator(data.flat_map(|d| {
76 d.borrow().into_index_iter().flat_map(
77 move |(time, (operator_id, massaged))| {
78 let time_ms =
79 ((time.as_millis() / interval_ms) + 1) * interval_ms;
80 let time_ms: Timestamp = time_ms.try_into().expect("must fit");
81 massaged.into_iter().map(
82 move |(source, port, update_type, ts, diff)| {
83 let datum =
84 (update_type, operator_id, source, port, ts);
85 ((datum, ()), time_ms, diff)
86 },
87 )
88 },
89 )
90 }));
91 });
92 }
93 });
94
95 let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
97 let worker_id = scope.index();
98
99 let updates = consolidate_and_pack::<
100 batcher::Chunker<_>,
101 Col2ValBatcher<UpdatesKey, _, _, _>,
102 ColumnBuilder<_>,
103 _,
104 _,
105 _,
106 >(
107 logs,
108 TimelyLog::Reachability,
109 move |data, packer, session| {
110 for ((datum, ()), time, diff) in data.iter() {
111 let (update_type, operator_id, source, port, ts) = datum;
112 let update_type = if *update_type { "source" } else { "target" };
113 let data = packer.pack_slice(&[
114 Datum::UInt64(u64::cast_from(*operator_id)),
115 Datum::UInt64(u64::cast_from(worker_id)),
116 Datum::UInt64(u64::cast_from(*source)),
117 Datum::UInt64(u64::cast_from(*port)),
118 Datum::String(update_type),
119 Datum::from(*ts),
120 ]);
121 session.give((data, time, diff));
122 }
123 },
124 );
125
126 let mut result = BTreeMap::new();
127 for variant in logs_active {
128 if config.index_logs.contains_key(&variant) {
129 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
130 columnar_exchange::<Row, Row, Timestamp, Diff>,
131 );
132 let trace = updates
133 .clone()
134 .mz_arrange_core::<
135 _,
136 batcher::Chunker<_>,
137 Col2ValBatcher<_, _, _, _>,
138 RowRowBuilder<_, _>,
139 RowRowSpine<_, _>,
140 >(exchange, &format!("Arrange {variant:?}"))
141 .trace;
142 let collection = LogCollection {
143 trace,
144 token: Rc::clone(&token),
145 };
146 result.insert(variant, collection);
147 }
148 }
149 result
150 });
151
152 Return { collections }
153}