1use std::collections::{BTreeMap, BTreeSet};
17use std::fmt::Write;
18
19use mz_ore::assert_none;
20use mz_ore::collections::CollectionExt;
21use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlScalarType};
22use mz_sql_parser::ast::display::{AstDisplay, FormatMode};
23use mz_sql_parser::ast::{
24 CreateSubsourceOptionName, ExternalReferenceExport, ExternalReferences, ObjectType,
25 ShowCreateClusterStatement, ShowCreateConnectionStatement, ShowCreateMaterializedViewStatement,
26 ShowCreateTypeStatement, ShowObjectType, SqlServerConfigOptionName, SystemObjectType,
27 UnresolvedItemName, WithOptionValue,
28};
29use mz_sql_pretty::PrettyConfig;
30use query::QueryContext;
31
32use crate::ast::visit_mut::VisitMut;
33use crate::ast::{
34 SelectStatement, ShowColumnsStatement, ShowCreateIndexStatement, ShowCreateSinkStatement,
35 ShowCreateSourceStatement, ShowCreateTableStatement, ShowCreateViewStatement,
36 ShowObjectsStatement, ShowStatementFilter, Statement, Value,
37};
38use crate::catalog::{CatalogItemType, SessionCatalog};
39use crate::names::{
40 self, Aug, NameSimplifier, ObjectId, ResolvedClusterName, ResolvedDataType,
41 ResolvedDatabaseName, ResolvedIds, ResolvedItemName, ResolvedRoleName, ResolvedSchemaName,
42};
43use crate::parse;
44use crate::plan::scope::Scope;
45use crate::plan::statement::ddl::unplan_create_cluster;
46use crate::plan::statement::{StatementContext, StatementDesc, dml};
47use crate::plan::{
48 HirRelationExpr, Params, Plan, PlanError, ShowColumnsPlan, ShowCreatePlan, query, transform_ast,
49};
50
51pub fn describe_show_create_view(
52 _: &StatementContext,
53 _: ShowCreateViewStatement<Aug>,
54) -> Result<StatementDesc, PlanError> {
55 Ok(StatementDesc::new(Some(
56 RelationDesc::builder()
57 .with_column("name", SqlScalarType::String.nullable(false))
58 .with_column("create_sql", SqlScalarType::String.nullable(false))
59 .finish(),
60 )))
61}
62
63pub fn plan_show_create_view(
64 scx: &StatementContext,
65 ShowCreateViewStatement {
66 view_name,
67 redacted,
68 }: ShowCreateViewStatement<Aug>,
69) -> Result<ShowCreatePlan, PlanError> {
70 plan_show_create_item(scx, &view_name, CatalogItemType::View, redacted)
71}
72
73pub fn describe_show_create_materialized_view(
74 _: &StatementContext,
75 _: ShowCreateMaterializedViewStatement<Aug>,
76) -> Result<StatementDesc, PlanError> {
77 Ok(StatementDesc::new(Some(
78 RelationDesc::builder()
79 .with_column("name", SqlScalarType::String.nullable(false))
80 .with_column("create_sql", SqlScalarType::String.nullable(false))
81 .finish(),
82 )))
83}
84
85pub fn plan_show_create_materialized_view(
86 scx: &StatementContext,
87 ShowCreateMaterializedViewStatement {
88 materialized_view_name,
89 redacted,
90 }: ShowCreateMaterializedViewStatement<Aug>,
91) -> Result<ShowCreatePlan, PlanError> {
92 plan_show_create_item(
93 scx,
94 &materialized_view_name,
95 CatalogItemType::MaterializedView,
96 redacted,
97 )
98}
99
100pub fn describe_show_create_table(
101 _: &StatementContext,
102 _: ShowCreateTableStatement<Aug>,
103) -> Result<StatementDesc, PlanError> {
104 Ok(StatementDesc::new(Some(
105 RelationDesc::builder()
106 .with_column("name", SqlScalarType::String.nullable(false))
107 .with_column("create_sql", SqlScalarType::String.nullable(false))
108 .finish(),
109 )))
110}
111
112fn plan_show_create_item(
113 scx: &StatementContext,
114 name: &ResolvedItemName,
115 expect_type: CatalogItemType,
116 redacted: bool,
117) -> Result<ShowCreatePlan, PlanError> {
118 let item = scx.get_item_by_resolved_name(name)?;
119 let name = name.full_name_str();
120 if item.id().is_system()
121 && matches!(
122 expect_type,
123 CatalogItemType::Table | CatalogItemType::Source
124 )
125 {
126 sql_bail!("cannot show create for system object {name}");
127 }
128 if item.item_type() == CatalogItemType::MaterializedView && expect_type == CatalogItemType::View
129 {
130 return Err(PlanError::ShowCreateViewOnMaterializedView(name));
131 }
132 if item.item_type() != expect_type {
133 sql_bail!("{name} is not a {expect_type}");
134 }
135 let create_sql =
136 humanize_sql_for_show_create(scx.catalog, item.id(), item.create_sql(), redacted)?;
137 Ok(ShowCreatePlan {
138 id: ObjectId::Item(item.id()),
139 row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
140 })
141}
142
143pub fn plan_show_create_table(
144 scx: &StatementContext,
145 ShowCreateTableStatement {
146 table_name,
147 redacted,
148 }: ShowCreateTableStatement<Aug>,
149) -> Result<ShowCreatePlan, PlanError> {
150 plan_show_create_item(scx, &table_name, CatalogItemType::Table, redacted)
151}
152
153pub fn describe_show_create_source(
154 _: &StatementContext,
155 _: ShowCreateSourceStatement<Aug>,
156) -> Result<StatementDesc, PlanError> {
157 Ok(StatementDesc::new(Some(
158 RelationDesc::builder()
159 .with_column("name", SqlScalarType::String.nullable(false))
160 .with_column("create_sql", SqlScalarType::String.nullable(false))
161 .finish(),
162 )))
163}
164
165pub fn plan_show_create_source(
166 scx: &StatementContext,
167 ShowCreateSourceStatement {
168 source_name,
169 redacted,
170 }: ShowCreateSourceStatement<Aug>,
171) -> Result<ShowCreatePlan, PlanError> {
172 plan_show_create_item(scx, &source_name, CatalogItemType::Source, redacted)
173}
174
175pub fn describe_show_create_sink(
176 _: &StatementContext,
177 _: ShowCreateSinkStatement<Aug>,
178) -> Result<StatementDesc, PlanError> {
179 Ok(StatementDesc::new(Some(
180 RelationDesc::builder()
181 .with_column("name", SqlScalarType::String.nullable(false))
182 .with_column("create_sql", SqlScalarType::String.nullable(false))
183 .finish(),
184 )))
185}
186
187pub fn plan_show_create_sink(
188 scx: &StatementContext,
189 ShowCreateSinkStatement {
190 sink_name,
191 redacted,
192 }: ShowCreateSinkStatement<Aug>,
193) -> Result<ShowCreatePlan, PlanError> {
194 plan_show_create_item(scx, &sink_name, CatalogItemType::Sink, redacted)
195}
196
197pub fn describe_show_create_index(
198 _: &StatementContext,
199 _: ShowCreateIndexStatement<Aug>,
200) -> Result<StatementDesc, PlanError> {
201 Ok(StatementDesc::new(Some(
202 RelationDesc::builder()
203 .with_column("name", SqlScalarType::String.nullable(false))
204 .with_column("create_sql", SqlScalarType::String.nullable(false))
205 .finish(),
206 )))
207}
208
209pub fn plan_show_create_index(
210 scx: &StatementContext,
211 ShowCreateIndexStatement {
212 index_name,
213 redacted,
214 }: ShowCreateIndexStatement<Aug>,
215) -> Result<ShowCreatePlan, PlanError> {
216 plan_show_create_item(scx, &index_name, CatalogItemType::Index, redacted)
217}
218
219pub fn describe_show_create_connection(
220 _: &StatementContext,
221 _: ShowCreateConnectionStatement<Aug>,
222) -> Result<StatementDesc, PlanError> {
223 Ok(StatementDesc::new(Some(
224 RelationDesc::builder()
225 .with_column("name", SqlScalarType::String.nullable(false))
226 .with_column("create_sql", SqlScalarType::String.nullable(false))
227 .finish(),
228 )))
229}
230
231pub fn plan_show_create_cluster(
232 scx: &StatementContext,
233 ShowCreateClusterStatement { cluster_name }: ShowCreateClusterStatement<Aug>,
234) -> Result<ShowCreatePlan, PlanError> {
235 let cluster = scx.get_cluster(&cluster_name.id);
236 let name = cluster.name().to_string();
237 let plan = cluster.try_to_plan()?;
238 let stmt = unplan_create_cluster(scx, plan)?;
239 let create_sql = stmt.to_ast_string_stable();
240 Ok(ShowCreatePlan {
241 id: ObjectId::Cluster(cluster_name.id),
242 row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
243 })
244}
245
246pub fn describe_show_create_cluster(
247 _: &StatementContext,
248 _: ShowCreateClusterStatement<Aug>,
249) -> Result<StatementDesc, PlanError> {
250 Ok(StatementDesc::new(Some(
251 RelationDesc::builder()
252 .with_column("name", SqlScalarType::String.nullable(false))
253 .with_column("create_sql", SqlScalarType::String.nullable(false))
254 .finish(),
255 )))
256}
257
258pub fn plan_show_create_type(
259 scx: &StatementContext,
260 ShowCreateTypeStatement {
261 type_name,
262 redacted,
263 }: ShowCreateTypeStatement<Aug>,
264) -> Result<ShowCreatePlan, PlanError> {
265 let ResolvedDataType::Named { id, full_name, .. } = type_name else {
266 sql_bail!("{type_name} is not a named type");
267 };
268
269 let type_item = scx.get_item(&id);
270
271 if id.is_system() {
273 sql_bail!("cannot show create for system type {full_name}");
275 }
276
277 let name = type_item.name().item.as_ref();
278
279 let create_sql = humanize_sql_for_show_create(
281 scx.catalog,
282 type_item.id(),
283 type_item.create_sql(),
284 redacted,
285 )?;
286
287 Ok(ShowCreatePlan {
288 id: ObjectId::Item(id),
289 row: Row::pack_slice(&[Datum::String(name), Datum::String(&create_sql)]),
290 })
291}
292
293pub fn describe_show_create_type(
294 _: &StatementContext,
295 _: ShowCreateTypeStatement<Aug>,
296) -> Result<StatementDesc, PlanError> {
297 Ok(StatementDesc::new(Some(
298 RelationDesc::builder()
299 .with_column("name", SqlScalarType::String.nullable(false))
300 .with_column("create_sql", SqlScalarType::String.nullable(false))
301 .finish(),
302 )))
303}
304
305pub fn plan_show_create_connection(
306 scx: &StatementContext,
307 ShowCreateConnectionStatement {
308 connection_name,
309 redacted,
310 }: ShowCreateConnectionStatement<Aug>,
311) -> Result<ShowCreatePlan, PlanError> {
312 plan_show_create_item(scx, &connection_name, CatalogItemType::Connection, redacted)
313}
314
315pub fn show_databases<'a>(
316 scx: &'a StatementContext<'a>,
317 filter: Option<ShowStatementFilter<Aug>>,
318) -> Result<ShowSelect<'a>, PlanError> {
319 let query = "SELECT name, comment FROM mz_internal.mz_show_databases".to_string();
320 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
321}
322
323pub fn show_schemas<'a>(
324 scx: &'a StatementContext<'a>,
325 from: Option<ResolvedDatabaseName>,
326 filter: Option<ShowStatementFilter<Aug>>,
327) -> Result<ShowSelect<'a>, PlanError> {
328 let database_id = match from {
329 Some(ResolvedDatabaseName::Database { id, .. }) => id.to_string(),
330 None => match scx.active_database() {
331 Some(id) => id.to_string(),
332 None => sql_bail!("no database specified and no active database"),
333 },
334 Some(ResolvedDatabaseName::Error) => {
335 unreachable!("should have been handled in name resolution")
336 }
337 };
338 let query = format!(
339 "SELECT name, comment
340 FROM mz_internal.mz_show_schemas
341 WHERE database_id IS NULL OR database_id = '{database_id}'",
342 );
343 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
344}
345
346pub fn show_roles<'a>(
347 scx: &'a StatementContext<'a>,
348 filter: Option<ShowStatementFilter<Aug>>,
349) -> Result<ShowSelect<'a>, PlanError> {
350 let query = "SELECT name, comment FROM mz_internal.mz_show_roles".to_string();
351 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
352}
353
354pub fn show_network_policies<'a>(
355 scx: &'a StatementContext<'a>,
356 filter: Option<ShowStatementFilter<Aug>>,
357) -> Result<ShowSelect<'a>, PlanError> {
358 let query = "SELECT name, rules, comment FROM mz_internal.mz_show_network_policies".to_string();
359 ShowSelect::new(
360 scx,
361 query,
362 filter,
363 None,
364 Some(&["name", "rules", "comment"]),
365 )
366}
367
368pub fn show_objects<'a>(
369 scx: &'a StatementContext<'a>,
370 ShowObjectsStatement {
371 object_type,
372 from,
373 filter,
374 }: ShowObjectsStatement<Aug>,
375) -> Result<ShowSelect<'a>, PlanError> {
376 match object_type {
377 ShowObjectType::Table { on_source } => show_tables(scx, from, on_source, filter),
378 ShowObjectType::Source { in_cluster } => show_sources(scx, from, in_cluster, filter),
379 ShowObjectType::Subsource { on_source } => show_subsources(scx, from, on_source, filter),
380 ShowObjectType::View => show_views(scx, from, filter),
381 ShowObjectType::Sink { in_cluster } => show_sinks(scx, from, in_cluster, filter),
382 ShowObjectType::Type => show_types(scx, from, filter),
383 ShowObjectType::Object => show_all_objects(scx, from, filter),
384 ShowObjectType::Role => {
385 assert_none!(from, "parser should reject from");
386 show_roles(scx, filter)
387 }
388 ShowObjectType::Cluster => {
389 assert_none!(from, "parser should reject from");
390 show_clusters(scx, filter)
391 }
392 ShowObjectType::ClusterReplica => {
393 assert_none!(from, "parser should reject from");
394 show_cluster_replicas(scx, filter)
395 }
396 ShowObjectType::Secret => show_secrets(scx, from, filter),
397 ShowObjectType::Connection => show_connections(scx, from, filter),
398 ShowObjectType::MaterializedView { in_cluster } => {
399 show_materialized_views(scx, from, in_cluster, filter)
400 }
401 ShowObjectType::Index {
402 in_cluster,
403 on_object,
404 } => show_indexes(scx, from, on_object, in_cluster, filter),
405 ShowObjectType::Database => {
406 assert_none!(from, "parser should reject from");
407 show_databases(scx, filter)
408 }
409 ShowObjectType::Schema { from: db_from } => {
410 assert_none!(from, "parser should reject from");
411 show_schemas(scx, db_from, filter)
412 }
413 ShowObjectType::Privileges { object_type, role } => {
414 assert_none!(from, "parser should reject from");
415 show_privileges(scx, object_type, role, filter)
416 }
417 ShowObjectType::DefaultPrivileges { object_type, role } => {
418 assert_none!(from, "parser should reject from");
419 show_default_privileges(scx, object_type, role, filter)
420 }
421 ShowObjectType::RoleMembership { role } => {
422 assert_none!(from, "parser should reject from");
423 show_role_membership(scx, role, filter)
424 }
425 ShowObjectType::ContinualTask { in_cluster } => {
426 show_continual_tasks(scx, from, in_cluster, filter)
427 }
428 ShowObjectType::NetworkPolicy => {
429 assert_none!(from, "parser should reject from");
430 show_network_policies(scx, filter)
431 }
432 }
433}
434
435fn show_connections<'a>(
436 scx: &'a StatementContext<'a>,
437 from: Option<ResolvedSchemaName>,
438 filter: Option<ShowStatementFilter<Aug>>,
439) -> Result<ShowSelect<'a>, PlanError> {
440 let schema_spec = scx.resolve_optional_schema(&from)?;
441 let query = format!(
442 "SELECT name, type, comment
443 FROM mz_internal.mz_show_connections connections
444 WHERE schema_id = '{schema_spec}'",
445 );
446 ShowSelect::new(scx, query, filter, None, Some(&["name", "type", "comment"]))
447}
448
449fn show_tables<'a>(
450 scx: &'a StatementContext<'a>,
451 from: Option<ResolvedSchemaName>,
452 on_source: Option<ResolvedItemName>,
453 filter: Option<ShowStatementFilter<Aug>>,
454) -> Result<ShowSelect<'a>, PlanError> {
455 let schema_spec = scx.resolve_optional_schema(&from)?;
456 let mut query = format!(
457 "SELECT name, comment
458 FROM mz_internal.mz_show_tables tables
459 WHERE tables.schema_id = '{schema_spec}'",
460 );
461 if let Some(on_source) = &on_source {
462 let on_item = scx.get_item_by_resolved_name(on_source)?;
463 if on_item.item_type() != CatalogItemType::Source {
464 sql_bail!(
465 "cannot show tables on {} because it is a {}",
466 on_source.full_name_str(),
467 on_item.item_type(),
468 );
469 }
470 query += &format!(" AND tables.source_id = '{}'", on_item.id());
471 }
472 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
473}
474
475fn show_sources<'a>(
476 scx: &'a StatementContext<'a>,
477 from: Option<ResolvedSchemaName>,
478 in_cluster: Option<ResolvedClusterName>,
479 filter: Option<ShowStatementFilter<Aug>>,
480) -> Result<ShowSelect<'a>, PlanError> {
481 let schema_spec = scx.resolve_optional_schema(&from)?;
482 let mut where_clause = format!("schema_id = '{schema_spec}'");
483
484 if let Some(cluster) = in_cluster {
485 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
486 .expect("write on string cannot fail");
487 }
488
489 let query = format!(
490 "SELECT name, type, cluster, comment
491 FROM mz_internal.mz_show_sources
492 WHERE {where_clause}"
493 );
494 ShowSelect::new(
495 scx,
496 query,
497 filter,
498 None,
499 Some(&["name", "type", "cluster", "comment"]),
500 )
501}
502
503fn show_subsources<'a>(
504 scx: &'a StatementContext<'a>,
505 from_schema: Option<ResolvedSchemaName>,
506 on_source: Option<ResolvedItemName>,
507 filter: Option<ShowStatementFilter<Aug>>,
508) -> Result<ShowSelect<'a>, PlanError> {
509 let mut query_filter = Vec::new();
510
511 if on_source.is_none() && from_schema.is_none() {
512 query_filter.push("subsources.id NOT LIKE 's%'".into());
513 let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?;
514 query_filter.push(format!("subsources.schema_id = '{schema_spec}'"));
515 }
516
517 if let Some(on_source) = &on_source {
518 let on_item = scx.get_item_by_resolved_name(on_source)?;
519 if on_item.item_type() != CatalogItemType::Source {
520 sql_bail!(
521 "cannot show subsources on {} because it is a {}",
522 on_source.full_name_str(),
523 on_item.item_type(),
524 );
525 }
526 query_filter.push(format!("sources.id = '{}'", on_item.id()));
527 }
528
529 if let Some(schema) = from_schema {
530 let schema_spec = schema.schema_spec();
531 query_filter.push(format!("subsources.schema_id = '{schema_spec}'"));
532 }
533
534 let query = format!(
537 "SELECT DISTINCT
538 subsources.name AS name,
539 subsources.type AS type
540 FROM
541 mz_sources AS subsources
542 JOIN mz_internal.mz_object_dependencies deps ON (subsources.id = deps.object_id OR subsources.id = deps.referenced_object_id)
543 JOIN mz_sources AS sources ON (sources.id = deps.object_id OR sources.id = deps.referenced_object_id)
544 WHERE (subsources.type = 'subsource' OR subsources.type = 'progress') AND {}",
545 itertools::join(query_filter, " AND "),
546 );
547 ShowSelect::new(scx, query, filter, None, None)
548}
549
550fn show_views<'a>(
551 scx: &'a StatementContext<'a>,
552 from: Option<ResolvedSchemaName>,
553 filter: Option<ShowStatementFilter<Aug>>,
554) -> Result<ShowSelect<'a>, PlanError> {
555 let schema_spec = scx.resolve_optional_schema(&from)?;
556 let query = format!(
557 "SELECT name, comment
558 FROM mz_internal.mz_show_views
559 WHERE schema_id = '{schema_spec}'"
560 );
561 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
562}
563
564fn show_materialized_views<'a>(
565 scx: &'a StatementContext<'a>,
566 from: Option<ResolvedSchemaName>,
567 in_cluster: Option<ResolvedClusterName>,
568 filter: Option<ShowStatementFilter<Aug>>,
569) -> Result<ShowSelect<'a>, PlanError> {
570 let schema_spec = scx.resolve_optional_schema(&from)?;
571 let mut where_clause = format!("schema_id = '{schema_spec}'");
572
573 if let Some(cluster) = in_cluster {
574 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
575 .expect("write on string cannot fail");
576 }
577
578 let query = format!(
579 "SELECT name, cluster, comment
580 FROM mz_internal.mz_show_materialized_views
581 WHERE {where_clause}"
582 );
583
584 ShowSelect::new(
585 scx,
586 query,
587 filter,
588 None,
589 Some(&["name", "cluster", "comment"]),
590 )
591}
592
593fn show_sinks<'a>(
594 scx: &'a StatementContext<'a>,
595 from: Option<ResolvedSchemaName>,
596 in_cluster: Option<ResolvedClusterName>,
597 filter: Option<ShowStatementFilter<Aug>>,
598) -> Result<ShowSelect<'a>, PlanError> {
599 let schema_spec = if let Some(ResolvedSchemaName::Schema { schema_spec, .. }) = from {
600 schema_spec.to_string()
601 } else {
602 scx.resolve_active_schema()?.to_string()
603 };
604
605 let mut where_clause = format!("schema_id = '{schema_spec}'");
606
607 if let Some(cluster) = in_cluster {
608 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
609 .expect("write on string cannot fail");
610 }
611
612 let query = format!(
613 "SELECT name, type, cluster, comment
614 FROM mz_internal.mz_show_sinks sinks
615 WHERE {where_clause}"
616 );
617 ShowSelect::new(
618 scx,
619 query,
620 filter,
621 None,
622 Some(&["name", "type", "cluster", "comment"]),
623 )
624}
625
626fn show_types<'a>(
627 scx: &'a StatementContext<'a>,
628 from: Option<ResolvedSchemaName>,
629 filter: Option<ShowStatementFilter<Aug>>,
630) -> Result<ShowSelect<'a>, PlanError> {
631 let schema_spec = scx.resolve_optional_schema(&from)?;
632 let query = format!(
633 "SELECT name, comment
634 FROM mz_internal.mz_show_types
635 WHERE schema_id = '{schema_spec}'"
636 );
637 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
638}
639
640fn show_all_objects<'a>(
641 scx: &'a StatementContext<'a>,
642 from: Option<ResolvedSchemaName>,
643 filter: Option<ShowStatementFilter<Aug>>,
644) -> Result<ShowSelect<'a>, PlanError> {
645 let schema_spec = scx.resolve_optional_schema(&from)?;
646 let query = format!(
647 "SELECT name, type, comment
648 FROM mz_internal.mz_show_all_objects
649 WHERE schema_id = '{schema_spec}'",
650 );
651 ShowSelect::new(scx, query, filter, None, Some(&["name", "type", "comment"]))
652}
653
654pub fn show_indexes<'a>(
655 scx: &'a StatementContext<'a>,
656 from_schema: Option<ResolvedSchemaName>,
657 on_object: Option<ResolvedItemName>,
658 in_cluster: Option<ResolvedClusterName>,
659 filter: Option<ShowStatementFilter<Aug>>,
660) -> Result<ShowSelect<'a>, PlanError> {
661 let mut query_filter = Vec::new();
662
663 if on_object.is_none() && from_schema.is_none() && in_cluster.is_none() {
664 query_filter.push("on_id NOT LIKE 's%'".into());
665 let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?;
666 query_filter.push(format!("schema_id = '{schema_spec}'"));
667 }
668
669 if let Some(on_object) = &on_object {
670 let on_item = scx.get_item_by_resolved_name(on_object)?;
671 if on_item.item_type() != CatalogItemType::View
672 && on_item.item_type() != CatalogItemType::MaterializedView
673 && on_item.item_type() != CatalogItemType::Source
674 && on_item.item_type() != CatalogItemType::Table
675 {
676 sql_bail!(
677 "cannot show indexes on {} because it is a {}",
678 on_object.full_name_str(),
679 on_item.item_type(),
680 );
681 }
682 query_filter.push(format!("on_id = '{}'", on_item.id()));
683 }
684
685 if let Some(schema) = from_schema {
686 let schema_spec = schema.schema_spec();
687 query_filter.push(format!("schema_id = '{schema_spec}'"));
688 }
689
690 if let Some(cluster) = in_cluster {
691 query_filter.push(format!("cluster_id = '{}'", cluster.id))
692 };
693
694 let query = format!(
695 "SELECT name, on, cluster, key, comment
696 FROM mz_internal.mz_show_indexes
697 WHERE {}",
698 itertools::join(query_filter.iter(), " AND ")
699 );
700
701 ShowSelect::new(
702 scx,
703 query,
704 filter,
705 None,
706 Some(&["name", "on", "cluster", "key", "comment"]),
707 )
708}
709
710pub fn show_columns<'a>(
711 scx: &'a StatementContext<'a>,
712 ShowColumnsStatement { table_name, filter }: ShowColumnsStatement<Aug>,
713) -> Result<ShowColumnsSelect<'a>, PlanError> {
714 let entry = scx.get_item_by_resolved_name(&table_name)?;
715 let full_name = scx.catalog.resolve_full_name(entry.name());
716
717 match entry.item_type() {
718 CatalogItemType::Source
719 | CatalogItemType::Table
720 | CatalogItemType::View
721 | CatalogItemType::MaterializedView
722 | CatalogItemType::ContinualTask => (),
723 ty @ CatalogItemType::Connection
724 | ty @ CatalogItemType::Index
725 | ty @ CatalogItemType::Func
726 | ty @ CatalogItemType::Secret
727 | ty @ CatalogItemType::Type
728 | ty @ CatalogItemType::Sink => {
729 sql_bail!("{full_name} is a {ty} and so does not have columns");
730 }
731 }
732
733 let query = format!(
734 "SELECT name, nullable, type, position, comment
735 FROM mz_internal.mz_show_columns columns
736 WHERE columns.id = '{}'",
737 entry.id(),
738 );
739 let (show_select, new_resolved_ids) = ShowSelect::new_with_resolved_ids(
740 scx,
741 query,
742 filter,
743 Some("position"),
744 Some(&["name", "nullable", "type", "comment"]),
745 )?;
746 Ok(ShowColumnsSelect {
747 id: entry.id(),
748 show_select,
749 new_resolved_ids,
750 })
751}
752
753pub fn show_clusters<'a>(
757 scx: &'a StatementContext<'a>,
758 filter: Option<ShowStatementFilter<Aug>>,
759) -> Result<ShowSelect<'a>, PlanError> {
760 let query = "SELECT name, replicas, comment FROM mz_internal.mz_show_clusters".to_string();
761 ShowSelect::new(
762 scx,
763 query,
764 filter,
765 None,
766 Some(&["name", "replicas", "comment"]),
767 )
768}
769
770pub fn show_cluster_replicas<'a>(
771 scx: &'a StatementContext<'a>,
772 filter: Option<ShowStatementFilter<Aug>>,
773) -> Result<ShowSelect<'a>, PlanError> {
774 let query = "
775 SELECT cluster, replica, size, ready, comment
776 FROM mz_internal.mz_show_cluster_replicas
777 "
778 .to_string();
779
780 ShowSelect::new(
781 scx,
782 query,
783 filter,
784 None,
785 Some(&["cluster", "replica", "size", "ready", "comment"]),
786 )
787}
788
789pub fn show_secrets<'a>(
790 scx: &'a StatementContext<'a>,
791 from: Option<ResolvedSchemaName>,
792 filter: Option<ShowStatementFilter<Aug>>,
793) -> Result<ShowSelect<'a>, PlanError> {
794 let schema_spec = scx.resolve_optional_schema(&from)?;
795
796 let query = format!(
797 "SELECT name, comment
798 FROM mz_internal.mz_show_secrets
799 WHERE schema_id = '{schema_spec}'",
800 );
801
802 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
803}
804
805pub fn show_privileges<'a>(
806 scx: &'a StatementContext<'a>,
807 object_type: Option<SystemObjectType>,
808 role: Option<ResolvedRoleName>,
809 filter: Option<ShowStatementFilter<Aug>>,
810) -> Result<ShowSelect<'a>, PlanError> {
811 let mut query_filter = Vec::new();
812 if let Some(object_type) = object_type {
813 query_filter.push(format!(
814 "object_type = '{}'",
815 object_type.to_string().to_lowercase()
816 ));
817 }
818 if let Some(role) = role {
819 let name = role.name;
820 query_filter.push(format!("CASE WHEN grantee = 'PUBLIC' THEN true ELSE pg_has_role('{name}', grantee, 'USAGE') END"));
821 }
822 let query_filter = if query_filter.len() > 0 {
823 format!("WHERE {}", itertools::join(query_filter, " AND "))
824 } else {
825 "".to_string()
826 };
827
828 let query = format!(
829 "SELECT grantor, grantee, database, schema, name, object_type, privilege_type
830 FROM mz_internal.mz_show_all_privileges
831 {query_filter}",
832 );
833
834 ShowSelect::new(
835 scx,
836 query,
837 filter,
838 None,
839 Some(&[
840 "grantor",
841 "grantee",
842 "database",
843 "schema",
844 "name",
845 "object_type",
846 "privilege_type",
847 ]),
848 )
849}
850
851pub fn show_default_privileges<'a>(
852 scx: &'a StatementContext<'a>,
853 object_type: Option<ObjectType>,
854 role: Option<ResolvedRoleName>,
855 filter: Option<ShowStatementFilter<Aug>>,
856) -> Result<ShowSelect<'a>, PlanError> {
857 let mut query_filter = Vec::new();
858 if let Some(object_type) = object_type {
859 query_filter.push(format!(
860 "object_type = '{}'",
861 object_type.to_string().to_lowercase()
862 ));
863 }
864 if let Some(role) = role {
865 let name = role.name;
866 query_filter.push(format!("CASE WHEN grantee = 'PUBLIC' THEN true ELSE pg_has_role('{name}', grantee, 'USAGE') END"));
867 }
868 let query_filter = if query_filter.len() > 0 {
869 format!("WHERE {}", itertools::join(query_filter, " AND "))
870 } else {
871 "".to_string()
872 };
873
874 let query = format!(
875 "SELECT object_owner, database, schema, object_type, grantee, privilege_type
876 FROM mz_internal.mz_show_default_privileges
877 {query_filter}",
878 );
879
880 ShowSelect::new(
881 scx,
882 query,
883 filter,
884 None,
885 Some(&[
886 "object_owner",
887 "database",
888 "schema",
889 "object_type",
890 "grantee",
891 "privilege_type",
892 ]),
893 )
894}
895
896pub fn show_role_membership<'a>(
897 scx: &'a StatementContext<'a>,
898 role: Option<ResolvedRoleName>,
899 filter: Option<ShowStatementFilter<Aug>>,
900) -> Result<ShowSelect<'a>, PlanError> {
901 let mut query_filter = Vec::new();
902 if let Some(role) = role {
903 let name = role.name;
904 query_filter.push(format!("pg_has_role('{name}', member, 'USAGE')"));
905 }
906 let query_filter = if query_filter.len() > 0 {
907 format!("WHERE {}", itertools::join(query_filter, " AND "))
908 } else {
909 "".to_string()
910 };
911
912 let query = format!(
913 "SELECT role, member, grantor
914 FROM mz_internal.mz_show_role_members
915 {query_filter}",
916 );
917
918 ShowSelect::new(
919 scx,
920 query,
921 filter,
922 None,
923 Some(&["role", "member", "grantor"]),
924 )
925}
926
927fn show_continual_tasks<'a>(
928 scx: &'a StatementContext<'a>,
929 from: Option<ResolvedSchemaName>,
930 in_cluster: Option<ResolvedClusterName>,
931 filter: Option<ShowStatementFilter<Aug>>,
932) -> Result<ShowSelect<'a>, PlanError> {
933 let schema_spec = scx.resolve_optional_schema(&from)?;
934 let mut where_clause = format!("schema_id = '{schema_spec}'");
935
936 if let Some(cluster) = in_cluster {
937 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
938 .expect("write on string cannot fail");
939 }
940
941 let query = format!(
942 "SELECT name, cluster, comment
943 FROM mz_internal.mz_show_continual_tasks
944 WHERE {where_clause}"
945 );
946
947 ShowSelect::new(
948 scx,
949 query,
950 filter,
951 None,
952 Some(&["name", "cluster", "comment"]),
953 )
954}
955
956pub struct ShowSelect<'a> {
960 scx: &'a StatementContext<'a>,
961 pub(crate) stmt: SelectStatement<Aug>,
962}
963
964impl<'a> ShowSelect<'a> {
965 fn new(
974 scx: &'a StatementContext,
975 query: String,
976 filter: Option<ShowStatementFilter<Aug>>,
977 order: Option<&str>,
978 projection: Option<&[&str]>,
979 ) -> Result<ShowSelect<'a>, PlanError> {
980 Self::new_with_resolved_ids(scx, query, filter, order, projection)
981 .map(|(show_select, _)| show_select)
982 }
983
984 fn new_with_resolved_ids(
985 scx: &'a StatementContext,
986 query: String,
987 filter: Option<ShowStatementFilter<Aug>>,
988 order: Option<&str>,
989 projection: Option<&[&str]>,
990 ) -> Result<(ShowSelect<'a>, ResolvedIds), PlanError> {
991 let filter = match filter {
992 Some(ShowStatementFilter::Like(like)) => format!("name LIKE {}", Value::String(like)),
993 Some(ShowStatementFilter::Where(expr)) => expr.to_string(),
994 None => "true".to_string(),
995 };
996 let query = format!(
997 "SELECT {} FROM ({}) q WHERE {} ORDER BY {}",
998 projection
999 .map(|ps| ps.join(", "))
1000 .unwrap_or_else(|| "*".into()),
1001 query,
1002 filter,
1003 order.unwrap_or("q.*")
1004 );
1005
1006 Self::new_from_bare_query(scx, query)
1007 }
1008
1009 pub fn new_from_bare_query(
1010 scx: &'a StatementContext,
1011 query: String,
1012 ) -> Result<(ShowSelect<'a>, ResolvedIds), PlanError> {
1013 let stmts = parse::parse(&query).expect("ShowSelect::new called with invalid SQL");
1014 let stmt = match stmts.into_element().ast {
1015 Statement::Select(select) => select,
1016 _ => panic!("ShowSelect::new called with non-SELECT statement"),
1017 };
1018 let (mut stmt, new_resolved_ids) = names::resolve(scx.catalog, stmt)?;
1019 transform_ast::transform(scx, &mut stmt)?;
1020 Ok((ShowSelect { scx, stmt }, new_resolved_ids))
1021 }
1022
1023 pub fn describe(self) -> Result<StatementDesc, PlanError> {
1025 dml::describe_select(self.scx, self.stmt)
1026 }
1027
1028 pub fn plan(self) -> Result<Plan, PlanError> {
1030 dml::plan_select(self.scx, self.stmt, &Params::empty(), None)
1031 }
1032
1033 pub fn plan_hir(self, qcx: &QueryContext) -> Result<(HirRelationExpr, Scope), PlanError> {
1035 query::plan_nested_query(&mut qcx.clone(), &self.stmt.query)
1036 }
1037}
1038
1039pub struct ShowColumnsSelect<'a> {
1040 id: CatalogItemId,
1041 new_resolved_ids: ResolvedIds,
1042 show_select: ShowSelect<'a>,
1043}
1044
1045impl<'a> ShowColumnsSelect<'a> {
1046 pub fn describe(self) -> Result<StatementDesc, PlanError> {
1047 self.show_select.describe()
1048 }
1049
1050 pub fn plan(self) -> Result<Plan, PlanError> {
1051 let select_plan = self.show_select.plan()?;
1052 match select_plan {
1053 Plan::Select(select_plan) => Ok(Plan::ShowColumns(ShowColumnsPlan {
1054 id: self.id,
1055 select_plan,
1056 new_resolved_ids: self.new_resolved_ids,
1057 })),
1058 _ => {
1059 tracing::error!(
1060 "SHOW COLUMNS produced a non select plan. plan: {:?}",
1061 select_plan
1062 );
1063 Err(PlanError::Unstructured(
1064 "SHOW COLUMNS produced an unexpected plan. Please file a bug.".to_string(),
1065 ))
1066 }
1067 }
1068 }
1069
1070 pub fn plan_hir(self, qcx: &QueryContext) -> Result<(HirRelationExpr, Scope), PlanError> {
1071 self.show_select.plan_hir(qcx)
1072 }
1073}
1074
1075fn humanize_sql_for_show_create(
1078 catalog: &dyn SessionCatalog,
1079 id: CatalogItemId,
1080 sql: &str,
1081 redacted: bool,
1082) -> Result<String, PlanError> {
1083 use mz_sql_parser::ast::{CreateSourceConnection, MySqlConfigOptionName, PgConfigOptionName};
1084
1085 let parsed = parse::parse(sql)?.into_element().ast;
1086 let (mut resolved, _) = names::resolve(catalog, parsed)?;
1087
1088 let mut simplifier = NameSimplifier { catalog };
1090 simplifier.visit_statement_mut(&mut resolved);
1091
1092 match &mut resolved {
1093 Statement::CreateMaterializedView(stmt) => stmt.as_of = None,
1095 Statement::CreateContinualTask(stmt) => stmt.as_of = None,
1096 Statement::CreateSource(stmt) => {
1105 let mut curr_references: BTreeMap<UnresolvedItemName, Vec<UnresolvedItemName>> =
1107 catalog
1108 .get_item(&id)
1109 .used_by()
1110 .into_iter()
1111 .filter_map(|subsource| {
1112 let item = catalog.get_item(subsource);
1113 item.subsource_details().map(|(_id, reference, _details)| {
1114 let name = item.name();
1115 let subsource_name = catalog.resolve_full_name(name);
1116 let subsource_name = UnresolvedItemName::from(subsource_name);
1117 (reference.clone(), subsource_name)
1118 })
1119 })
1120 .fold(BTreeMap::new(), |mut map, (reference, subsource_name)| {
1121 map.entry(reference)
1122 .or_insert_with(Vec::new)
1123 .push(subsource_name);
1124 map
1125 });
1126
1127 match &mut stmt.connection {
1128 CreateSourceConnection::Postgres { options, .. } => {
1129 options.retain_mut(|o| {
1130 match o.name {
1131 PgConfigOptionName::TextColumns => {}
1135 PgConfigOptionName::Details => return false,
1137 _ => return true,
1138 };
1139 match &mut o.value {
1140 Some(WithOptionValue::Sequence(text_cols)) => {
1141 text_cols.retain(|v| match v {
1142 WithOptionValue::UnresolvedItemName(n) => {
1143 let mut name = n.clone();
1144 name.0.truncate(3);
1146 curr_references.contains_key(&name)
1147 }
1148 _ => unreachable!(
1149 "TEXT COLUMNS must be sequence of unresolved item names"
1150 ),
1151 });
1152 !text_cols.is_empty()
1153 }
1154 _ => unreachable!(
1155 "TEXT COLUMNS must be sequence of unresolved item names"
1156 ),
1157 }
1158 });
1159 }
1160 CreateSourceConnection::SqlServer { options, .. } => {
1161 let adjusted_references: BTreeSet<_> = curr_references
1166 .keys()
1167 .map(|name| {
1168 if name.0.len() == 3 {
1169 let adjusted_name = name.0[1..].to_vec();
1171 UnresolvedItemName(adjusted_name)
1172 } else {
1173 name.clone()
1174 }
1175 })
1176 .collect();
1177
1178 options.retain_mut(|o| {
1179 match o.name {
1180 SqlServerConfigOptionName::TextColumns
1184 | SqlServerConfigOptionName::ExcludeColumns => {}
1185 SqlServerConfigOptionName::Details => return false,
1187 };
1188
1189 match &mut o.value {
1190 Some(WithOptionValue::Sequence(seq_unresolved_item_names)) => {
1191 seq_unresolved_item_names.retain(|v| match v {
1192 WithOptionValue::UnresolvedItemName(n) => {
1193 let mut name = n.clone();
1194 name.0.truncate(2);
1196 adjusted_references.contains(&name)
1197 }
1198 _ => unreachable!(
1199 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1200 ),
1201 });
1202 !seq_unresolved_item_names.is_empty()
1203 }
1204 _ => unreachable!(
1205 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1206 ),
1207 }
1208 });
1209 }
1210 CreateSourceConnection::MySql { options, .. } => {
1211 options.retain_mut(|o| {
1212 match o.name {
1213 MySqlConfigOptionName::TextColumns
1217 | MySqlConfigOptionName::ExcludeColumns => {}
1218 MySqlConfigOptionName::Details => return false,
1220 };
1221
1222 match &mut o.value {
1223 Some(WithOptionValue::Sequence(seq_unresolved_item_names)) => {
1224 seq_unresolved_item_names.retain(|v| match v {
1225 WithOptionValue::UnresolvedItemName(n) => {
1226 let mut name = n.clone();
1227 name.0.truncate(2);
1229 curr_references.contains_key(&name)
1230 }
1231 _ => unreachable!(
1232 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1233 ),
1234 });
1235 !seq_unresolved_item_names.is_empty()
1236 }
1237 _ => unreachable!(
1238 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1239 ),
1240 }
1241 });
1242 }
1243 CreateSourceConnection::LoadGenerator { .. } if !curr_references.is_empty() => {
1244 curr_references.clear();
1248 stmt.external_references = Some(ExternalReferences::All);
1249 }
1250 CreateSourceConnection::Kafka { .. }
1251 | CreateSourceConnection::LoadGenerator { .. } => {}
1252 }
1253
1254 if !curr_references.is_empty() {
1256 let mut subsources: Vec<_> = curr_references
1257 .into_iter()
1258 .flat_map(|(reference, names)| {
1259 names.into_iter().map(move |name| ExternalReferenceExport {
1260 reference: reference.clone(),
1261 alias: Some(name),
1262 })
1263 })
1264 .collect();
1265 subsources.sort();
1266 stmt.external_references = Some(ExternalReferences::SubsetTables(subsources));
1267 }
1268 }
1269 Statement::CreateSubsource(stmt) => {
1270 stmt.with_options.retain_mut(|o| {
1271 match o.name {
1272 CreateSubsourceOptionName::TextColumns => true,
1273 CreateSubsourceOptionName::RetainHistory => true,
1274 CreateSubsourceOptionName::ExcludeColumns => true,
1275 CreateSubsourceOptionName::Details => false,
1277 CreateSubsourceOptionName::ExternalReference => true,
1278 CreateSubsourceOptionName::Progress => true,
1279 }
1280 });
1281 }
1282
1283 _ => (),
1284 }
1285
1286 Ok(mz_sql_pretty::to_pretty(
1287 &resolved,
1288 PrettyConfig {
1289 width: mz_sql_pretty::DEFAULT_WIDTH,
1290 format_mode: if redacted {
1291 FormatMode::SimpleRedacted
1292 } else {
1293 FormatMode::Simple
1294 },
1295 },
1296 ))
1297}