mz_adapter/explain/
optimizer_trace.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tracing utilities for explainable plans.
11
12use std::fmt::{Debug, Display};
13use std::sync::Arc;
14
15use mz_catalog::memory::objects::Cluster;
16use mz_compute_types::dataflows::DataflowDescription;
17use mz_compute_types::plan::Plan;
18use mz_expr::explain::ExplainContext;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::collections::CollectionExt;
21use mz_repr::explain::tracing::{PlanTrace, TraceEntry};
22use mz_repr::explain::{
23    Explain, ExplainConfig, ExplainError, ExplainFormat, ExprHumanizer, UsedIndexes,
24};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Datum, Row};
27use mz_sql::ast::display::AstDisplay;
28use mz_sql::plan::{self, HirRelationExpr, HirScalarExpr};
29use mz_sql_parser::ast::{ExplainStage, NamedPlan};
30use mz_transform::dataflow::DataflowMetainfo;
31use mz_transform::notice::RawOptimizerNotice;
32use smallvec::SmallVec;
33use tracing::dispatcher;
34use tracing_subscriber::prelude::*;
35
36use crate::AdapterError;
37use crate::coord::peek::FastPathPlan;
38use crate::explain::Explainable;
39use crate::explain::insights::{self, PlanInsightsContext};
40
41/// Provides functionality for tracing plans generated by the execution of an
42/// optimization pipeline.
43///
44/// Internally, this will create a layered [`tracing::subscriber::Subscriber`]
45/// consisting of one layer for each supported plan type `T` and wrap it into a
46/// [`dispatcher::Dispatch`] instance.
47///
48/// Use [`OptimizerTrace::as_guard`] to activate the [`dispatcher::Dispatch`]
49/// and collect a trace.
50///
51/// Use [`OptimizerTrace::into_rows`] or [`OptimizerTrace::into_plan_insights`]
52/// to cleanly destroy the [`OptimizerTrace`] instance and obtain the tracing
53/// result.
54pub struct OptimizerTrace(dispatcher::Dispatch);
55
56impl std::fmt::Debug for OptimizerTrace {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_tuple("OptimizerTrace").finish() // Skip the dispatch field
59    }
60}
61
62impl OptimizerTrace {
63    /// Create a new [`OptimizerTrace`].
64    ///
65    /// The instance will only accumulate [`TraceEntry`] instances along
66    /// the prefix of the given `path` if `path` is present, or it will
67    /// accumulate all [`TraceEntry`] instances otherwise.
68    pub fn new(filter: Option<SmallVec<[NamedPlan; 4]>>) -> OptimizerTrace {
69        let filter = || filter.clone();
70        if let Some(global_subscriber) = mz_ore::tracing::GLOBAL_SUBSCRIBER.get() {
71            let subscriber = Arc::clone(global_subscriber)
72                // Collect `explain_plan` types that are not used in the regular explain
73                // path, but are useful when instrumenting code for debugging purposes.
74                .with(PlanTrace::<String>::new(filter()))
75                .with(PlanTrace::<HirScalarExpr>::new(filter()))
76                .with(PlanTrace::<MirScalarExpr>::new(filter()))
77                // Collect `explain_plan` types that are used in the regular explain path.
78                .with(PlanTrace::<HirRelationExpr>::new(filter()))
79                .with(PlanTrace::<MirRelationExpr>::new(filter()))
80                .with(PlanTrace::<DataflowDescription<OptimizedMirRelationExpr>>::new(filter()))
81                .with(PlanTrace::<DataflowDescription<Plan>>::new(filter()))
82                // Don't filter for FastPathPlan entries (there can be at most one).
83                .with(PlanTrace::<FastPathPlan>::new(None))
84                .with(PlanTrace::<UsedIndexes>::new(None))
85                // All optimizer spans are `TRACE` and up. Technically this slows down the system
86                // by skipping the tracing fast path DURING an `EXPLAIN`, but we haven't
87                // seen this be a problem (yet).
88                //
89                // Note that we typically do NOT use global filters like this, preferring
90                // per-layer ones, but we are forced to because per-layer filters
91                // require an `Arc<dyn Subscriber + LookupSpan>`, which isn't a trait
92                // exposed by tracing, for now.
93                .with(tracing::level_filters::LevelFilter::TRACE);
94
95            OptimizerTrace(dispatcher::Dispatch::new(subscriber))
96        } else {
97            // This codepath should not be taken except in tests, and is left here as a
98            // convenience.
99            let subscriber = tracing_subscriber::registry()
100                .with(PlanTrace::<String>::new(filter()))
101                .with(PlanTrace::<HirScalarExpr>::new(filter()))
102                .with(PlanTrace::<MirScalarExpr>::new(filter()))
103                .with(PlanTrace::<HirRelationExpr>::new(filter()))
104                .with(PlanTrace::<MirRelationExpr>::new(filter()))
105                .with(PlanTrace::<DataflowDescription<OptimizedMirRelationExpr>>::new(filter()))
106                .with(PlanTrace::<DataflowDescription<Plan>>::new(filter()))
107                .with(PlanTrace::<FastPathPlan>::new(None))
108                .with(PlanTrace::<UsedIndexes>::new(None))
109                .with(tracing::level_filters::LevelFilter::TRACE);
110
111            OptimizerTrace(dispatcher::Dispatch::new(subscriber))
112        }
113    }
114
115    /// Enter this [`OptimizerTrace`]'s tracing [`dispatcher::Dispatch`], returning a guard.
116    ///
117    /// Linked to this [`OptimizerTrace`] with a lifetime to ensure
118    /// [`OptimizerTrace::into_rows`] isn't called until the guard is dropped.
119    pub fn as_guard<'s>(&'s self) -> DispatchGuard<'s> {
120        let dispatch = self.0.clone();
121        let tracing_guard = tracing::dispatcher::set_default(&dispatch);
122
123        DispatchGuard {
124            _tracing_guard: tracing_guard,
125            _life: std::marker::PhantomData,
126        }
127    }
128
129    /// Convert the optimizer trace into a vector or rows that can be returned
130    /// to the client.
131    pub async fn into_rows(
132        self,
133        format: ExplainFormat,
134        config: &ExplainConfig,
135        features: &OptimizerFeatures,
136        humanizer: &dyn ExprHumanizer,
137        row_set_finishing: Option<RowSetFinishing>,
138        target_cluster: Option<&Cluster>,
139        dataflow_metainfo: DataflowMetainfo,
140        stage: ExplainStage,
141        stmt_kind: plan::ExplaineeStatementKind,
142        insights_ctx: Option<Box<PlanInsightsContext>>,
143    ) -> Result<Vec<Row>, AdapterError> {
144        let collect_all = |format| {
145            self.collect_all(
146                format,
147                config,
148                features,
149                humanizer,
150                row_set_finishing.clone(),
151                target_cluster.map(|c| c.name.as_str()),
152                dataflow_metainfo.clone(),
153            )
154        };
155
156        let rows = match stage {
157            ExplainStage::Trace => {
158                // For the `Trace` (pseudo-)stage, return the entire trace as
159                // triples of (time, path, plan) values.
160                let rows = collect_all(format)?
161                    .0
162                    .into_iter()
163                    .map(|entry| {
164                        // The trace would have to take over 584 years to overflow a u64.
165                        let span_duration = u64::try_from(entry.span_duration.as_nanos());
166                        Row::pack_slice(&[
167                            Datum::from(span_duration.unwrap_or(u64::MAX)),
168                            Datum::from(entry.path.as_str()),
169                            Datum::from(entry.plan.as_str()),
170                        ])
171                    })
172                    .collect();
173                rows
174            }
175            ExplainStage::PlanInsights => {
176                if format != ExplainFormat::Json {
177                    coord_bail!("EXPLAIN PLAN INSIGHTS only supports JSON format");
178                }
179
180                let mut text_traces = collect_all(ExplainFormat::Text)?;
181                let mut json_traces = collect_all(ExplainFormat::Json)?;
182                let global_plan = self.collect_global_plan();
183                let fast_path_plan = self.collect_fast_path_plan();
184
185                // Plans can be very large and exhaust the json serialization recursion limit.
186                // Convert those into error objects.
187                let mut get_plan = |name: NamedPlan| {
188                    let text_plan = match text_traces.remove(name.path()) {
189                        None => "<unknown>".into(),
190                        Some(entry) => entry.plan,
191                    };
192                    let json_plan = match json_traces.remove(name.path()) {
193                        None => serde_json::Value::Null,
194                        Some(entry) => serde_json::from_str(&entry.plan).unwrap_or_else(|e| {
195                            serde_json::json!({
196                                "error": format!("internal error: {e}"),
197                            })
198                        }),
199                    };
200                    serde_json::json!({
201                        "text": text_plan,
202                        "json": json_plan,
203                    })
204                };
205
206                let is_fast_path = fast_path_plan.is_some();
207                let mut plan_insights =
208                    insights::plan_insights(humanizer, global_plan, fast_path_plan);
209                let mut redacted_sql = None;
210                if let Some(insights_ctx) = insights_ctx {
211                    redacted_sql = insights_ctx
212                        .stmt
213                        .as_ref()
214                        .map(|s| Some(s.to_ast_string_redacted()));
215                    if let (Some(plan_insights), false) = (plan_insights.as_mut(), is_fast_path) {
216                        if insights_ctx.enable_re_optimize {
217                            plan_insights
218                                .compute_fast_path_clusters(humanizer, insights_ctx)
219                                .await;
220                        }
221                    }
222                }
223                let cluster = target_cluster.map(|c| {
224                    serde_json::json!({
225                        "name": c.name,
226                        "id": c.id,
227                    })
228                });
229
230                let output = serde_json::json!({
231                    "plans": {
232                        "raw": get_plan(NamedPlan::Raw),
233                        "optimized": {
234                            "global": get_plan(NamedPlan::Global),
235                            "fast_path": get_plan(NamedPlan::FastPath),
236                        }
237                    },
238                    "insights": plan_insights,
239                    "cluster": cluster,
240                    "redacted_sql": redacted_sql,
241                });
242                let output = serde_json::to_string_pretty(&output).expect("JSON string");
243                vec![Row::pack_slice(&[Datum::from(output.as_str())])]
244            }
245            _ => {
246                // For everything else, return the plan for the stage identified
247                // by the corresponding path.
248
249                let path = stage
250                    .paths()
251                    .map(|path| path.into_element().path())
252                    .ok_or_else(|| {
253                        AdapterError::Internal("explain stage unexpectedly missing path".into())
254                    })?;
255                let mut traces = collect_all(format)?;
256
257                // For certain stages we want to return the resulting fast path
258                // plan instead of the selected stage if it is present.
259                let plan = if stage.show_fast_path() && !config.no_fast_path {
260                    traces
261                        .remove(NamedPlan::FastPath.path())
262                        .or_else(|| traces.remove(path))
263                } else {
264                    traces.remove(path)
265                };
266
267                let row = plan
268                    .map(|entry| Row::pack_slice(&[Datum::from(entry.plan.as_str())]))
269                    .ok_or_else(|| {
270                        if !stmt_kind.supports(&stage) {
271                            // Print a nicer error for unsupported stages.
272                            AdapterError::Unstructured(anyhow::anyhow!(format!(
273                                "cannot EXPLAIN {stage} FOR {stmt_kind}"
274                            )))
275                        } else {
276                            // We don't expect this stage to be missing.
277                            AdapterError::Internal(format!(
278                                "stage `{path}` not present in the collected optimizer trace",
279                            ))
280                        }
281                    })?;
282                vec![row]
283            }
284        };
285
286        // We assume that any `Dispatch` cloned from this `OptimizerTrace` has long been dropped
287        // (`as_guard` tries to ensure this.). We rebuild the tracing interest cache, as
288        // this `OptimizerTrace` is acting like a reload-layer, and tracing needs to
289        // recalculate what the max level is, using this often-unknown
290        // API. Note that the reference to the `Dispatch` in self MUST be dropped before
291        // re-calculating interest.
292        //
293        // Before this is dropped and rebuilt, there is small extra cost to all `DEBUG` spans and
294        // events, if the other layers (otel and stderr) are only interested in `INFO`.
295        drop(self);
296        tracing_core::callsite::rebuild_interest_cache();
297        Ok(rows)
298    }
299
300    /// Collect a [`insights::PlanInsights`] with insights about the the
301    /// optimized plans rendered as a JSON `String`.
302    pub async fn into_plan_insights(
303        self,
304        features: &OptimizerFeatures,
305        humanizer: &dyn ExprHumanizer,
306        row_set_finishing: Option<RowSetFinishing>,
307        target_cluster: Option<&Cluster>,
308        dataflow_metainfo: DataflowMetainfo,
309        insights_ctx: Option<Box<PlanInsightsContext>>,
310    ) -> Result<String, AdapterError> {
311        let rows = self
312            .into_rows(
313                ExplainFormat::Json,
314                &ExplainConfig::default(),
315                features,
316                humanizer,
317                row_set_finishing,
318                target_cluster,
319                dataflow_metainfo,
320                ExplainStage::PlanInsights,
321                plan::ExplaineeStatementKind::Select,
322                insights_ctx,
323            )
324            .await?;
325
326        // When using `ExplainStage::PlanInsights`, we're guaranteed that the
327        // output is a single row containing a single column containing the plan
328        // insights as a string.
329        Ok(rows.into_element().into_element().unwrap_str().into())
330    }
331
332    /// Collect all traced plans for all plan types `T` that are available in
333    /// the wrapped [`dispatcher::Dispatch`].
334    fn collect_all(
335        &self,
336        format: ExplainFormat,
337        config: &ExplainConfig,
338        features: &OptimizerFeatures,
339        humanizer: &dyn ExprHumanizer,
340        row_set_finishing: Option<RowSetFinishing>,
341        target_cluster: Option<&str>,
342        dataflow_metainfo: DataflowMetainfo,
343    ) -> Result<TraceEntries<String>, ExplainError> {
344        let mut results = vec![];
345
346        // First, create an ExplainContext without `used_indexes`. We'll use this to, e.g., collect
347        // HIR plans.
348        let mut context = ExplainContext {
349            config,
350            features,
351            humanizer,
352            cardinality_stats: Default::default(), // empty stats
353            used_indexes: Default::default(),
354            finishing: row_set_finishing.clone(),
355            duration: Default::default(),
356            target_cluster,
357            optimizer_notices: RawOptimizerNotice::explain(
358                &dataflow_metainfo.optimizer_notices,
359                humanizer,
360                config.redacted,
361            )?,
362        };
363
364        // Collect trace entries of types produced by local optimizer stages.
365        results.extend(itertools::chain!(
366            self.collect_explainable_entries::<HirRelationExpr>(&format, &mut context)?,
367            self.collect_explainable_entries::<MirRelationExpr>(&format, &mut context)?,
368        ));
369
370        // Collect trace entries of types produced by global optimizer stages.
371        let mut context = ExplainContext {
372            config,
373            features,
374            humanizer,
375            cardinality_stats: Default::default(), // empty stats
376            used_indexes: Default::default(),
377            finishing: row_set_finishing,
378            duration: Default::default(),
379            target_cluster,
380            optimizer_notices: RawOptimizerNotice::explain(
381                &dataflow_metainfo.optimizer_notices,
382                humanizer,
383                config.redacted,
384            )?,
385        };
386        results.extend(itertools::chain!(
387            self.collect_explainable_entries::<DataflowDescription<OptimizedMirRelationExpr>>(
388                &format,
389                &mut context,
390            )?,
391            self.collect_explainable_entries::<DataflowDescription<Plan>>(&format, &mut context)?,
392            self.collect_explainable_entries::<FastPathPlan>(&format, &mut context)?,
393        ));
394
395        // Collect trace entries of type String, HirScalarExpr, MirScalarExpr
396        // which are useful for ad-hoc debugging.
397        results.extend(itertools::chain!(
398            self.collect_scalar_entries::<HirScalarExpr>(),
399            self.collect_scalar_entries::<MirScalarExpr>(),
400            self.collect_string_entries(),
401        ));
402
403        // sort plans by instant (TODO: this can be implemented in a more
404        // efficient way, as we can assume that each of the runs that are used
405        // to `*.extend` the `results` vector is already sorted).
406        results.sort_by_key(|x| x.instant);
407
408        Ok(TraceEntries(results))
409    }
410
411    /// Collects the global optimized plan from the trace, if it exists.
412    fn collect_global_plan(&self) -> Option<DataflowDescription<OptimizedMirRelationExpr>> {
413        self.0
414            .downcast_ref::<PlanTrace<DataflowDescription<OptimizedMirRelationExpr>>>()
415            .and_then(|trace| trace.find(NamedPlan::Global.path()))
416            .map(|entry| entry.plan)
417    }
418
419    /// Collects the fast path plan from the trace, if it exists.
420    fn collect_fast_path_plan(&self) -> Option<FastPathPlan> {
421        self.0
422            .downcast_ref::<PlanTrace<FastPathPlan>>()
423            .and_then(|trace| trace.find(NamedPlan::FastPath.path()))
424            .map(|entry| entry.plan)
425    }
426
427    /// Collect all trace entries of a plan type `T` that implements
428    /// [`Explainable`].
429    fn collect_explainable_entries<T>(
430        &self,
431        format: &ExplainFormat,
432        context: &mut ExplainContext,
433    ) -> Result<Vec<TraceEntry<String>>, ExplainError>
434    where
435        T: Clone + Debug + 'static,
436        for<'a> Explainable<'a, T>: Explain<'a, Context = ExplainContext<'a>>,
437    {
438        if let Some(trace) = self.0.downcast_ref::<PlanTrace<T>>() {
439            // Get a handle of the associated `PlanTrace<UsedIndexes>`.
440            let used_indexes_trace = self.0.downcast_ref::<PlanTrace<UsedIndexes>>();
441
442            trace
443                .collect_as_vec()
444                .into_iter()
445                .map(|mut entry| {
446                    // Update the context with the current time.
447                    context.duration = entry.full_duration;
448
449                    // Try to find the UsedIndexes instance for this entry.
450                    let used_indexes = used_indexes_trace.map(|t| t.used_indexes_for(&entry.path));
451
452                    // Render the EXPLAIN output string for this entry.
453                    let plan = if let Some(mut used_indexes) = used_indexes {
454                        // Temporary swap the found UsedIndexes with the default
455                        // one in the ExplainContext while explaining the plan
456                        // for this entry.
457                        std::mem::swap(&mut context.used_indexes, &mut used_indexes);
458                        let plan = Explainable::new(&mut entry.plan).explain(format, context)?;
459                        std::mem::swap(&mut context.used_indexes, &mut used_indexes);
460                        plan
461                    } else {
462                        // No UsedIndexes instance for this entry found - use
463                        // the default UsedIndexes in the ExplainContext.
464                        Explainable::new(&mut entry.plan).explain(format, context)?
465                    };
466
467                    Ok(TraceEntry {
468                        instant: entry.instant,
469                        span_duration: entry.span_duration,
470                        full_duration: entry.full_duration,
471                        path: entry.path,
472                        plan,
473                    })
474                })
475                .collect()
476        } else {
477            unreachable!("collect_explainable_entries called with wrong plan type T");
478        }
479    }
480
481    /// Collect all trace entries of a plan type `T`.
482    fn collect_scalar_entries<T>(&self) -> Vec<TraceEntry<String>>
483    where
484        T: Clone + Debug + 'static,
485        T: Display,
486    {
487        if let Some(trace) = self.0.downcast_ref::<PlanTrace<T>>() {
488            trace
489                .collect_as_vec()
490                .into_iter()
491                .map(|entry| TraceEntry {
492                    instant: entry.instant,
493                    span_duration: entry.span_duration,
494                    full_duration: entry.full_duration,
495                    path: entry.path,
496                    plan: entry.plan.to_string(),
497                })
498                .collect()
499        } else {
500            vec![]
501        }
502    }
503
504    /// Collect all trace entries with plans of type [`String`].
505    fn collect_string_entries(&self) -> Vec<TraceEntry<String>> {
506        if let Some(trace) = self.0.downcast_ref::<PlanTrace<String>>() {
507            trace.collect_as_vec()
508        } else {
509            vec![]
510        }
511    }
512}
513
514/// A wrapper around a `tracing::subscriber::DefaultGuard`.
515pub struct DispatchGuard<'a> {
516    _tracing_guard: tracing::subscriber::DefaultGuard,
517    _life: std::marker::PhantomData<&'a ()>,
518}
519
520/// A collection of optimizer trace entries with convenient accessor methods.
521pub struct TraceEntries<T>(pub Vec<TraceEntry<T>>);
522
523impl<T> TraceEntries<T> {
524    // Removes the first (and by assumption the only) trace that matches the
525    // given path from the collected trace.
526    pub fn remove(&mut self, path: &'static str) -> Option<TraceEntry<T>> {
527        let index = self.0.iter().position(|entry| entry.path == path);
528        index.map(|index| self.0.remove(index))
529    }
530}