mz_compute/logging/
initialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Initialization of logging dataflows.
7
8use std::cell::RefCell;
9use std::collections::BTreeMap;
10use std::rc::Rc;
11use std::time::{Duration, Instant};
12
13use differential_dataflow::Collection;
14use differential_dataflow::dynamic::pointstamp::PointStamp;
15use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder};
16use mz_compute_client::logging::{LogVariant, LoggingConfig};
17use mz_repr::{Diff, Timestamp};
18use mz_storage_operators::persist_source::Subtime;
19use mz_storage_types::errors::DataflowError;
20use mz_timely_util::containers::{Column, ColumnBuilder};
21use mz_timely_util::operator::CollectionExt;
22use timely::communication::Allocate;
23use timely::container::{ContainerBuilder, PushInto};
24use timely::dataflow::Scope;
25use timely::logging::{TimelyEvent, TimelyEventBuilder};
26use timely::logging_core::{Logger, Registry};
27use timely::order::Product;
28use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder};
29
30use crate::arrangement::manager::TraceBundle;
31use crate::extensions::arrange::{KeyCollection, MzArrange};
32use crate::logging::compute::{ComputeEvent, ComputeEventBuilder};
33use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
34use crate::typedefs::{ErrBatcher, ErrBuilder};
35
36/// Initialize logging dataflows.
37///
38/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
39/// retrieving logged records as well as the index of the exporting dataflow.
40pub fn initialize<A: Allocate + 'static>(
41    worker: &mut timely::worker::Worker<A>,
42    config: &LoggingConfig,
43) -> LoggingTraces {
44    let interval_ms = std::cmp::max(1, config.interval.as_millis());
45
46    // Track time relative to the Unix epoch, rather than when the server
47    // started, so that the logging sources can be joined with tables and
48    // other real time sources for semi-sensible results.
49    let now = Instant::now();
50    let start_offset = std::time::SystemTime::now()
51        .duration_since(std::time::SystemTime::UNIX_EPOCH)
52        .expect("Failed to get duration since Unix epoch");
53
54    let mut context = LoggingContext {
55        worker,
56        config,
57        interval_ms,
58        now,
59        start_offset,
60        t_event_queue: EventQueue::new("t"),
61        r_event_queue: EventQueue::new("r"),
62        d_event_queue: EventQueue::new("d"),
63        c_event_queue: EventQueue::new("c"),
64        shared_state: Default::default(),
65    };
66
67    // Depending on whether we should log the creation of the logging dataflows, we register the
68    // loggers with timely either before or after creating them.
69    let dataflow_index = context.worker.next_dataflow_index();
70    let traces = if config.log_logging {
71        context.register_loggers();
72        context.construct_dataflow()
73    } else {
74        let traces = context.construct_dataflow();
75        context.register_loggers();
76        traces
77    };
78
79    let compute_logger = worker.log_register().get("materialize/compute").unwrap();
80    LoggingTraces {
81        traces,
82        dataflow_index,
83        compute_logger,
84    }
85}
86
87pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>);
88
89struct LoggingContext<'a, A: Allocate> {
90    worker: &'a mut timely::worker::Worker<A>,
91    config: &'a LoggingConfig,
92    interval_ms: u128,
93    now: Instant,
94    start_offset: Duration,
95    t_event_queue: EventQueue<Vec<(Duration, TimelyEvent)>>,
96    r_event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
97    d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
98    c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
99    shared_state: Rc<RefCell<SharedLoggingState>>,
100}
101
102pub(crate) struct LoggingTraces {
103    /// Exported traces, by log variant.
104    pub traces: BTreeMap<LogVariant, TraceBundle>,
105    /// The index of the dataflow that exports the traces.
106    pub dataflow_index: usize,
107    /// The compute logger.
108    pub compute_logger: super::compute::Logger,
109}
110
111impl<A: Allocate + 'static> LoggingContext<'_, A> {
112    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
113        self.worker.dataflow_named("Dataflow: logging", |scope| {
114            let mut collections = BTreeMap::new();
115
116            let super::timely::Return {
117                collections: timely_collections,
118            } = super::timely::construct(
119                scope.clone(),
120                self.config,
121                self.t_event_queue.clone(),
122                Rc::clone(&self.shared_state),
123            );
124            collections.extend(timely_collections);
125
126            let super::reachability::Return {
127                collections: reachability_collections,
128            } = super::reachability::construct(
129                scope.clone(),
130                self.config,
131                self.r_event_queue.clone(),
132            );
133            collections.extend(reachability_collections);
134
135            let super::differential::Return {
136                collections: differential_collections,
137            } = super::differential::construct(
138                scope.clone(),
139                self.config,
140                self.d_event_queue.clone(),
141                Rc::clone(&self.shared_state),
142            );
143            collections.extend(differential_collections);
144
145            let super::compute::Return {
146                collections: compute_collections,
147            } = super::compute::construct(
148                scope.clone(),
149                scope.parent.clone(),
150                self.config,
151                self.c_event_queue.clone(),
152                Rc::clone(&self.shared_state),
153            );
154            collections.extend(compute_collections);
155
156            let errs = scope.scoped("logging errors", |scope| {
157                let collection: KeyCollection<_, DataflowError, Diff> =
158                    Collection::empty(scope).into();
159                collection
160                    .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange logging err")
161                    .trace
162            });
163
164            let traces = collections
165                .into_iter()
166                .map(|(log, collection)| {
167                    let bundle = TraceBundle::new(collection.trace, errs.clone())
168                        .with_drop(collection.token);
169                    (log, bundle)
170                })
171                .collect();
172            traces
173        })
174    }
175
176    /// Construct a new reachability logger for timestamp type `T`.
177    ///
178    /// Inserts a logger with the name `timely/reachability/{type_name::<T>()}`, following
179    /// Timely naming convention.
180    fn register_reachability_logger<T: ExtractTimestamp>(
181        &self,
182        registry: &mut Registry,
183        index: usize,
184    ) {
185        let logger = self.reachability_logger::<T>(index);
186        let type_name = std::any::type_name::<T>();
187        registry.insert_logger(&format!("timely/reachability/{type_name}"), logger);
188    }
189
190    /// Register all loggers with the timely worker.
191    ///
192    /// Registers the timely, differential, compute, and reachability loggers.
193    fn register_loggers(&self) {
194        let t_logger = self.simple_logger::<TimelyEventBuilder>(self.t_event_queue.clone());
195        let d_logger = self.simple_logger::<DifferentialEventBuilder>(self.d_event_queue.clone());
196        let c_logger = self.simple_logger::<ComputeEventBuilder>(self.c_event_queue.clone());
197
198        let mut register = self.worker.log_register();
199        register.insert_logger("timely", t_logger);
200        // Note that each reachability logger has a unique index, this is crucial to avoid dropping
201        // data because the event link structure is not multi-producer safe.
202        self.register_reachability_logger::<Timestamp>(&mut register, 0);
203        self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
204        self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
205        register.insert_logger("differential/arrange", d_logger);
206        register.insert_logger("materialize/compute", c_logger.clone());
207
208        self.shared_state.borrow_mut().compute_logger = Some(c_logger);
209    }
210
211    fn simple_logger<CB: ContainerBuilder>(
212        &self,
213        event_queue: EventQueue<CB::Container>,
214    ) -> Logger<CB> {
215        let [link] = event_queue.links;
216        let mut logger = BatchLogger::new(link, self.interval_ms);
217        let activator = event_queue.activator.clone();
218        Logger::new(
219            self.now,
220            self.start_offset,
221            move |time, data: &mut Option<CB::Container>| {
222                if let Some(data) = data.take() {
223                    logger.publish_batch(data);
224                } else if logger.report_progress(*time) {
225                    activator.activate();
226                }
227            },
228        )
229    }
230
231    /// Construct a reachability logger for timestamp type `T`. The index must
232    /// refer to a unique link in the reachability event queue.
233    fn reachability_logger<T>(&self, index: usize) -> Logger<TrackerEventBuilder<T>>
234    where
235        T: ExtractTimestamp,
236    {
237        let link = Rc::clone(&self.r_event_queue.links[index]);
238        let mut logger = BatchLogger::new(link, self.interval_ms);
239        let mut massaged = Vec::new();
240        let mut builder = ColumnBuilder::default();
241        let activator = self.r_event_queue.activator.clone();
242
243        let action = move |batch_time: &Duration, data: &mut Option<Vec<_>>| {
244            if let Some(data) = data {
245                // Handle data
246                for (time, event) in data.drain(..) {
247                    match event {
248                        TrackerEvent::SourceUpdate(update) => {
249                            massaged.extend(update.updates.iter().map(
250                                |(node, port, time, diff)| {
251                                    let is_source = true;
252                                    (*node, *port, is_source, T::extract(time), Diff::from(*diff))
253                                },
254                            ));
255
256                            builder.push_into((time, (update.tracker_id, &massaged)));
257                            massaged.clear();
258                        }
259                        TrackerEvent::TargetUpdate(update) => {
260                            massaged.extend(update.updates.iter().map(
261                                |(node, port, time, diff)| {
262                                    let is_source = false;
263                                    (*node, *port, is_source, time.extract(), Diff::from(*diff))
264                                },
265                            ));
266
267                            builder.push_into((time, (update.tracker_id, &massaged)));
268                            massaged.clear();
269                        }
270                    }
271                    while let Some(container) = builder.extract() {
272                        logger.publish_batch(std::mem::take(container));
273                    }
274                }
275            } else {
276                // Handle a flush
277                while let Some(container) = builder.finish() {
278                    logger.publish_batch(std::mem::take(container));
279                }
280
281                if logger.report_progress(*batch_time) {
282                    activator.activate();
283                }
284            }
285        };
286
287        Logger::new(self.now, self.start_offset, action)
288    }
289}
290
291/// Helper trait to extract a timestamp from various types of timestamp used in rendering.
292trait ExtractTimestamp: Clone + 'static {
293    /// Extracts the timestamp from the type.
294    fn extract(&self) -> Timestamp;
295}
296
297impl ExtractTimestamp for Timestamp {
298    fn extract(&self) -> Timestamp {
299        *self
300    }
301}
302
303impl ExtractTimestamp for Product<Timestamp, PointStamp<u64>> {
304    fn extract(&self) -> Timestamp {
305        self.outer
306    }
307}
308
309impl ExtractTimestamp for (Timestamp, Subtime) {
310    fn extract(&self) -> Timestamp {
311        self.0
312    }
313}