Skip to main content

mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_dyncfg::ConfigSet;
18use mz_ore::metrics::MetricsRegistry;
19use mz_repr::{Diff, Timestamp};
20use mz_storage_operators::persist_source::Subtime;
21use mz_timely_util::columnar::Column;
22use mz_timely_util::columnar::builder::ColumnBuilder;
23use mz_timely_util::columnation::ColumnationChunker;
24use mz_timely_util::operator::CollectionExt;
25use mz_timely_util::scope_label::ScopeExt;
26use timely::ContainerBuilder;
27use timely::container::{ContainerBuilder as _, PushInto};
28use timely::logging::{TimelyEvent, TimelyEventBuilder};
29use timely::logging_core::{Logger, Registry};
30use timely::order::Product;
31use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
32
33use crate::arrangement::manager::TraceBundle;
34use crate::extensions::arrange::{KeyCollection, MzArrange};
35use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
36use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
37use crate::render::errors::DataflowErrorSer;
38use crate::typedefs::{ErrBatcher, ErrBuilder};
39
40/// Initialize logging dataflows.
41///
42/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
43/// retrieving logged records as well as the index of the exporting dataflow.
44pub fn initialize(
45    worker: &mut timely::worker::Worker,
46    config: &LoggingConfig,
47    metrics_registry: MetricsRegistry,
48    worker_config: Rc<ConfigSet>,
49    workers_per_process: usize,
50    storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
51) -> LoggingTraces {
52    let interval_ms = std::cmp::max(1, config.interval.as_millis());
53
54    // Track time relative to the Unix epoch, rather than when the server
55    // started, so that the logging sources can be joined with tables and
56    // other real time sources for semi-sensible results.
57    let now = Instant::now();
58    let start_offset = std::time::SystemTime::now()
59        .duration_since(std::time::SystemTime::UNIX_EPOCH)
60        .expect("Failed to get duration since Unix epoch");
61
62    let mut context = LoggingContext {
63        worker,
64        config,
65        interval_ms,
66        now,
67        start_offset,
68        t_event_queue: EventQueue::new("t"),
69        r_event_queue: EventQueue::new("r"),
70        d_event_queue: EventQueue::new("d"),
71        c_event_queue: EventQueue::new("c"),
72        shared_state: Default::default(),
73        metrics_registry,
74        worker_config,
75        workers_per_process,
76        storage_log_reader,
77    };
78
79    // Depending on whether we should log the creation of the logging dataflows, we register the
80    // loggers with timely either before or after creating them.
81    let dataflow_index = context.worker.next_dataflow_index();
82    let traces = if config.log_logging {
83        context.register_loggers();
84        context.construct_dataflow()
85    } else {
86        let traces = context.construct_dataflow();
87        context.register_loggers();
88        traces
89    };
90
91    let compute_logger = worker.logger_for("materialize/compute").unwrap();
92    LoggingTraces {
93        traces,
94        dataflow_index,
95        compute_logger,
96    }
97}
98
99pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
100
101struct LoggingContext<'a> {
102    worker: &'a mut timely::worker::Worker,
103    config: &'a LoggingConfig,
104    interval_ms: u128,
105    now: Instant,
106    start_offset: Duration,
107    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
108    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
109    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
110    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
111    shared_state: Rc<RefCell<SharedLoggingState>>,
112    metrics_registry: MetricsRegistry,
113    worker_config: Rc<ConfigSet>,
114    workers_per_process: usize,
115    /// Optional reader for storage timely logging events.
116    storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
117}
118
119pub(crate) struct LoggingTraces {
120    /// Exported traces, by log variant.
121    pub traces: BTreeMap<LogVariant, TraceBundle>,
122    /// The index of the dataflow that exports the traces.
123    pub dataflow_index: usize,
124    /// The compute logger.
125    pub compute_logger: super::compute::Logger,
126}
127
128impl LoggingContext<'_> {
129    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
130        self.worker.dataflow_named("Dataflow: logging", |scope| {
131            let scope = scope.with_label();
132
133            let mut collections = BTreeMap::new();
134
135            let super::timely::Return {
136                collections: timely_collections,
137            } = super::timely::construct(
138                scope,
139                self.config,
140                self.t_event_queue.clone(),
141                Rc::clone(&self.shared_state),
142                self.storage_log_reader.take(),
143            );
144            collections.extend(timely_collections);
145
146            let super::reachability::Return {
147                collections: reachability_collections,
148            } = super::reachability::construct(scope, self.config, self.r_event_queue.clone());
149            collections.extend(reachability_collections);
150
151            let super::differential::Return {
152                collections: differential_collections,
153            } = super::differential::construct(
154                scope,
155                self.config,
156                self.d_event_queue.clone(),
157                Rc::clone(&self.shared_state),
158            );
159            collections.extend(differential_collections);
160
161            let super::compute::Return {
162                collections: compute_collections,
163            } = super::compute::construct(
164                scope.clone(),
165                scope.activations(),
166                self.config,
167                self.c_event_queue.clone(),
168                Rc::clone(&self.shared_state),
169            );
170            collections.extend(compute_collections);
171
172            let super::prometheus::Return {
173                collections: prometheus_collections,
174            } = super::prometheus::construct(
175                scope,
176                self.config,
177                self.metrics_registry.clone(),
178                self.now,
179                self.start_offset,
180                Rc::clone(&self.worker_config),
181                self.workers_per_process,
182            );
183            collections.extend(prometheus_collections);
184
185            let errs = scope.scoped("logging errors", |scope| {
186                let collection: KeyCollection<_, DataflowErrorSer, Diff> =
187                    VecCollection::empty(scope).into();
188                collection
189                    .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
190                        "Arrange logging err",
191                    )
192                    .trace
193            });
194
195            let traces = collections
196                .into_iter()
197                .map(|(log, collection)| {
198                    let bundle = TraceBundle::new(collection.trace, errs.clone())
199                        .with_drop(collection.token);
200                    (log, bundle)
201                })
202                .collect();
203            traces
204        })
205    }
206
207    /// Construct a new reachability logger for timestamp type `T`.
208    ///
209    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
210    /// Timely naming convention.
211    fn register_reachability_logger<T: ExtractTimestamp>(
212        &self,
213        registry: &mut Registry,
214        index: usize,
215    ) {
216        let logger = self.reachability_logger::<T>(index);
217        let type_name = std::any::type_name::<T>();
218        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
219    }
220
221    /// Register all loggers with the timely worker.
222    ///
223    /// Registers the timely, differential, compute, and reachability loggers.
224    fn register_loggers(&self) {
225        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
226        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
227        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
228
229        let mut register = self.worker.log_register().expect("Logging must be enabled");
230        register.insert_logger("timely", t_logger);
231        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
232        // data because the event link structure is not multi-producer safe.
233        self.register_reachability_logger::<Timestamp>(&mut register, 0);
234        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
235        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
236        register.insert_logger("differential/arrange", d_logger);
237        register.insert_logger("materialize/compute", c_logger.clone());
238
239        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
240    }
241
242    fn simple_logger<CB: ContainerBuilder>(
243        &self,
244        event_queue: EventQueue<CB::Container>,
245    ) -> Logger<CB> {
246        let [link] = event_queue.links;
247        let mut logger = BatchLogger::new(link, self.interval_ms);
248        let activator = event_queue.activator.clone();
249        Logger::new(
250            self.now,
251            self.start_offset,
252            move |time, data: &mut Option<CB::Container>| {
253                if let Some(data) = data.take() {
254                    logger.publish_batch(data);
255                } else if logger.report_progress(*time) {
256                    activator.activate();
257                }
258            },
259        )
260    }
261
262    /// Construct a reachability logger for timestamp type `T`. The index must
263    /// refer to a unique link in the reachability event queue.
264    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
265    where
266        T: ExtractTimestamp,
267    {
268        let link = Rc::clone(&self.r_event_queue.links[index]);
269        let mut logger = BatchLogger::new(link, self.interval_ms);
270        let mut massaged = Vec::new();
271        let mut builder = ColumnBuilder::default();
272        let activator = self.r_event_queue.activator.clone();
273
274        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
275            if let Some(data) = data {
276                // Handle data
277                for (time, event) in data.drain(..) {
278                    match event {
279                        TrackerEvent::SourceUpdate(update) => {
280                            massaged.extend(update.updates.iter().map(
281                                |(node, port, time, diff)| {
282                                    let is_source = true;
283                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
284                                },
285                            ));
286
287                            builder.push_into((time, (update.tracker_id, &massaged)));
288                            massaged.clear();
289                        }
290                        TrackerEvent::TargetUpdate(update) => {
291                            massaged.extend(update.updates.iter().map(
292                                |(node, port, time, diff)| {
293                                    let is_source = false;
294                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
295                                },
296                            ));
297
298                            builder.push_into((time, (update.tracker_id, &massaged)));
299                            massaged.clear();
300                        }
301                    }
302                    while let Some(container) = builder.extract() {
303                        logger.publish_batch(std::mem::take(container));
304                    }
305                }
306            } else {
307                // Handle a flush
308                while let Some(container) = builder.finish() {
309                    logger.publish_batch(std::mem::take(container));
310                }
311
312                if logger.report_progress(*batch_time) {
313                    activator.activate();
314                }
315            }
316        };
317
318        Logger::new(self.now, self.start_offset, action)
319    }
320}
321
322/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
323trait ExtractTimestamp: Clone + 'static {
324    /// Extracts the timestamp from the type.
325    fn extract(&self) -> Timestamp;
326}
327
328impl ExtractTimestamp for Timestamp {
329    fn extract(&self) -> Timestamp {
330        *self
331    }
332}
333
334impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
335    fn extract(&self) -> Timestamp {
336        self.outer
337    }
338}
339
340impl ExtractTimestamp for (Timestamp, Subtime) {
341    fn extract(&self) -> Timestamp {
342        self.0
343    }
344}