mz_repr/explain/
tracing.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tracing utilities for explainable plans.
11
12use std::fmt::{Debug, Display};
13use std::sync::Mutex;
14
15use mz_sql_parser::ast::NamedPlan;
16use tracing::{Level, span, subscriber};
17use tracing_core::{Interest, Metadata};
18use tracing_subscriber::{field, layer};
19
20use crate::explain::UsedIndexes;
21use smallvec::SmallVec;
22
23/// A tracing layer used to accumulate a sequence of explainable plans.
24#[allow(missing_debug_implementations)]
25pub struct PlanTrace<T> {
26    /// A specific concrete path to find in this trace. If present,
27    /// [`PlanTrace::push`] will only collect traces if the current path is a
28    /// prefix of find.
29    filter: Option<SmallVec<[NamedPlan; 4]>>,
30    /// A path of segments identifying the spans in the current ancestor-or-self
31    /// chain. The current path is used when accumulating new `entries`.
32    path: Mutex<String>,
33    /// The first time when entering a span (None no span was entered yet).
34    start: Mutex<Option<std::time::Instant>>,
35    /// A path of times at which the spans in the current ancestor-or-self chain
36    /// were started. The duration since the last time is used when accumulating
37    /// new `entries`.
38    times: Mutex<Vec<std::time::Instant>>,
39    /// A sequence of entries associating for a specific plan type `T`.
40    entries: Mutex<Vec<TraceEntry<T>>>,
41}
42
43/// A struct created as a reflection of a [`trace_plan`] call.
44#[derive(Clone, Debug)]
45pub struct TraceEntry<T> {
46    /// The instant at which an entry was created.
47    ///
48    /// Used to impose global sorting when merging multiple `TraceEntry`
49    /// arrays in a single array.
50    pub instant: std::time::Instant,
51    /// The duration since the start of the enclosing span.
52    pub span_duration: std::time::Duration,
53    /// The duration since the start of the top-level span seen by the `PlanTrace`.
54    pub full_duration: std::time::Duration,
55    /// Ancestor chain of span names (root is first, parent is last).
56    pub path: String,
57    /// The plan produced this step.
58    pub plan: T,
59}
60
61/// Trace a fragment of type `T` to be emitted as part of an `EXPLAIN OPTIMIZER
62/// TRACE` output.
63///
64/// For best compatibility with the existing UI (which at the moment is the only
65/// sane way to look at such `EXPLAIN` traces), code instrumentation should
66/// adhere to the following constraints:
67///
68/// 1.  The plan type should be listed in the layers created in the
69///     `OptimizerTrace` constructor.
70/// 2.  Each `trace_plan` should be unique within it's enclosing span and should
71///     represent the result of the stage idenified by that span. In particular,
72///     this means that functions that call `trace_plan` more than once need to
73///     construct ad-hoc spans (see the iteration spans in the `Fixpoint`
74///     transform for example).
75///
76/// As a consequence of the second constraint, a sequence of paths such as
77/// ```text
78/// optimizer.foo.bar
79/// optimizer.foo.baz
80/// ```
81/// is not well-formed as it is missing the results of the prefix paths at the
82/// end:
83/// ```text
84/// optimizer.foo.bar
85/// optimizer.foo.baz
86/// optimizer.foo
87/// optimizer
88/// ```
89///
90/// Also, note that full paths can be repeated within a pipeline, but adjacent
91/// duplicates are interpreted as separete invocations. For example, the
92/// sub-sequence
93/// ```text
94/// ... // preceding stages
95/// optimizer.foo.bar // 1st call
96/// optimizer.foo.bar // 2nd call
97/// ... // following stages
98/// ```
99/// will be rendered by the UI as the following tree structure.
100/// ```text
101/// optimizer
102///   ... // following stages
103///   foo
104///     bar // 2nd call
105///     bar // 1st call
106///   ... // preceding stages
107/// ```
108pub fn trace_plan<T: Clone + 'static>(plan: &T) {
109    tracing::Span::current().with_subscriber(|(_id, subscriber)| {
110        if let Some(trace) = subscriber.downcast_ref::<PlanTrace<T>>() {
111            trace.push(plan)
112        }
113    });
114}
115
116/// Create a span identified by `segment` and trace `plan` in it.
117///
118/// This primitive is useful for instrumentic code, see this commit[^example]
119/// for an example.
120///
121/// [^example]: <https://github.com/MaterializeInc/materialize/commit/2ce93229>
122pub fn dbg_plan<S: Display, T: Clone + 'static>(segment: S, plan: &T) {
123    span!(target: "optimizer", Level::DEBUG, "segment", path.segment = %segment).in_scope(|| {
124        trace_plan(plan);
125    });
126}
127
128/// Create a span identified by `segment` and trace `misc` in it.
129///
130/// This primitive is useful for instrumentic code, see this commit[^example]
131/// for an example.
132///
133/// [^example]: <https://github.com/MaterializeInc/materialize/commit/2ce93229>
134pub fn dbg_misc<S: Display, T: Display>(segment: S, misc: T) {
135    span!(target: "optimizer", Level::DEBUG, "segment", path.segment = %segment).in_scope(|| {
136        trace_plan(&misc.to_string());
137    });
138}
139
140/// A helper struct for wrapping entries that represent the invocation context
141/// of a function or method call into an object that renders as their hash.
142///
143/// Useful when constructing path segments when instrumenting a function trace
144/// with additional debugging information.
145#[allow(missing_debug_implementations)]
146pub struct ContextHash(u64);
147
148impl ContextHash {
149    pub fn of<T: std::hash::Hash>(t: T) -> Self {
150        use std::collections::hash_map::DefaultHasher;
151        use std::hash::Hasher;
152
153        let mut h = DefaultHasher::new();
154        t.hash(&mut h);
155        ContextHash(h.finish())
156    }
157}
158
159impl Display for ContextHash {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "{:x}", self.0 & 0xFFFFFFFu64) // show last 28 bits
162    }
163}
164
165/// A [`layer::Layer`] implementation for [`PlanTrace`].
166///
167/// Populates the `data` wrapped by the [`PlanTrace`] instance with
168/// [`TraceEntry`] values, one for each span with attached plan in its
169/// extensions map.
170impl<S, T> layer::Layer<S> for PlanTrace<T>
171where
172    S: subscriber::Subscriber,
173    T: 'static,
174{
175    fn on_new_span(
176        &self,
177        attrs: &span::Attributes<'_>,
178        _id: &span::Id,
179        _ctx: layer::Context<'_, S>,
180    ) {
181        // add segment to path
182        let mut path = self.path.lock().expect("path shouldn't be poisoned");
183        let segment = attrs.get_str("path.segment");
184        let segment = segment.unwrap_or_else(|| attrs.metadata().name().to_string());
185        if !path.is_empty() {
186            path.push('/');
187        }
188        path.push_str(segment.as_str());
189    }
190
191    fn on_enter(&self, _id: &span::Id, _ctx: layer::Context<'_, S>) {
192        let now = std::time::Instant::now();
193        // set start value on first ever on_enter
194        let mut start = self.start.lock().expect("start shouldn't be poisoned");
195        start.get_or_insert(now);
196        // push to time stack
197        let mut times = self.times.lock().expect("times shouldn't be poisoned");
198        times.push(now);
199    }
200
201    fn on_exit(&self, _id: &span::Id, _ctx: layer::Context<'_, S>) {
202        // truncate last segment from path
203        let mut path = self.path.lock().expect("path shouldn't be poisoned");
204        let new_len = path.rfind('/').unwrap_or(0);
205        path.truncate(new_len);
206        // pop from time stack
207        let mut times = self.times.lock().expect("times shouldn't be poisoned");
208        times.pop();
209    }
210}
211
212impl<S, T> layer::Filter<S> for PlanTrace<T>
213where
214    S: subscriber::Subscriber,
215    T: 'static + Clone,
216{
217    fn enabled(&self, meta: &Metadata<'_>, _cx: &layer::Context<'_, S>) -> bool {
218        self.is_enabled(meta)
219    }
220
221    fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
222        if self.is_enabled(meta) {
223            Interest::always()
224        } else {
225            Interest::never()
226        }
227    }
228}
229
230impl<T: 'static + Clone> PlanTrace<T> {
231    /// Create a new trace for plans of type `T` that will only accumulate
232    /// [`TraceEntry`] instances along the prefix of the given `path`.
233    pub fn new(filter: Option<SmallVec<[NamedPlan; 4]>>) -> Self {
234        Self {
235            filter,
236            path: Mutex::new(String::with_capacity(256)),
237            start: Mutex::new(None),
238            times: Mutex::new(Default::default()),
239            entries: Mutex::new(Default::default()),
240        }
241    }
242
243    /// Check if a subscriber layer of this kind will be interested in tracing
244    /// spans and events with the given metadata.
245    fn is_enabled(&self, meta: &Metadata<'_>) -> bool {
246        meta.is_span() && meta.target() == "optimizer"
247    }
248
249    /// Drain the trace data collected so far.
250    ///
251    /// Note that this method will mutate the internal state of the enclosing
252    /// [`PlanTrace`] even though its receiver is not `&mut self`. This quirk is
253    /// required because the tracing `Dispatch` does not have `downcast_mut` method.
254    pub fn drain_as_vec(&self) -> Vec<TraceEntry<T>> {
255        let mut entries = self.entries.lock().expect("entries shouldn't be poisoned");
256        entries.split_off(0)
257    }
258
259    /// Retrieve the trace data collected so far while leaving it in place.
260    pub fn collect_as_vec(&self) -> Vec<TraceEntry<T>> {
261        let entries = self.entries.lock().expect("entries shouldn't be poisoned");
262        (*entries).clone()
263    }
264
265    /// Find and return a clone of the [`TraceEntry`] for the given `path`.
266    pub fn find(&self, path: &str) -> Option<TraceEntry<T>>
267    where
268        T: Clone,
269    {
270        let entries = self.entries.lock().expect("entries shouldn't be poisoned");
271        entries.iter().find(|entry| entry.path == path).cloned()
272    }
273
274    /// Push a trace entry for the given `plan` to the current trace.
275    ///
276    /// This is a noop if
277    /// 1. the call is within a context without an enclosing span, or if
278    /// 2. [`PlanTrace::filter`] is set not equal to [`PlanTrace::current_path`].
279    fn push(&self, plan: &T)
280    where
281        T: Clone,
282    {
283        if let Some(current_path) = self.current_path() {
284            let times = self.times.lock().expect("times shouldn't be poisoned");
285            let start = self.start.lock().expect("start shouldn't is poisoned");
286            if let (Some(full_start), Some(span_start)) = (start.as_ref(), times.last()) {
287                let mut entries = self.entries.lock().expect("entries shouldn't be poisoned");
288                let time = std::time::Instant::now();
289                entries.push(TraceEntry {
290                    instant: time,
291                    span_duration: time.duration_since(*span_start),
292                    full_duration: time.duration_since(*full_start),
293                    path: current_path,
294                    plan: plan.clone(),
295                });
296            }
297        }
298    }
299
300    /// Helper method: get a copy of the current path.
301    ///
302    /// If [`PlanTrace::filter`] is set, this will also check the current path
303    /// against the `find` entry and return `None` if the two differ.
304    fn current_path(&self) -> Option<String> {
305        let path = self.path.lock().expect("path shouldn't be poisoned");
306        let path = path.as_str();
307        match self.filter.as_ref() {
308            Some(named_paths) => {
309                if named_paths.iter().any(|named| path == named.path()) {
310                    Some(path.to_owned())
311                } else {
312                    None
313                }
314            }
315            None => Some(path.to_owned()),
316        }
317    }
318}
319
320impl PlanTrace<UsedIndexes> {
321    /// Get the [`UsedIndexes`] corresponding to the given `plan_path`.
322    ///
323    /// Note that the path under which a `UsedIndexes` entry is traced might
324    /// differ from the path of the `plan_path` of the plan that needs it.
325    pub fn used_indexes_for(&self, plan_path: &str) -> UsedIndexes {
326        // Compute the path from which we are going to lookup the `UsedIndexes`
327        // instance from the requested path.
328        let path = match NamedPlan::of_path(plan_path) {
329            Some(NamedPlan::Global) => Some(NamedPlan::Global),
330            Some(NamedPlan::Physical) => Some(NamedPlan::Global),
331            Some(NamedPlan::FastPath) => Some(NamedPlan::FastPath),
332            _ => None,
333        };
334        // Find the `TraceEntry` wrapping the `UsedIndexes` instance.
335        let entry = match path {
336            Some(path) => self.find(path.path()),
337            None => None,
338        };
339        // Either return the `UsedIndexes` wrapped by the found entry or a
340        // default `UsedIndexes` instance if such entry was not found.
341        entry.map_or(Default::default(), |e| e.plan)
342    }
343}
344
345/// Helper trait used to extract attributes of type `&'static str`.
346trait GetStr {
347    fn get_str(&self, key: &'static str) -> Option<String>;
348}
349
350impl<'a> GetStr for span::Attributes<'a> {
351    fn get_str(&self, key: &'static str) -> Option<String> {
352        let mut extract_str = ExtractStr::new(key);
353        self.record(&mut extract_str);
354        extract_str.val()
355    }
356}
357
358/// Helper struct that implements `field::Visit` and is used in the
359/// `GetStr::get_str` implementation for `span::Attributes`.
360struct ExtractStr {
361    key: &'static str,
362    val: Option<String>,
363}
364
365impl ExtractStr {
366    fn new(key: &'static str) -> Self {
367        Self { key, val: None }
368    }
369
370    fn val(self) -> Option<String> {
371        self.val
372    }
373}
374
375impl field::Visit for ExtractStr {
376    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
377        if field.name() == self.key {
378            self.val = Some(value.to_string())
379        }
380    }
381
382    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
383        if field.name() == self.key {
384            self.val = Some(format!("{value:?}"))
385        }
386    }
387}
388
389#[cfg(test)]
390mod test {
391    use mz_ore::instrument;
392    use tracing::dispatcher;
393    use tracing_subscriber::prelude::*;
394
395    use super::{PlanTrace, trace_plan};
396
397    #[mz_ore::test]
398    fn test_optimizer_trace() {
399        let subscriber = tracing_subscriber::registry().with(Some(PlanTrace::<String>::new(None)));
400        let dispatch = dispatcher::Dispatch::new(subscriber);
401
402        dispatcher::with_default(&dispatch, || {
403            optimize();
404        });
405
406        if let Some(trace) = dispatch.downcast_ref::<PlanTrace<String>>() {
407            let trace = trace.drain_as_vec();
408            assert_eq!(trace.len(), 5);
409            for (i, entry) in trace.into_iter().enumerate() {
410                let path = entry.path;
411                match i {
412                    0 => {
413                        assert_eq!(path, "optimize");
414                    }
415                    1 => {
416                        assert_eq!(path, "optimize/logical/my_optimization");
417                    }
418                    2 => {
419                        assert_eq!(path, "optimize/logical");
420                    }
421                    3 => {
422                        assert_eq!(path, "optimize/physical");
423                    }
424                    4 => {
425                        assert_eq!(path, "optimize");
426                    }
427                    _ => (),
428                }
429            }
430        }
431    }
432
433    #[instrument(level = "info")]
434    fn optimize() {
435        let mut plan = constant_plan(42);
436        trace_plan(&plan);
437        logical_optimizer(&mut plan);
438        physical_optimizer(&mut plan);
439        trace_plan(&plan);
440    }
441
442    #[instrument(level = "info", name = "logical")]
443    fn logical_optimizer(plan: &mut String) {
444        some_optimization(plan);
445        let _ = plan.replace("RawPlan", "LogicalPlan");
446        trace_plan(plan);
447    }
448
449    #[instrument(level = "info", name = "physical")]
450    fn physical_optimizer(plan: &mut String) {
451        let _ = plan.replace("LogicalPlan", "PhysicalPlan");
452        trace_plan(plan);
453    }
454
455    #[mz_ore::instrument(level = "debug", fields(path.segment ="my_optimization"))]
456    fn some_optimization(plan: &mut String) {
457        let _ = plan.replace("42", "47");
458        trace_plan(plan);
459    }
460
461    fn constant_plan(i: usize) -> String {
462        format!("RawPlan(#{})", i)
463    }
464}