1use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::Collection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::containers::{Column, ColumnBuilder};
21use mz_timely_util::operator::CollectionExt;
22use timely::communication::Allocate;
23use timely::container::{ContainerBuilder, PushInto};
24use timely::dataflow::Scope;
25use timely::logging::{TimelyEvent, TimelyEventBuilder};
26use timely::logging_core::{Logger, Registry};
27use timely::order::Product;
28use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
29
30use crate::arrangement::manager::TraceBundle;
31use crate::extensions::arrange::{KeyCollection, MzArrange};
32use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
33use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
34use crate::typedefs::{ErrBatcher, ErrBuilder};
35
36pub fn initialize<A: Allocate + 'static>(
41 worker: &mut timely::worker::Worker<A>,
42 config: &LoggingConfig,
43) -> LoggingTraces {
44 let interval_ms = std::cmp::max(1, config.interval.as_millis());
45
46 let now = Instant::now();
50 let start_offset = std::time::SystemTime::now()
51 .duration_since(std::time::SystemTime::UNIX_EPOCH)
52 .expect("Failed to get duration since Unix epoch");
53
54 let mut context = LoggingContext {
55 worker,
56 config,
57 interval_ms,
58 now,
59 start_offset,
60 t_event_queue: EventQueue::new("t"),
61 r_event_queue: EventQueue::new("r"),
62 d_event_queue: EventQueue::new("d"),
63 c_event_queue: EventQueue::new("c"),
64 shared_state: Default::default(),
65 };
66
67 let dataflow_index = context.worker.next_dataflow_index();
70 let traces = if config.log_logging {
71 context.register_loggers();
72 context.construct_dataflow()
73 } else {
74 let traces = context.construct_dataflow();
75 context.register_loggers();
76 traces
77 };
78
79 let compute_logger = worker.log_register().get("materialize/compute").unwrap();
80 LoggingTraces {
81 traces,
82 dataflow_index,
83 compute_logger,
84 }
85}
86
87pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
88
89struct LoggingContext<'a, A: Allocate> {
90 worker: &'a mut timely::worker::Worker<A>,
91 config: &'a LoggingConfig,
92 interval_ms: u128,
93 now: Instant,
94 start_offset: Duration,
95 t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
96 r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
97 d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
98 c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
99 shared_state: Rc<RefCell<SharedLoggingState>>,
100}
101
102pub(crate) struct LoggingTraces {
103 pub traces: BTreeMap<LogVariant, TraceBundle>,
105 pub dataflow_index: usize,
107 pub compute_logger: super::compute::Logger,
109}
110
111impl<A: Allocate + 'static> LoggingContext<'_, A> {
112 fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
113 self.worker.dataflow_named("Dataflow: logging", |scope| {
114 let mut collections = BTreeMap::new();
115
116 let super::timely::Return {
117 collections: timely_collections,
118 } = super::timely::construct(
119 scope.clone(),
120 self.config,
121 self.t_event_queue.clone(),
122 Rc::clone(&self.shared_state),
123 );
124 collections.extend(timely_collections);
125
126 let super::reachability::Return {
127 collections: reachability_collections,
128 } = super::reachability::construct(
129 scope.clone(),
130 self.config,
131 self.r_event_queue.clone(),
132 );
133 collections.extend(reachability_collections);
134
135 let super::differential::Return {
136 collections: differential_collections,
137 } = super::differential::construct(
138 scope.clone(),
139 self.config,
140 self.d_event_queue.clone(),
141 Rc::clone(&self.shared_state),
142 );
143 collections.extend(differential_collections);
144
145 let super::compute::Return {
146 collections: compute_collections,
147 } = super::compute::construct(
148 scope.clone(),
149 scope.parent.clone(),
150 self.config,
151 self.c_event_queue.clone(),
152 Rc::clone(&self.shared_state),
153 );
154 collections.extend(compute_collections);
155
156 let errs = scope.scoped("logging errors", |scope| {
157 let collection: KeyCollection<_, DataflowError, Diff> =
158 Collection::empty(scope).into();
159 collection
160 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
161 .trace
162 });
163
164 let traces = collections
165 .into_iter()
166 .map(|(log, collection)| {
167 let bundle = TraceBundle::new(collection.trace, errs.clone())
168 .with_drop(collection.token);
169 (log, bundle)
170 })
171 .collect();
172 traces
173 })
174 }
175
176 fn register_reachability_logger<T: ExtractTimestamp>(
181 &self,
182 registry: &mut Registry,
183 index: usize,
184 ) {
185 let logger = self.reachability_logger::<T>(index);
186 let type_name = std::any::type_name::<T>();
187 registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
188 }
189
190 fn register_loggers(&self) {
194 let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
195 let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
196 let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
197
198 let mut register = self.worker.log_register();
199 register.insert_logger("timely", t_logger);
200 self.register_reachability_logger::<Timestamp>(&mut register, 0);
203 self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
204 self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
205 register.insert_logger("differential/arrange", d_logger);
206 register.insert_logger("materialize/compute", c_logger.clone());
207
208 self.shared_state.borrow_mut().compute_logger = Some(c_logger);
209 }
210
211 fn simple_logger<CB: ContainerBuilder>(
212 &self,
213 event_queue: EventQueue<CB::Container>,
214 ) -> Logger<CB> {
215 let [link] = event_queue.links;
216 let mut logger = BatchLogger::new(link, self.interval_ms);
217 let activator = event_queue.activator.clone();
218 Logger::new(
219 self.now,
220 self.start_offset,
221 move |time, data: &mut Option<CB::Container>| {
222 if let Some(data) = data.take() {
223 logger.publish_batch(data);
224 } else if logger.report_progress(*time) {
225 activator.activate();
226 }
227 },
228 )
229 }
230
231 fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
234 where
235 T: ExtractTimestamp,
236 {
237 let link = Rc::clone(&self.r_event_queue.links[index]);
238 let mut logger = BatchLogger::new(link, self.interval_ms);
239 let mut massaged = Vec::new();
240 let mut builder = ColumnBuilder::default();
241 let activator = self.r_event_queue.activator.clone();
242
243 let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
244 if let Some(data) = data {
245 for (time, event) in data.drain(..) {
247 match event {
248 TrackerEvent::SourceUpdate(update) => {
249 massaged.extend(update.updates.iter().map(
250 |(node, port, time, diff)| {
251 let is_source = true;
252 (*node, *port, is_source, T::extract(time), Diff::from(*diff))
253 },
254 ));
255
256 builder.push_into((time, (update.tracker_id, &massaged)));
257 massaged.clear();
258 }
259 TrackerEvent::TargetUpdate(update) => {
260 massaged.extend(update.updates.iter().map(
261 |(node, port, time, diff)| {
262 let is_source = false;
263 (*node, *port, is_source, time.extract(), Diff::from(*diff))
264 },
265 ));
266
267 builder.push_into((time, (update.tracker_id, &massaged)));
268 massaged.clear();
269 }
270 }
271 while let Some(container) = builder.extract() {
272 logger.publish_batch(std::mem::take(container));
273 }
274 }
275 } else {
276 while let Some(container) = builder.finish() {
278 logger.publish_batch(std::mem::take(container));
279 }
280
281 if logger.report_progress(*batch_time) {
282 activator.activate();
283 }
284 }
285 };
286
287 Logger::new(self.now, self.start_offset, action)
288 }
289}
290
291trait ExtractTimestamp: Clone + 'static {
293 fn extract(&self) -> Timestamp;
295}
296
297impl ExtractTimestamp for Timestamp {
298 fn extract(&self) -> Timestamp {
299 *self
300 }
301}
302
303impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
304 fn extract(&self) -> Timestamp {
305 self.outer
306 }
307}
308
309impl ExtractTimestamp for (Timestamp, Subtime) {
310 fn extract(&self) -> Timestamp {
311 self.0
312 }
313}