1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_dyncfg::ConfigSet;
18use mz_ore::metrics::MetricsRegistry;
19use mz_repr::{Diff, Timestamp};
20use mz_storage_operators::persist_source::Subtime;
21use mz_timely_util::columnar::Column;
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::operator::CollectionExt;
24use mz_timely_util::scope_label::ScopeExt;
25use timely::ContainerBuilder;
26use timely::container::{ContainerBuilder as _, PushInto};
27use timely::logging::{TimelyEvent, TimelyEventBuilder};
28use timely::logging_core::{Logger, Registry};
29use timely::order::Product;
30use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
31
32use crate::arrangement::manager::TraceBundle;
33use crate::extensions::arrange::{KeyCollection, MzArrange};
34use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
35use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
36use crate::render::errors::DataflowErrorSer;
37use crate::typedefs::{ErrBatcher, ErrBuilder};
38
39pub fn initialize(
44 worker: &mut timely::worker::Worker,
45 config: &LoggingConfig,
46 metrics_registry: MetricsRegistry,
47 worker_config: Rc<ConfigSet>,
48 workers_per_process: usize,
49 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
50) -> LoggingTraces {
51 let interval_ms = std::cmp::max(1, config.interval.as_millis());
52
53 let now = Instant::now();
57 let start_offset = std::time::SystemTime::now()
58 .duration_since(std::time::SystemTime::UNIX_EPOCH)
59 .expect("Failed to get duration since Unix epoch");
60
61 let mut context = LoggingContext {
62 worker,
63 config,
64 interval_ms,
65 now,
66 start_offset,
67 t_event_queue: EventQueue::new("t"),
68 r_event_queue: EventQueue::new("r"),
69 d_event_queue: EventQueue::new("d"),
70 c_event_queue: EventQueue::new("c"),
71 shared_state: Default::default(),
72 metrics_registry,
73 worker_config,
74 workers_per_process,
75 storage_log_reader,
76 };
77
78 let dataflow_index = context.worker.next_dataflow_index();
81 let traces = if config.log_logging {
82 context.register_loggers();
83 context.construct_dataflow()
84 } else {
85 let traces = context.construct_dataflow();
86 context.register_loggers();
87 traces
88 };
89
90 let compute_logger = worker.logger_for("materialize/compute").unwrap();
91 LoggingTraces {
92 traces,
93 dataflow_index,
94 compute_logger,
95 }
96}
97
98pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
99
100struct LoggingContext<'a> {
101 worker: &'a mut timely::worker::Worker,
102 config: &'a LoggingConfig,
103 interval_ms: u128,
104 now: Instant,
105 start_offset: Duration,
106 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
107 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
108 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
109 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
110 shared_state: Rc<RefCell<SharedLoggingState>>,
111 metrics_registry: MetricsRegistry,
112 worker_config: Rc<ConfigSet>,
113 workers_per_process: usize,
114 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
116}
117
118pub(crate) struct LoggingTraces {
119 pub traces: BTreeMap<LogVariant, TraceBundle>,
121 pub dataflow_index: usize,
123 pub compute_logger: super::compute::Logger,
125}
126
127impl LoggingContext<'_> {
128 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
129 self.worker.dataflow_named("Dataflow: logging", |scope| {
130 let scope = scope.with_label();
131
132 let mut collections = BTreeMap::new();
133
134 let super::timely::Return {
135 collections: timely_collections,
136 } = super::timely::construct(
137 scope,
138 self.config,
139 self.t_event_queue.clone(),
140 Rc::clone(&self.shared_state),
141 self.storage_log_reader.take(),
142 );
143 collections.extend(timely_collections);
144
145 let super::reachability::Return {
146 collections: reachability_collections,
147 } = super::reachability::construct(scope, self.config, self.r_event_queue.clone());
148 collections.extend(reachability_collections);
149
150 let super::differential::Return {
151 collections: differential_collections,
152 } = super::differential::construct(
153 scope,
154 self.config,
155 self.d_event_queue.clone(),
156 Rc::clone(&self.shared_state),
157 );
158 collections.extend(differential_collections);
159
160 let super::compute::Return {
161 collections: compute_collections,
162 } = super::compute::construct(
163 scope.clone(),
164 scope.activations(),
165 self.config,
166 self.c_event_queue.clone(),
167 Rc::clone(&self.shared_state),
168 );
169 collections.extend(compute_collections);
170
171 let super::prometheus::Return {
172 collections: prometheus_collections,
173 } = super::prometheus::construct(
174 scope,
175 self.config,
176 self.metrics_registry.clone(),
177 self.now,
178 self.start_offset,
179 Rc::clone(&self.worker_config),
180 self.workers_per_process,
181 );
182 collections.extend(prometheus_collections);
183
184 let errs = scope.scoped("logging errors", |scope| {
185 let collection: KeyCollection<_, DataflowErrorSer, Diff> =
186 VecCollection::empty(scope).into();
187 collection
188 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
189 .trace
190 });
191
192 let traces = collections
193 .into_iter()
194 .map(|(log, collection)| {
195 let bundle = TraceBundle::new(collection.trace, errs.clone())
196 .with_drop(collection.token);
197 (log, bundle)
198 })
199 .collect();
200 traces
201 })
202 }
203
204 fn register_reachability_logger<T: ExtractTimestamp>(
209 &self,
210 registry: &mut Registry,
211 index: usize,
212 ) {
213 let logger = self.reachability_logger::<T>(index);
214 let type_name = std::any::type_name::<T>();
215 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
216 }
217
218 fn register_loggers(&self) {
222 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
223 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
224 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
225
226 let mut register = self.worker.log_register().expect("Logging must be enabled");
227 register.insert_logger("timely", t_logger);
228 self.register_reachability_logger::<Timestamp>(&mut register, 0);
231 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
232 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
233 register.insert_logger("differential/arrange", d_logger);
234 register.insert_logger("materialize/compute", c_logger.clone());
235
236 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
237 }
238
239 fn simple_logger<CB: ContainerBuilder>(
240 &self,
241 event_queue: EventQueue<CB::Container>,
242 ) -> Logger<CB> {
243 let [link] = event_queue.links;
244 let mut logger = BatchLogger::new(link, self.interval_ms);
245 let activator = event_queue.activator.clone();
246 Logger::new(
247 self.now,
248 self.start_offset,
249 move |time, data: &mut Option<CB::Container>| {
250 if let Some(data) = data.take() {
251 logger.publish_batch(data);
252 } else if logger.report_progress(*time) {
253 activator.activate();
254 }
255 },
256 )
257 }
258
259 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
262 where
263 T: ExtractTimestamp,
264 {
265 let link = Rc::clone(&self.r_event_queue.links[index]);
266 let mut logger = BatchLogger::new(link, self.interval_ms);
267 let mut massaged = Vec::new();
268 let mut builder = ColumnBuilder::default();
269 let activator = self.r_event_queue.activator.clone();
270
271 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
272 if let Some(data) = data {
273 for (time, event) in data.drain(..) {
275 match event {
276 TrackerEvent::SourceUpdate(update) => {
277 massaged.extend(update.updates.iter().map(
278 |(node, port, time, diff)| {
279 let is_source = true;
280 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
281 },
282 ));
283
284 builder.push_into((time, (update.tracker_id, &massaged)));
285 massaged.clear();
286 }
287 TrackerEvent::TargetUpdate(update) => {
288 massaged.extend(update.updates.iter().map(
289 |(node, port, time, diff)| {
290 let is_source = false;
291 (*node, *port, is_source, time.extract(), Diff::from(*diff))
292 },
293 ));
294
295 builder.push_into((time, (update.tracker_id, &massaged)));
296 massaged.clear();
297 }
298 }
299 while let Some(container) = builder.extract() {
300 logger.publish_batch(std::mem::take(container));
301 }
302 }
303 } else {
304 while let Some(container) = builder.finish() {
306 logger.publish_batch(std::mem::take(container));
307 }
308
309 if logger.report_progress(*batch_time) {
310 activator.activate();
311 }
312 }
313 };
314
315 Logger::new(self.now, self.start_offset, action)
316 }
317}
318
319trait ExtractTimestamp: Clone + 'static {
321 fn extract(&self) -> Timestamp;
323}
324
325impl ExtractTimestamp for Timestamp {
326 fn extract(&self) -> Timestamp {
327 *self
328 }
329}
330
331impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
332 fn extract(&self) -> Timestamp {
333 self.outer
334 }
335}
336
337impl ExtractTimestamp for (Timestamp, Subtime) {
338 fn extract(&self) -> Timestamp {
339 self.0
340 }
341}