Skip to main content

mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::columnar::Column;
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::operator::CollectionExt;
23use mz_timely_util::scope_label::ScopeExt;
24use timely::ContainerBuilder;
25use timely::communication::Allocate;
26use timely::container::{ContainerBuilder as _, PushInto};
27use timely::dataflow::Scope;
28use timely::logging::{TimelyEvent, TimelyEventBuilder};
29use timely::logging_core::{Logger, Registry};
30use timely::order::Product;
31use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
32use timely::worker::AsWorker;
33
34use crate::arrangement::manager::TraceBundle;
35use crate::extensions::arrange::{KeyCollection, MzArrange};
36use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
37use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
38use crate::typedefs::{ErrBatcher, ErrBuilder};
39
40/// Initialize logging dataflows.
41///
42/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
43/// retrieving logged records as well as the index of the exporting dataflow.
44pub fn initialize<A: Allocate + 'static>(
45    worker: &mut timely::worker::Worker<A>,
46    config: &LoggingConfig,
47) -> LoggingTraces {
48    let interval_ms = std::cmp::max(1, config.interval.as_millis());
49
50    // Track time relative to the Unix epoch, rather than when the server
51    // started, so that the logging sources can be joined with tables and
52    // other real time sources for semi-sensible results.
53    let now = Instant::now();
54    let start_offset = std::time::SystemTime::now()
55        .duration_since(std::time::SystemTime::UNIX_EPOCH)
56        .expect("Failed to get duration since Unix epoch");
57
58    let mut context = LoggingContext {
59        worker,
60        config,
61        interval_ms,
62        now,
63        start_offset,
64        t_event_queue: EventQueue::new("t"),
65        r_event_queue: EventQueue::new("r"),
66        d_event_queue: EventQueue::new("d"),
67        c_event_queue: EventQueue::new("c"),
68        shared_state: Default::default(),
69    };
70
71    // Depending on whether we should log the creation of the logging dataflows, we register the
72    // loggers with timely either before or after creating them.
73    let dataflow_index = context.worker.next_dataflow_index();
74    let traces = if config.log_logging {
75        context.register_loggers();
76        context.construct_dataflow()
77    } else {
78        let traces = context.construct_dataflow();
79        context.register_loggers();
80        traces
81    };
82
83    let compute_logger = worker.logger_for("materialize/compute").unwrap();
84    LoggingTraces {
85        traces,
86        dataflow_index,
87        compute_logger,
88    }
89}
90
91pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
92
93struct LoggingContext<'a, A: Allocate> {
94    worker: &'a mut timely::worker::Worker<A>,
95    config: &'a LoggingConfig,
96    interval_ms: u128,
97    now: Instant,
98    start_offset: Duration,
99    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
100    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
101    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
102    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
103    shared_state: Rc<RefCell<SharedLoggingState>>,
104}
105
106pub(crate) struct LoggingTraces {
107    /// Exported traces, by log variant.
108    pub traces: BTreeMap<LogVariant, TraceBundle>,
109    /// The index of the dataflow that exports the traces.
110    pub dataflow_index: usize,
111    /// The compute logger.
112    pub compute_logger: super::compute::Logger,
113}
114
115impl<A: Allocate + 'static> LoggingContext<'_, A> {
116    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
117        self.worker.dataflow_named("Dataflow: logging", |scope| {
118            let scope = &mut scope.with_label();
119
120            let mut collections = BTreeMap::new();
121
122            let super::timely::Return {
123                collections: timely_collections,
124            } = super::timely::construct(
125                scope.clone(),
126                self.config,
127                self.t_event_queue.clone(),
128                Rc::clone(&self.shared_state),
129            );
130            collections.extend(timely_collections);
131
132            let super::reachability::Return {
133                collections: reachability_collections,
134            } = super::reachability::construct(
135                scope.clone(),
136                self.config,
137                self.r_event_queue.clone(),
138            );
139            collections.extend(reachability_collections);
140
141            let super::differential::Return {
142                collections: differential_collections,
143            } = super::differential::construct(
144                scope.clone(),
145                self.config,
146                self.d_event_queue.clone(),
147                Rc::clone(&self.shared_state),
148            );
149            collections.extend(differential_collections);
150
151            let super::compute::Return {
152                collections: compute_collections,
153            } = super::compute::construct(
154                scope.clone(),
155                scope.parent().clone(),
156                self.config,
157                self.c_event_queue.clone(),
158                Rc::clone(&self.shared_state),
159            );
160            collections.extend(compute_collections);
161
162            let errs = scope.scoped("logging errors", |scope| {
163                let collection: KeyCollection<_, DataflowError, Diff> =
164                    VecCollection::empty(scope).into();
165                collection
166                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
167                    .trace
168            });
169
170            let traces = collections
171                .into_iter()
172                .map(|(log, collection)| {
173                    let bundle = TraceBundle::new(collection.trace, errs.clone())
174                        .with_drop(collection.token);
175                    (log, bundle)
176                })
177                .collect();
178            traces
179        })
180    }
181
182    /// Construct a new reachability logger for timestamp type `T`.
183    ///
184    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
185    /// Timely naming convention.
186    fn register_reachability_logger<T: ExtractTimestamp>(
187        &self,
188        registry: &mut Registry,
189        index: usize,
190    ) {
191        let logger = self.reachability_logger::<T>(index);
192        let type_name = std::any::type_name::<T>();
193        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
194    }
195
196    /// Register all loggers with the timely worker.
197    ///
198    /// Registers the timely, differential, compute, and reachability loggers.
199    fn register_loggers(&self) {
200        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
201        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
202        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
203
204        let mut register = self.worker.log_register().expect("Logging must be enabled");
205        register.insert_logger("timely", t_logger);
206        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
207        // data because the event link structure is not multi-producer safe.
208        self.register_reachability_logger::<Timestamp>(&mut register, 0);
209        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
210        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
211        register.insert_logger("differential/arrange", d_logger);
212        register.insert_logger("materialize/compute", c_logger.clone());
213
214        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
215    }
216
217    fn simple_logger<CB: ContainerBuilder>(
218        &self,
219        event_queue: EventQueue<CB::Container>,
220    ) -> Logger<CB> {
221        let [link] = event_queue.links;
222        let mut logger = BatchLogger::new(link, self.interval_ms);
223        let activator = event_queue.activator.clone();
224        Logger::new(
225            self.now,
226            self.start_offset,
227            move |time, data: &mut Option<CB::Container>| {
228                if let Some(data) = data.take() {
229                    logger.publish_batch(data);
230                } else if logger.report_progress(*time) {
231                    activator.activate();
232                }
233            },
234        )
235    }
236
237    /// Construct a reachability logger for timestamp type `T`. The index must
238    /// refer to a unique link in the reachability event queue.
239    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
240    where
241        T: ExtractTimestamp,
242    {
243        let link = Rc::clone(&self.r_event_queue.links[index]);
244        let mut logger = BatchLogger::new(link, self.interval_ms);
245        let mut massaged = Vec::new();
246        let mut builder = ColumnBuilder::default();
247        let activator = self.r_event_queue.activator.clone();
248
249        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
250            if let Some(data) = data {
251                // Handle data
252                for (time, event) in data.drain(..) {
253                    match event {
254                        TrackerEvent::SourceUpdate(update) => {
255                            massaged.extend(update.updates.iter().map(
256                                |(node, port, time, diff)| {
257                                    let is_source = true;
258                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
259                                },
260                            ));
261
262                            builder.push_into((time, (update.tracker_id, &massaged)));
263                            massaged.clear();
264                        }
265                        TrackerEvent::TargetUpdate(update) => {
266                            massaged.extend(update.updates.iter().map(
267                                |(node, port, time, diff)| {
268                                    let is_source = false;
269                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
270                                },
271                            ));
272
273                            builder.push_into((time, (update.tracker_id, &massaged)));
274                            massaged.clear();
275                        }
276                    }
277                    while let Some(container) = builder.extract() {
278                        logger.publish_batch(std::mem::take(container));
279                    }
280                }
281            } else {
282                // Handle a flush
283                while let Some(container) = builder.finish() {
284                    logger.publish_batch(std::mem::take(container));
285                }
286
287                if logger.report_progress(*batch_time) {
288                    activator.activate();
289                }
290            }
291        };
292
293        Logger::new(self.now, self.start_offset, action)
294    }
295}
296
297/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
298trait ExtractTimestamp: Clone + 'static {
299    /// Extracts the timestamp from the type.
300    fn extract(&self) -> Timestamp;
301}
302
303impl ExtractTimestamp for Timestamp {
304    fn extract(&self) -> Timestamp {
305        *self
306    }
307}
308
309impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
310    fn extract(&self) -> Timestamp {
311        self.outer
312    }
313}
314
315impl ExtractTimestamp for (Timestamp, Subtime) {
316    fn extract(&self) -> Timestamp {
317        self.0
318    }
319}