Skip to main content

mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_dyncfg::ConfigSet;
18use mz_ore::metrics::MetricsRegistry;
19use mz_repr::{Diff, Timestamp};
20use mz_storage_operators::persist_source::Subtime;
21use mz_timely_util::columnar::Column;
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::operator::CollectionExt;
24use mz_timely_util::scope_label::ScopeExt;
25use timely::ContainerBuilder;
26use timely::container::{ContainerBuilder as _, PushInto};
27use timely::logging::{TimelyEvent, TimelyEventBuilder};
28use timely::logging_core::{Logger, Registry};
29use timely::order::Product;
30use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
31
32use crate::arrangement::manager::TraceBundle;
33use crate::extensions::arrange::{KeyCollection, MzArrange};
34use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
35use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
36use crate::render::errors::DataflowErrorSer;
37use crate::typedefs::{ErrBatcher, ErrBuilder};
38
39/// Initialize logging dataflows.
40///
41/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
42/// retrieving logged records as well as the index of the exporting dataflow.
43pub fn initialize(
44    worker: &mut timely::worker::Worker,
45    config: &LoggingConfig,
46    metrics_registry: MetricsRegistry,
47    worker_config: Rc<ConfigSet>,
48    workers_per_process: usize,
49    storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
50) -> LoggingTraces {
51    let interval_ms = std::cmp::max(1, config.interval.as_millis());
52
53    // Track time relative to the Unix epoch, rather than when the server
54    // started, so that the logging sources can be joined with tables and
55    // other real time sources for semi-sensible results.
56    let now = Instant::now();
57    let start_offset = std::time::SystemTime::now()
58        .duration_since(std::time::SystemTime::UNIX_EPOCH)
59        .expect("Failed to get duration since Unix epoch");
60
61    let mut context = LoggingContext {
62        worker,
63        config,
64        interval_ms,
65        now,
66        start_offset,
67        t_event_queue: EventQueue::new("t"),
68        r_event_queue: EventQueue::new("r"),
69        d_event_queue: EventQueue::new("d"),
70        c_event_queue: EventQueue::new("c"),
71        shared_state: Default::default(),
72        metrics_registry,
73        worker_config,
74        workers_per_process,
75        storage_log_reader,
76    };
77
78    // Depending on whether we should log the creation of the logging dataflows, we register the
79    // loggers with timely either before or after creating them.
80    let dataflow_index = context.worker.next_dataflow_index();
81    let traces = if config.log_logging {
82        context.register_loggers();
83        context.construct_dataflow()
84    } else {
85        let traces = context.construct_dataflow();
86        context.register_loggers();
87        traces
88    };
89
90    let compute_logger = worker.logger_for("materialize/compute").unwrap();
91    LoggingTraces {
92        traces,
93        dataflow_index,
94        compute_logger,
95    }
96}
97
98pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
99
100struct LoggingContext<'a> {
101    worker: &'a mut timely::worker::Worker,
102    config: &'a LoggingConfig,
103    interval_ms: u128,
104    now: Instant,
105    start_offset: Duration,
106    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
107    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
108    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
109    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
110    shared_state: Rc<RefCell<SharedLoggingState>>,
111    metrics_registry: MetricsRegistry,
112    worker_config: Rc<ConfigSet>,
113    workers_per_process: usize,
114    /// Optional reader for storage timely logging events.
115    storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
116}
117
118pub(crate) struct LoggingTraces {
119    /// Exported traces, by log variant.
120    pub traces: BTreeMap<LogVariant, TraceBundle>,
121    /// The index of the dataflow that exports the traces.
122    pub dataflow_index: usize,
123    /// The compute logger.
124    pub compute_logger: super::compute::Logger,
125}
126
127impl LoggingContext<'_> {
128    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
129        self.worker.dataflow_named("Dataflow: logging", |scope| {
130            let scope = scope.with_label();
131
132            let mut collections = BTreeMap::new();
133
134            let super::timely::Return {
135                collections: timely_collections,
136            } = super::timely::construct(
137                scope,
138                self.config,
139                self.t_event_queue.clone(),
140                Rc::clone(&self.shared_state),
141                self.storage_log_reader.take(),
142            );
143            collections.extend(timely_collections);
144
145            let super::reachability::Return {
146                collections: reachability_collections,
147            } = super::reachability::construct(scope, self.config, self.r_event_queue.clone());
148            collections.extend(reachability_collections);
149
150            let super::differential::Return {
151                collections: differential_collections,
152            } = super::differential::construct(
153                scope,
154                self.config,
155                self.d_event_queue.clone(),
156                Rc::clone(&self.shared_state),
157            );
158            collections.extend(differential_collections);
159
160            let super::compute::Return {
161                collections: compute_collections,
162            } = super::compute::construct(
163                scope.clone(),
164                scope.activations(),
165                self.config,
166                self.c_event_queue.clone(),
167                Rc::clone(&self.shared_state),
168            );
169            collections.extend(compute_collections);
170
171            let super::prometheus::Return {
172                collections: prometheus_collections,
173            } = super::prometheus::construct(
174                scope,
175                self.config,
176                self.metrics_registry.clone(),
177                self.now,
178                self.start_offset,
179                Rc::clone(&self.worker_config),
180                self.workers_per_process,
181            );
182            collections.extend(prometheus_collections);
183
184            let errs = scope.scoped("logging errors", |scope| {
185                let collection: KeyCollection<_, DataflowErrorSer, Diff> =
186                    VecCollection::empty(scope).into();
187                collection
188                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
189                    .trace
190            });
191
192            let traces = collections
193                .into_iter()
194                .map(|(log, collection)| {
195                    let bundle = TraceBundle::new(collection.trace, errs.clone())
196                        .with_drop(collection.token);
197                    (log, bundle)
198                })
199                .collect();
200            traces
201        })
202    }
203
204    /// Construct a new reachability logger for timestamp type `T`.
205    ///
206    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
207    /// Timely naming convention.
208    fn register_reachability_logger<T: ExtractTimestamp>(
209        &self,
210        registry: &mut Registry,
211        index: usize,
212    ) {
213        let logger = self.reachability_logger::<T>(index);
214        let type_name = std::any::type_name::<T>();
215        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
216    }
217
218    /// Register all loggers with the timely worker.
219    ///
220    /// Registers the timely, differential, compute, and reachability loggers.
221    fn register_loggers(&self) {
222        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
223        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
224        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
225
226        let mut register = self.worker.log_register().expect("Logging must be enabled");
227        register.insert_logger("timely", t_logger);
228        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
229        // data because the event link structure is not multi-producer safe.
230        self.register_reachability_logger::<Timestamp>(&mut register, 0);
231        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
232        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
233        register.insert_logger("differential/arrange", d_logger);
234        register.insert_logger("materialize/compute", c_logger.clone());
235
236        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
237    }
238
239    fn simple_logger<CB: ContainerBuilder>(
240        &self,
241        event_queue: EventQueue<CB::Container>,
242    ) -> Logger<CB> {
243        let [link] = event_queue.links;
244        let mut logger = BatchLogger::new(link, self.interval_ms);
245        let activator = event_queue.activator.clone();
246        Logger::new(
247            self.now,
248            self.start_offset,
249            move |time, data: &mut Option<CB::Container>| {
250                if let Some(data) = data.take() {
251                    logger.publish_batch(data);
252                } else if logger.report_progress(*time) {
253                    activator.activate();
254                }
255            },
256        )
257    }
258
259    /// Construct a reachability logger for timestamp type `T`. The index must
260    /// refer to a unique link in the reachability event queue.
261    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
262    where
263        T: ExtractTimestamp,
264    {
265        let link = Rc::clone(&self.r_event_queue.links[index]);
266        let mut logger = BatchLogger::new(link, self.interval_ms);
267        let mut massaged = Vec::new();
268        let mut builder = ColumnBuilder::default();
269        let activator = self.r_event_queue.activator.clone();
270
271        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
272            if let Some(data) = data {
273                // Handle data
274                for (time, event) in data.drain(..) {
275                    match event {
276                        TrackerEvent::SourceUpdate(update) => {
277                            massaged.extend(update.updates.iter().map(
278                                |(node, port, time, diff)| {
279                                    let is_source = true;
280                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
281                                },
282                            ));
283
284                            builder.push_into((time, (update.tracker_id, &massaged)));
285                            massaged.clear();
286                        }
287                        TrackerEvent::TargetUpdate(update) => {
288                            massaged.extend(update.updates.iter().map(
289                                |(node, port, time, diff)| {
290                                    let is_source = false;
291                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
292                                },
293                            ));
294
295                            builder.push_into((time, (update.tracker_id, &massaged)));
296                            massaged.clear();
297                        }
298                    }
299                    while let Some(container) = builder.extract() {
300                        logger.publish_batch(std::mem::take(container));
301                    }
302                }
303            } else {
304                // Handle a flush
305                while let Some(container) = builder.finish() {
306                    logger.publish_batch(std::mem::take(container));
307                }
308
309                if logger.report_progress(*batch_time) {
310                    activator.activate();
311                }
312            }
313        };
314
315        Logger::new(self.now, self.start_offset, action)
316    }
317}
318
319/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
320trait ExtractTimestamp: Clone + 'static {
321    /// Extracts the timestamp from the type.
322    fn extract(&self) -> Timestamp;
323}
324
325impl ExtractTimestamp for Timestamp {
326    fn extract(&self) -> Timestamp {
327        *self
328    }
329}
330
331impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
332    fn extract(&self) -> Timestamp {
333        self.outer
334    }
335}
336
337impl ExtractTimestamp for (Timestamp, Subtime) {
338    fn extract(&self) -> Timestamp {
339        self.0
340    }
341}