mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::VecCollection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::columnar::Column;
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::operator::CollectionExt;
23use timely::ContainerBuilder;
24use timely::communication::Allocate;
25use timely::container::{ContainerBuilder as _, PushInto};
26use timely::dataflow::Scope;
27use timely::logging::{TimelyEvent, TimelyEventBuilder};
28use timely::logging_core::{Logger, Registry};
29use timely::order::Product;
30use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
31use timely::worker::AsWorker;
32
33use crate::arrangement::manager::TraceBundle;
34use crate::extensions::arrange::{KeyCollection, MzArrange};
35use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
36use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
37use crate::typedefs::{ErrBatcher, ErrBuilder};
38
39/// Initialize logging dataflows.
40///
41/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
42/// retrieving logged records as well as the index of the exporting dataflow.
43pub fn initialize<A: Allocate + 'static>(
44    worker: &mut timely::worker::Worker<A>,
45    config: &LoggingConfig,
46) -> LoggingTraces {
47    let interval_ms = std::cmp::max(1, config.interval.as_millis());
48
49    // Track time relative to the Unix epoch, rather than when the server
50    // started, so that the logging sources can be joined with tables and
51    // other real time sources for semi-sensible results.
52    let now = Instant::now();
53    let start_offset = std::time::SystemTime::now()
54        .duration_since(std::time::SystemTime::UNIX_EPOCH)
55        .expect("Failed to get duration since Unix epoch");
56
57    let mut context = LoggingContext {
58        worker,
59        config,
60        interval_ms,
61        now,
62        start_offset,
63        t_event_queue: EventQueue::new("t"),
64        r_event_queue: EventQueue::new("r"),
65        d_event_queue: EventQueue::new("d"),
66        c_event_queue: EventQueue::new("c"),
67        shared_state: Default::default(),
68    };
69
70    // Depending on whether we should log the creation of the logging dataflows, we register the
71    // loggers with timely either before or after creating them.
72    let dataflow_index = context.worker.next_dataflow_index();
73    let traces = if config.log_logging {
74        context.register_loggers();
75        context.construct_dataflow()
76    } else {
77        let traces = context.construct_dataflow();
78        context.register_loggers();
79        traces
80    };
81
82    let compute_logger = worker.logger_for("materialize/compute").unwrap();
83    LoggingTraces {
84        traces,
85        dataflow_index,
86        compute_logger,
87    }
88}
89
90pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
91
92struct LoggingContext<'a, A: Allocate> {
93    worker: &'a mut timely::worker::Worker<A>,
94    config: &'a LoggingConfig,
95    interval_ms: u128,
96    now: Instant,
97    start_offset: Duration,
98    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
99    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
100    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
101    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
102    shared_state: Rc<RefCell<SharedLoggingState>>,
103}
104
105pub(crate) struct LoggingTraces {
106    /// Exported traces, by log variant.
107    pub traces: BTreeMap<LogVariant, TraceBundle>,
108    /// The index of the dataflow that exports the traces.
109    pub dataflow_index: usize,
110    /// The compute logger.
111    pub compute_logger: super::compute::Logger,
112}
113
114impl<A: Allocate + 'static> LoggingContext<'_, A> {
115    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
116        self.worker.dataflow_named("Dataflow: logging", |scope| {
117            let mut collections = BTreeMap::new();
118
119            let super::timely::Return {
120                collections: timely_collections,
121            } = super::timely::construct(
122                scope.clone(),
123                self.config,
124                self.t_event_queue.clone(),
125                Rc::clone(&self.shared_state),
126            );
127            collections.extend(timely_collections);
128
129            let super::reachability::Return {
130                collections: reachability_collections,
131            } = super::reachability::construct(
132                scope.clone(),
133                self.config,
134                self.r_event_queue.clone(),
135            );
136            collections.extend(reachability_collections);
137
138            let super::differential::Return {
139                collections: differential_collections,
140            } = super::differential::construct(
141                scope.clone(),
142                self.config,
143                self.d_event_queue.clone(),
144                Rc::clone(&self.shared_state),
145            );
146            collections.extend(differential_collections);
147
148            let super::compute::Return {
149                collections: compute_collections,
150            } = super::compute::construct(
151                scope.clone(),
152                scope.parent.clone(),
153                self.config,
154                self.c_event_queue.clone(),
155                Rc::clone(&self.shared_state),
156            );
157            collections.extend(compute_collections);
158
159            let errs = scope.scoped("logging errors", |scope| {
160                let collection: KeyCollection<_, DataflowError, Diff> =
161                    VecCollection::empty(scope).into();
162                collection
163                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
164                    .trace
165            });
166
167            let traces = collections
168                .into_iter()
169                .map(|(log, collection)| {
170                    let bundle = TraceBundle::new(collection.trace, errs.clone())
171                        .with_drop(collection.token);
172                    (log, bundle)
173                })
174                .collect();
175            traces
176        })
177    }
178
179    /// Construct a new reachability logger for timestamp type `T`.
180    ///
181    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
182    /// Timely naming convention.
183    fn register_reachability_logger<T: ExtractTimestamp>(
184        &self,
185        registry: &mut Registry,
186        index: usize,
187    ) {
188        let logger = self.reachability_logger::<T>(index);
189        let type_name = std::any::type_name::<T>();
190        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
191    }
192
193    /// Register all loggers with the timely worker.
194    ///
195    /// Registers the timely, differential, compute, and reachability loggers.
196    fn register_loggers(&self) {
197        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
198        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
199        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
200
201        let mut register = self.worker.log_register().expect("Logging must be enabled");
202        register.insert_logger("timely", t_logger);
203        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
204        // data because the event link structure is not multi-producer safe.
205        self.register_reachability_logger::<Timestamp>(&mut register, 0);
206        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
207        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
208        register.insert_logger("differential/arrange", d_logger);
209        register.insert_logger("materialize/compute", c_logger.clone());
210
211        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
212    }
213
214    fn simple_logger<CB: ContainerBuilder>(
215        &self,
216        event_queue: EventQueue<CB::Container>,
217    ) -> Logger<CB> {
218        let [link] = event_queue.links;
219        let mut logger = BatchLogger::new(link, self.interval_ms);
220        let activator = event_queue.activator.clone();
221        Logger::new(
222            self.now,
223            self.start_offset,
224            move |time, data: &mut Option<CB::Container>| {
225                if let Some(data) = data.take() {
226                    logger.publish_batch(data);
227                } else if logger.report_progress(*time) {
228                    activator.activate();
229                }
230            },
231        )
232    }
233
234    /// Construct a reachability logger for timestamp type `T`. The index must
235    /// refer to a unique link in the reachability event queue.
236    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
237    where
238        T: ExtractTimestamp,
239    {
240        let link = Rc::clone(&self.r_event_queue.links[index]);
241        let mut logger = BatchLogger::new(link, self.interval_ms);
242        let mut massaged = Vec::new();
243        let mut builder = ColumnBuilder::default();
244        let activator = self.r_event_queue.activator.clone();
245
246        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
247            if let Some(data) = data {
248                // Handle data
249                for (time, event) in data.drain(..) {
250                    match event {
251                        TrackerEvent::SourceUpdate(update) => {
252                            massaged.extend(update.updates.iter().map(
253                                |(node, port, time, diff)| {
254                                    let is_source = true;
255                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
256                                },
257                            ));
258
259                            builder.push_into((time, (update.tracker_id, &massaged)));
260                            massaged.clear();
261                        }
262                        TrackerEvent::TargetUpdate(update) => {
263                            massaged.extend(update.updates.iter().map(
264                                |(node, port, time, diff)| {
265                                    let is_source = false;
266                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
267                                },
268                            ));
269
270                            builder.push_into((time, (update.tracker_id, &massaged)));
271                            massaged.clear();
272                        }
273                    }
274                    while let Some(container) = builder.extract() {
275                        logger.publish_batch(std::mem::take(container));
276                    }
277                }
278            } else {
279                // Handle a flush
280                while let Some(container) = builder.finish() {
281                    logger.publish_batch(std::mem::take(container));
282                }
283
284                if logger.report_progress(*batch_time) {
285                    activator.activate();
286                }
287            }
288        };
289
290        Logger::new(self.now, self.start_offset, action)
291    }
292}
293
294/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
295trait ExtractTimestamp: Clone + 'static {
296    /// Extracts the timestamp from the type.
297    fn extract(&self) -> Timestamp;
298}
299
300impl ExtractTimestamp for Timestamp {
301    fn extract(&self) -> Timestamp {
302        *self
303    }
304}
305
306impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
307    fn extract(&self) -> Timestamp {
308        self.outer
309    }
310}
311
312impl ExtractTimestamp for (Timestamp, Subtime) {
313    fn extract(&self) -> Timestamp {
314        self.0
315    }
316}