1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_dyncfg::ConfigSet;
18use mz_ore::metrics::MetricsRegistry;
19use mz_repr::{Diff, Timestamp};
20use mz_storage_operators::persist_source::Subtime;
21use mz_storage_types::errors::DataflowError;
22use mz_timely_util::columnar::Column;
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::operator::CollectionExt;
25use mz_timely_util::scope_label::ScopeExt;
26use timely::ContainerBuilder;
27use timely::communication::Allocate;
28use timely::container::{ContainerBuilder as _, PushInto};
29use timely::dataflow::Scope;
30use timely::logging::{TimelyEvent, TimelyEventBuilder};
31use timely::logging_core::{Logger, Registry};
32use timely::order::Product;
33use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
34use timely::worker::AsWorker;
35
36use crate::arrangement::manager::TraceBundle;
37use crate::extensions::arrange::{KeyCollection, MzArrange};
38use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
39use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
40use crate::typedefs::{ErrBatcher, ErrBuilder};
41
42pub fn initialize<A: Allocate + 'static>(
47 worker: &mut timely::worker::Worker<A>,
48 config: &LoggingConfig,
49 metrics_registry: MetricsRegistry,
50 worker_config: Rc<ConfigSet>,
51 workers_per_process: usize,
52) -> LoggingTraces {
53 let interval_ms = std::cmp::max(1, config.interval.as_millis());
54
55 let now = Instant::now();
59 let start_offset = std::time::SystemTime::now()
60 .duration_since(std::time::SystemTime::UNIX_EPOCH)
61 .expect("Failed to get duration since Unix epoch");
62
63 let mut context = LoggingContext {
64 worker,
65 config,
66 interval_ms,
67 now,
68 start_offset,
69 t_event_queue: EventQueue::new("t"),
70 r_event_queue: EventQueue::new("r"),
71 d_event_queue: EventQueue::new("d"),
72 c_event_queue: EventQueue::new("c"),
73 shared_state: Default::default(),
74 metrics_registry,
75 worker_config,
76 workers_per_process,
77 };
78
79 let dataflow_index = context.worker.next_dataflow_index();
82 let traces = if config.log_logging {
83 context.register_loggers();
84 context.construct_dataflow()
85 } else {
86 let traces = context.construct_dataflow();
87 context.register_loggers();
88 traces
89 };
90
91 let compute_logger = worker.logger_for("materialize/compute").unwrap();
92 LoggingTraces {
93 traces,
94 dataflow_index,
95 compute_logger,
96 }
97}
98
99pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
100
101struct LoggingContext<'a, A: Allocate> {
102 worker: &'a mut timely::worker::Worker<A>,
103 config: &'a LoggingConfig,
104 interval_ms: u128,
105 now: Instant,
106 start_offset: Duration,
107 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
108 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
109 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
110 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
111 shared_state: Rc<RefCell<SharedLoggingState>>,
112 metrics_registry: MetricsRegistry,
113 worker_config: Rc<ConfigSet>,
114 workers_per_process: usize,
115}
116
117pub(crate) struct LoggingTraces {
118 pub traces: BTreeMap<LogVariant, TraceBundle>,
120 pub dataflow_index: usize,
122 pub compute_logger: super::compute::Logger,
124}
125
126impl<A: Allocate + 'static> LoggingContext<'_, A> {
127 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
128 self.worker.dataflow_named("Dataflow: logging", |scope| {
129 let scope = &mut scope.with_label();
130
131 let mut collections = BTreeMap::new();
132
133 let super::timely::Return {
134 collections: timely_collections,
135 } = super::timely::construct(
136 scope.clone(),
137 self.config,
138 self.t_event_queue.clone(),
139 Rc::clone(&self.shared_state),
140 );
141 collections.extend(timely_collections);
142
143 let super::reachability::Return {
144 collections: reachability_collections,
145 } = super::reachability::construct(
146 scope.clone(),
147 self.config,
148 self.r_event_queue.clone(),
149 );
150 collections.extend(reachability_collections);
151
152 let super::differential::Return {
153 collections: differential_collections,
154 } = super::differential::construct(
155 scope.clone(),
156 self.config,
157 self.d_event_queue.clone(),
158 Rc::clone(&self.shared_state),
159 );
160 collections.extend(differential_collections);
161
162 let super::compute::Return {
163 collections: compute_collections,
164 } = super::compute::construct(
165 scope.clone(),
166 scope.parent().clone(),
167 self.config,
168 self.c_event_queue.clone(),
169 Rc::clone(&self.shared_state),
170 );
171 collections.extend(compute_collections);
172
173 let super::prometheus::Return {
174 collections: prometheus_collections,
175 } = super::prometheus::construct(
176 scope.clone(),
177 self.config,
178 self.metrics_registry.clone(),
179 self.now,
180 self.start_offset,
181 Rc::clone(&self.worker_config),
182 self.workers_per_process,
183 );
184 collections.extend(prometheus_collections);
185
186 let errs = scope.scoped("logging errors", |scope| {
187 let collection: KeyCollection<_, DataflowError, Diff> =
188 VecCollection::empty(scope).into();
189 collection
190 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
191 .trace
192 });
193
194 let traces = collections
195 .into_iter()
196 .map(|(log, collection)| {
197 let bundle = TraceBundle::new(collection.trace, errs.clone())
198 .with_drop(collection.token);
199 (log, bundle)
200 })
201 .collect();
202 traces
203 })
204 }
205
206 fn register_reachability_logger<T: ExtractTimestamp>(
211 &self,
212 registry: &mut Registry,
213 index: usize,
214 ) {
215 let logger = self.reachability_logger::<T>(index);
216 let type_name = std::any::type_name::<T>();
217 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
218 }
219
220 fn register_loggers(&self) {
224 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
225 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
226 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
227
228 let mut register = self.worker.log_register().expect("Logging must be enabled");
229 register.insert_logger("timely", t_logger);
230 self.register_reachability_logger::<Timestamp>(&mut register, 0);
233 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
234 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
235 register.insert_logger("differential/arrange", d_logger);
236 register.insert_logger("materialize/compute", c_logger.clone());
237
238 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
239 }
240
241 fn simple_logger<CB: ContainerBuilder>(
242 &self,
243 event_queue: EventQueue<CB::Container>,
244 ) -> Logger<CB> {
245 let [link] = event_queue.links;
246 let mut logger = BatchLogger::new(link, self.interval_ms);
247 let activator = event_queue.activator.clone();
248 Logger::new(
249 self.now,
250 self.start_offset,
251 move |time, data: &mut Option<CB::Container>| {
252 if let Some(data) = data.take() {
253 logger.publish_batch(data);
254 } else if logger.report_progress(*time) {
255 activator.activate();
256 }
257 },
258 )
259 }
260
261 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
264 where
265 T: ExtractTimestamp,
266 {
267 let link = Rc::clone(&self.r_event_queue.links[index]);
268 let mut logger = BatchLogger::new(link, self.interval_ms);
269 let mut massaged = Vec::new();
270 let mut builder = ColumnBuilder::default();
271 let activator = self.r_event_queue.activator.clone();
272
273 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
274 if let Some(data) = data {
275 for (time, event) in data.drain(..) {
277 match event {
278 TrackerEvent::SourceUpdate(update) => {
279 massaged.extend(update.updates.iter().map(
280 |(node, port, time, diff)| {
281 let is_source = true;
282 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
283 },
284 ));
285
286 builder.push_into((time, (update.tracker_id, &massaged)));
287 massaged.clear();
288 }
289 TrackerEvent::TargetUpdate(update) => {
290 massaged.extend(update.updates.iter().map(
291 |(node, port, time, diff)| {
292 let is_source = false;
293 (*node, *port, is_source, time.extract(), Diff::from(*diff))
294 },
295 ));
296
297 builder.push_into((time, (update.tracker_id, &massaged)));
298 massaged.clear();
299 }
300 }
301 while let Some(container) = builder.extract() {
302 logger.publish_batch(std::mem::take(container));
303 }
304 }
305 } else {
306 while let Some(container) = builder.finish() {
308 logger.publish_batch(std::mem::take(container));
309 }
310
311 if logger.report_progress(*batch_time) {
312 activator.activate();
313 }
314 }
315 };
316
317 Logger::new(self.now, self.start_offset, action)
318 }
319}
320
321trait ExtractTimestamp: Clone + 'static {
323 fn extract(&self) -> Timestamp;
325}
326
327impl ExtractTimestamp for Timestamp {
328 fn extract(&self) -> Timestamp {
329 *self
330 }
331}
332
333impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
334 fn extract(&self) -> Timestamp {
335 self.outer
336 }
337}
338
339impl ExtractTimestamp for (Timestamp, Subtime) {
340 fn extract(&self) -> Timestamp {
341 self.0
342 }
343}