Skip to main content

mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use crate::arrangement::manager::TraceBundle;
14use crate::extensions::arrange::{KeyCollection, MzArrange};
15use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
16use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
17use crate::typedefs::{ErrBatcher, ErrBuilder};
18use differential_dataflow::VecCollection;
19use differential_dataflow::dynamic::pointstamp::PointStamp;
20use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
21use mz_compute_client::logging::{LogVariant, LoggingConfig};
22use mz_dyncfg::ConfigSet;
23use mz_ore::metrics::MetricsRegistry;
24use mz_repr::{Diff, Timestamp};
25use mz_storage_operators::persist_source::Subtime;
26use mz_storage_types::errors::DataflowError;
27use mz_timely_util::columnar::Column;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::operator::CollectionExt;
30use mz_timely_util::scope_label::ScopeExt;
31use timely::ContainerBuilder;
32use timely::container::{ContainerBuilder as _, PushInto};
33use timely::logging::{TimelyEvent, TimelyEventBuilder};
34use timely::logging_core::{Logger, Registry};
35use timely::order::Product;
36use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
37
38/// Initialize logging dataflows.
39///
40/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
41/// retrieving logged records as well as the index of the exporting dataflow.
42pub fn initialize(
43    worker: &mut timely::worker::Worker,
44    config: &LoggingConfig,
45    metrics_registry: MetricsRegistry,
46    worker_config: Rc<ConfigSet>,
47    workers_per_process: usize,
48) -> LoggingTraces {
49    let interval_ms = std::cmp::max(1, config.interval.as_millis());
50
51    // Track time relative to the Unix epoch, rather than when the server
52    // started, so that the logging sources can be joined with tables and
53    // other real time sources for semi-sensible results.
54    let now = Instant::now();
55    let start_offset = std::time::SystemTime::now()
56        .duration_since(std::time::SystemTime::UNIX_EPOCH)
57        .expect("Failed to get duration since Unix epoch");
58
59    let mut context = LoggingContext {
60        worker,
61        config,
62        interval_ms,
63        now,
64        start_offset,
65        t_event_queue: EventQueue::new("t"),
66        r_event_queue: EventQueue::new("r"),
67        d_event_queue: EventQueue::new("d"),
68        c_event_queue: EventQueue::new("c"),
69        shared_state: Default::default(),
70        metrics_registry,
71        worker_config,
72        workers_per_process,
73    };
74
75    // Depending on whether we should log the creation of the logging dataflows, we register the
76    // loggers with timely either before or after creating them.
77    let dataflow_index = context.worker.next_dataflow_index();
78    let traces = if config.log_logging {
79        context.register_loggers();
80        context.construct_dataflow()
81    } else {
82        let traces = context.construct_dataflow();
83        context.register_loggers();
84        traces
85    };
86
87    let compute_logger = worker.logger_for("materialize/compute").unwrap();
88    LoggingTraces {
89        traces,
90        dataflow_index,
91        compute_logger,
92    }
93}
94
95pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
96
97struct LoggingContext<'a> {
98    worker: &'a mut timely::worker::Worker,
99    config: &'a LoggingConfig,
100    interval_ms: u128,
101    now: Instant,
102    start_offset: Duration,
103    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
104    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
105    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
106    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
107    shared_state: Rc<RefCell<SharedLoggingState>>,
108    metrics_registry: MetricsRegistry,
109    worker_config: Rc<ConfigSet>,
110    workers_per_process: usize,
111}
112
113pub(crate) struct LoggingTraces {
114    /// Exported traces, by log variant.
115    pub traces: BTreeMap<LogVariant, TraceBundle>,
116    /// The index of the dataflow that exports the traces.
117    pub dataflow_index: usize,
118    /// The compute logger.
119    pub compute_logger: super::compute::Logger,
120}
121
122impl LoggingContext<'_> {
123    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
124        self.worker.dataflow_named("Dataflow: logging", |scope| {
125            let scope = scope.with_label();
126
127            let mut collections = BTreeMap::new();
128
129            let super::timely::Return {
130                collections: timely_collections,
131            } = super::timely::construct(
132                scope,
133                self.config,
134                self.t_event_queue.clone(),
135                Rc::clone(&self.shared_state),
136            );
137            collections.extend(timely_collections);
138
139            let super::reachability::Return {
140                collections: reachability_collections,
141            } = super::reachability::construct(scope, self.config, self.r_event_queue.clone());
142            collections.extend(reachability_collections);
143
144            let super::differential::Return {
145                collections: differential_collections,
146            } = super::differential::construct(
147                scope,
148                self.config,
149                self.d_event_queue.clone(),
150                Rc::clone(&self.shared_state),
151            );
152            collections.extend(differential_collections);
153
154            let super::compute::Return {
155                collections: compute_collections,
156            } = super::compute::construct(
157                scope.clone(),
158                scope.activations(),
159                self.config,
160                self.c_event_queue.clone(),
161                Rc::clone(&self.shared_state),
162            );
163            collections.extend(compute_collections);
164
165            let super::prometheus::Return {
166                collections: prometheus_collections,
167            } = super::prometheus::construct(
168                scope,
169                self.config,
170                self.metrics_registry.clone(),
171                self.now,
172                self.start_offset,
173                Rc::clone(&self.worker_config),
174                self.workers_per_process,
175            );
176            collections.extend(prometheus_collections);
177
178            let errs = scope.scoped("logging errors", |scope| {
179                let collection: KeyCollection<_, DataflowError, Diff> =
180                    VecCollection::empty(scope).into();
181                collection
182                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
183                    .trace
184            });
185
186            let traces = collections
187                .into_iter()
188                .map(|(log, collection)| {
189                    let bundle = TraceBundle::new(collection.trace, errs.clone())
190                        .with_drop(collection.token);
191                    (log, bundle)
192                })
193                .collect();
194            traces
195        })
196    }
197
198    /// Construct a new reachability logger for timestamp type `T`.
199    ///
200    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
201    /// Timely naming convention.
202    fn register_reachability_logger<T: ExtractTimestamp>(
203        &self,
204        registry: &mut Registry,
205        index: usize,
206    ) {
207        let logger = self.reachability_logger::<T>(index);
208        let type_name = std::any::type_name::<T>();
209        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
210    }
211
212    /// Register all loggers with the timely worker.
213    ///
214    /// Registers the timely, differential, compute, and reachability loggers.
215    fn register_loggers(&self) {
216        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
217        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
218        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
219
220        let mut register = self.worker.log_register().expect("Logging must be enabled");
221        register.insert_logger("timely", t_logger);
222        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
223        // data because the event link structure is not multi-producer safe.
224        self.register_reachability_logger::<Timestamp>(&mut register, 0);
225        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
226        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
227        register.insert_logger("differential/arrange", d_logger);
228        register.insert_logger("materialize/compute", c_logger.clone());
229
230        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
231    }
232
233    fn simple_logger<CB: ContainerBuilder>(
234        &self,
235        event_queue: EventQueue<CB::Container>,
236    ) -> Logger<CB> {
237        let [link] = event_queue.links;
238        let mut logger = BatchLogger::new(link, self.interval_ms);
239        let activator = event_queue.activator.clone();
240        Logger::new(
241            self.now,
242            self.start_offset,
243            move |time, data: &mut Option<CB::Container>| {
244                if let Some(data) = data.take() {
245                    logger.publish_batch(data);
246                } else if logger.report_progress(*time) {
247                    activator.activate();
248                }
249            },
250        )
251    }
252
253    /// Construct a reachability logger for timestamp type `T`. The index must
254    /// refer to a unique link in the reachability event queue.
255    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
256    where
257        T: ExtractTimestamp,
258    {
259        let link = Rc::clone(&self.r_event_queue.links[index]);
260        let mut logger = BatchLogger::new(link, self.interval_ms);
261        let mut massaged = Vec::new();
262        let mut builder = ColumnBuilder::default();
263        let activator = self.r_event_queue.activator.clone();
264
265        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
266            if let Some(data) = data {
267                // Handle data
268                for (time, event) in data.drain(..) {
269                    match event {
270                        TrackerEvent::SourceUpdate(update) => {
271                            massaged.extend(update.updates.iter().map(
272                                |(node, port, time, diff)| {
273                                    let is_source = true;
274                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
275                                },
276                            ));
277
278                            builder.push_into((time, (update.tracker_id, &massaged)));
279                            massaged.clear();
280                        }
281                        TrackerEvent::TargetUpdate(update) => {
282                            massaged.extend(update.updates.iter().map(
283                                |(node, port, time, diff)| {
284                                    let is_source = false;
285                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
286                                },
287                            ));
288
289                            builder.push_into((time, (update.tracker_id, &massaged)));
290                            massaged.clear();
291                        }
292                    }
293                    while let Some(container) = builder.extract() {
294                        logger.publish_batch(std::mem::take(container));
295                    }
296                }
297            } else {
298                // Handle a flush
299                while let Some(container) = builder.finish() {
300                    logger.publish_batch(std::mem::take(container));
301                }
302
303                if logger.report_progress(*batch_time) {
304                    activator.activate();
305                }
306            }
307        };
308
309        Logger::new(self.now, self.start_offset, action)
310    }
311}
312
313/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
314trait ExtractTimestamp: Clone + 'static {
315    /// Extracts the timestamp from the type.
316    fn extract(&self) -> Timestamp;
317}
318
319impl ExtractTimestamp for Timestamp {
320    fn extract(&self) -> Timestamp {
321        *self
322    }
323}
324
325impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
326    fn extract(&self) -> Timestamp {
327        self.outer
328    }
329}
330
331impl ExtractTimestamp for (Timestamp, Subtime) {
332    fn extract(&self) -> Timestamp {
333        self.0
334    }
335}