1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_dyncfg::ConfigSet;
18use mz_ore::metrics::MetricsRegistry;
19use mz_repr::{Diff, Timestamp};
20use mz_storage_operators::persist_source::Subtime;
21use mz_timely_util::columnar::Column;
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnation::ColumnationChunker;
24use mz_timely_util::operator::CollectionExt;
25use mz_timely_util::scope_label::ScopeExt;
26use timely::ContainerBuilder;
27use timely::container::{ContainerBuilder as _, PushInto};
28use timely::logging::{TimelyEvent, TimelyEventBuilder};
29use timely::logging_core::{Logger, Registry};
30use timely::order::Product;
31use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
32
33use crate::arrangement::manager::TraceBundle;
34use crate::extensions::arrange::{KeyCollection, MzArrange};
35use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
36use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
37use crate::render::errors::DataflowErrorSer;
38use crate::typedefs::{ErrBatcher, ErrBuilder};
39
40pub fn initialize(
45 worker: &mut timely::worker::Worker,
46 config: &LoggingConfig,
47 metrics_registry: MetricsRegistry,
48 worker_config: Rc<ConfigSet>,
49 workers_per_process: usize,
50 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
51) -> LoggingTraces {
52 let interval_ms = std::cmp::max(1, config.interval.as_millis());
53
54 let now = Instant::now();
58 let start_offset = std::time::SystemTime::now()
59 .duration_since(std::time::SystemTime::UNIX_EPOCH)
60 .expect("Failed to get duration since Unix epoch");
61
62 let mut context = LoggingContext {
63 worker,
64 config,
65 interval_ms,
66 now,
67 start_offset,
68 t_event_queue: EventQueue::new("t"),
69 r_event_queue: EventQueue::new("r"),
70 d_event_queue: EventQueue::new("d"),
71 c_event_queue: EventQueue::new("c"),
72 shared_state: Default::default(),
73 metrics_registry,
74 worker_config,
75 workers_per_process,
76 storage_log_reader,
77 };
78
79 let dataflow_index = context.worker.next_dataflow_index();
82 let traces = if config.log_logging {
83 context.register_loggers();
84 context.construct_dataflow()
85 } else {
86 let traces = context.construct_dataflow();
87 context.register_loggers();
88 traces
89 };
90
91 let compute_logger = worker.logger_for("materialize/compute").unwrap();
92 LoggingTraces {
93 traces,
94 dataflow_index,
95 compute_logger,
96 }
97}
98
99pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
100
101struct LoggingContext<'a> {
102 worker: &'a mut timely::worker::Worker,
103 config: &'a LoggingConfig,
104 interval_ms: u128,
105 now: Instant,
106 start_offset: Duration,
107 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
108 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
109 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
110 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
111 shared_state: Rc<RefCell<SharedLoggingState>>,
112 metrics_registry: MetricsRegistry,
113 worker_config: Rc<ConfigSet>,
114 workers_per_process: usize,
115 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
117}
118
119pub(crate) struct LoggingTraces {
120 pub traces: BTreeMap<LogVariant, TraceBundle>,
122 pub dataflow_index: usize,
124 pub compute_logger: super::compute::Logger,
126}
127
128impl LoggingContext<'_> {
129 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
130 self.worker.dataflow_named("Dataflow: logging", |scope| {
131 let scope = scope.with_label();
132
133 let mut collections = BTreeMap::new();
134
135 let super::timely::Return {
136 collections: timely_collections,
137 } = super::timely::construct(
138 scope,
139 self.config,
140 self.t_event_queue.clone(),
141 Rc::clone(&self.shared_state),
142 self.storage_log_reader.take(),
143 );
144 collections.extend(timely_collections);
145
146 let super::reachability::Return {
147 collections: reachability_collections,
148 } = super::reachability::construct(scope, self.config, self.r_event_queue.clone());
149 collections.extend(reachability_collections);
150
151 let super::differential::Return {
152 collections: differential_collections,
153 } = super::differential::construct(
154 scope,
155 self.config,
156 self.d_event_queue.clone(),
157 Rc::clone(&self.shared_state),
158 );
159 collections.extend(differential_collections);
160
161 let super::compute::Return {
162 collections: compute_collections,
163 } = super::compute::construct(
164 scope.clone(),
165 scope.activations(),
166 self.config,
167 self.c_event_queue.clone(),
168 Rc::clone(&self.shared_state),
169 );
170 collections.extend(compute_collections);
171
172 let super::prometheus::Return {
173 collections: prometheus_collections,
174 } = super::prometheus::construct(
175 scope,
176 self.config,
177 self.metrics_registry.clone(),
178 self.now,
179 self.start_offset,
180 Rc::clone(&self.worker_config),
181 self.workers_per_process,
182 );
183 collections.extend(prometheus_collections);
184
185 let errs = scope.scoped("logging errors", |scope| {
186 let collection: KeyCollection<_, DataflowErrorSer, Diff> =
187 VecCollection::empty(scope).into();
188 collection
189 .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
190 "Arrange logging err",
191 )
192 .trace
193 });
194
195 let traces = collections
196 .into_iter()
197 .map(|(log, collection)| {
198 let bundle = TraceBundle::new(collection.trace, errs.clone())
199 .with_drop(collection.token);
200 (log, bundle)
201 })
202 .collect();
203 traces
204 })
205 }
206
207 fn register_reachability_logger<T: ExtractTimestamp>(
212 &self,
213 registry: &mut Registry,
214 index: usize,
215 ) {
216 let logger = self.reachability_logger::<T>(index);
217 let type_name = std::any::type_name::<T>();
218 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
219 }
220
221 fn register_loggers(&self) {
225 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
226 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
227 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
228
229 let mut register = self.worker.log_register().expect("Logging must be enabled");
230 register.insert_logger("timely", t_logger);
231 self.register_reachability_logger::<Timestamp>(&mut register, 0);
234 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
235 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
236 register.insert_logger("differential/arrange", d_logger);
237 register.insert_logger("materialize/compute", c_logger.clone());
238
239 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
240 }
241
242 fn simple_logger<CB: ContainerBuilder>(
243 &self,
244 event_queue: EventQueue<CB::Container>,
245 ) -> Logger<CB> {
246 let [link] = event_queue.links;
247 let mut logger = BatchLogger::new(link, self.interval_ms);
248 let activator = event_queue.activator.clone();
249 Logger::new(
250 self.now,
251 self.start_offset,
252 move |time, data: &mut Option<CB::Container>| {
253 if let Some(data) = data.take() {
254 logger.publish_batch(data);
255 } else if logger.report_progress(*time) {
256 activator.activate();
257 }
258 },
259 )
260 }
261
262 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
265 where
266 T: ExtractTimestamp,
267 {
268 let link = Rc::clone(&self.r_event_queue.links[index]);
269 let mut logger = BatchLogger::new(link, self.interval_ms);
270 let mut massaged = Vec::new();
271 let mut builder = ColumnBuilder::default();
272 let activator = self.r_event_queue.activator.clone();
273
274 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
275 if let Some(data) = data {
276 for (time, event) in data.drain(..) {
278 match event {
279 TrackerEvent::SourceUpdate(update) => {
280 massaged.extend(update.updates.iter().map(
281 |(node, port, time, diff)| {
282 let is_source = true;
283 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
284 },
285 ));
286
287 builder.push_into((time, (update.tracker_id, &massaged)));
288 massaged.clear();
289 }
290 TrackerEvent::TargetUpdate(update) => {
291 massaged.extend(update.updates.iter().map(
292 |(node, port, time, diff)| {
293 let is_source = false;
294 (*node, *port, is_source, time.extract(), Diff::from(*diff))
295 },
296 ));
297
298 builder.push_into((time, (update.tracker_id, &massaged)));
299 massaged.clear();
300 }
301 }
302 while let Some(container) = builder.extract() {
303 logger.publish_batch(std::mem::take(container));
304 }
305 }
306 } else {
307 while let Some(container) = builder.finish() {
309 logger.publish_batch(std::mem::take(container));
310 }
311
312 if logger.report_progress(*batch_time) {
313 activator.activate();
314 }
315 }
316 };
317
318 Logger::new(self.now, self.start_offset, action)
319 }
320}
321
322trait ExtractTimestamp: Clone + 'static {
324 fn extract(&self) -> Timestamp;
326}
327
328impl ExtractTimestamp for Timestamp {
329 fn extract(&self) -> Timestamp {
330 *self
331 }
332}
333
334impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
335 fn extract(&self) -> Timestamp {
336 self.outer
337 }
338}
339
340impl ExtractTimestamp for (Timestamp, Subtime) {
341 fn extract(&self) -> Timestamp {
342 self.0
343 }
344}