1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use crate::arrangement::manager::TraceBundle;
14use crate::extensions::arrange::{KeyCollection, MzArrange};
15use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
16use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
17use crate::typedefs::{ErrBatcher, ErrBuilder};
18use differential_dataflow::VecCollection;
19use differential_dataflow::dynamic::pointstamp::PointStamp;
20use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
21use mz_compute_client::logging::{LogVariant, LoggingConfig};
22use mz_dyncfg::ConfigSet;
23use mz_ore::metrics::MetricsRegistry;
24use mz_repr::{Diff, Timestamp};
25use mz_storage_operators::persist_source::Subtime;
26use mz_storage_types::errors::DataflowError;
27use mz_timely_util::columnar::Column;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::operator::CollectionExt;
30use mz_timely_util::scope_label::ScopeExt;
31use timely::ContainerBuilder;
32use timely::container::{ContainerBuilder as _, PushInto};
33use timely::logging::{TimelyEvent, TimelyEventBuilder};
34use timely::logging_core::{Logger, Registry};
35use timely::order::Product;
36use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
37
38pub fn initialize(
43 worker: &mut timely::worker::Worker,
44 config: &LoggingConfig,
45 metrics_registry: MetricsRegistry,
46 worker_config: Rc<ConfigSet>,
47 workers_per_process: usize,
48) -> LoggingTraces {
49 let interval_ms = std::cmp::max(1, config.interval.as_millis());
50
51 let now = Instant::now();
55 let start_offset = std::time::SystemTime::now()
56 .duration_since(std::time::SystemTime::UNIX_EPOCH)
57 .expect("Failed to get duration since Unix epoch");
58
59 let mut context = LoggingContext {
60 worker,
61 config,
62 interval_ms,
63 now,
64 start_offset,
65 t_event_queue: EventQueue::new("t"),
66 r_event_queue: EventQueue::new("r"),
67 d_event_queue: EventQueue::new("d"),
68 c_event_queue: EventQueue::new("c"),
69 shared_state: Default::default(),
70 metrics_registry,
71 worker_config,
72 workers_per_process,
73 };
74
75 let dataflow_index = context.worker.next_dataflow_index();
78 let traces = if config.log_logging {
79 context.register_loggers();
80 context.construct_dataflow()
81 } else {
82 let traces = context.construct_dataflow();
83 context.register_loggers();
84 traces
85 };
86
87 let compute_logger = worker.logger_for("materialize/compute").unwrap();
88 LoggingTraces {
89 traces,
90 dataflow_index,
91 compute_logger,
92 }
93}
94
95pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
96
97struct LoggingContext<'a> {
98 worker: &'a mut timely::worker::Worker,
99 config: &'a LoggingConfig,
100 interval_ms: u128,
101 now: Instant,
102 start_offset: Duration,
103 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
104 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
105 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
106 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
107 shared_state: Rc<RefCell<SharedLoggingState>>,
108 metrics_registry: MetricsRegistry,
109 worker_config: Rc<ConfigSet>,
110 workers_per_process: usize,
111}
112
113pub(crate) struct LoggingTraces {
114 pub traces: BTreeMap<LogVariant, TraceBundle>,
116 pub dataflow_index: usize,
118 pub compute_logger: super::compute::Logger,
120}
121
122impl LoggingContext<'_> {
123 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
124 self.worker.dataflow_named("Dataflow: logging", |scope| {
125 let scope = scope.with_label();
126
127 let mut collections = BTreeMap::new();
128
129 let super::timely::Return {
130 collections: timely_collections,
131 } = super::timely::construct(
132 scope,
133 self.config,
134 self.t_event_queue.clone(),
135 Rc::clone(&self.shared_state),
136 );
137 collections.extend(timely_collections);
138
139 let super::reachability::Return {
140 collections: reachability_collections,
141 } = super::reachability::construct(scope, self.config, self.r_event_queue.clone());
142 collections.extend(reachability_collections);
143
144 let super::differential::Return {
145 collections: differential_collections,
146 } = super::differential::construct(
147 scope,
148 self.config,
149 self.d_event_queue.clone(),
150 Rc::clone(&self.shared_state),
151 );
152 collections.extend(differential_collections);
153
154 let super::compute::Return {
155 collections: compute_collections,
156 } = super::compute::construct(
157 scope.clone(),
158 scope.activations(),
159 self.config,
160 self.c_event_queue.clone(),
161 Rc::clone(&self.shared_state),
162 );
163 collections.extend(compute_collections);
164
165 let super::prometheus::Return {
166 collections: prometheus_collections,
167 } = super::prometheus::construct(
168 scope,
169 self.config,
170 self.metrics_registry.clone(),
171 self.now,
172 self.start_offset,
173 Rc::clone(&self.worker_config),
174 self.workers_per_process,
175 );
176 collections.extend(prometheus_collections);
177
178 let errs = scope.scoped("logging errors", |scope| {
179 let collection: KeyCollection<_, DataflowError, Diff> =
180 VecCollection::empty(scope).into();
181 collection
182 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
183 .trace
184 });
185
186 let traces = collections
187 .into_iter()
188 .map(|(log, collection)| {
189 let bundle = TraceBundle::new(collection.trace, errs.clone())
190 .with_drop(collection.token);
191 (log, bundle)
192 })
193 .collect();
194 traces
195 })
196 }
197
198 fn register_reachability_logger<T: ExtractTimestamp>(
203 &self,
204 registry: &mut Registry,
205 index: usize,
206 ) {
207 let logger = self.reachability_logger::<T>(index);
208 let type_name = std::any::type_name::<T>();
209 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
210 }
211
212 fn register_loggers(&self) {
216 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
217 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
218 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
219
220 let mut register = self.worker.log_register().expect("Logging must be enabled");
221 register.insert_logger("timely", t_logger);
222 self.register_reachability_logger::<Timestamp>(&mut register, 0);
225 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
226 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
227 register.insert_logger("differential/arrange", d_logger);
228 register.insert_logger("materialize/compute", c_logger.clone());
229
230 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
231 }
232
233 fn simple_logger<CB: ContainerBuilder>(
234 &self,
235 event_queue: EventQueue<CB::Container>,
236 ) -> Logger<CB> {
237 let [link] = event_queue.links;
238 let mut logger = BatchLogger::new(link, self.interval_ms);
239 let activator = event_queue.activator.clone();
240 Logger::new(
241 self.now,
242 self.start_offset,
243 move |time, data: &mut Option<CB::Container>| {
244 if let Some(data) = data.take() {
245 logger.publish_batch(data);
246 } else if logger.report_progress(*time) {
247 activator.activate();
248 }
249 },
250 )
251 }
252
253 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
256 where
257 T: ExtractTimestamp,
258 {
259 let link = Rc::clone(&self.r_event_queue.links[index]);
260 let mut logger = BatchLogger::new(link, self.interval_ms);
261 let mut massaged = Vec::new();
262 let mut builder = ColumnBuilder::default();
263 let activator = self.r_event_queue.activator.clone();
264
265 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
266 if let Some(data) = data {
267 for (time, event) in data.drain(..) {
269 match event {
270 TrackerEvent::SourceUpdate(update) => {
271 massaged.extend(update.updates.iter().map(
272 |(node, port, time, diff)| {
273 let is_source = true;
274 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
275 },
276 ));
277
278 builder.push_into((time, (update.tracker_id, &massaged)));
279 massaged.clear();
280 }
281 TrackerEvent::TargetUpdate(update) => {
282 massaged.extend(update.updates.iter().map(
283 |(node, port, time, diff)| {
284 let is_source = false;
285 (*node, *port, is_source, time.extract(), Diff::from(*diff))
286 },
287 ));
288
289 builder.push_into((time, (update.tracker_id, &massaged)));
290 massaged.clear();
291 }
292 }
293 while let Some(container) = builder.extract() {
294 logger.publish_batch(std::mem::take(container));
295 }
296 }
297 } else {
298 while let Some(container) = builder.finish() {
300 logger.publish_batch(std::mem::take(container));
301 }
302
303 if logger.report_progress(*batch_time) {
304 activator.activate();
305 }
306 }
307 };
308
309 Logger::new(self.now, self.start_offset, action)
310 }
311}
312
313trait ExtractTimestamp: Clone + 'static {
315 fn extract(&self) -> Timestamp;
317}
318
319impl ExtractTimestamp for Timestamp {
320 fn extract(&self) -> Timestamp {
321 *self
322 }
323}
324
325impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
326 fn extract(&self) -> Timestamp {
327 self.outer
328 }
329}
330
331impl ExtractTimestamp for (Timestamp, Subtime) {
332 fn extract(&self) -> Timestamp {
333 self.0
334 }
335}