1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::columnar::Column;
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::operator::CollectionExt;
23use timely::ContainerBuilder;
24use timely::communication::Allocate;
25use timely::container::{ContainerBuilder as _, PushInto};
26use timely::dataflow::Scope;
27use timely::logging::{TimelyEvent, TimelyEventBuilder};
28use timely::logging_core::{Logger, Registry};
29use timely::order::Product;
30use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
31use timely::worker::AsWorker;
32
33use crate::arrangement::manager::TraceBundle;
34use crate::extensions::arrange::{KeyCollection, MzArrange};
35use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
36use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
37use crate::typedefs::{ErrBatcher, ErrBuilder};
38
39pub fn initialize<A: Allocate + 'static>(
44 worker: &mut timely::worker::Worker<A>,
45 config: &LoggingConfig,
46) -> LoggingTraces {
47 let interval_ms = std::cmp::max(1, config.interval.as_millis());
48
49 let now = Instant::now();
53 let start_offset = std::time::SystemTime::now()
54 .duration_since(std::time::SystemTime::UNIX_EPOCH)
55 .expect("Failed to get duration since Unix epoch");
56
57 let mut context = LoggingContext {
58 worker,
59 config,
60 interval_ms,
61 now,
62 start_offset,
63 t_event_queue: EventQueue::new("t"),
64 r_event_queue: EventQueue::new("r"),
65 d_event_queue: EventQueue::new("d"),
66 c_event_queue: EventQueue::new("c"),
67 shared_state: Default::default(),
68 };
69
70 let dataflow_index = context.worker.next_dataflow_index();
73 let traces = if config.log_logging {
74 context.register_loggers();
75 context.construct_dataflow()
76 } else {
77 let traces = context.construct_dataflow();
78 context.register_loggers();
79 traces
80 };
81
82 let compute_logger = worker.logger_for("materialize/compute").unwrap();
83 LoggingTraces {
84 traces,
85 dataflow_index,
86 compute_logger,
87 }
88}
89
90pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
91
92struct LoggingContext<'a, A: Allocate> {
93 worker: &'a mut timely::worker::Worker<A>,
94 config: &'a LoggingConfig,
95 interval_ms: u128,
96 now: Instant,
97 start_offset: Duration,
98 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
99 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
100 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
101 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
102 shared_state: Rc<RefCell<SharedLoggingState>>,
103}
104
105pub(crate) struct LoggingTraces {
106 pub traces: BTreeMap<LogVariant, TraceBundle>,
108 pub dataflow_index: usize,
110 pub compute_logger: super::compute::Logger,
112}
113
114impl<A: Allocate + 'static> LoggingContext<'_, A> {
115 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
116 self.worker.dataflow_named("Dataflow: logging", |scope| {
117 let mut collections = BTreeMap::new();
118
119 let super::timely::Return {
120 collections: timely_collections,
121 } = super::timely::construct(
122 scope.clone(),
123 self.config,
124 self.t_event_queue.clone(),
125 Rc::clone(&self.shared_state),
126 );
127 collections.extend(timely_collections);
128
129 let super::reachability::Return {
130 collections: reachability_collections,
131 } = super::reachability::construct(
132 scope.clone(),
133 self.config,
134 self.r_event_queue.clone(),
135 );
136 collections.extend(reachability_collections);
137
138 let super::differential::Return {
139 collections: differential_collections,
140 } = super::differential::construct(
141 scope.clone(),
142 self.config,
143 self.d_event_queue.clone(),
144 Rc::clone(&self.shared_state),
145 );
146 collections.extend(differential_collections);
147
148 let super::compute::Return {
149 collections: compute_collections,
150 } = super::compute::construct(
151 scope.clone(),
152 scope.parent.clone(),
153 self.config,
154 self.c_event_queue.clone(),
155 Rc::clone(&self.shared_state),
156 );
157 collections.extend(compute_collections);
158
159 let errs = scope.scoped("logging errors", |scope| {
160 let collection: KeyCollection<_, DataflowError, Diff> =
161 VecCollection::empty(scope).into();
162 collection
163 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
164 .trace
165 });
166
167 let traces = collections
168 .into_iter()
169 .map(|(log, collection)| {
170 let bundle = TraceBundle::new(collection.trace, errs.clone())
171 .with_drop(collection.token);
172 (log, bundle)
173 })
174 .collect();
175 traces
176 })
177 }
178
179 fn register_reachability_logger<T: ExtractTimestamp>(
184 &self,
185 registry: &mut Registry,
186 index: usize,
187 ) {
188 let logger = self.reachability_logger::<T>(index);
189 let type_name = std::any::type_name::<T>();
190 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
191 }
192
193 fn register_loggers(&self) {
197 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
198 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
199 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
200
201 let mut register = self.worker.log_register().expect("Logging must be enabled");
202 register.insert_logger("timely", t_logger);
203 self.register_reachability_logger::<Timestamp>(&mut register, 0);
206 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
207 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
208 register.insert_logger("differential/arrange", d_logger);
209 register.insert_logger("materialize/compute", c_logger.clone());
210
211 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
212 }
213
214 fn simple_logger<CB: ContainerBuilder>(
215 &self,
216 event_queue: EventQueue<CB::Container>,
217 ) -> Logger<CB> {
218 let [link] = event_queue.links;
219 let mut logger = BatchLogger::new(link, self.interval_ms);
220 let activator = event_queue.activator.clone();
221 Logger::new(
222 self.now,
223 self.start_offset,
224 move |time, data: &mut Option<CB::Container>| {
225 if let Some(data) = data.take() {
226 logger.publish_batch(data);
227 } else if logger.report_progress(*time) {
228 activator.activate();
229 }
230 },
231 )
232 }
233
234 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
237 where
238 T: ExtractTimestamp,
239 {
240 let link = Rc::clone(&self.r_event_queue.links[index]);
241 let mut logger = BatchLogger::new(link, self.interval_ms);
242 let mut massaged = Vec::new();
243 let mut builder = ColumnBuilder::default();
244 let activator = self.r_event_queue.activator.clone();
245
246 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
247 if let Some(data) = data {
248 for (time, event) in data.drain(..) {
250 match event {
251 TrackerEvent::SourceUpdate(update) => {
252 massaged.extend(update.updates.iter().map(
253 |(node, port, time, diff)| {
254 let is_source = true;
255 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
256 },
257 ));
258
259 builder.push_into((time, (update.tracker_id, &massaged)));
260 massaged.clear();
261 }
262 TrackerEvent::TargetUpdate(update) => {
263 massaged.extend(update.updates.iter().map(
264 |(node, port, time, diff)| {
265 let is_source = false;
266 (*node, *port, is_source, time.extract(), Diff::from(*diff))
267 },
268 ));
269
270 builder.push_into((time, (update.tracker_id, &massaged)));
271 massaged.clear();
272 }
273 }
274 while let Some(container) = builder.extract() {
275 logger.publish_batch(std::mem::take(container));
276 }
277 }
278 } else {
279 while let Some(container) = builder.finish() {
281 logger.publish_batch(std::mem::take(container));
282 }
283
284 if logger.report_progress(*batch_time) {
285 activator.activate();
286 }
287 }
288 };
289
290 Logger::new(self.now, self.start_offset, action)
291 }
292}
293
294trait ExtractTimestamp: Clone + 'static {
296 fn extract(&self) -> Timestamp;
298}
299
300impl ExtractTimestamp for Timestamp {
301 fn extract(&self) -> Timestamp {
302 *self
303 }
304}
305
306impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
307 fn extract(&self) -> Timestamp {
308 self.outer
309 }
310}
311
312impl ExtractTimestamp for (Timestamp, Subtime) {
313 fn extract(&self) -> Timestamp {
314 self.0
315 }
316}