1use std::collections::{BTreeMap, BTreeSet};
17use std::fmt::Write;
18
19use mz_ore::assert_none;
20use mz_ore::collections::CollectionExt;
21use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlScalarType};
22use mz_sql_parser::ast::display::{AstDisplay, FormatMode};
23use mz_sql_parser::ast::{
24 CreateSubsourceOptionName, ExternalReferenceExport, ExternalReferences, ObjectType,
25 ShowCreateClusterStatement, ShowCreateConnectionStatement, ShowCreateMaterializedViewStatement,
26 ShowObjectType, SqlServerConfigOptionName, SystemObjectType, UnresolvedItemName,
27 WithOptionValue,
28};
29use mz_sql_pretty::PrettyConfig;
30use query::QueryContext;
31
32use crate::ast::visit_mut::VisitMut;
33use crate::ast::{
34 SelectStatement, ShowColumnsStatement, ShowCreateIndexStatement, ShowCreateSinkStatement,
35 ShowCreateSourceStatement, ShowCreateTableStatement, ShowCreateViewStatement,
36 ShowObjectsStatement, ShowStatementFilter, Statement, Value,
37};
38use crate::catalog::{CatalogItemType, SessionCatalog};
39use crate::names::{
40 self, Aug, NameSimplifier, ObjectId, ResolvedClusterName, ResolvedDatabaseName, ResolvedIds,
41 ResolvedItemName, ResolvedRoleName, ResolvedSchemaName,
42};
43use crate::parse;
44use crate::plan::scope::Scope;
45use crate::plan::statement::ddl::unplan_create_cluster;
46use crate::plan::statement::{StatementContext, StatementDesc, dml};
47use crate::plan::{
48 HirRelationExpr, Params, Plan, PlanError, ShowColumnsPlan, ShowCreatePlan, query, transform_ast,
49};
50
51pub fn describe_show_create_view(
52 _: &StatementContext,
53 _: ShowCreateViewStatement<Aug>,
54) -> Result<StatementDesc, PlanError> {
55 Ok(StatementDesc::new(Some(
56 RelationDesc::builder()
57 .with_column("name", SqlScalarType::String.nullable(false))
58 .with_column("create_sql", SqlScalarType::String.nullable(false))
59 .finish(),
60 )))
61}
62
63pub fn plan_show_create_view(
64 scx: &StatementContext,
65 ShowCreateViewStatement {
66 view_name,
67 redacted,
68 }: ShowCreateViewStatement<Aug>,
69) -> Result<ShowCreatePlan, PlanError> {
70 plan_show_create_item(scx, &view_name, CatalogItemType::View, redacted)
71}
72
73pub fn describe_show_create_materialized_view(
74 _: &StatementContext,
75 _: ShowCreateMaterializedViewStatement<Aug>,
76) -> Result<StatementDesc, PlanError> {
77 Ok(StatementDesc::new(Some(
78 RelationDesc::builder()
79 .with_column("name", SqlScalarType::String.nullable(false))
80 .with_column("create_sql", SqlScalarType::String.nullable(false))
81 .finish(),
82 )))
83}
84
85pub fn plan_show_create_materialized_view(
86 scx: &StatementContext,
87 ShowCreateMaterializedViewStatement {
88 materialized_view_name,
89 redacted,
90 }: ShowCreateMaterializedViewStatement<Aug>,
91) -> Result<ShowCreatePlan, PlanError> {
92 plan_show_create_item(
93 scx,
94 &materialized_view_name,
95 CatalogItemType::MaterializedView,
96 redacted,
97 )
98}
99
100pub fn describe_show_create_table(
101 _: &StatementContext,
102 _: ShowCreateTableStatement<Aug>,
103) -> Result<StatementDesc, PlanError> {
104 Ok(StatementDesc::new(Some(
105 RelationDesc::builder()
106 .with_column("name", SqlScalarType::String.nullable(false))
107 .with_column("create_sql", SqlScalarType::String.nullable(false))
108 .finish(),
109 )))
110}
111
112fn plan_show_create_item(
113 scx: &StatementContext,
114 name: &ResolvedItemName,
115 expect_type: CatalogItemType,
116 redacted: bool,
117) -> Result<ShowCreatePlan, PlanError> {
118 let item = scx.get_item_by_resolved_name(name)?;
119 let name = name.full_name_str();
120 if item.id().is_system()
121 && matches!(
122 expect_type,
123 CatalogItemType::Table | CatalogItemType::Source
124 )
125 {
126 sql_bail!("cannot show create for system object {name}");
127 }
128 if item.item_type() == CatalogItemType::MaterializedView && expect_type == CatalogItemType::View
129 {
130 return Err(PlanError::ShowCreateViewOnMaterializedView(name));
131 }
132 if item.item_type() != expect_type {
133 sql_bail!("{name} is not a {expect_type}");
134 }
135 let create_sql =
136 humanize_sql_for_show_create(scx.catalog, item.id(), item.create_sql(), redacted)?;
137 Ok(ShowCreatePlan {
138 id: ObjectId::Item(item.id()),
139 row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
140 })
141}
142
143pub fn plan_show_create_table(
144 scx: &StatementContext,
145 ShowCreateTableStatement {
146 table_name,
147 redacted,
148 }: ShowCreateTableStatement<Aug>,
149) -> Result<ShowCreatePlan, PlanError> {
150 plan_show_create_item(scx, &table_name, CatalogItemType::Table, redacted)
151}
152
153pub fn describe_show_create_source(
154 _: &StatementContext,
155 _: ShowCreateSourceStatement<Aug>,
156) -> Result<StatementDesc, PlanError> {
157 Ok(StatementDesc::new(Some(
158 RelationDesc::builder()
159 .with_column("name", SqlScalarType::String.nullable(false))
160 .with_column("create_sql", SqlScalarType::String.nullable(false))
161 .finish(),
162 )))
163}
164
165pub fn plan_show_create_source(
166 scx: &StatementContext,
167 ShowCreateSourceStatement {
168 source_name,
169 redacted,
170 }: ShowCreateSourceStatement<Aug>,
171) -> Result<ShowCreatePlan, PlanError> {
172 plan_show_create_item(scx, &source_name, CatalogItemType::Source, redacted)
173}
174
175pub fn describe_show_create_sink(
176 _: &StatementContext,
177 _: ShowCreateSinkStatement<Aug>,
178) -> Result<StatementDesc, PlanError> {
179 Ok(StatementDesc::new(Some(
180 RelationDesc::builder()
181 .with_column("name", SqlScalarType::String.nullable(false))
182 .with_column("create_sql", SqlScalarType::String.nullable(false))
183 .finish(),
184 )))
185}
186
187pub fn plan_show_create_sink(
188 scx: &StatementContext,
189 ShowCreateSinkStatement {
190 sink_name,
191 redacted,
192 }: ShowCreateSinkStatement<Aug>,
193) -> Result<ShowCreatePlan, PlanError> {
194 plan_show_create_item(scx, &sink_name, CatalogItemType::Sink, redacted)
195}
196
197pub fn describe_show_create_index(
198 _: &StatementContext,
199 _: ShowCreateIndexStatement<Aug>,
200) -> Result<StatementDesc, PlanError> {
201 Ok(StatementDesc::new(Some(
202 RelationDesc::builder()
203 .with_column("name", SqlScalarType::String.nullable(false))
204 .with_column("create_sql", SqlScalarType::String.nullable(false))
205 .finish(),
206 )))
207}
208
209pub fn plan_show_create_index(
210 scx: &StatementContext,
211 ShowCreateIndexStatement {
212 index_name,
213 redacted,
214 }: ShowCreateIndexStatement<Aug>,
215) -> Result<ShowCreatePlan, PlanError> {
216 plan_show_create_item(scx, &index_name, CatalogItemType::Index, redacted)
217}
218
219pub fn describe_show_create_connection(
220 _: &StatementContext,
221 _: ShowCreateConnectionStatement<Aug>,
222) -> Result<StatementDesc, PlanError> {
223 Ok(StatementDesc::new(Some(
224 RelationDesc::builder()
225 .with_column("name", SqlScalarType::String.nullable(false))
226 .with_column("create_sql", SqlScalarType::String.nullable(false))
227 .finish(),
228 )))
229}
230
231pub fn plan_show_create_cluster(
232 scx: &StatementContext,
233 ShowCreateClusterStatement { cluster_name }: ShowCreateClusterStatement<Aug>,
234) -> Result<ShowCreatePlan, PlanError> {
235 let cluster = scx.get_cluster(&cluster_name.id);
236 let name = cluster.name().to_string();
237 let plan = cluster.try_to_plan()?;
238 let stmt = unplan_create_cluster(scx, plan)?;
239 let create_sql = stmt.to_ast_string_stable();
240 Ok(ShowCreatePlan {
241 id: ObjectId::Cluster(cluster_name.id),
242 row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
243 })
244}
245
246pub fn describe_show_create_cluster(
247 _: &StatementContext,
248 _: ShowCreateClusterStatement<Aug>,
249) -> Result<StatementDesc, PlanError> {
250 Ok(StatementDesc::new(Some(
251 RelationDesc::builder()
252 .with_column("name", SqlScalarType::String.nullable(false))
253 .with_column("create_sql", SqlScalarType::String.nullable(false))
254 .finish(),
255 )))
256}
257
258pub fn plan_show_create_connection(
259 scx: &StatementContext,
260 ShowCreateConnectionStatement {
261 connection_name,
262 redacted,
263 }: ShowCreateConnectionStatement<Aug>,
264) -> Result<ShowCreatePlan, PlanError> {
265 plan_show_create_item(scx, &connection_name, CatalogItemType::Connection, redacted)
266}
267
268pub fn show_databases<'a>(
269 scx: &'a StatementContext<'a>,
270 filter: Option<ShowStatementFilter<Aug>>,
271) -> Result<ShowSelect<'a>, PlanError> {
272 let query = "SELECT name, comment FROM mz_internal.mz_show_databases".to_string();
273 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
274}
275
276pub fn show_schemas<'a>(
277 scx: &'a StatementContext<'a>,
278 from: Option<ResolvedDatabaseName>,
279 filter: Option<ShowStatementFilter<Aug>>,
280) -> Result<ShowSelect<'a>, PlanError> {
281 let database_id = match from {
282 Some(ResolvedDatabaseName::Database { id, .. }) => id.to_string(),
283 None => match scx.active_database() {
284 Some(id) => id.to_string(),
285 None => sql_bail!("no database specified and no active database"),
286 },
287 Some(ResolvedDatabaseName::Error) => {
288 unreachable!("should have been handled in name resolution")
289 }
290 };
291 let query = format!(
292 "SELECT name, comment
293 FROM mz_internal.mz_show_schemas
294 WHERE database_id IS NULL OR database_id = '{database_id}'",
295 );
296 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
297}
298
299pub fn show_roles<'a>(
300 scx: &'a StatementContext<'a>,
301 filter: Option<ShowStatementFilter<Aug>>,
302) -> Result<ShowSelect<'a>, PlanError> {
303 let query = "SELECT name, comment FROM mz_internal.mz_show_roles".to_string();
304 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
305}
306
307pub fn show_network_policies<'a>(
308 scx: &'a StatementContext<'a>,
309 filter: Option<ShowStatementFilter<Aug>>,
310) -> Result<ShowSelect<'a>, PlanError> {
311 let query = "SELECT name, rules, comment FROM mz_internal.mz_show_network_policies".to_string();
312 ShowSelect::new(
313 scx,
314 query,
315 filter,
316 None,
317 Some(&["name", "rules", "comment"]),
318 )
319}
320
321pub fn show_objects<'a>(
322 scx: &'a StatementContext<'a>,
323 ShowObjectsStatement {
324 object_type,
325 from,
326 filter,
327 }: ShowObjectsStatement<Aug>,
328) -> Result<ShowSelect<'a>, PlanError> {
329 match object_type {
330 ShowObjectType::Table { on_source } => show_tables(scx, from, on_source, filter),
331 ShowObjectType::Source { in_cluster } => show_sources(scx, from, in_cluster, filter),
332 ShowObjectType::Subsource { on_source } => show_subsources(scx, from, on_source, filter),
333 ShowObjectType::View => show_views(scx, from, filter),
334 ShowObjectType::Sink { in_cluster } => show_sinks(scx, from, in_cluster, filter),
335 ShowObjectType::Type => show_types(scx, from, filter),
336 ShowObjectType::Object => show_all_objects(scx, from, filter),
337 ShowObjectType::Role => {
338 assert_none!(from, "parser should reject from");
339 show_roles(scx, filter)
340 }
341 ShowObjectType::Cluster => {
342 assert_none!(from, "parser should reject from");
343 show_clusters(scx, filter)
344 }
345 ShowObjectType::ClusterReplica => {
346 assert_none!(from, "parser should reject from");
347 show_cluster_replicas(scx, filter)
348 }
349 ShowObjectType::Secret => show_secrets(scx, from, filter),
350 ShowObjectType::Connection => show_connections(scx, from, filter),
351 ShowObjectType::MaterializedView { in_cluster } => {
352 show_materialized_views(scx, from, in_cluster, filter)
353 }
354 ShowObjectType::Index {
355 in_cluster,
356 on_object,
357 } => show_indexes(scx, from, on_object, in_cluster, filter),
358 ShowObjectType::Database => {
359 assert_none!(from, "parser should reject from");
360 show_databases(scx, filter)
361 }
362 ShowObjectType::Schema { from: db_from } => {
363 assert_none!(from, "parser should reject from");
364 show_schemas(scx, db_from, filter)
365 }
366 ShowObjectType::Privileges { object_type, role } => {
367 assert_none!(from, "parser should reject from");
368 show_privileges(scx, object_type, role, filter)
369 }
370 ShowObjectType::DefaultPrivileges { object_type, role } => {
371 assert_none!(from, "parser should reject from");
372 show_default_privileges(scx, object_type, role, filter)
373 }
374 ShowObjectType::RoleMembership { role } => {
375 assert_none!(from, "parser should reject from");
376 show_role_membership(scx, role, filter)
377 }
378 ShowObjectType::ContinualTask { in_cluster } => {
379 show_continual_tasks(scx, from, in_cluster, filter)
380 }
381 ShowObjectType::NetworkPolicy => {
382 assert_none!(from, "parser should reject from");
383 show_network_policies(scx, filter)
384 }
385 }
386}
387
388fn show_connections<'a>(
389 scx: &'a StatementContext<'a>,
390 from: Option<ResolvedSchemaName>,
391 filter: Option<ShowStatementFilter<Aug>>,
392) -> Result<ShowSelect<'a>, PlanError> {
393 let schema_spec = scx.resolve_optional_schema(&from)?;
394 let query = format!(
395 "SELECT name, type, comment
396 FROM mz_internal.mz_show_connections connections
397 WHERE schema_id = '{schema_spec}'",
398 );
399 ShowSelect::new(scx, query, filter, None, Some(&["name", "type", "comment"]))
400}
401
402fn show_tables<'a>(
403 scx: &'a StatementContext<'a>,
404 from: Option<ResolvedSchemaName>,
405 on_source: Option<ResolvedItemName>,
406 filter: Option<ShowStatementFilter<Aug>>,
407) -> Result<ShowSelect<'a>, PlanError> {
408 let schema_spec = scx.resolve_optional_schema(&from)?;
409 let mut query = format!(
410 "SELECT name, comment
411 FROM mz_internal.mz_show_tables tables
412 WHERE tables.schema_id = '{schema_spec}'",
413 );
414 if let Some(on_source) = &on_source {
415 let on_item = scx.get_item_by_resolved_name(on_source)?;
416 if on_item.item_type() != CatalogItemType::Source {
417 sql_bail!(
418 "cannot show tables on {} because it is a {}",
419 on_source.full_name_str(),
420 on_item.item_type(),
421 );
422 }
423 query += &format!(" AND tables.source_id = '{}'", on_item.id());
424 }
425 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
426}
427
428fn show_sources<'a>(
429 scx: &'a StatementContext<'a>,
430 from: Option<ResolvedSchemaName>,
431 in_cluster: Option<ResolvedClusterName>,
432 filter: Option<ShowStatementFilter<Aug>>,
433) -> Result<ShowSelect<'a>, PlanError> {
434 let schema_spec = scx.resolve_optional_schema(&from)?;
435 let mut where_clause = format!("schema_id = '{schema_spec}'");
436
437 if let Some(cluster) = in_cluster {
438 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
439 .expect("write on string cannot fail");
440 }
441
442 let query = format!(
443 "SELECT name, type, cluster, comment
444 FROM mz_internal.mz_show_sources
445 WHERE {where_clause}"
446 );
447 ShowSelect::new(
448 scx,
449 query,
450 filter,
451 None,
452 Some(&["name", "type", "cluster", "comment"]),
453 )
454}
455
456fn show_subsources<'a>(
457 scx: &'a StatementContext<'a>,
458 from_schema: Option<ResolvedSchemaName>,
459 on_source: Option<ResolvedItemName>,
460 filter: Option<ShowStatementFilter<Aug>>,
461) -> Result<ShowSelect<'a>, PlanError> {
462 let mut query_filter = Vec::new();
463
464 if on_source.is_none() && from_schema.is_none() {
465 query_filter.push("subsources.id NOT LIKE 's%'".into());
466 let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?;
467 query_filter.push(format!("subsources.schema_id = '{schema_spec}'"));
468 }
469
470 if let Some(on_source) = &on_source {
471 let on_item = scx.get_item_by_resolved_name(on_source)?;
472 if on_item.item_type() != CatalogItemType::Source {
473 sql_bail!(
474 "cannot show subsources on {} because it is a {}",
475 on_source.full_name_str(),
476 on_item.item_type(),
477 );
478 }
479 query_filter.push(format!("sources.id = '{}'", on_item.id()));
480 }
481
482 if let Some(schema) = from_schema {
483 let schema_spec = schema.schema_spec();
484 query_filter.push(format!("subsources.schema_id = '{schema_spec}'"));
485 }
486
487 let query = format!(
490 "SELECT DISTINCT
491 subsources.name AS name,
492 subsources.type AS type
493 FROM
494 mz_sources AS subsources
495 JOIN mz_internal.mz_object_dependencies deps ON (subsources.id = deps.object_id OR subsources.id = deps.referenced_object_id)
496 JOIN mz_sources AS sources ON (sources.id = deps.object_id OR sources.id = deps.referenced_object_id)
497 WHERE (subsources.type = 'subsource' OR subsources.type = 'progress') AND {}",
498 itertools::join(query_filter, " AND "),
499 );
500 ShowSelect::new(scx, query, filter, None, None)
501}
502
503fn show_views<'a>(
504 scx: &'a StatementContext<'a>,
505 from: Option<ResolvedSchemaName>,
506 filter: Option<ShowStatementFilter<Aug>>,
507) -> Result<ShowSelect<'a>, PlanError> {
508 let schema_spec = scx.resolve_optional_schema(&from)?;
509 let query = format!(
510 "SELECT name, comment
511 FROM mz_internal.mz_show_views
512 WHERE schema_id = '{schema_spec}'"
513 );
514 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
515}
516
517fn show_materialized_views<'a>(
518 scx: &'a StatementContext<'a>,
519 from: Option<ResolvedSchemaName>,
520 in_cluster: Option<ResolvedClusterName>,
521 filter: Option<ShowStatementFilter<Aug>>,
522) -> Result<ShowSelect<'a>, PlanError> {
523 let schema_spec = scx.resolve_optional_schema(&from)?;
524 let mut where_clause = format!("schema_id = '{schema_spec}'");
525
526 if let Some(cluster) = in_cluster {
527 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
528 .expect("write on string cannot fail");
529 }
530
531 let query = format!(
532 "SELECT name, cluster, comment
533 FROM mz_internal.mz_show_materialized_views
534 WHERE {where_clause}"
535 );
536
537 ShowSelect::new(
538 scx,
539 query,
540 filter,
541 None,
542 Some(&["name", "cluster", "comment"]),
543 )
544}
545
546fn show_sinks<'a>(
547 scx: &'a StatementContext<'a>,
548 from: Option<ResolvedSchemaName>,
549 in_cluster: Option<ResolvedClusterName>,
550 filter: Option<ShowStatementFilter<Aug>>,
551) -> Result<ShowSelect<'a>, PlanError> {
552 let schema_spec = if let Some(ResolvedSchemaName::Schema { schema_spec, .. }) = from {
553 schema_spec.to_string()
554 } else {
555 scx.resolve_active_schema()?.to_string()
556 };
557
558 let mut where_clause = format!("schema_id = '{schema_spec}'");
559
560 if let Some(cluster) = in_cluster {
561 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
562 .expect("write on string cannot fail");
563 }
564
565 let query = format!(
566 "SELECT name, type, cluster, comment
567 FROM mz_internal.mz_show_sinks sinks
568 WHERE {where_clause}"
569 );
570 ShowSelect::new(
571 scx,
572 query,
573 filter,
574 None,
575 Some(&["name", "type", "cluster", "comment"]),
576 )
577}
578
579fn show_types<'a>(
580 scx: &'a StatementContext<'a>,
581 from: Option<ResolvedSchemaName>,
582 filter: Option<ShowStatementFilter<Aug>>,
583) -> Result<ShowSelect<'a>, PlanError> {
584 let schema_spec = scx.resolve_optional_schema(&from)?;
585 let query = format!(
586 "SELECT name, comment
587 FROM mz_internal.mz_show_types
588 WHERE schema_id = '{schema_spec}'"
589 );
590 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
591}
592
593fn show_all_objects<'a>(
594 scx: &'a StatementContext<'a>,
595 from: Option<ResolvedSchemaName>,
596 filter: Option<ShowStatementFilter<Aug>>,
597) -> Result<ShowSelect<'a>, PlanError> {
598 let schema_spec = scx.resolve_optional_schema(&from)?;
599 let query = format!(
600 "SELECT name, type, comment
601 FROM mz_internal.mz_show_all_objects
602 WHERE schema_id = '{schema_spec}'",
603 );
604 ShowSelect::new(scx, query, filter, None, Some(&["name", "type", "comment"]))
605}
606
607pub fn show_indexes<'a>(
608 scx: &'a StatementContext<'a>,
609 from_schema: Option<ResolvedSchemaName>,
610 on_object: Option<ResolvedItemName>,
611 in_cluster: Option<ResolvedClusterName>,
612 filter: Option<ShowStatementFilter<Aug>>,
613) -> Result<ShowSelect<'a>, PlanError> {
614 let mut query_filter = Vec::new();
615
616 if on_object.is_none() && from_schema.is_none() && in_cluster.is_none() {
617 query_filter.push("on_id NOT LIKE 's%'".into());
618 let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?;
619 query_filter.push(format!("schema_id = '{schema_spec}'"));
620 }
621
622 if let Some(on_object) = &on_object {
623 let on_item = scx.get_item_by_resolved_name(on_object)?;
624 if on_item.item_type() != CatalogItemType::View
625 && on_item.item_type() != CatalogItemType::MaterializedView
626 && on_item.item_type() != CatalogItemType::Source
627 && on_item.item_type() != CatalogItemType::Table
628 {
629 sql_bail!(
630 "cannot show indexes on {} because it is a {}",
631 on_object.full_name_str(),
632 on_item.item_type(),
633 );
634 }
635 query_filter.push(format!("on_id = '{}'", on_item.id()));
636 }
637
638 if let Some(schema) = from_schema {
639 let schema_spec = schema.schema_spec();
640 query_filter.push(format!("schema_id = '{schema_spec}'"));
641 }
642
643 if let Some(cluster) = in_cluster {
644 query_filter.push(format!("cluster_id = '{}'", cluster.id))
645 };
646
647 let query = format!(
648 "SELECT name, on, cluster, key, comment
649 FROM mz_internal.mz_show_indexes
650 WHERE {}",
651 itertools::join(query_filter.iter(), " AND ")
652 );
653
654 ShowSelect::new(
655 scx,
656 query,
657 filter,
658 None,
659 Some(&["name", "on", "cluster", "key", "comment"]),
660 )
661}
662
663pub fn show_columns<'a>(
664 scx: &'a StatementContext<'a>,
665 ShowColumnsStatement { table_name, filter }: ShowColumnsStatement<Aug>,
666) -> Result<ShowColumnsSelect<'a>, PlanError> {
667 let entry = scx.get_item_by_resolved_name(&table_name)?;
668 let full_name = scx.catalog.resolve_full_name(entry.name());
669
670 match entry.item_type() {
671 CatalogItemType::Source
672 | CatalogItemType::Table
673 | CatalogItemType::View
674 | CatalogItemType::MaterializedView
675 | CatalogItemType::ContinualTask => (),
676 ty @ CatalogItemType::Connection
677 | ty @ CatalogItemType::Index
678 | ty @ CatalogItemType::Func
679 | ty @ CatalogItemType::Secret
680 | ty @ CatalogItemType::Type
681 | ty @ CatalogItemType::Sink => {
682 sql_bail!("{full_name} is a {ty} and so does not have columns");
683 }
684 }
685
686 let query = format!(
687 "SELECT name, nullable, type, position, comment
688 FROM mz_internal.mz_show_columns columns
689 WHERE columns.id = '{}'",
690 entry.id(),
691 );
692 let (show_select, new_resolved_ids) = ShowSelect::new_with_resolved_ids(
693 scx,
694 query,
695 filter,
696 Some("position"),
697 Some(&["name", "nullable", "type", "comment"]),
698 )?;
699 Ok(ShowColumnsSelect {
700 id: entry.id(),
701 show_select,
702 new_resolved_ids,
703 })
704}
705
706pub fn show_clusters<'a>(
710 scx: &'a StatementContext<'a>,
711 filter: Option<ShowStatementFilter<Aug>>,
712) -> Result<ShowSelect<'a>, PlanError> {
713 let query = "SELECT name, replicas, comment FROM mz_internal.mz_show_clusters".to_string();
714 ShowSelect::new(
715 scx,
716 query,
717 filter,
718 None,
719 Some(&["name", "replicas", "comment"]),
720 )
721}
722
723pub fn show_cluster_replicas<'a>(
724 scx: &'a StatementContext<'a>,
725 filter: Option<ShowStatementFilter<Aug>>,
726) -> Result<ShowSelect<'a>, PlanError> {
727 let query = "
728 SELECT cluster, replica, size, ready, comment
729 FROM mz_internal.mz_show_cluster_replicas
730 "
731 .to_string();
732
733 ShowSelect::new(
734 scx,
735 query,
736 filter,
737 None,
738 Some(&["cluster", "replica", "size", "ready", "comment"]),
739 )
740}
741
742pub fn show_secrets<'a>(
743 scx: &'a StatementContext<'a>,
744 from: Option<ResolvedSchemaName>,
745 filter: Option<ShowStatementFilter<Aug>>,
746) -> Result<ShowSelect<'a>, PlanError> {
747 let schema_spec = scx.resolve_optional_schema(&from)?;
748
749 let query = format!(
750 "SELECT name, comment
751 FROM mz_internal.mz_show_secrets
752 WHERE schema_id = '{schema_spec}'",
753 );
754
755 ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
756}
757
758pub fn show_privileges<'a>(
759 scx: &'a StatementContext<'a>,
760 object_type: Option<SystemObjectType>,
761 role: Option<ResolvedRoleName>,
762 filter: Option<ShowStatementFilter<Aug>>,
763) -> Result<ShowSelect<'a>, PlanError> {
764 let mut query_filter = Vec::new();
765 if let Some(object_type) = object_type {
766 query_filter.push(format!(
767 "object_type = '{}'",
768 object_type.to_string().to_lowercase()
769 ));
770 }
771 if let Some(role) = role {
772 let name = role.name;
773 query_filter.push(format!("CASE WHEN grantee = 'PUBLIC' THEN true ELSE pg_has_role('{name}', grantee, 'USAGE') END"));
774 }
775 let query_filter = if query_filter.len() > 0 {
776 format!("WHERE {}", itertools::join(query_filter, " AND "))
777 } else {
778 "".to_string()
779 };
780
781 let query = format!(
782 "SELECT grantor, grantee, database, schema, name, object_type, privilege_type
783 FROM mz_internal.mz_show_all_privileges
784 {query_filter}",
785 );
786
787 ShowSelect::new(
788 scx,
789 query,
790 filter,
791 None,
792 Some(&[
793 "grantor",
794 "grantee",
795 "database",
796 "schema",
797 "name",
798 "object_type",
799 "privilege_type",
800 ]),
801 )
802}
803
804pub fn show_default_privileges<'a>(
805 scx: &'a StatementContext<'a>,
806 object_type: Option<ObjectType>,
807 role: Option<ResolvedRoleName>,
808 filter: Option<ShowStatementFilter<Aug>>,
809) -> Result<ShowSelect<'a>, PlanError> {
810 let mut query_filter = Vec::new();
811 if let Some(object_type) = object_type {
812 query_filter.push(format!(
813 "object_type = '{}'",
814 object_type.to_string().to_lowercase()
815 ));
816 }
817 if let Some(role) = role {
818 let name = role.name;
819 query_filter.push(format!("CASE WHEN grantee = 'PUBLIC' THEN true ELSE pg_has_role('{name}', grantee, 'USAGE') END"));
820 }
821 let query_filter = if query_filter.len() > 0 {
822 format!("WHERE {}", itertools::join(query_filter, " AND "))
823 } else {
824 "".to_string()
825 };
826
827 let query = format!(
828 "SELECT object_owner, database, schema, object_type, grantee, privilege_type
829 FROM mz_internal.mz_show_default_privileges
830 {query_filter}",
831 );
832
833 ShowSelect::new(
834 scx,
835 query,
836 filter,
837 None,
838 Some(&[
839 "object_owner",
840 "database",
841 "schema",
842 "object_type",
843 "grantee",
844 "privilege_type",
845 ]),
846 )
847}
848
849pub fn show_role_membership<'a>(
850 scx: &'a StatementContext<'a>,
851 role: Option<ResolvedRoleName>,
852 filter: Option<ShowStatementFilter<Aug>>,
853) -> Result<ShowSelect<'a>, PlanError> {
854 let mut query_filter = Vec::new();
855 if let Some(role) = role {
856 let name = role.name;
857 query_filter.push(format!("pg_has_role('{name}', member, 'USAGE')"));
858 }
859 let query_filter = if query_filter.len() > 0 {
860 format!("WHERE {}", itertools::join(query_filter, " AND "))
861 } else {
862 "".to_string()
863 };
864
865 let query = format!(
866 "SELECT role, member, grantor
867 FROM mz_internal.mz_show_role_members
868 {query_filter}",
869 );
870
871 ShowSelect::new(
872 scx,
873 query,
874 filter,
875 None,
876 Some(&["role", "member", "grantor"]),
877 )
878}
879
880fn show_continual_tasks<'a>(
881 scx: &'a StatementContext<'a>,
882 from: Option<ResolvedSchemaName>,
883 in_cluster: Option<ResolvedClusterName>,
884 filter: Option<ShowStatementFilter<Aug>>,
885) -> Result<ShowSelect<'a>, PlanError> {
886 let schema_spec = scx.resolve_optional_schema(&from)?;
887 let mut where_clause = format!("schema_id = '{schema_spec}'");
888
889 if let Some(cluster) = in_cluster {
890 write!(where_clause, " AND cluster_id = '{}'", cluster.id)
891 .expect("write on string cannot fail");
892 }
893
894 let query = format!(
895 "SELECT name, cluster, comment
896 FROM mz_internal.mz_show_continual_tasks
897 WHERE {where_clause}"
898 );
899
900 ShowSelect::new(
901 scx,
902 query,
903 filter,
904 None,
905 Some(&["name", "cluster", "comment"]),
906 )
907}
908
909pub struct ShowSelect<'a> {
913 scx: &'a StatementContext<'a>,
914 pub(crate) stmt: SelectStatement<Aug>,
915}
916
917impl<'a> ShowSelect<'a> {
918 fn new(
927 scx: &'a StatementContext,
928 query: String,
929 filter: Option<ShowStatementFilter<Aug>>,
930 order: Option<&str>,
931 projection: Option<&[&str]>,
932 ) -> Result<ShowSelect<'a>, PlanError> {
933 Self::new_with_resolved_ids(scx, query, filter, order, projection)
934 .map(|(show_select, _)| show_select)
935 }
936
937 fn new_with_resolved_ids(
938 scx: &'a StatementContext,
939 query: String,
940 filter: Option<ShowStatementFilter<Aug>>,
941 order: Option<&str>,
942 projection: Option<&[&str]>,
943 ) -> Result<(ShowSelect<'a>, ResolvedIds), PlanError> {
944 let filter = match filter {
945 Some(ShowStatementFilter::Like(like)) => format!("name LIKE {}", Value::String(like)),
946 Some(ShowStatementFilter::Where(expr)) => expr.to_string(),
947 None => "true".to_string(),
948 };
949 let query = format!(
950 "SELECT {} FROM ({}) q WHERE {} ORDER BY {}",
951 projection
952 .map(|ps| ps.join(", "))
953 .unwrap_or_else(|| "*".into()),
954 query,
955 filter,
956 order.unwrap_or("q.*")
957 );
958
959 Self::new_from_bare_query(scx, query)
960 }
961
962 pub fn new_from_bare_query(
963 scx: &'a StatementContext,
964 query: String,
965 ) -> Result<(ShowSelect<'a>, ResolvedIds), PlanError> {
966 let stmts = parse::parse(&query).expect("ShowSelect::new called with invalid SQL");
967 let stmt = match stmts.into_element().ast {
968 Statement::Select(select) => select,
969 _ => panic!("ShowSelect::new called with non-SELECT statement"),
970 };
971 let (mut stmt, new_resolved_ids) = names::resolve(scx.catalog, stmt)?;
972 transform_ast::transform(scx, &mut stmt)?;
973 Ok((ShowSelect { scx, stmt }, new_resolved_ids))
974 }
975
976 pub fn describe(self) -> Result<StatementDesc, PlanError> {
978 dml::describe_select(self.scx, self.stmt)
979 }
980
981 pub fn plan(self) -> Result<Plan, PlanError> {
983 dml::plan_select(self.scx, self.stmt, &Params::empty(), None)
984 }
985
986 pub fn plan_hir(self, qcx: &QueryContext) -> Result<(HirRelationExpr, Scope), PlanError> {
988 query::plan_nested_query(&mut qcx.clone(), &self.stmt.query)
989 }
990}
991
992pub struct ShowColumnsSelect<'a> {
993 id: CatalogItemId,
994 new_resolved_ids: ResolvedIds,
995 show_select: ShowSelect<'a>,
996}
997
998impl<'a> ShowColumnsSelect<'a> {
999 pub fn describe(self) -> Result<StatementDesc, PlanError> {
1000 self.show_select.describe()
1001 }
1002
1003 pub fn plan(self) -> Result<Plan, PlanError> {
1004 let select_plan = self.show_select.plan()?;
1005 match select_plan {
1006 Plan::Select(select_plan) => Ok(Plan::ShowColumns(ShowColumnsPlan {
1007 id: self.id,
1008 select_plan,
1009 new_resolved_ids: self.new_resolved_ids,
1010 })),
1011 _ => {
1012 tracing::error!(
1013 "SHOW COLUMNS produced a non select plan. plan: {:?}",
1014 select_plan
1015 );
1016 Err(PlanError::Unstructured(
1017 "SHOW COLUMNS produced an unexpected plan. Please file a bug.".to_string(),
1018 ))
1019 }
1020 }
1021 }
1022
1023 pub fn plan_hir(self, qcx: &QueryContext) -> Result<(HirRelationExpr, Scope), PlanError> {
1024 self.show_select.plan_hir(qcx)
1025 }
1026}
1027
1028fn humanize_sql_for_show_create(
1031 catalog: &dyn SessionCatalog,
1032 id: CatalogItemId,
1033 sql: &str,
1034 redacted: bool,
1035) -> Result<String, PlanError> {
1036 use mz_sql_parser::ast::{CreateSourceConnection, MySqlConfigOptionName, PgConfigOptionName};
1037
1038 let parsed = parse::parse(sql)?.into_element().ast;
1039 let (mut resolved, _) = names::resolve(catalog, parsed)?;
1040
1041 let mut simplifier = NameSimplifier { catalog };
1043 simplifier.visit_statement_mut(&mut resolved);
1044
1045 match &mut resolved {
1046 Statement::CreateMaterializedView(stmt) => stmt.as_of = None,
1048 Statement::CreateContinualTask(stmt) => stmt.as_of = None,
1049 Statement::CreateSource(stmt) => {
1058 let mut curr_references: BTreeMap<UnresolvedItemName, Vec<UnresolvedItemName>> =
1060 catalog
1061 .get_item(&id)
1062 .used_by()
1063 .into_iter()
1064 .filter_map(|subsource| {
1065 let item = catalog.get_item(subsource);
1066 item.subsource_details().map(|(_id, reference, _details)| {
1067 let name = item.name();
1068 let subsource_name = catalog.resolve_full_name(name);
1069 let subsource_name = UnresolvedItemName::from(subsource_name);
1070 (reference.clone(), subsource_name)
1071 })
1072 })
1073 .fold(BTreeMap::new(), |mut map, (reference, subsource_name)| {
1074 map.entry(reference)
1075 .or_insert_with(Vec::new)
1076 .push(subsource_name);
1077 map
1078 });
1079
1080 match &mut stmt.connection {
1081 CreateSourceConnection::Postgres { options, .. }
1082 | CreateSourceConnection::Yugabyte { options, .. } => {
1083 options.retain_mut(|o| {
1084 match o.name {
1085 PgConfigOptionName::TextColumns => {}
1089 PgConfigOptionName::Details => return false,
1091 _ => return true,
1092 };
1093 match &mut o.value {
1094 Some(WithOptionValue::Sequence(text_cols)) => {
1095 text_cols.retain(|v| match v {
1096 WithOptionValue::UnresolvedItemName(n) => {
1097 let mut name = n.clone();
1098 name.0.truncate(3);
1100 curr_references.contains_key(&name)
1101 }
1102 _ => unreachable!(
1103 "TEXT COLUMNS must be sequence of unresolved item names"
1104 ),
1105 });
1106 !text_cols.is_empty()
1107 }
1108 _ => unreachable!(
1109 "TEXT COLUMNS must be sequence of unresolved item names"
1110 ),
1111 }
1112 });
1113 }
1114 CreateSourceConnection::SqlServer { options, .. } => {
1115 let adjusted_references: BTreeSet<_> = curr_references
1120 .keys()
1121 .map(|name| {
1122 if name.0.len() == 3 {
1123 let adjusted_name = name.0[1..].to_vec();
1125 UnresolvedItemName(adjusted_name)
1126 } else {
1127 name.clone()
1128 }
1129 })
1130 .collect();
1131
1132 options.retain_mut(|o| {
1133 match o.name {
1134 SqlServerConfigOptionName::TextColumns
1138 | SqlServerConfigOptionName::ExcludeColumns => {}
1139 SqlServerConfigOptionName::Details => return false,
1141 };
1142
1143 match &mut o.value {
1144 Some(WithOptionValue::Sequence(seq_unresolved_item_names)) => {
1145 seq_unresolved_item_names.retain(|v| match v {
1146 WithOptionValue::UnresolvedItemName(n) => {
1147 let mut name = n.clone();
1148 name.0.truncate(2);
1150 adjusted_references.contains(&name)
1151 }
1152 _ => unreachable!(
1153 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1154 ),
1155 });
1156 !seq_unresolved_item_names.is_empty()
1157 }
1158 _ => unreachable!(
1159 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1160 ),
1161 }
1162 });
1163 }
1164 CreateSourceConnection::MySql { options, .. } => {
1165 options.retain_mut(|o| {
1166 match o.name {
1167 MySqlConfigOptionName::TextColumns
1171 | MySqlConfigOptionName::ExcludeColumns => {}
1172 MySqlConfigOptionName::Details => return false,
1174 };
1175
1176 match &mut o.value {
1177 Some(WithOptionValue::Sequence(seq_unresolved_item_names)) => {
1178 seq_unresolved_item_names.retain(|v| match v {
1179 WithOptionValue::UnresolvedItemName(n) => {
1180 let mut name = n.clone();
1181 name.0.truncate(2);
1183 curr_references.contains_key(&name)
1184 }
1185 _ => unreachable!(
1186 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1187 ),
1188 });
1189 !seq_unresolved_item_names.is_empty()
1190 }
1191 _ => unreachable!(
1192 "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1193 ),
1194 }
1195 });
1196 }
1197 CreateSourceConnection::LoadGenerator { .. } if !curr_references.is_empty() => {
1198 curr_references.clear();
1202 stmt.external_references = Some(ExternalReferences::All);
1203 }
1204 CreateSourceConnection::Kafka { .. }
1205 | CreateSourceConnection::LoadGenerator { .. } => {}
1206 }
1207
1208 if !curr_references.is_empty() {
1210 let mut subsources: Vec<_> = curr_references
1211 .into_iter()
1212 .flat_map(|(reference, names)| {
1213 names.into_iter().map(move |name| ExternalReferenceExport {
1214 reference: reference.clone(),
1215 alias: Some(name),
1216 })
1217 })
1218 .collect();
1219 subsources.sort();
1220 stmt.external_references = Some(ExternalReferences::SubsetTables(subsources));
1221 }
1222 }
1223 Statement::CreateSubsource(stmt) => {
1224 stmt.with_options.retain_mut(|o| {
1225 match o.name {
1226 CreateSubsourceOptionName::TextColumns => true,
1227 CreateSubsourceOptionName::RetainHistory => true,
1228 CreateSubsourceOptionName::ExcludeColumns => true,
1229 CreateSubsourceOptionName::Details => false,
1231 CreateSubsourceOptionName::ExternalReference => true,
1232 CreateSubsourceOptionName::Progress => true,
1233 }
1234 });
1235 }
1236
1237 _ => (),
1238 }
1239
1240 Ok(mz_sql_pretty::to_pretty(
1241 &resolved,
1242 PrettyConfig {
1243 width: mz_sql_pretty::DEFAULT_WIDTH,
1244 format_mode: if redacted {
1245 FormatMode::SimpleRedacted
1246 } else {
1247 FormatMode::Simple
1248 },
1249 },
1250 ))
1251}