1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::Collection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::columnar::Column;
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::operator::CollectionExt;
23use timely::communication::Allocate;
24use timely::container::{ContainerBuilder, PushInto};
25use timely::dataflow::Scope;
26use timely::logging::{TimelyEvent, TimelyEventBuilder};
27use timely::logging_core::{Logger, Registry};
28use timely::order::Product;
29use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
30use timely::worker::AsWorker;
31
32use crate::arrangement::manager::TraceBundle;
33use crate::extensions::arrange::{KeyCollection, MzArrange};
34use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
35use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
36use crate::typedefs::{ErrBatcher, ErrBuilder};
37
38pub fn initialize<A: Allocate + 'static>(
43 worker: &mut timely::worker::Worker<A>,
44 config: &LoggingConfig,
45) -> LoggingTraces {
46 let interval_ms = std::cmp::max(1, config.interval.as_millis());
47
48 let now = Instant::now();
52 let start_offset = std::time::SystemTime::now()
53 .duration_since(std::time::SystemTime::UNIX_EPOCH)
54 .expect("Failed to get duration since Unix epoch");
55
56 let mut context = LoggingContext {
57 worker,
58 config,
59 interval_ms,
60 now,
61 start_offset,
62 t_event_queue: EventQueue::new("t"),
63 r_event_queue: EventQueue::new("r"),
64 d_event_queue: EventQueue::new("d"),
65 c_event_queue: EventQueue::new("c"),
66 shared_state: Default::default(),
67 };
68
69 let dataflow_index = context.worker.next_dataflow_index();
72 let traces = if config.log_logging {
73 context.register_loggers();
74 context.construct_dataflow()
75 } else {
76 let traces = context.construct_dataflow();
77 context.register_loggers();
78 traces
79 };
80
81 let compute_logger = worker.logger_for("materialize/compute").unwrap();
82 LoggingTraces {
83 traces,
84 dataflow_index,
85 compute_logger,
86 }
87}
88
89pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
90
91struct LoggingContext<'a, A: Allocate> {
92 worker: &'a mut timely::worker::Worker<A>,
93 config: &'a LoggingConfig,
94 interval_ms: u128,
95 now: Instant,
96 start_offset: Duration,
97 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
98 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
99 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
100 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
101 shared_state: Rc<RefCell<SharedLoggingState>>,
102}
103
104pub(crate) struct LoggingTraces {
105 pub traces: BTreeMap<LogVariant, TraceBundle>,
107 pub dataflow_index: usize,
109 pub compute_logger: super::compute::Logger,
111}
112
113impl<A: Allocate + 'static> LoggingContext<'_, A> {
114 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
115 self.worker.dataflow_named("Dataflow: logging", |scope| {
116 let mut collections = BTreeMap::new();
117
118 let super::timely::Return {
119 collections: timely_collections,
120 } = super::timely::construct(
121 scope.clone(),
122 self.config,
123 self.t_event_queue.clone(),
124 Rc::clone(&self.shared_state),
125 );
126 collections.extend(timely_collections);
127
128 let super::reachability::Return {
129 collections: reachability_collections,
130 } = super::reachability::construct(
131 scope.clone(),
132 self.config,
133 self.r_event_queue.clone(),
134 );
135 collections.extend(reachability_collections);
136
137 let super::differential::Return {
138 collections: differential_collections,
139 } = super::differential::construct(
140 scope.clone(),
141 self.config,
142 self.d_event_queue.clone(),
143 Rc::clone(&self.shared_state),
144 );
145 collections.extend(differential_collections);
146
147 let super::compute::Return {
148 collections: compute_collections,
149 } = super::compute::construct(
150 scope.clone(),
151 scope.parent.clone(),
152 self.config,
153 self.c_event_queue.clone(),
154 Rc::clone(&self.shared_state),
155 );
156 collections.extend(compute_collections);
157
158 let errs = scope.scoped("logging errors", |scope| {
159 let collection: KeyCollection<_, DataflowError, Diff> =
160 Collection::empty(scope).into();
161 collection
162 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
163 .trace
164 });
165
166 let traces = collections
167 .into_iter()
168 .map(|(log, collection)| {
169 let bundle = TraceBundle::new(collection.trace, errs.clone())
170 .with_drop(collection.token);
171 (log, bundle)
172 })
173 .collect();
174 traces
175 })
176 }
177
178 fn register_reachability_logger<T: ExtractTimestamp>(
183 &self,
184 registry: &mut Registry,
185 index: usize,
186 ) {
187 let logger = self.reachability_logger::<T>(index);
188 let type_name = std::any::type_name::<T>();
189 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
190 }
191
192 fn register_loggers(&self) {
196 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
197 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
198 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
199
200 let mut register = self.worker.log_register().expect("Logging must be enabled");
201 register.insert_logger("timely", t_logger);
202 self.register_reachability_logger::<Timestamp>(&mut register, 0);
205 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
206 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
207 register.insert_logger("differential/arrange", d_logger);
208 register.insert_logger("materialize/compute", c_logger.clone());
209
210 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
211 }
212
213 fn simple_logger<CB: ContainerBuilder>(
214 &self,
215 event_queue: EventQueue<CB::Container>,
216 ) -> Logger<CB> {
217 let [link] = event_queue.links;
218 let mut logger = BatchLogger::new(link, self.interval_ms);
219 let activator = event_queue.activator.clone();
220 Logger::new(
221 self.now,
222 self.start_offset,
223 move |time, data: &mut Option<CB::Container>| {
224 if let Some(data) = data.take() {
225 logger.publish_batch(data);
226 } else if logger.report_progress(*time) {
227 activator.activate();
228 }
229 },
230 )
231 }
232
233 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
236 where
237 T: ExtractTimestamp,
238 {
239 let link = Rc::clone(&self.r_event_queue.links[index]);
240 let mut logger = BatchLogger::new(link, self.interval_ms);
241 let mut massaged = Vec::new();
242 let mut builder = ColumnBuilder::default();
243 let activator = self.r_event_queue.activator.clone();
244
245 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
246 if let Some(data) = data {
247 for (time, event) in data.drain(..) {
249 match event {
250 TrackerEvent::SourceUpdate(update) => {
251 massaged.extend(update.updates.iter().map(
252 |(node, port, time, diff)| {
253 let is_source = true;
254 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
255 },
256 ));
257
258 builder.push_into((time, (update.tracker_id, &massaged)));
259 massaged.clear();
260 }
261 TrackerEvent::TargetUpdate(update) => {
262 massaged.extend(update.updates.iter().map(
263 |(node, port, time, diff)| {
264 let is_source = false;
265 (*node, *port, is_source, time.extract(), Diff::from(*diff))
266 },
267 ));
268
269 builder.push_into((time, (update.tracker_id, &massaged)));
270 massaged.clear();
271 }
272 }
273 while let Some(container) = builder.extract() {
274 logger.publish_batch(std::mem::take(container));
275 }
276 }
277 } else {
278 while let Some(container) = builder.finish() {
280 logger.publish_batch(std::mem::take(container));
281 }
282
283 if logger.report_progress(*batch_time) {
284 activator.activate();
285 }
286 }
287 };
288
289 Logger::new(self.now, self.start_offset, action)
290 }
291}
292
293trait ExtractTimestamp: Clone + 'static {
295 fn extract(&self) -> Timestamp;
297}
298
299impl ExtractTimestamp for Timestamp {
300 fn extract(&self) -> Timestamp {
301 *self
302 }
303}
304
305impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
306 fn extract(&self) -> Timestamp {
307 self.outer
308 }
309}
310
311impl ExtractTimestamp for (Timestamp, Subtime) {
312 fn extract(&self) -> Timestamp {
313 self.0
314 }
315}