1use std::fmt::{Debug, Display};
13use std::sync::Arc;
14
15use mz_catalog::memory::objects::Cluster;
16use mz_compute_types::dataflows::DataflowDescription;
17use mz_compute_types::plan::Plan;
18use mz_expr::explain::ExplainContext;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::collections::CollectionExt;
21use mz_repr::explain::tracing::{PlanTrace, TraceEntry};
22use mz_repr::explain::{
23 Explain, ExplainConfig, ExplainError, ExplainFormat, ExprHumanizer, UsedIndexes,
24};
25use mz_repr::optimize::OptimizerFeatures;
26use mz_repr::{Datum, Row};
27use mz_sql::ast::display::AstDisplay;
28use mz_sql::plan::{self, HirRelationExpr, HirScalarExpr};
29use mz_sql_parser::ast::{ExplainStage, NamedPlan};
30use mz_transform::dataflow::DataflowMetainfo;
31use mz_transform::notice::RawOptimizerNotice;
32use smallvec::SmallVec;
33use tracing::dispatcher;
34use tracing_subscriber::prelude::*;
35
36use crate::AdapterError;
37use crate::coord::peek::FastPathPlan;
38use crate::explain::Explainable;
39use crate::explain::insights::{self, PlanInsightsContext};
40
41pub struct OptimizerTrace(dispatcher::Dispatch);
55
56impl std::fmt::Debug for OptimizerTrace {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_tuple("OptimizerTrace").finish() }
60}
61
62impl OptimizerTrace {
63 pub fn new(filter: Option<SmallVec<[NamedPlan; 4]>>) -> OptimizerTrace {
69 let filter = || filter.clone();
70 if let Some(global_subscriber) = mz_ore::tracing::GLOBAL_SUBSCRIBER.get() {
71 let subscriber = Arc::clone(global_subscriber)
72 .with(PlanTrace::<String>::new(filter()))
75 .with(PlanTrace::<HirScalarExpr>::new(filter()))
76 .with(PlanTrace::<MirScalarExpr>::new(filter()))
77 .with(PlanTrace::<HirRelationExpr>::new(filter()))
79 .with(PlanTrace::<MirRelationExpr>::new(filter()))
80 .with(PlanTrace::<DataflowDescription<OptimizedMirRelationExpr>>::new(filter()))
81 .with(PlanTrace::<DataflowDescription<Plan>>::new(filter()))
82 .with(PlanTrace::<FastPathPlan>::new(None))
84 .with(PlanTrace::<UsedIndexes>::new(None))
85 .with(tracing::level_filters::LevelFilter::TRACE);
94
95 OptimizerTrace(dispatcher::Dispatch::new(subscriber))
96 } else {
97 let subscriber = tracing_subscriber::registry()
100 .with(PlanTrace::<String>::new(filter()))
101 .with(PlanTrace::<HirScalarExpr>::new(filter()))
102 .with(PlanTrace::<MirScalarExpr>::new(filter()))
103 .with(PlanTrace::<HirRelationExpr>::new(filter()))
104 .with(PlanTrace::<MirRelationExpr>::new(filter()))
105 .with(PlanTrace::<DataflowDescription<OptimizedMirRelationExpr>>::new(filter()))
106 .with(PlanTrace::<DataflowDescription<Plan>>::new(filter()))
107 .with(PlanTrace::<FastPathPlan>::new(None))
108 .with(PlanTrace::<UsedIndexes>::new(None))
109 .with(tracing::level_filters::LevelFilter::TRACE);
110
111 OptimizerTrace(dispatcher::Dispatch::new(subscriber))
112 }
113 }
114
115 pub fn as_guard<'s>(&'s self) -> DispatchGuard<'s> {
120 let dispatch = self.0.clone();
121 let tracing_guard = tracing::dispatcher::set_default(&dispatch);
122
123 DispatchGuard {
124 _tracing_guard: tracing_guard,
125 _life: std::marker::PhantomData,
126 }
127 }
128
129 pub async fn into_rows(
132 self,
133 format: ExplainFormat,
134 config: &ExplainConfig,
135 features: &OptimizerFeatures,
136 humanizer: &dyn ExprHumanizer,
137 row_set_finishing: Option<RowSetFinishing>,
138 target_cluster: Option<&Cluster>,
139 dataflow_metainfo: DataflowMetainfo,
140 stage: ExplainStage,
141 stmt_kind: plan::ExplaineeStatementKind,
142 insights_ctx: Option<Box<PlanInsightsContext>>,
143 ) -> Result<Vec<Row>, AdapterError> {
144 let collect_all = |format| {
145 self.collect_all(
146 format,
147 config,
148 features,
149 humanizer,
150 row_set_finishing.clone(),
151 target_cluster.map(|c| c.name.as_str()),
152 dataflow_metainfo.clone(),
153 )
154 };
155
156 let rows = match stage {
157 ExplainStage::Trace => {
158 let rows = collect_all(format)?
161 .0
162 .into_iter()
163 .map(|entry| {
164 let span_duration = u64::try_from(entry.span_duration.as_nanos());
166 Row::pack_slice(&[
167 Datum::from(span_duration.unwrap_or(u64::MAX)),
168 Datum::from(entry.path.as_str()),
169 Datum::from(entry.plan.as_str()),
170 ])
171 })
172 .collect();
173 rows
174 }
175 ExplainStage::PlanInsights => {
176 if format != ExplainFormat::Json {
177 coord_bail!("EXPLAIN PLAN INSIGHTS only supports JSON format");
178 }
179
180 let mut text_traces = collect_all(ExplainFormat::Text)?;
181 let mut json_traces = collect_all(ExplainFormat::Json)?;
182 let global_plan = self.collect_global_plan();
183 let fast_path_plan = self.collect_fast_path_plan();
184
185 let mut get_plan = |name: NamedPlan| {
188 let text_plan = match text_traces.remove(name.path()) {
189 None => "<unknown>".into(),
190 Some(entry) => entry.plan,
191 };
192 let json_plan = match json_traces.remove(name.path()) {
193 None => serde_json::Value::Null,
194 Some(entry) => serde_json::from_str(&entry.plan).unwrap_or_else(|e| {
195 serde_json::json!({
196 "error": format!("internal error: {e}"),
197 })
198 }),
199 };
200 serde_json::json!({
201 "text": text_plan,
202 "json": json_plan,
203 })
204 };
205
206 let is_fast_path = fast_path_plan.is_some();
207 let mut plan_insights =
208 insights::plan_insights(humanizer, global_plan, fast_path_plan);
209 let mut redacted_sql = None;
210 if let Some(insights_ctx) = insights_ctx {
211 redacted_sql = insights_ctx
212 .stmt
213 .as_ref()
214 .map(|s| Some(s.to_ast_string_redacted()));
215 if let (Some(plan_insights), false) = (plan_insights.as_mut(), is_fast_path) {
216 if insights_ctx.enable_re_optimize {
217 plan_insights
218 .compute_fast_path_clusters(humanizer, insights_ctx)
219 .await;
220 }
221 }
222 }
223 let cluster = target_cluster.map(|c| {
224 serde_json::json!({
225 "name": c.name,
226 "id": c.id,
227 })
228 });
229
230 let output = serde_json::json!({
231 "plans": {
232 "raw": get_plan(NamedPlan::Raw),
233 "optimized": {
234 "global": get_plan(NamedPlan::Global),
235 "fast_path": get_plan(NamedPlan::FastPath),
236 }
237 },
238 "insights": plan_insights,
239 "cluster": cluster,
240 "redacted_sql": redacted_sql,
241 });
242 let output = serde_json::to_string_pretty(&output).expect("JSON string");
243 vec![Row::pack_slice(&[Datum::from(output.as_str())])]
244 }
245 _ => {
246 let path = stage
250 .paths()
251 .map(|path| path.into_element().path())
252 .ok_or_else(|| {
253 AdapterError::Internal("explain stage unexpectedly missing path".into())
254 })?;
255 let mut traces = collect_all(format)?;
256
257 let plan = if stage.show_fast_path() && !config.no_fast_path {
260 traces
261 .remove(NamedPlan::FastPath.path())
262 .or_else(|| traces.remove(path))
263 } else {
264 traces.remove(path)
265 };
266
267 let row = plan
268 .map(|entry| Row::pack_slice(&[Datum::from(entry.plan.as_str())]))
269 .ok_or_else(|| {
270 if !stmt_kind.supports(&stage) {
271 AdapterError::Unstructured(anyhow::anyhow!(format!(
273 "cannot EXPLAIN {stage} FOR {stmt_kind}"
274 )))
275 } else {
276 AdapterError::Internal(format!(
278 "stage `{path}` not present in the collected optimizer trace",
279 ))
280 }
281 })?;
282 vec![row]
283 }
284 };
285
286 drop(self);
296 tracing_core::callsite::rebuild_interest_cache();
297 Ok(rows)
298 }
299
300 pub async fn into_plan_insights(
303 self,
304 features: &OptimizerFeatures,
305 humanizer: &dyn ExprHumanizer,
306 row_set_finishing: Option<RowSetFinishing>,
307 target_cluster: Option<&Cluster>,
308 dataflow_metainfo: DataflowMetainfo,
309 insights_ctx: Option<Box<PlanInsightsContext>>,
310 ) -> Result<String, AdapterError> {
311 let rows = self
312 .into_rows(
313 ExplainFormat::Json,
314 &ExplainConfig::default(),
315 features,
316 humanizer,
317 row_set_finishing,
318 target_cluster,
319 dataflow_metainfo,
320 ExplainStage::PlanInsights,
321 plan::ExplaineeStatementKind::Select,
322 insights_ctx,
323 )
324 .await?;
325
326 Ok(rows.into_element().into_element().unwrap_str().into())
330 }
331
332 fn collect_all(
335 &self,
336 format: ExplainFormat,
337 config: &ExplainConfig,
338 features: &OptimizerFeatures,
339 humanizer: &dyn ExprHumanizer,
340 row_set_finishing: Option<RowSetFinishing>,
341 target_cluster: Option<&str>,
342 dataflow_metainfo: DataflowMetainfo,
343 ) -> Result<TraceEntries<String>, ExplainError> {
344 let mut results = vec![];
345
346 let mut context = ExplainContext {
349 config,
350 features,
351 humanizer,
352 cardinality_stats: Default::default(), used_indexes: Default::default(),
354 finishing: row_set_finishing.clone(),
355 duration: Default::default(),
356 target_cluster,
357 optimizer_notices: RawOptimizerNotice::explain(
358 &dataflow_metainfo.optimizer_notices,
359 humanizer,
360 config.redacted,
361 )?,
362 };
363
364 results.extend(itertools::chain!(
366 self.collect_explainable_entries::<HirRelationExpr>(&format, &mut context)?,
367 self.collect_explainable_entries::<MirRelationExpr>(&format, &mut context)?,
368 ));
369
370 let mut context = ExplainContext {
372 config,
373 features,
374 humanizer,
375 cardinality_stats: Default::default(), used_indexes: Default::default(),
377 finishing: row_set_finishing,
378 duration: Default::default(),
379 target_cluster,
380 optimizer_notices: RawOptimizerNotice::explain(
381 &dataflow_metainfo.optimizer_notices,
382 humanizer,
383 config.redacted,
384 )?,
385 };
386 results.extend(itertools::chain!(
387 self.collect_explainable_entries::<DataflowDescription<OptimizedMirRelationExpr>>(
388 &format,
389 &mut context,
390 )?,
391 self.collect_explainable_entries::<DataflowDescription<Plan>>(&format, &mut context)?,
392 self.collect_explainable_entries::<FastPathPlan>(&format, &mut context)?,
393 ));
394
395 results.extend(itertools::chain!(
398 self.collect_scalar_entries::<HirScalarExpr>(),
399 self.collect_scalar_entries::<MirScalarExpr>(),
400 self.collect_string_entries(),
401 ));
402
403 results.sort_by_key(|x| x.instant);
407
408 Ok(TraceEntries(results))
409 }
410
411 fn collect_global_plan(&self) -> Option<DataflowDescription<OptimizedMirRelationExpr>> {
413 self.0
414 .downcast_ref::<PlanTrace<DataflowDescription<OptimizedMirRelationExpr>>>()
415 .and_then(|trace| trace.find(NamedPlan::Global.path()))
416 .map(|entry| entry.plan)
417 }
418
419 fn collect_fast_path_plan(&self) -> Option<FastPathPlan> {
421 self.0
422 .downcast_ref::<PlanTrace<FastPathPlan>>()
423 .and_then(|trace| trace.find(NamedPlan::FastPath.path()))
424 .map(|entry| entry.plan)
425 }
426
427 fn collect_explainable_entries<T>(
430 &self,
431 format: &ExplainFormat,
432 context: &mut ExplainContext,
433 ) -> Result<Vec<TraceEntry<String>>, ExplainError>
434 where
435 T: Clone + Debug + 'static,
436 for<'a> Explainable<'a, T>: Explain<'a, Context = ExplainContext<'a>>,
437 {
438 if let Some(trace) = self.0.downcast_ref::<PlanTrace<T>>() {
439 let used_indexes_trace = self.0.downcast_ref::<PlanTrace<UsedIndexes>>();
441
442 trace
443 .collect_as_vec()
444 .into_iter()
445 .map(|mut entry| {
446 context.duration = entry.full_duration;
448
449 let used_indexes = used_indexes_trace.map(|t| t.used_indexes_for(&entry.path));
451
452 let plan = if let Some(mut used_indexes) = used_indexes {
454 std::mem::swap(&mut context.used_indexes, &mut used_indexes);
458 let plan = Explainable::new(&mut entry.plan).explain(format, context)?;
459 std::mem::swap(&mut context.used_indexes, &mut used_indexes);
460 plan
461 } else {
462 Explainable::new(&mut entry.plan).explain(format, context)?
465 };
466
467 Ok(TraceEntry {
468 instant: entry.instant,
469 span_duration: entry.span_duration,
470 full_duration: entry.full_duration,
471 path: entry.path,
472 plan,
473 })
474 })
475 .collect()
476 } else {
477 unreachable!("collect_explainable_entries called with wrong plan type T");
478 }
479 }
480
481 fn collect_scalar_entries<T>(&self) -> Vec<TraceEntry<String>>
483 where
484 T: Clone + Debug + 'static,
485 T: Display,
486 {
487 if let Some(trace) = self.0.downcast_ref::<PlanTrace<T>>() {
488 trace
489 .collect_as_vec()
490 .into_iter()
491 .map(|entry| TraceEntry {
492 instant: entry.instant,
493 span_duration: entry.span_duration,
494 full_duration: entry.full_duration,
495 path: entry.path,
496 plan: entry.plan.to_string(),
497 })
498 .collect()
499 } else {
500 vec![]
501 }
502 }
503
504 fn collect_string_entries(&self) -> Vec<TraceEntry<String>> {
506 if let Some(trace) = self.0.downcast_ref::<PlanTrace<String>>() {
507 trace.collect_as_vec()
508 } else {
509 vec![]
510 }
511 }
512}
513
514pub struct DispatchGuard<'a> {
516 _tracing_guard: tracing::subscriber::DefaultGuard,
517 _life: std::marker::PhantomData<&'a ()>,
518}
519
520pub struct TraceEntries<T>(pub Vec<TraceEntry<T>>);
522
523impl<T> TraceEntries<T> {
524 pub fn remove(&mut self, path: &'static str) -> Option<TraceEntry<T>> {
527 let index = self.0.iter().position(|entry| entry.path == path);
528 index.map(|index| self.0.remove(index))
529 }
530}