1use crate::cli::CliError;
42use crate::cli::commands::compile;
43use crate::client::Client;
44use crate::client::quote_identifier;
45use crate::config::Settings;
46use crate::docker_runtime::{DockerRuntime, DockerRuntimeError};
47use crate::project::ast::Statement;
48use crate::project::compiler::cache::ProjectCache;
49use crate::project::ir::compiled::FullyQualifiedName;
50use crate::project::ir::graph;
51use crate::project::ir::object_id::ObjectId;
52use crate::project::resolve::normalize::NormalizingVisitor;
53use crate::types::{ColumnType, Types};
54use crate::verbose;
55use mz_sql_parser::ast::*;
56use serde::Serialize;
57use std::collections::{BTreeMap, BTreeSet};
58use std::fmt;
59use std::path::Path;
60use tokio_postgres::SimpleQueryMessage;
61
62struct ExplainTarget {
64 object_id: ObjectId,
65 index_name: Option<String>,
66}
67
68enum StagingAction {
70 StubTable {
72 object_id: ObjectId,
73 columns: BTreeMap<String, ColumnType>,
74 },
75 CreateIndex {
77 index: CreateIndexStatement<Raw>,
78 on_object: ObjectId,
79 },
80 CreateView {
82 object_id: ObjectId,
83 stmt: Statement,
84 },
85}
86
87#[derive(Serialize)]
89struct ExplainOutput {
90 object: String,
91 explain_output: String,
92}
93
94impl fmt::Display for ExplainOutput {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 write!(f, "{}", self.explain_output)
97 }
98}
99
100pub async fn run(
110 settings: &Settings,
111 target: &str,
112 overlay: Option<&Path>,
113) -> Result<(), CliError> {
114 let target = parse_target(target)?;
115
116 let fs = match overlay {
117 Some(p) => crate::fs::FileSystem::from_overlay_file(p).map_err(|e| {
118 CliError::Message(format!("failed to load overlay {}: {}", p.display(), e))
119 })?,
120 None => crate::fs::FileSystem::new(),
121 };
122 let project = compile::run_with_fs(settings, false, fs).await?;
123
124 let planned_obj = project.find_object(&target.object_id).ok_or_else(|| {
126 CliError::Message(format!(
127 "object '{}' not found in project",
128 target.object_id
129 ))
130 })?;
131
132 let target_cluster = validate_target(planned_obj, &target)?;
134
135 let (types_lock, types_cache) = load_types_and_cache(settings);
137
138 let get_columns = |id: &ObjectId| -> Option<BTreeMap<String, ColumnType>> {
139 types_cache
140 .as_ref()
141 .and_then(|tc| tc.get_columns(id))
142 .or_else(|| types_lock.get_table(id).cloned())
143 };
144
145 let runtime = DockerRuntime::new().with_image(&settings.docker_image);
147 let client = match runtime.get_client().await {
148 Ok(client) => client,
149 Err(DockerRuntimeError::ContainerStartFailed(e)) => {
150 return Err(CliError::Message(format!(
151 "Docker not available for running explain: {}",
152 e
153 )));
154 }
155 Err(e) => {
156 return Err(CliError::Message(format!(
157 "Failed to start explain environment: {}",
158 e
159 )));
160 }
161 };
162
163 let actions = plan_staging(&project, &target, &target_cluster, &get_columns)?;
165
166 let explain_schema = format!(
168 "_mz_explain_{}",
169 std::time::SystemTime::now()
170 .duration_since(std::time::UNIX_EPOCH)
171 .unwrap_or_default()
172 .as_millis()
173 );
174 let explain_db = target.object_id.expect_database();
175
176 let result = execute_explain(
178 &client,
179 explain_db,
180 &explain_schema,
181 &actions,
182 &target,
183 &planned_obj.typed_object,
184 &target_cluster,
185 )
186 .await;
187
188 let drop_sql = format!(
190 "DROP SCHEMA IF EXISTS {}.{} CASCADE",
191 quote_identifier(explain_db),
192 quote_identifier(&explain_schema),
193 );
194 verbose!("Cleanup: {}", drop_sql);
195 let _ = client.execute(&drop_sql, &[]).await;
196
197 let explain_text = result?;
198
199 let output = ExplainOutput {
200 object: target.object_id.to_string(),
201 explain_output: explain_text,
202 };
203 crate::log::output(&output);
204
205 Ok(())
206}
207
208fn parse_target(target: &str) -> Result<ExplainTarget, CliError> {
210 let (object_part, index_name) = match target.split_once('#') {
211 Some((obj, idx)) => (obj, Some(idx.to_string())),
212 None => (target, None),
213 };
214
215 let parts: Vec<&str> = object_part.split('.').collect();
216 if parts.len() != 3 {
217 return Err(CliError::Message(format!(
218 "expected fully qualified name 'database.schema.object', got '{}'",
219 object_part
220 )));
221 }
222
223 Ok(ExplainTarget {
224 object_id: ObjectId::new(
225 parts[0].to_string(),
226 parts[1].to_string(),
227 parts[2].to_string(),
228 ),
229 index_name,
230 })
231}
232
233fn validate_target(
235 planned_obj: &graph::DatabaseObject,
236 target: &ExplainTarget,
237) -> Result<String, CliError> {
238 match &target.index_name {
239 None => {
240 match &planned_obj.typed_object.stmt {
242 Statement::CreateMaterializedView(mv) => {
243 let cluster = mv
244 .in_cluster
245 .as_ref()
246 .expect("materialized view must have IN CLUSTER")
247 .to_string();
248 Ok(cluster)
249 }
250 other => Err(CliError::Message(format!(
251 "'{}' is a {}, but explain without #index only supports materialized views",
252 target.object_id,
253 other.kind()
254 ))),
255 }
256 }
257 Some(index_name) => {
258 let index = planned_obj
260 .typed_object
261 .indexes
262 .iter()
263 .find(|idx| {
264 idx.name
265 .as_ref()
266 .map(|n| n.to_string() == *index_name)
267 .unwrap_or(false)
268 })
269 .ok_or_else(|| {
270 let available: Vec<String> = planned_obj
271 .typed_object
272 .indexes
273 .iter()
274 .filter_map(|idx| idx.name.as_ref().map(|n| n.to_string()))
275 .collect();
276 CliError::Message(format!(
277 "index '{}' not found on '{}'. Available indexes: {}",
278 index_name,
279 target.object_id,
280 if available.is_empty() {
281 "(none)".to_string()
282 } else {
283 available.join(", ")
284 }
285 ))
286 })?;
287
288 let cluster = index
289 .in_cluster
290 .as_ref()
291 .expect("index must have IN CLUSTER")
292 .to_string();
293 Ok(cluster)
294 }
295 }
296}
297
298fn load_types_and_cache(settings: &Settings) -> (Types, Option<ProjectCache>) {
300 let types_lock = crate::types::load_types_lock(&settings.directory).unwrap_or_default();
301 let types_cache = ProjectCache::open(
302 &settings.directory,
303 settings.profile_name().unwrap_or(""),
304 settings.profile_suffix(),
305 settings.variables(),
306 )
307 .ok()
308 .flatten();
309 if types_cache.is_none() {
310 verbose!("No types cache found; stub tables will use types.lock and AST only");
311 }
312 (types_lock, types_cache)
313}
314
315fn plan_staging(
324 project: &graph::Project,
325 target: &ExplainTarget,
326 target_cluster: &str,
327 get_columns: &dyn Fn(&ObjectId) -> Option<BTreeMap<String, ColumnType>>,
328) -> Result<Vec<StagingAction>, CliError> {
329 let mut actions = Vec::new();
330 let mut visited = BTreeSet::new();
331
332 let target_deps = project
334 .dependency_graph
335 .get(&target.object_id)
336 .cloned()
337 .unwrap_or_default();
338
339 for dep_id in &target_deps {
340 plan_dep(
341 project,
342 dep_id,
343 target_cluster,
344 get_columns,
345 &mut actions,
346 &mut visited,
347 )?;
348 }
349
350 Ok(actions)
351}
352
353fn plan_dep(
355 project: &graph::Project,
356 dep_id: &ObjectId,
357 target_cluster: &str,
358 get_columns: &dyn Fn(&ObjectId) -> Option<BTreeMap<String, ColumnType>>,
359 actions: &mut Vec<StagingAction>,
360 visited: &mut BTreeSet<ObjectId>,
361) -> Result<(), CliError> {
362 if visited.contains(dep_id) {
363 return Ok(());
364 }
365 visited.insert(dep_id.clone());
366
367 if project.external_dependencies.contains(dep_id) {
369 let columns = get_columns_for_stub(dep_id, None, get_columns)?;
370 actions.push(StagingAction::StubTable {
371 object_id: dep_id.clone(),
372 columns,
373 });
374 return Ok(());
375 }
376
377 let planned_obj = project.find_object(dep_id).ok_or_else(|| {
378 CliError::Message(format!("dependency '{}' not found in project", dep_id))
379 })?;
380
381 let matching_indexes: Vec<_> = planned_obj
383 .typed_object
384 .indexes
385 .iter()
386 .filter(|idx| {
387 idx.in_cluster
388 .as_ref()
389 .map(|c| c.to_string() == target_cluster)
390 .unwrap_or(false)
391 })
392 .cloned()
393 .collect();
394
395 if !matching_indexes.is_empty() {
396 let columns =
398 get_columns_for_stub(dep_id, Some(&planned_obj.typed_object.stmt), get_columns)?;
399 actions.push(StagingAction::StubTable {
400 object_id: dep_id.clone(),
401 columns,
402 });
403 for index in matching_indexes {
404 actions.push(StagingAction::CreateIndex {
405 index,
406 on_object: dep_id.clone(),
407 });
408 }
409 } else {
410 match planned_obj.typed_object.stmt.kind() {
411 crate::types::ObjectKind::MaterializedView | crate::types::ObjectKind::Table => {
412 let columns = get_columns_for_stub(
414 dep_id,
415 Some(&planned_obj.typed_object.stmt),
416 get_columns,
417 )?;
418 actions.push(StagingAction::StubTable {
419 object_id: dep_id.clone(),
420 columns,
421 });
422 }
423 crate::types::ObjectKind::View => {
424 let view_deps = project
426 .dependency_graph
427 .get(dep_id)
428 .cloned()
429 .unwrap_or_default();
430 for sub_dep_id in &view_deps {
431 plan_dep(
432 project,
433 sub_dep_id,
434 target_cluster,
435 get_columns,
436 actions,
437 visited,
438 )?;
439 }
440 actions.push(StagingAction::CreateView {
441 object_id: dep_id.clone(),
442 stmt: planned_obj.typed_object.stmt.clone(),
443 });
444 }
445 kind => {
446 return Err(CliError::Message(format!(
447 "dependency '{}' is a {} which cannot be staged for explain",
448 dep_id, kind
449 )));
450 }
451 }
452 }
453
454 Ok(())
455}
456
457fn get_columns_for_stub(
463 object_id: &ObjectId,
464 stmt: Option<&Statement>,
465 get_columns: &dyn Fn(&ObjectId) -> Option<BTreeMap<String, ColumnType>>,
466) -> Result<BTreeMap<String, ColumnType>, CliError> {
467 if let Some(columns) = get_columns(object_id) {
468 return Ok(columns);
469 }
470
471 if let Some(Statement::CreateTable(table)) = stmt {
473 let mut columns = BTreeMap::new();
474 for (position, col) in table.columns.iter().enumerate() {
475 let nullable = !col
476 .options
477 .iter()
478 .any(|opt| matches!(opt.option, ColumnOption::NotNull));
479 columns.insert(
483 col.name.as_str().to_string(),
484 ColumnType {
485 r#type: col.data_type.to_string(),
486 nullable,
487 position,
488 comment: None,
489 },
490 );
491 }
492 return Ok(columns);
493 }
494
495 Err(CliError::Message(format!(
496 "no column schema available for '{}'. Run 'mz-deploy compile' to populate the type cache",
497 object_id
498 )))
499}
500
501async fn execute_explain(
503 client: &Client,
504 explain_db: &str,
505 explain_schema: &str,
506 actions: &[StagingAction],
507 target: &ExplainTarget,
508 target_typed_obj: &crate::project::ir::compiled::DatabaseObject,
509 target_cluster: &str,
510) -> Result<String, CliError> {
511 let create_db_sql = format!(
515 "CREATE DATABASE IF NOT EXISTS {}",
516 quote_identifier(explain_db),
517 );
518 verbose!("Creating explain database: {}", create_db_sql);
519 client
520 .execute(&create_db_sql, &[])
521 .await
522 .map_err(|e| CliError::Message(format!("failed to create explain database: {}", e)))?;
523
524 let create_schema_sql = format!(
526 "CREATE SCHEMA {}.{}",
527 quote_identifier(explain_db),
528 quote_identifier(explain_schema),
529 );
530 verbose!("Creating explain schema: {}", create_schema_sql);
531 client
532 .execute(&create_schema_sql, &[])
533 .await
534 .map_err(|e| CliError::Message(format!("failed to create explain schema: {}", e)))?;
535
536 for action in actions {
538 match action {
539 StagingAction::StubTable { object_id, columns } => {
540 let fqn = object_id.to_string();
541 let mut col_defs = Vec::new();
542 for (col_name, col_type) in columns {
543 let nullable = if col_type.nullable { "" } else { " NOT NULL" };
544 col_defs.push(format!(
545 "{} {}{}",
546 quote_identifier(col_name),
547 col_type.r#type,
548 nullable
549 ));
550 }
551 let sql = format!(
552 "CREATE TABLE {}.{}.{} ({})",
553 quote_identifier(explain_db),
554 quote_identifier(explain_schema),
555 quote_identifier(&fqn),
556 col_defs.join(", ")
557 );
558 verbose!("Stub table: {}", sql);
559 client.execute(&sql, &[]).await.map_err(|e| {
560 CliError::Message(format!(
561 "failed to create stub table for '{}': {}",
562 object_id, e
563 ))
564 })?;
565 }
566 StagingAction::CreateIndex { index, on_object } => {
567 let sql = build_index_sql(index, on_object, explain_db, explain_schema);
568 verbose!("Create index: {}", sql);
569 client
570 .execute(&sql, &[])
571 .await
572 .map_err(|e| CliError::Message(format!("failed to create index: {}", e)))?;
573 }
574 StagingAction::CreateView { object_id, stmt } => {
575 let sql = build_view_sql(stmt, object_id, explain_db, explain_schema);
576 verbose!("Create view: {}", sql);
577 client.execute(&sql, &[]).await.map_err(|e| {
578 CliError::Message(format!("failed to create view '{}': {}", object_id, e))
579 })?;
580 }
581 }
582 }
583
584 create_target(client, explain_db, explain_schema, target, target_typed_obj).await?;
586
587 let explain_sql = build_explain_sql(target, explain_db, explain_schema);
589 verbose!("Running: {}", explain_sql);
590 let messages = client
591 .simple_query(&explain_sql)
592 .await
593 .map_err(|e| CliError::Message(format!("EXPLAIN failed: {}", e)))?;
594
595 let lines = extract_text_from_messages(messages);
596 let text = lines.join("\n");
597
598 let quoted_prefix = format!(
601 "{}.{}.",
602 quote_identifier(explain_db),
603 quote_identifier(explain_schema),
604 );
605 let unquoted_prefix = format!("{}.{}.", explain_db, explain_schema);
606 let text = text
607 .replace("ed_prefix, "")
608 .replace(&unquoted_prefix, "")
609 .replace(
610 "Target cluster: quickstart",
611 &format!("Target cluster: {}", target_cluster),
612 );
613 Ok(text)
614}
615
616async fn create_target(
618 client: &Client,
619 explain_db: &str,
620 explain_schema: &str,
621 target: &ExplainTarget,
622 typed_obj: &crate::project::ir::compiled::DatabaseObject,
623) -> Result<(), CliError> {
624 let fqn: FullyQualifiedName = target.object_id.clone().into();
625 let mut visitor =
626 NormalizingVisitor::explain(&fqn, explain_db.to_string(), explain_schema.to_string());
627
628 match &typed_obj.stmt {
630 Statement::CreateMaterializedView(_) => {
631 let normalized = typed_obj
632 .stmt
633 .clone()
634 .normalize_name_with(&visitor, &fqn.to_item_name())
635 .normalize_dependencies_with(&mut visitor)
636 .normalize_cluster_with(&visitor);
637 let sql = normalized.to_string();
638 verbose!("Create target MV: {}", sql);
639 client.execute(&sql, &[]).await.map_err(|e| {
640 CliError::Message(format!(
641 "failed to create target '{}': {}",
642 target.object_id, e
643 ))
644 })?;
645 }
646 other => {
647 if target.index_name.is_some() {
650 match other.kind() {
656 crate::types::ObjectKind::MaterializedView
657 | crate::types::ObjectKind::Table => {
658 }
660 crate::types::ObjectKind::View => {
661 let normalized = other
662 .clone()
663 .normalize_name_with(&visitor, &fqn.to_item_name())
664 .normalize_dependencies_with(&mut visitor);
665 let sql = normalized.to_string();
666 verbose!("Create target view: {}", sql);
667 client.execute(&sql, &[]).await.map_err(|e| {
668 CliError::Message(format!(
669 "failed to create target '{}': {}",
670 target.object_id, e
671 ))
672 })?;
673 }
674 kind => {
675 return Err(CliError::Message(format!(
676 "'{}' is a {} — cannot create in explain schema",
677 target.object_id, kind
678 )));
679 }
680 }
681 } else {
682 return Err(CliError::Message(format!(
683 "'{}' is a {} — explain only supports materialized views",
684 target.object_id,
685 other.kind()
686 )));
687 }
688 }
689 }
690
691 if target.index_name.is_some() {
693 let mut indexes = typed_obj.indexes.clone();
694 visitor.normalize_index_references(&mut indexes);
695 visitor.normalize_index_clusters(&mut indexes);
696 for index in &indexes {
697 let sql = index.to_string();
698 verbose!("Create target index: {}", sql);
699 client
700 .execute(&sql, &[])
701 .await
702 .map_err(|e| CliError::Message(format!("failed to create index: {}", e)))?;
703 }
704 }
705
706 Ok(())
707}
708
709fn build_index_sql(
711 index: &CreateIndexStatement<Raw>,
712 on_object: &ObjectId,
713 explain_db: &str,
714 explain_schema: &str,
715) -> String {
716 let fqn: FullyQualifiedName = on_object.clone().into();
717 let visitor =
718 NormalizingVisitor::explain(&fqn, explain_db.to_string(), explain_schema.to_string());
719
720 let mut indexes = vec![index.clone()];
721 visitor.normalize_index_references(&mut indexes);
722 visitor.normalize_index_clusters(&mut indexes);
723 indexes.into_iter().next().unwrap().to_string()
724}
725
726fn build_view_sql(
728 stmt: &Statement,
729 object_id: &ObjectId,
730 explain_db: &str,
731 explain_schema: &str,
732) -> String {
733 let fqn: FullyQualifiedName = object_id.clone().into();
734 let mut visitor =
735 NormalizingVisitor::explain(&fqn, explain_db.to_string(), explain_schema.to_string());
736
737 let normalized = stmt
738 .clone()
739 .normalize_name_with(&visitor, &fqn.to_item_name())
740 .normalize_dependencies_with(&mut visitor);
741
742 normalized.to_string()
743}
744
745fn build_explain_sql(target: &ExplainTarget, explain_db: &str, explain_schema: &str) -> String {
747 let flattened_obj = target.object_id.to_string();
748 let qualified_name = format!(
749 "{}.{}.{}",
750 quote_identifier(explain_db),
751 quote_identifier(explain_schema),
752 quote_identifier(&flattened_obj),
753 );
754
755 match &target.index_name {
756 None => {
757 format!("EXPLAIN MATERIALIZED VIEW {}", qualified_name)
758 }
759 Some(index_name) => {
760 let qualified_index = format!(
763 "{}.{}.{}",
764 quote_identifier(explain_db),
765 quote_identifier(explain_schema),
766 quote_identifier(index_name),
767 );
768 format!("EXPLAIN INDEX {}", qualified_index)
769 }
770 }
771}
772
773fn extract_text_from_messages(messages: Vec<SimpleQueryMessage>) -> Vec<String> {
778 let mut lines = Vec::new();
779 for msg in messages {
780 if let SimpleQueryMessage::Row(row) = msg {
781 for i in 0..row.columns().len() {
782 let text: Option<&str> = row.get(i);
783 if let Some(t) = text {
784 lines.push(t.to_string());
785 }
786 }
787 }
788 }
789 lines
790}