1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::columnar::Column;
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::operator::CollectionExt;
23use mz_timely_util::scope_label::ScopeExt;
24use timely::ContainerBuilder;
25use timely::communication::Allocate;
26use timely::container::{ContainerBuilder as _, PushInto};
27use timely::dataflow::Scope;
28use timely::logging::{TimelyEvent, TimelyEventBuilder};
29use timely::logging_core::{Logger, Registry};
30use timely::order::Product;
31use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
32use timely::worker::AsWorker;
33
34use crate::arrangement::manager::TraceBundle;
35use crate::extensions::arrange::{KeyCollection, MzArrange};
36use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
37use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
38use crate::typedefs::{ErrBatcher, ErrBuilder};
39
40pub fn initialize<A: Allocate + 'static>(
45 worker: &mut timely::worker::Worker<A>,
46 config: &LoggingConfig,
47) -> LoggingTraces {
48 let interval_ms = std::cmp::max(1, config.interval.as_millis());
49
50 let now = Instant::now();
54 let start_offset = std::time::SystemTime::now()
55 .duration_since(std::time::SystemTime::UNIX_EPOCH)
56 .expect("Failed to get duration since Unix epoch");
57
58 let mut context = LoggingContext {
59 worker,
60 config,
61 interval_ms,
62 now,
63 start_offset,
64 t_event_queue: EventQueue::new("t"),
65 r_event_queue: EventQueue::new("r"),
66 d_event_queue: EventQueue::new("d"),
67 c_event_queue: EventQueue::new("c"),
68 shared_state: Default::default(),
69 };
70
71 let dataflow_index = context.worker.next_dataflow_index();
74 let traces = if config.log_logging {
75 context.register_loggers();
76 context.construct_dataflow()
77 } else {
78 let traces = context.construct_dataflow();
79 context.register_loggers();
80 traces
81 };
82
83 let compute_logger = worker.logger_for("materialize/compute").unwrap();
84 LoggingTraces {
85 traces,
86 dataflow_index,
87 compute_logger,
88 }
89}
90
91pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
92
93struct LoggingContext<'a, A: Allocate> {
94 worker: &'a mut timely::worker::Worker<A>,
95 config: &'a LoggingConfig,
96 interval_ms: u128,
97 now: Instant,
98 start_offset: Duration,
99 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
100 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
101 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
102 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
103 shared_state: Rc<RefCell<SharedLoggingState>>,
104}
105
106pub(crate) struct LoggingTraces {
107 pub traces: BTreeMap<LogVariant, TraceBundle>,
109 pub dataflow_index: usize,
111 pub compute_logger: super::compute::Logger,
113}
114
115impl<A: Allocate + 'static> LoggingContext<'_, A> {
116 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
117 self.worker.dataflow_named("Dataflow: logging", |scope| {
118 let scope = &mut scope.with_label();
119
120 let mut collections = BTreeMap::new();
121
122 let super::timely::Return {
123 collections: timely_collections,
124 } = super::timely::construct(
125 scope.clone(),
126 self.config,
127 self.t_event_queue.clone(),
128 Rc::clone(&self.shared_state),
129 );
130 collections.extend(timely_collections);
131
132 let super::reachability::Return {
133 collections: reachability_collections,
134 } = super::reachability::construct(
135 scope.clone(),
136 self.config,
137 self.r_event_queue.clone(),
138 );
139 collections.extend(reachability_collections);
140
141 let super::differential::Return {
142 collections: differential_collections,
143 } = super::differential::construct(
144 scope.clone(),
145 self.config,
146 self.d_event_queue.clone(),
147 Rc::clone(&self.shared_state),
148 );
149 collections.extend(differential_collections);
150
151 let super::compute::Return {
152 collections: compute_collections,
153 } = super::compute::construct(
154 scope.clone(),
155 scope.parent().clone(),
156 self.config,
157 self.c_event_queue.clone(),
158 Rc::clone(&self.shared_state),
159 );
160 collections.extend(compute_collections);
161
162 let errs = scope.scoped("logging errors", |scope| {
163 let collection: KeyCollection<_, DataflowError, Diff> =
164 VecCollection::empty(scope).into();
165 collection
166 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
167 .trace
168 });
169
170 let traces = collections
171 .into_iter()
172 .map(|(log, collection)| {
173 let bundle = TraceBundle::new(collection.trace, errs.clone())
174 .with_drop(collection.token);
175 (log, bundle)
176 })
177 .collect();
178 traces
179 })
180 }
181
182 fn register_reachability_logger<T: ExtractTimestamp>(
187 &self,
188 registry: &mut Registry,
189 index: usize,
190 ) {
191 let logger = self.reachability_logger::<T>(index);
192 let type_name = std::any::type_name::<T>();
193 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
194 }
195
196 fn register_loggers(&self) {
200 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
201 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
202 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
203
204 let mut register = self.worker.log_register().expect("Logging must be enabled");
205 register.insert_logger("timely", t_logger);
206 self.register_reachability_logger::<Timestamp>(&mut register, 0);
209 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
210 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
211 register.insert_logger("differential/arrange", d_logger);
212 register.insert_logger("materialize/compute", c_logger.clone());
213
214 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
215 }
216
217 fn simple_logger<CB: ContainerBuilder>(
218 &self,
219 event_queue: EventQueue<CB::Container>,
220 ) -> Logger<CB> {
221 let [link] = event_queue.links;
222 let mut logger = BatchLogger::new(link, self.interval_ms);
223 let activator = event_queue.activator.clone();
224 Logger::new(
225 self.now,
226 self.start_offset,
227 move |time, data: &mut Option<CB::Container>| {
228 if let Some(data) = data.take() {
229 logger.publish_batch(data);
230 } else if logger.report_progress(*time) {
231 activator.activate();
232 }
233 },
234 )
235 }
236
237 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
240 where
241 T: ExtractTimestamp,
242 {
243 let link = Rc::clone(&self.r_event_queue.links[index]);
244 let mut logger = BatchLogger::new(link, self.interval_ms);
245 let mut massaged = Vec::new();
246 let mut builder = ColumnBuilder::default();
247 let activator = self.r_event_queue.activator.clone();
248
249 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
250 if let Some(data) = data {
251 for (time, event) in data.drain(..) {
253 match event {
254 TrackerEvent::SourceUpdate(update) => {
255 massaged.extend(update.updates.iter().map(
256 |(node, port, time, diff)| {
257 let is_source = true;
258 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
259 },
260 ));
261
262 builder.push_into((time, (update.tracker_id, &massaged)));
263 massaged.clear();
264 }
265 TrackerEvent::TargetUpdate(update) => {
266 massaged.extend(update.updates.iter().map(
267 |(node, port, time, diff)| {
268 let is_source = false;
269 (*node, *port, is_source, time.extract(), Diff::from(*diff))
270 },
271 ));
272
273 builder.push_into((time, (update.tracker_id, &massaged)));
274 massaged.clear();
275 }
276 }
277 while let Some(container) = builder.extract() {
278 logger.publish_batch(std::mem::take(container));
279 }
280 }
281 } else {
282 while let Some(container) = builder.finish() {
284 logger.publish_batch(std::mem::take(container));
285 }
286
287 if logger.report_progress(*batch_time) {
288 activator.activate();
289 }
290 }
291 };
292
293 Logger::new(self.now, self.start_offset, action)
294 }
295}
296
297trait ExtractTimestamp: Clone + 'static {
299 fn extract(&self) -> Timestamp;
301}
302
303impl ExtractTimestamp for Timestamp {
304 fn extract(&self) -> Timestamp {
305 *self
306 }
307}
308
309impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
310 fn extract(&self) -> Timestamp {
311 self.outer
312 }
313}
314
315impl ExtractTimestamp for (Timestamp, Subtime) {
316 fn extract(&self) -> Timestamp {
317 self.0
318 }
319}