mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::Collection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::columnar::Column;
21use mz_timely_util::columnar::builder::ColumnBuilder;
22use mz_timely_util::operator::CollectionExt;
23use timely::communication::Allocate;
24use timely::container::{ContainerBuilder, PushInto};
25use timely::dataflow::Scope;
26use timely::logging::{TimelyEvent, TimelyEventBuilder};
27use timely::logging_core::{Logger, Registry};
28use timely::order::Product;
29use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
30use timely::worker::AsWorker;
31
32use crate::arrangement::manager::TraceBundle;
33use crate::extensions::arrange::{KeyCollection, MzArrange};
34use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
35use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
36use crate::typedefs::{ErrBatcher, ErrBuilder};
37
38/// Initialize logging dataflows.
39///
40/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
41/// retrieving logged records as well as the index of the exporting dataflow.
42pub fn initialize<A: Allocate + 'static>(
43    worker: &mut timely::worker::Worker<A>,
44    config: &LoggingConfig,
45) -> LoggingTraces {
46    let interval_ms = std::cmp::max(1, config.interval.as_millis());
47
48    // Track time relative to the Unix epoch, rather than when the server
49    // started, so that the logging sources can be joined with tables and
50    // other real time sources for semi-sensible results.
51    let now = Instant::now();
52    let start_offset = std::time::SystemTime::now()
53        .duration_since(std::time::SystemTime::UNIX_EPOCH)
54        .expect("Failed to get duration since Unix epoch");
55
56    let mut context = LoggingContext {
57        worker,
58        config,
59        interval_ms,
60        now,
61        start_offset,
62        t_event_queue: EventQueue::new("t"),
63        r_event_queue: EventQueue::new("r"),
64        d_event_queue: EventQueue::new("d"),
65        c_event_queue: EventQueue::new("c"),
66        shared_state: Default::default(),
67    };
68
69    // Depending on whether we should log the creation of the logging dataflows, we register the
70    // loggers with timely either before or after creating them.
71    let dataflow_index = context.worker.next_dataflow_index();
72    let traces = if config.log_logging {
73        context.register_loggers();
74        context.construct_dataflow()
75    } else {
76        let traces = context.construct_dataflow();
77        context.register_loggers();
78        traces
79    };
80
81    let compute_logger = worker.logger_for("materialize/compute").unwrap();
82    LoggingTraces {
83        traces,
84        dataflow_index,
85        compute_logger,
86    }
87}
88
89pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
90
91struct LoggingContext<'a, A: Allocate> {
92    worker: &'a mut timely::worker::Worker<A>,
93    config: &'a LoggingConfig,
94    interval_ms: u128,
95    now: Instant,
96    start_offset: Duration,
97    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
98    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
99    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
100    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
101    shared_state: Rc<RefCell<SharedLoggingState>>,
102}
103
104pub(crate) struct LoggingTraces {
105    /// Exported traces, by log variant.
106    pub traces: BTreeMap<LogVariant, TraceBundle>,
107    /// The index of the dataflow that exports the traces.
108    pub dataflow_index: usize,
109    /// The compute logger.
110    pub compute_logger: super::compute::Logger,
111}
112
113impl<A: Allocate + 'static> LoggingContext<'_, A> {
114    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
115        self.worker.dataflow_named("Dataflow: logging", |scope| {
116            let mut collections = BTreeMap::new();
117
118            let super::timely::Return {
119                collections: timely_collections,
120            } = super::timely::construct(
121                scope.clone(),
122                self.config,
123                self.t_event_queue.clone(),
124                Rc::clone(&self.shared_state),
125            );
126            collections.extend(timely_collections);
127
128            let super::reachability::Return {
129                collections: reachability_collections,
130            } = super::reachability::construct(
131                scope.clone(),
132                self.config,
133                self.r_event_queue.clone(),
134            );
135            collections.extend(reachability_collections);
136
137            let super::differential::Return {
138                collections: differential_collections,
139            } = super::differential::construct(
140                scope.clone(),
141                self.config,
142                self.d_event_queue.clone(),
143                Rc::clone(&self.shared_state),
144            );
145            collections.extend(differential_collections);
146
147            let super::compute::Return {
148                collections: compute_collections,
149            } = super::compute::construct(
150                scope.clone(),
151                scope.parent.clone(),
152                self.config,
153                self.c_event_queue.clone(),
154                Rc::clone(&self.shared_state),
155            );
156            collections.extend(compute_collections);
157
158            let errs = scope.scoped("logging errors", |scope| {
159                let collection: KeyCollection<_, DataflowError, Diff> =
160                    Collection::empty(scope).into();
161                collection
162                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
163                    .trace
164            });
165
166            let traces = collections
167                .into_iter()
168                .map(|(log, collection)| {
169                    let bundle = TraceBundle::new(collection.trace, errs.clone())
170                        .with_drop(collection.token);
171                    (log, bundle)
172                })
173                .collect();
174            traces
175        })
176    }
177
178    /// Construct a new reachability logger for timestamp type `T`.
179    ///
180    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
181    /// Timely naming convention.
182    fn register_reachability_logger<T: ExtractTimestamp>(
183        &self,
184        registry: &mut Registry,
185        index: usize,
186    ) {
187        let logger = self.reachability_logger::<T>(index);
188        let type_name = std::any::type_name::<T>();
189        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
190    }
191
192    /// Register all loggers with the timely worker.
193    ///
194    /// Registers the timely, differential, compute, and reachability loggers.
195    fn register_loggers(&self) {
196        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
197        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
198        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
199
200        let mut register = self.worker.log_register().expect("Logging must be enabled");
201        register.insert_logger("timely", t_logger);
202        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
203        // data because the event link structure is not multi-producer safe.
204        self.register_reachability_logger::<Timestamp>(&mut register, 0);
205        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
206        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
207        register.insert_logger("differential/arrange", d_logger);
208        register.insert_logger("materialize/compute", c_logger.clone());
209
210        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
211    }
212
213    fn simple_logger<CB: ContainerBuilder>(
214        &self,
215        event_queue: EventQueue<CB::Container>,
216    ) -> Logger<CB> {
217        let [link] = event_queue.links;
218        let mut logger = BatchLogger::new(link, self.interval_ms);
219        let activator = event_queue.activator.clone();
220        Logger::new(
221            self.now,
222            self.start_offset,
223            move |time, data: &mut Option<CB::Container>| {
224                if let Some(data) = data.take() {
225                    logger.publish_batch(data);
226                } else if logger.report_progress(*time) {
227                    activator.activate();
228                }
229            },
230        )
231    }
232
233    /// Construct a reachability logger for timestamp type `T`. The index must
234    /// refer to a unique link in the reachability event queue.
235    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
236    where
237        T: ExtractTimestamp,
238    {
239        let link = Rc::clone(&self.r_event_queue.links[index]);
240        let mut logger = BatchLogger::new(link, self.interval_ms);
241        let mut massaged = Vec::new();
242        let mut builder = ColumnBuilder::default();
243        let activator = self.r_event_queue.activator.clone();
244
245        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
246            if let Some(data) = data {
247                // Handle data
248                for (time, event) in data.drain(..) {
249                    match event {
250                        TrackerEvent::SourceUpdate(update) => {
251                            massaged.extend(update.updates.iter().map(
252                                |(node, port, time, diff)| {
253                                    let is_source = true;
254                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
255                                },
256                            ));
257
258                            builder.push_into((time, (update.tracker_id, &massaged)));
259                            massaged.clear();
260                        }
261                        TrackerEvent::TargetUpdate(update) => {
262                            massaged.extend(update.updates.iter().map(
263                                |(node, port, time, diff)| {
264                                    let is_source = false;
265                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
266                                },
267                            ));
268
269                            builder.push_into((time, (update.tracker_id, &massaged)));
270                            massaged.clear();
271                        }
272                    }
273                    while let Some(container) = builder.extract() {
274                        logger.publish_batch(std::mem::take(container));
275                    }
276                }
277            } else {
278                // Handle a flush
279                while let Some(container) = builder.finish() {
280                    logger.publish_batch(std::mem::take(container));
281                }
282
283                if logger.report_progress(*batch_time) {
284                    activator.activate();
285                }
286            }
287        };
288
289        Logger::new(self.now, self.start_offset, action)
290    }
291}
292
293/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
294trait ExtractTimestamp: Clone + 'static {
295    /// Extracts the timestamp from the type.
296    fn extract(&self) -> Timestamp;
297}
298
299impl ExtractTimestamp for Timestamp {
300    fn extract(&self) -> Timestamp {
301        *self
302    }
303}
304
305impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
306    fn extract(&self) -> Timestamp {
307        self.outer
308    }
309}
310
311impl ExtractTimestamp for (Timestamp, Subtime) {
312    fn extract(&self) -> Timestamp {
313        self.0
314    }
315}