Skip to main content

mz_sql/plan/statement/
show.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Queries that show the state of the database system.
11//!
12//! This module houses the handlers for the `SHOW` suite of statements, like
13//! `SHOW CREATE TABLE` and `SHOW VIEWS`. Note that `SHOW <var>` is considered
14//! an SCL statement.
15
16use std::collections::{BTreeMap, BTreeSet};
17use std::fmt::Write;
18
19use mz_ore::collections::CollectionExt;
20use mz_repr::{CatalogItemId, Datum, RelationDesc, Row, SqlScalarType};
21use mz_sql_parser::ast::display::{AstDisplay, FormatMode};
22use mz_sql_parser::ast::{
23    CreateSinkOptionName, CreateSubsourceOptionName, ExternalReferenceExport, ExternalReferences,
24    ObjectType, ShowCreateClusterStatement, ShowCreateConnectionStatement,
25    ShowCreateMaterializedViewStatement, ShowCreateTypeStatement, ShowObjectType,
26    SqlServerConfigOptionName, SystemObjectType, UnresolvedItemName, WithOptionValue,
27};
28use mz_sql_pretty::PrettyConfig;
29use query::QueryContext;
30
31use crate::ast::display::escaped_string_literal;
32use crate::ast::visit_mut::VisitMut;
33use crate::ast::{
34    SelectStatement, ShowColumnsStatement, ShowCreateIndexStatement, ShowCreateSinkStatement,
35    ShowCreateSourceStatement, ShowCreateTableStatement, ShowCreateViewStatement,
36    ShowObjectsStatement, ShowStatementFilter, Statement, Value,
37};
38use crate::catalog::{CatalogItemType, SessionCatalog};
39use crate::names::{
40    self, Aug, NameSimplifier, ObjectId, ResolvedClusterName, ResolvedDataType,
41    ResolvedDatabaseName, ResolvedIds, ResolvedItemName, ResolvedRoleName, ResolvedSchemaName,
42};
43use crate::parse;
44use crate::plan::scope::Scope;
45use crate::plan::statement::ddl::unplan_create_cluster;
46use crate::plan::statement::{StatementContext, StatementDesc, dml};
47use crate::plan::{
48    HirRelationExpr, Params, Plan, PlanError, ShowColumnsPlan, ShowCreatePlan, query, transform_ast,
49};
50
51pub fn describe_show_create_view(
52    _: &StatementContext,
53    _: ShowCreateViewStatement<Aug>,
54) -> Result<StatementDesc, PlanError> {
55    Ok(StatementDesc::new(Some(
56        RelationDesc::builder()
57            .with_column("name", SqlScalarType::String.nullable(false))
58            .with_column("create_sql", SqlScalarType::String.nullable(false))
59            .finish(),
60    )))
61}
62
63pub fn plan_show_create_view(
64    scx: &StatementContext,
65    ShowCreateViewStatement {
66        view_name,
67        redacted,
68    }: ShowCreateViewStatement<Aug>,
69) -> Result<ShowCreatePlan, PlanError> {
70    plan_show_create_item(scx, &view_name, CatalogItemType::View, redacted)
71}
72
73pub fn describe_show_create_materialized_view(
74    _: &StatementContext,
75    _: ShowCreateMaterializedViewStatement<Aug>,
76) -> Result<StatementDesc, PlanError> {
77    Ok(StatementDesc::new(Some(
78        RelationDesc::builder()
79            .with_column("name", SqlScalarType::String.nullable(false))
80            .with_column("create_sql", SqlScalarType::String.nullable(false))
81            .finish(),
82    )))
83}
84
85pub fn plan_show_create_materialized_view(
86    scx: &StatementContext,
87    ShowCreateMaterializedViewStatement {
88        materialized_view_name,
89        redacted,
90    }: ShowCreateMaterializedViewStatement<Aug>,
91) -> Result<ShowCreatePlan, PlanError> {
92    plan_show_create_item(
93        scx,
94        &materialized_view_name,
95        CatalogItemType::MaterializedView,
96        redacted,
97    )
98}
99
100pub fn describe_show_create_table(
101    _: &StatementContext,
102    _: ShowCreateTableStatement<Aug>,
103) -> Result<StatementDesc, PlanError> {
104    Ok(StatementDesc::new(Some(
105        RelationDesc::builder()
106            .with_column("name", SqlScalarType::String.nullable(false))
107            .with_column("create_sql", SqlScalarType::String.nullable(false))
108            .finish(),
109    )))
110}
111
112fn plan_show_create_item(
113    scx: &StatementContext,
114    name: &ResolvedItemName,
115    expect_type: CatalogItemType,
116    redacted: bool,
117) -> Result<ShowCreatePlan, PlanError> {
118    let item = scx.get_item_by_resolved_name(name)?;
119    let name = name.full_name_str();
120    if item.id().is_system()
121        && matches!(
122            expect_type,
123            CatalogItemType::Table | CatalogItemType::Source
124        )
125    {
126        sql_bail!("cannot show create for system object {name}");
127    }
128    if item.item_type() == CatalogItemType::MaterializedView && expect_type == CatalogItemType::View
129    {
130        return Err(PlanError::ShowCreateViewOnMaterializedView(name));
131    }
132    if item.item_type() != expect_type {
133        sql_bail!("{name} is not a {expect_type}");
134    }
135    let create_sql =
136        humanize_sql_for_show_create(scx.catalog, item.id(), item.create_sql(), redacted)?;
137    Ok(ShowCreatePlan {
138        id: ObjectId::Item(item.id()),
139        row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
140    })
141}
142
143pub fn plan_show_create_table(
144    scx: &StatementContext,
145    ShowCreateTableStatement {
146        table_name,
147        redacted,
148    }: ShowCreateTableStatement<Aug>,
149) -> Result<ShowCreatePlan, PlanError> {
150    plan_show_create_item(scx, &table_name, CatalogItemType::Table, redacted)
151}
152
153pub fn describe_show_create_source(
154    _: &StatementContext,
155    _: ShowCreateSourceStatement<Aug>,
156) -> Result<StatementDesc, PlanError> {
157    Ok(StatementDesc::new(Some(
158        RelationDesc::builder()
159            .with_column("name", SqlScalarType::String.nullable(false))
160            .with_column("create_sql", SqlScalarType::String.nullable(false))
161            .finish(),
162    )))
163}
164
165pub fn plan_show_create_source(
166    scx: &StatementContext,
167    ShowCreateSourceStatement {
168        source_name,
169        redacted,
170    }: ShowCreateSourceStatement<Aug>,
171) -> Result<ShowCreatePlan, PlanError> {
172    plan_show_create_item(scx, &source_name, CatalogItemType::Source, redacted)
173}
174
175pub fn describe_show_create_sink(
176    _: &StatementContext,
177    _: ShowCreateSinkStatement<Aug>,
178) -> Result<StatementDesc, PlanError> {
179    Ok(StatementDesc::new(Some(
180        RelationDesc::builder()
181            .with_column("name", SqlScalarType::String.nullable(false))
182            .with_column("create_sql", SqlScalarType::String.nullable(false))
183            .finish(),
184    )))
185}
186
187pub fn plan_show_create_sink(
188    scx: &StatementContext,
189    ShowCreateSinkStatement {
190        sink_name,
191        redacted,
192    }: ShowCreateSinkStatement<Aug>,
193) -> Result<ShowCreatePlan, PlanError> {
194    plan_show_create_item(scx, &sink_name, CatalogItemType::Sink, redacted)
195}
196
197pub fn describe_show_create_index(
198    _: &StatementContext,
199    _: ShowCreateIndexStatement<Aug>,
200) -> Result<StatementDesc, PlanError> {
201    Ok(StatementDesc::new(Some(
202        RelationDesc::builder()
203            .with_column("name", SqlScalarType::String.nullable(false))
204            .with_column("create_sql", SqlScalarType::String.nullable(false))
205            .finish(),
206    )))
207}
208
209pub fn plan_show_create_index(
210    scx: &StatementContext,
211    ShowCreateIndexStatement {
212        index_name,
213        redacted,
214    }: ShowCreateIndexStatement<Aug>,
215) -> Result<ShowCreatePlan, PlanError> {
216    plan_show_create_item(scx, &index_name, CatalogItemType::Index, redacted)
217}
218
219pub fn describe_show_create_connection(
220    _: &StatementContext,
221    _: ShowCreateConnectionStatement<Aug>,
222) -> Result<StatementDesc, PlanError> {
223    Ok(StatementDesc::new(Some(
224        RelationDesc::builder()
225            .with_column("name", SqlScalarType::String.nullable(false))
226            .with_column("create_sql", SqlScalarType::String.nullable(false))
227            .finish(),
228    )))
229}
230
231pub fn plan_show_create_cluster(
232    scx: &StatementContext,
233    ShowCreateClusterStatement { cluster_name }: ShowCreateClusterStatement<Aug>,
234) -> Result<ShowCreatePlan, PlanError> {
235    let cluster = scx.get_cluster(&cluster_name.id);
236    let name = cluster.name().to_string();
237    let plan = cluster.try_to_plan()?;
238    let stmt = unplan_create_cluster(scx, plan)?;
239    let create_sql = stmt.to_ast_string_stable();
240    Ok(ShowCreatePlan {
241        id: ObjectId::Cluster(cluster_name.id),
242        row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
243    })
244}
245
246pub fn describe_show_create_cluster(
247    _: &StatementContext,
248    _: ShowCreateClusterStatement<Aug>,
249) -> Result<StatementDesc, PlanError> {
250    Ok(StatementDesc::new(Some(
251        RelationDesc::builder()
252            .with_column("name", SqlScalarType::String.nullable(false))
253            .with_column("create_sql", SqlScalarType::String.nullable(false))
254            .finish(),
255    )))
256}
257
258pub fn plan_show_create_type(
259    scx: &StatementContext,
260    ShowCreateTypeStatement {
261        type_name,
262        redacted,
263    }: ShowCreateTypeStatement<Aug>,
264) -> Result<ShowCreatePlan, PlanError> {
265    let ResolvedDataType::Named { id, full_name, .. } = type_name else {
266        sql_bail!("{type_name} is not a named type");
267    };
268
269    let type_item = scx.get_item(&id);
270
271    if id.is_system() {
272        sql_bail!("cannot show create for system type {full_name}");
273    }
274
275    let name = full_name.to_string();
276
277    let create_sql = humanize_sql_for_show_create(
278        scx.catalog,
279        type_item.id(),
280        type_item.create_sql(),
281        redacted,
282    )?;
283
284    Ok(ShowCreatePlan {
285        id: ObjectId::Item(id),
286        row: Row::pack_slice(&[Datum::String(&name), Datum::String(&create_sql)]),
287    })
288}
289
290pub fn describe_show_create_type(
291    _: &StatementContext,
292    _: ShowCreateTypeStatement<Aug>,
293) -> Result<StatementDesc, PlanError> {
294    Ok(StatementDesc::new(Some(
295        RelationDesc::builder()
296            .with_column("name", SqlScalarType::String.nullable(false))
297            .with_column("create_sql", SqlScalarType::String.nullable(false))
298            .finish(),
299    )))
300}
301
302pub fn plan_show_create_connection(
303    scx: &StatementContext,
304    ShowCreateConnectionStatement {
305        connection_name,
306        redacted,
307    }: ShowCreateConnectionStatement<Aug>,
308) -> Result<ShowCreatePlan, PlanError> {
309    plan_show_create_item(scx, &connection_name, CatalogItemType::Connection, redacted)
310}
311
312pub fn show_databases<'a>(
313    scx: &'a StatementContext<'a>,
314    filter: Option<ShowStatementFilter<Aug>>,
315) -> Result<ShowSelect<'a>, PlanError> {
316    let query = "SELECT name, comment FROM mz_internal.mz_show_databases".to_string();
317    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
318}
319
320pub fn show_schemas<'a>(
321    scx: &'a StatementContext<'a>,
322    from: Option<ResolvedDatabaseName>,
323    filter: Option<ShowStatementFilter<Aug>>,
324) -> Result<ShowSelect<'a>, PlanError> {
325    let database_id = match from {
326        Some(ResolvedDatabaseName::Database { id, .. }) => id.to_string(),
327        None => match scx.active_database() {
328            Some(id) => id.to_string(),
329            None => sql_bail!("no database specified and no active database"),
330        },
331        Some(ResolvedDatabaseName::Error) => {
332            bail_internal!("unresolved database name")
333        }
334    };
335    let query = format!(
336        "SELECT name, comment
337        FROM mz_internal.mz_show_schemas
338        WHERE database_id IS NULL OR database_id = '{database_id}'",
339    );
340    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
341}
342
343pub fn show_roles<'a>(
344    scx: &'a StatementContext<'a>,
345    filter: Option<ShowStatementFilter<Aug>>,
346) -> Result<ShowSelect<'a>, PlanError> {
347    let query = "SELECT name, comment FROM mz_internal.mz_show_roles".to_string();
348    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
349}
350
351pub fn show_network_policies<'a>(
352    scx: &'a StatementContext<'a>,
353    filter: Option<ShowStatementFilter<Aug>>,
354) -> Result<ShowSelect<'a>, PlanError> {
355    let query = "SELECT name, rules, comment FROM mz_internal.mz_show_network_policies".to_string();
356    ShowSelect::new(
357        scx,
358        query,
359        filter,
360        None,
361        Some(&["name", "rules", "comment"]),
362    )
363}
364
365/// Ensures that the `FROM` clause was not provided for `SHOW` commands that
366/// don't accept it. The parser is supposed to reject such cases, so this is an
367/// internal-only invariant.
368fn ensure_no_from<T>(from: Option<T>) -> Result<(), PlanError> {
369    if from.is_some() {
370        bail_internal!("FROM not supported for this SHOW command");
371    }
372    Ok(())
373}
374
375pub fn show_objects<'a>(
376    scx: &'a StatementContext<'a>,
377    ShowObjectsStatement {
378        object_type,
379        from,
380        filter,
381    }: ShowObjectsStatement<Aug>,
382) -> Result<ShowSelect<'a>, PlanError> {
383    match object_type {
384        ShowObjectType::Table { on_source } => show_tables(scx, from, on_source, filter),
385        ShowObjectType::Source { in_cluster } => show_sources(scx, from, in_cluster, filter),
386        ShowObjectType::Subsource { on_source } => show_subsources(scx, from, on_source, filter),
387        ShowObjectType::View => show_views(scx, from, filter),
388        ShowObjectType::Sink { in_cluster } => show_sinks(scx, from, in_cluster, filter),
389        ShowObjectType::Type => show_types(scx, from, filter),
390        ShowObjectType::Object => show_all_objects(scx, from, filter),
391        ShowObjectType::Role => {
392            ensure_no_from(from)?;
393            show_roles(scx, filter)
394        }
395        ShowObjectType::Cluster => {
396            ensure_no_from(from)?;
397            show_clusters(scx, filter)
398        }
399        ShowObjectType::ClusterReplica => {
400            ensure_no_from(from)?;
401            show_cluster_replicas(scx, filter)
402        }
403        ShowObjectType::Secret => show_secrets(scx, from, filter),
404        ShowObjectType::Connection => show_connections(scx, from, filter),
405        ShowObjectType::MaterializedView { in_cluster } => {
406            show_materialized_views(scx, from, in_cluster, filter)
407        }
408        ShowObjectType::Index {
409            in_cluster,
410            on_object,
411        } => show_indexes(scx, from, on_object, in_cluster, filter),
412        ShowObjectType::Database => {
413            ensure_no_from(from)?;
414            show_databases(scx, filter)
415        }
416        ShowObjectType::Schema { from: db_from } => {
417            ensure_no_from(from)?;
418            show_schemas(scx, db_from, filter)
419        }
420        ShowObjectType::Privileges { object_type, role } => {
421            ensure_no_from(from)?;
422            show_privileges(scx, object_type, role, filter)
423        }
424        ShowObjectType::DefaultPrivileges { object_type, role } => {
425            ensure_no_from(from)?;
426            show_default_privileges(scx, object_type, role, filter)
427        }
428        ShowObjectType::RoleMembership { role } => {
429            ensure_no_from(from)?;
430            show_role_membership(scx, role, filter)
431        }
432        ShowObjectType::NetworkPolicy => {
433            ensure_no_from(from)?;
434            show_network_policies(scx, filter)
435        }
436    }
437}
438
439fn show_connections<'a>(
440    scx: &'a StatementContext<'a>,
441    from: Option<ResolvedSchemaName>,
442    filter: Option<ShowStatementFilter<Aug>>,
443) -> Result<ShowSelect<'a>, PlanError> {
444    let schema_spec = scx.resolve_optional_schema(&from)?;
445    let query = format!(
446        "SELECT name, type, comment
447        FROM mz_internal.mz_show_connections connections
448        WHERE schema_id = '{schema_spec}'",
449    );
450    ShowSelect::new(scx, query, filter, None, Some(&["name", "type", "comment"]))
451}
452
453fn show_tables<'a>(
454    scx: &'a StatementContext<'a>,
455    from: Option<ResolvedSchemaName>,
456    on_source: Option<ResolvedItemName>,
457    filter: Option<ShowStatementFilter<Aug>>,
458) -> Result<ShowSelect<'a>, PlanError> {
459    let schema_spec = scx.resolve_optional_schema(&from)?;
460    let mut query = format!(
461        "SELECT name, comment
462        FROM mz_internal.mz_show_tables tables
463        WHERE tables.schema_id = '{schema_spec}'",
464    );
465    if let Some(on_source) = &on_source {
466        let on_item = scx.get_item_by_resolved_name(on_source)?;
467        if on_item.item_type() != CatalogItemType::Source {
468            sql_bail!(
469                "cannot show tables on {} because it is a {}",
470                on_source.full_name_str(),
471                on_item.item_type(),
472            );
473        }
474        query += &format!(" AND tables.source_id = '{}'", on_item.id());
475    }
476    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
477}
478
479fn show_sources<'a>(
480    scx: &'a StatementContext<'a>,
481    from: Option<ResolvedSchemaName>,
482    in_cluster: Option<ResolvedClusterName>,
483    filter: Option<ShowStatementFilter<Aug>>,
484) -> Result<ShowSelect<'a>, PlanError> {
485    let schema_spec = scx.resolve_optional_schema(&from)?;
486    let mut where_clause = format!("schema_id = '{schema_spec}'");
487
488    if let Some(cluster) = in_cluster {
489        write!(where_clause, " AND cluster_id = '{}'", cluster.id)
490            .expect("write on string cannot fail");
491    }
492
493    let query = format!(
494        "SELECT name, type, cluster, comment
495        FROM mz_internal.mz_show_sources
496        WHERE {where_clause}"
497    );
498    ShowSelect::new(
499        scx,
500        query,
501        filter,
502        None,
503        Some(&["name", "type", "cluster", "comment"]),
504    )
505}
506
507fn show_subsources<'a>(
508    scx: &'a StatementContext<'a>,
509    from_schema: Option<ResolvedSchemaName>,
510    on_source: Option<ResolvedItemName>,
511    filter: Option<ShowStatementFilter<Aug>>,
512) -> Result<ShowSelect<'a>, PlanError> {
513    let mut query_filter = Vec::new();
514
515    if on_source.is_none() && from_schema.is_none() {
516        query_filter.push("subsources.id NOT LIKE 's%'".into());
517        let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?;
518        query_filter.push(format!("subsources.schema_id = '{schema_spec}'"));
519    }
520
521    if let Some(on_source) = &on_source {
522        let on_item = scx.get_item_by_resolved_name(on_source)?;
523        if on_item.item_type() != CatalogItemType::Source {
524            sql_bail!(
525                "cannot show subsources on {} because it is a {}",
526                on_source.full_name_str(),
527                on_item.item_type(),
528            );
529        }
530        query_filter.push(format!("sources.id = '{}'", on_item.id()));
531    }
532
533    if let Some(schema) = from_schema {
534        let schema_spec = schema.schema_spec();
535        query_filter.push(format!("subsources.schema_id = '{schema_spec}'"));
536    }
537
538    // TODO(database-issues#8322): this looks in both directions for subsources as long as
539    // progress collections still exist
540    let query = format!(
541        "SELECT DISTINCT
542            subsources.name AS name,
543            subsources.type AS type
544        FROM
545            mz_sources AS subsources
546            JOIN mz_internal.mz_object_dependencies deps ON (subsources.id = deps.object_id OR subsources.id = deps.referenced_object_id)
547            JOIN mz_sources AS sources ON (sources.id = deps.object_id OR sources.id = deps.referenced_object_id)
548        WHERE (subsources.type = 'subsource' OR subsources.type = 'progress') AND {}",
549        itertools::join(query_filter, " AND "),
550    );
551    ShowSelect::new(scx, query, filter, None, None)
552}
553
554fn show_views<'a>(
555    scx: &'a StatementContext<'a>,
556    from: Option<ResolvedSchemaName>,
557    filter: Option<ShowStatementFilter<Aug>>,
558) -> Result<ShowSelect<'a>, PlanError> {
559    let schema_spec = scx.resolve_optional_schema(&from)?;
560    let query = format!(
561        "SELECT name, comment
562        FROM mz_internal.mz_show_views
563        WHERE schema_id = '{schema_spec}'"
564    );
565    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
566}
567
568fn show_materialized_views<'a>(
569    scx: &'a StatementContext<'a>,
570    from: Option<ResolvedSchemaName>,
571    in_cluster: Option<ResolvedClusterName>,
572    filter: Option<ShowStatementFilter<Aug>>,
573) -> Result<ShowSelect<'a>, PlanError> {
574    let schema_spec = scx.resolve_optional_schema(&from)?;
575    let mut where_clause = format!("schema_id = '{schema_spec}'");
576
577    if let Some(cluster) = in_cluster {
578        write!(where_clause, " AND cluster_id = '{}'", cluster.id)
579            .expect("write on string cannot fail");
580    }
581
582    let query = format!(
583        "SELECT name, cluster, comment
584            FROM mz_internal.mz_show_materialized_views
585            WHERE {where_clause}"
586    );
587
588    let projection = vec!["name", "cluster", "comment"];
589
590    ShowSelect::new(scx, query, filter, None, Some(&projection))
591}
592
593fn show_sinks<'a>(
594    scx: &'a StatementContext<'a>,
595    from: Option<ResolvedSchemaName>,
596    in_cluster: Option<ResolvedClusterName>,
597    filter: Option<ShowStatementFilter<Aug>>,
598) -> Result<ShowSelect<'a>, PlanError> {
599    let schema_spec = if let Some(ResolvedSchemaName::Schema { schema_spec, .. }) = from {
600        schema_spec.to_string()
601    } else {
602        scx.resolve_active_schema()?.to_string()
603    };
604
605    let mut where_clause = format!("schema_id = '{schema_spec}'");
606
607    if let Some(cluster) = in_cluster {
608        write!(where_clause, " AND cluster_id = '{}'", cluster.id)
609            .expect("write on string cannot fail");
610    }
611
612    let query = format!(
613        "SELECT name, type, cluster, comment
614        FROM mz_internal.mz_show_sinks sinks
615        WHERE {where_clause}"
616    );
617    ShowSelect::new(
618        scx,
619        query,
620        filter,
621        None,
622        Some(&["name", "type", "cluster", "comment"]),
623    )
624}
625
626fn show_types<'a>(
627    scx: &'a StatementContext<'a>,
628    from: Option<ResolvedSchemaName>,
629    filter: Option<ShowStatementFilter<Aug>>,
630) -> Result<ShowSelect<'a>, PlanError> {
631    let schema_spec = scx.resolve_optional_schema(&from)?;
632    let query = format!(
633        "SELECT name, comment
634        FROM mz_internal.mz_show_types
635        WHERE schema_id = '{schema_spec}'"
636    );
637    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
638}
639
640fn show_all_objects<'a>(
641    scx: &'a StatementContext<'a>,
642    from: Option<ResolvedSchemaName>,
643    filter: Option<ShowStatementFilter<Aug>>,
644) -> Result<ShowSelect<'a>, PlanError> {
645    let schema_spec = scx.resolve_optional_schema(&from)?;
646    let query = format!(
647        "SELECT name, type, comment
648         FROM mz_internal.mz_show_all_objects
649         WHERE schema_id = '{schema_spec}'",
650    );
651    ShowSelect::new(scx, query, filter, None, Some(&["name", "type", "comment"]))
652}
653
654pub fn show_indexes<'a>(
655    scx: &'a StatementContext<'a>,
656    from_schema: Option<ResolvedSchemaName>,
657    on_object: Option<ResolvedItemName>,
658    in_cluster: Option<ResolvedClusterName>,
659    filter: Option<ShowStatementFilter<Aug>>,
660) -> Result<ShowSelect<'a>, PlanError> {
661    let mut query_filter = Vec::new();
662
663    if on_object.is_none() && from_schema.is_none() && in_cluster.is_none() {
664        query_filter.push("on_id NOT LIKE 's%'".into());
665        let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?;
666        query_filter.push(format!("schema_id = '{schema_spec}'"));
667    }
668
669    if let Some(on_object) = &on_object {
670        let on_item = scx.get_item_by_resolved_name(on_object)?;
671        if on_item.item_type() != CatalogItemType::View
672            && on_item.item_type() != CatalogItemType::MaterializedView
673            && on_item.item_type() != CatalogItemType::Source
674            && on_item.item_type() != CatalogItemType::Table
675        {
676            sql_bail!(
677                "cannot show indexes on {} because it is a {}",
678                on_object.full_name_str(),
679                on_item.item_type(),
680            );
681        }
682        query_filter.push(format!("on_id = '{}'", on_item.id()));
683    }
684
685    if let Some(schema) = from_schema {
686        let schema_spec = schema.schema_spec();
687        query_filter.push(format!("schema_id = '{schema_spec}'"));
688    }
689
690    if let Some(cluster) = in_cluster {
691        query_filter.push(format!("cluster_id = '{}'", cluster.id))
692    };
693
694    let query = format!(
695        "SELECT name, on, cluster, key, comment
696        FROM mz_internal.mz_show_indexes
697        WHERE {}",
698        itertools::join(query_filter.iter(), " AND ")
699    );
700
701    ShowSelect::new(
702        scx,
703        query,
704        filter,
705        None,
706        Some(&["name", "on", "cluster", "key", "comment"]),
707    )
708}
709
710pub fn show_columns<'a>(
711    scx: &'a StatementContext<'a>,
712    ShowColumnsStatement { table_name, filter }: ShowColumnsStatement<Aug>,
713) -> Result<ShowColumnsSelect<'a>, PlanError> {
714    let entry = scx.get_item_by_resolved_name(&table_name)?;
715    let full_name = scx.catalog.resolve_full_name(entry.name());
716
717    match entry.item_type() {
718        CatalogItemType::Source
719        | CatalogItemType::Table
720        | CatalogItemType::View
721        | CatalogItemType::MaterializedView => (),
722        ty @ CatalogItemType::Connection
723        | ty @ CatalogItemType::Index
724        | ty @ CatalogItemType::Func
725        | ty @ CatalogItemType::Secret
726        | ty @ CatalogItemType::Type
727        | ty @ CatalogItemType::Sink => {
728            sql_bail!("{full_name} is a {ty} and so does not have columns");
729        }
730    }
731
732    let query = format!(
733        "SELECT name, nullable, type, position, comment
734         FROM mz_internal.mz_show_columns columns
735         WHERE columns.id = '{}'",
736        entry.id(),
737    );
738    let (show_select, new_resolved_ids) = ShowSelect::new_with_resolved_ids(
739        scx,
740        query,
741        filter,
742        Some("position"),
743        Some(&["name", "nullable", "type", "comment"]),
744    )?;
745    scx.record_sql_impl_ids(&new_resolved_ids);
746    Ok(ShowColumnsSelect {
747        id: entry.id(),
748        show_select,
749        new_resolved_ids,
750    })
751}
752
753// The rationale for which fields to include in the tuples are those
754// that are mandatory when creating a replica as part of the CREATE
755// CLUSTER command, i.e., name and size.
756pub fn show_clusters<'a>(
757    scx: &'a StatementContext<'a>,
758    filter: Option<ShowStatementFilter<Aug>>,
759) -> Result<ShowSelect<'a>, PlanError> {
760    let query = "SELECT name, replicas, comment FROM mz_internal.mz_show_clusters".to_string();
761    ShowSelect::new(
762        scx,
763        query,
764        filter,
765        None,
766        Some(&["name", "replicas", "comment"]),
767    )
768}
769
770pub fn show_cluster_replicas<'a>(
771    scx: &'a StatementContext<'a>,
772    filter: Option<ShowStatementFilter<Aug>>,
773) -> Result<ShowSelect<'a>, PlanError> {
774    let query = "
775    SELECT cluster, replica, size, ready, comment
776    FROM mz_internal.mz_show_cluster_replicas
777    "
778    .to_string();
779
780    ShowSelect::new(
781        scx,
782        query,
783        filter,
784        None,
785        Some(&["cluster", "replica", "size", "ready", "comment"]),
786    )
787}
788
789pub fn show_secrets<'a>(
790    scx: &'a StatementContext<'a>,
791    from: Option<ResolvedSchemaName>,
792    filter: Option<ShowStatementFilter<Aug>>,
793) -> Result<ShowSelect<'a>, PlanError> {
794    let schema_spec = scx.resolve_optional_schema(&from)?;
795
796    let query = format!(
797        "SELECT name, comment
798        FROM mz_internal.mz_show_secrets
799        WHERE schema_id = '{schema_spec}'",
800    );
801
802    ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"]))
803}
804
805pub fn show_privileges<'a>(
806    scx: &'a StatementContext<'a>,
807    object_type: Option<SystemObjectType>,
808    role: Option<ResolvedRoleName>,
809    filter: Option<ShowStatementFilter<Aug>>,
810) -> Result<ShowSelect<'a>, PlanError> {
811    let mut query_filter = Vec::new();
812    if let Some(object_type) = object_type {
813        query_filter.push(format!(
814            "object_type = '{}'",
815            object_type.to_string().to_lowercase()
816        ));
817    }
818    if let Some(role) = role {
819        let name = escaped_string_literal(&role.name);
820        query_filter.push(format!(
821            "CASE WHEN grantee = 'PUBLIC' THEN true ELSE pg_has_role({name}, grantee, 'USAGE') END"
822        ));
823    }
824    let query_filter = if query_filter.len() > 0 {
825        format!("WHERE {}", itertools::join(query_filter, " AND "))
826    } else {
827        "".to_string()
828    };
829
830    let query = format!(
831        "SELECT grantor, grantee, database, schema, name, object_type, privilege_type
832        FROM mz_internal.mz_show_all_privileges
833        {query_filter}",
834    );
835
836    ShowSelect::new(
837        scx,
838        query,
839        filter,
840        None,
841        Some(&[
842            "grantor",
843            "grantee",
844            "database",
845            "schema",
846            "name",
847            "object_type",
848            "privilege_type",
849        ]),
850    )
851}
852
853pub fn show_default_privileges<'a>(
854    scx: &'a StatementContext<'a>,
855    object_type: Option<ObjectType>,
856    role: Option<ResolvedRoleName>,
857    filter: Option<ShowStatementFilter<Aug>>,
858) -> Result<ShowSelect<'a>, PlanError> {
859    let mut query_filter = Vec::new();
860    if let Some(object_type) = object_type {
861        query_filter.push(format!(
862            "object_type = '{}'",
863            object_type.to_string().to_lowercase()
864        ));
865    }
866    if let Some(role) = role {
867        let name = escaped_string_literal(&role.name);
868        query_filter.push(format!(
869            "CASE WHEN grantee = 'PUBLIC' THEN true ELSE pg_has_role({name}, grantee, 'USAGE') END"
870        ));
871    }
872    let query_filter = if query_filter.len() > 0 {
873        format!("WHERE {}", itertools::join(query_filter, " AND "))
874    } else {
875        "".to_string()
876    };
877
878    let query = format!(
879        "SELECT object_owner, database, schema, object_type, grantee, privilege_type
880        FROM mz_internal.mz_show_default_privileges
881        {query_filter}",
882    );
883
884    ShowSelect::new(
885        scx,
886        query,
887        filter,
888        None,
889        Some(&[
890            "object_owner",
891            "database",
892            "schema",
893            "object_type",
894            "grantee",
895            "privilege_type",
896        ]),
897    )
898}
899
900pub fn show_role_membership<'a>(
901    scx: &'a StatementContext<'a>,
902    role: Option<ResolvedRoleName>,
903    filter: Option<ShowStatementFilter<Aug>>,
904) -> Result<ShowSelect<'a>, PlanError> {
905    let mut query_filter = Vec::new();
906    if let Some(role) = role {
907        let name = escaped_string_literal(&role.name);
908        query_filter.push(format!("pg_has_role({name}, member, 'USAGE')"));
909    }
910    let query_filter = if query_filter.len() > 0 {
911        format!("WHERE {}", itertools::join(query_filter, " AND "))
912    } else {
913        "".to_string()
914    };
915
916    let query = format!(
917        "SELECT role, member, grantor
918        FROM mz_internal.mz_show_role_members
919        {query_filter}",
920    );
921
922    ShowSelect::new(
923        scx,
924        query,
925        filter,
926        None,
927        Some(&["role", "member", "grantor"]),
928    )
929}
930
931/// An intermediate result when planning a `SHOW` query.
932///
933/// Can be interrogated for its columns, or converted into a proper [`Plan`].
934pub struct ShowSelect<'a> {
935    scx: &'a StatementContext<'a>,
936    pub(crate) stmt: SelectStatement<Aug>,
937}
938
939impl<'a> ShowSelect<'a> {
940    /// Constructs a new [`ShowSelect`] from a query that provides the base
941    /// data and an optional user-supplied filter, order column, and
942    /// projection on that data.
943    ///
944    /// Note that the query must return a column named `name`, as the filter
945    /// may implicitly reference this column. Any `ORDER BY` in the query is
946    /// ignored. `ShowSelects`s are always ordered in ascending order by all
947    /// columns from left to right unless an order field is supplied.
948    fn new(
949        scx: &'a StatementContext,
950        query: String,
951        filter: Option<ShowStatementFilter<Aug>>,
952        order: Option<&str>,
953        projection: Option<&[&str]>,
954    ) -> Result<ShowSelect<'a>, PlanError> {
955        let (show_select, new_resolved_ids) =
956            Self::new_with_resolved_ids(scx, query, filter, order, projection)?;
957        scx.sql_impl_resolved_ids
958            .lock()
959            .expect("planning is single-threaded")
960            .extend_from(&new_resolved_ids);
961        Ok(show_select)
962    }
963
964    fn new_with_resolved_ids(
965        scx: &'a StatementContext,
966        query: String,
967        filter: Option<ShowStatementFilter<Aug>>,
968        order: Option<&str>,
969        projection: Option<&[&str]>,
970    ) -> Result<(ShowSelect<'a>, ResolvedIds), PlanError> {
971        let filter = match filter {
972            Some(ShowStatementFilter::Like(like)) => format!("name LIKE {}", Value::String(like)),
973            Some(ShowStatementFilter::Where(expr)) => expr.to_string(),
974            None => "true".to_string(),
975        };
976        let query = format!(
977            "SELECT {} FROM ({}) q WHERE {} ORDER BY {}",
978            projection
979                .map(|ps| ps.join(", "))
980                .unwrap_or_else(|| "*".into()),
981            query,
982            filter,
983            order.unwrap_or("q.*")
984        );
985
986        Self::new_from_bare_query(scx, query)
987    }
988
989    pub fn new_from_bare_query(
990        scx: &'a StatementContext,
991        query: String,
992    ) -> Result<(ShowSelect<'a>, ResolvedIds), PlanError> {
993        let stmts = parse::parse(&query)
994            .map_err(|e| internal_err!("failed to parse generated SHOW query: {}", e))?;
995        let stmt = match stmts.into_element().ast {
996            Statement::Select(select) => select,
997            _ => bail_internal!("generated SHOW query was not a SELECT statement"),
998        };
999        let (mut stmt, new_resolved_ids) = names::resolve(scx.catalog, stmt)?;
1000        transform_ast::transform(scx, &mut stmt)?;
1001        Ok((ShowSelect { scx, stmt }, new_resolved_ids))
1002    }
1003
1004    /// Computes the shape of this `ShowSelect`.
1005    pub fn describe(self) -> Result<StatementDesc, PlanError> {
1006        dml::describe_select(self.scx, self.stmt)
1007    }
1008
1009    /// Converts this `ShowSelect` into a [`Plan`].
1010    pub fn plan(self) -> Result<Plan, PlanError> {
1011        dml::plan_select(self.scx, self.stmt, &Params::empty(), None)
1012    }
1013
1014    /// Converts this `ShowSelect` into a [`(HirRelationExpr, Scope)`].
1015    pub fn plan_hir(self, qcx: &QueryContext) -> Result<(HirRelationExpr, Scope), PlanError> {
1016        query::plan_nested_query(&mut qcx.clone(), &self.stmt.query)
1017    }
1018}
1019
1020pub struct ShowColumnsSelect<'a> {
1021    id: CatalogItemId,
1022    new_resolved_ids: ResolvedIds,
1023    show_select: ShowSelect<'a>,
1024}
1025
1026impl<'a> ShowColumnsSelect<'a> {
1027    pub fn describe(self) -> Result<StatementDesc, PlanError> {
1028        self.show_select.describe()
1029    }
1030
1031    pub fn plan(self) -> Result<Plan, PlanError> {
1032        let select_plan = self.show_select.plan()?;
1033        match select_plan {
1034            Plan::Select(select_plan) => Ok(Plan::ShowColumns(ShowColumnsPlan {
1035                id: self.id,
1036                select_plan,
1037                new_resolved_ids: self.new_resolved_ids,
1038            })),
1039            _ => {
1040                tracing::error!(
1041                    "SHOW COLUMNS produced a non select plan. plan: {:?}",
1042                    select_plan
1043                );
1044                Err(PlanError::Unstructured(
1045                    "SHOW COLUMNS produced an unexpected plan. Please file a bug.".to_string(),
1046                ))
1047            }
1048        }
1049    }
1050
1051    pub fn plan_hir(self, qcx: &QueryContext) -> Result<(HirRelationExpr, Scope), PlanError> {
1052        self.show_select.plan_hir(qcx)
1053    }
1054}
1055
1056/// Convert a SQL statement into a form that could be used as input, as well as
1057/// is more amenable to human consumption.
1058///
1059/// Note that the bits we omit here (e.g. the internal `AS OF` of a materialized
1060/// view, or the `DETAILS` of a `CREATE TABLE ... FROM SOURCE`) are not
1061/// user-typeable, but remain accessible for debugging in the raw `create_sql`
1062/// (and `redacted_create_sql`) column of catalog builtins like
1063/// `mz_catalog.mz_materialized_views` and `mz_catalog.mz_tables`. They are not
1064/// in the `definition` column, which is parsed back out of `create_sql` and only
1065/// retains the inner query.
1066fn humanize_sql_for_show_create(
1067    catalog: &dyn SessionCatalog,
1068    id: CatalogItemId,
1069    sql: &str,
1070    redacted: bool,
1071) -> Result<String, PlanError> {
1072    use mz_sql_parser::ast::{
1073        CreateSourceConnection, MySqlConfigOptionName, PgConfigOptionName, TableFromSourceColumns,
1074        TableFromSourceOptionName,
1075    };
1076
1077    let parsed = parse::parse(sql)?.into_element().ast;
1078    let (mut resolved, _) = names::resolve(catalog, parsed)?;
1079
1080    // Simplify names.
1081    let mut simplifier = NameSimplifier { catalog };
1082    simplifier.visit_statement_mut(&mut resolved);
1083
1084    match &mut resolved {
1085        // Strip internal `AS OF` syntax.
1086        Statement::CreateMaterializedView(stmt) => stmt.as_of = None,
1087        // Strip the internal `DETAILS` option, which is not user-typeable and
1088        // does not roundtrip.
1089        Statement::CreateTableFromSource(stmt) => {
1090            stmt.with_options.retain_mut(|o| match o.name {
1091                TableFromSourceOptionName::TextColumns => true,
1092                TableFromSourceOptionName::ExcludeColumns => true,
1093                // Drop details, which does not roundtrip.
1094                TableFromSourceOptionName::Details => false,
1095                TableFromSourceOptionName::PartitionBy => true,
1096                TableFromSourceOptionName::RetainHistory => true,
1097            });
1098            // The `Defined` column list and constraints are populated during
1099            // purification (from the upstream schema), and `CREATE TABLE ... FROM
1100            // SOURCE` rejects them as input. Omit them so the statement
1101            // roundtrips; purification re-derives them from the source on replay,
1102            // and the schema-affecting `TEXT COLUMNS` / `EXCLUDE COLUMNS` options
1103            // are retained above so the re-derived schema matches. A user-typed
1104            // `Named` column list is left intact, since it does roundtrip.
1105            if matches!(stmt.columns, TableFromSourceColumns::Defined(_)) {
1106                stmt.columns = TableFromSourceColumns::NotSpecified;
1107            }
1108            // Constraints are never valid input here (purification populates them
1109            // alongside `Defined` columns), so always drop them.
1110            stmt.constraints = Vec::new();
1111        }
1112        // `CREATE SOURCE` statements should roundtrip. However, sources and
1113        // their subsources have a complex relationship, so we need to do a lot
1114        // of work to reconstruct the statement for multi-output sources.
1115        //
1116        // For instance, `DROP SOURCE` statements can leave dangling references
1117        // to subsources that must be filtered out here, that, due to catalog
1118        // transaction limitations, can only be cleaned up when a top-level
1119        // source is altered.
1120        Statement::CreateSource(stmt) => {
1121            // Collect all current subsource references.
1122            let mut curr_references: BTreeMap<UnresolvedItemName, Vec<UnresolvedItemName>> =
1123                catalog
1124                    .get_item(&id)
1125                    .used_by()
1126                    .into_iter()
1127                    .filter_map(|subsource| {
1128                        let item = catalog.get_item(subsource);
1129                        item.subsource_details().map(|(_id, reference, _details)| {
1130                            let name = item.name();
1131                            let subsource_name = catalog.resolve_full_name(name);
1132                            let subsource_name = UnresolvedItemName::from(subsource_name);
1133                            (reference.clone(), subsource_name)
1134                        })
1135                    })
1136                    .fold(BTreeMap::new(), |mut map, (reference, subsource_name)| {
1137                        map.entry(reference)
1138                            .or_insert_with(Vec::new)
1139                            .push(subsource_name);
1140                        map
1141                    });
1142
1143            match &mut stmt.connection {
1144                CreateSourceConnection::Postgres { options, .. } => {
1145                    options.retain_mut(|o| {
1146                        match o.name {
1147                            // Dropping a subsource does not remove any `TEXT
1148                            // COLUMNS` values that refer to the table it
1149                            // ingests, which we'll handle below.
1150                            PgConfigOptionName::TextColumns => {}
1151                            // Drop details, which does not roundtrip.
1152                            PgConfigOptionName::Details => return false,
1153                            _ => return true,
1154                        };
1155                        match &mut o.value {
1156                            Some(WithOptionValue::Sequence(text_cols)) => {
1157                                text_cols.retain(|v| match v {
1158                                    WithOptionValue::UnresolvedItemName(n) => {
1159                                        let mut name = n.clone();
1160                                        // Remove the column reference.
1161                                        name.0.truncate(3);
1162                                        curr_references.contains_key(&name)
1163                                    }
1164                                    _ => unreachable!(
1165                                        "TEXT COLUMNS must be sequence of unresolved item names"
1166                                    ),
1167                                });
1168                                !text_cols.is_empty()
1169                            }
1170                            _ => unreachable!(
1171                                "TEXT COLUMNS must be sequence of unresolved item names"
1172                            ),
1173                        }
1174                    });
1175                }
1176                CreateSourceConnection::SqlServer { options, .. } => {
1177                    // TODO(sql_server2): TEXT and EXCLUDE columns are represented by
1178                    // `schema.table.column` whereas our external table references are
1179                    // `database.schema.table`. We handle the mismatch here but should
1180                    // probably fully qualify our TEXT and EXCLUDE column references.
1181                    let adjusted_references: BTreeSet<_> = curr_references
1182                        .keys()
1183                        .map(|name| {
1184                            if name.0.len() == 3 {
1185                                // Strip the database component of the name.
1186                                let adjusted_name = name.0[1..].to_vec();
1187                                UnresolvedItemName(adjusted_name)
1188                            } else {
1189                                name.clone()
1190                            }
1191                        })
1192                        .collect();
1193
1194                    options.retain_mut(|o| {
1195                        match o.name {
1196                            // Dropping a subsource does not remove any `TEXT COLUMNS`
1197                            // values that refer to the table it ingests, which we'll
1198                            // handle below.
1199                            SqlServerConfigOptionName::TextColumns
1200                            | SqlServerConfigOptionName::ExcludeColumns => {}
1201                            // Drop details, which does not roundtrip.
1202                            SqlServerConfigOptionName::Details => return false,
1203                        };
1204
1205                        match &mut o.value {
1206                            Some(WithOptionValue::Sequence(seq_unresolved_item_names)) => {
1207                                seq_unresolved_item_names.retain(|v| match v {
1208                                    WithOptionValue::UnresolvedItemName(n) => {
1209                                        let mut name = n.clone();
1210                                        // Remove column reference.
1211                                        name.0.truncate(2);
1212                                        adjusted_references.contains(&name)
1213                                    }
1214                                    _ => unreachable!(
1215                                        "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1216                                    ),
1217                                });
1218                                !seq_unresolved_item_names.is_empty()
1219                            }
1220                            _ => unreachable!(
1221                                "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1222                            ),
1223                        }
1224                    });
1225                }
1226                CreateSourceConnection::MySql { options, .. } => {
1227                    options.retain_mut(|o| {
1228                        match o.name {
1229                            // Dropping a subsource does not remove any `TEXT
1230                            // COLUMNS` values that refer to the table it
1231                            // ingests, which we'll handle below.
1232                            MySqlConfigOptionName::TextColumns
1233                            | MySqlConfigOptionName::ExcludeColumns => {}
1234                            // Drop details, which does not roundtrip.
1235                            MySqlConfigOptionName::Details => return false,
1236                        };
1237
1238                        match &mut o.value {
1239                            Some(WithOptionValue::Sequence(seq_unresolved_item_names)) => {
1240                                seq_unresolved_item_names.retain(|v| match v {
1241                                    WithOptionValue::UnresolvedItemName(n) => {
1242                                        let mut name = n.clone();
1243                                        // Remove column reference.
1244                                        name.0.truncate(2);
1245                                        curr_references.contains_key(&name)
1246                                    }
1247                                    _ => unreachable!(
1248                                        "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1249                                    ),
1250                                });
1251                                !seq_unresolved_item_names.is_empty()
1252                            }
1253                            _ => unreachable!(
1254                                "TEXT COLUMNS + EXCLUDE COLUMNS must be sequence of unresolved item names"
1255                            ),
1256                        }
1257                    });
1258                }
1259                CreateSourceConnection::LoadGenerator { .. } if !curr_references.is_empty() => {
1260                    // Load generator sources with any references only support
1261                    // `FOR ALL TABLES`. However, this would change if database-issues#7911
1262                    // landed.
1263                    curr_references.clear();
1264                    stmt.external_references = Some(ExternalReferences::All);
1265                }
1266                CreateSourceConnection::Kafka { .. }
1267                | CreateSourceConnection::LoadGenerator { .. } => {}
1268            }
1269
1270            // If this source has any references, reconstruct them.
1271            if !curr_references.is_empty() {
1272                let mut subsources: Vec<_> = curr_references
1273                    .into_iter()
1274                    .flat_map(|(reference, names)| {
1275                        names.into_iter().map(move |name| ExternalReferenceExport {
1276                            reference: reference.clone(),
1277                            alias: Some(name),
1278                        })
1279                    })
1280                    .collect();
1281                subsources.sort();
1282                stmt.external_references = Some(ExternalReferences::SubsetTables(subsources));
1283            }
1284        }
1285        Statement::CreateSubsource(stmt) => {
1286            stmt.with_options.retain_mut(|o| {
1287                match o.name {
1288                    CreateSubsourceOptionName::TextColumns => true,
1289                    CreateSubsourceOptionName::RetainHistory => true,
1290                    CreateSubsourceOptionName::ExcludeColumns => true,
1291                    // Drop details, which does not roundtrip.
1292                    CreateSubsourceOptionName::Details => false,
1293                    CreateSubsourceOptionName::ExternalReference => true,
1294                    CreateSubsourceOptionName::Progress => true,
1295                }
1296            });
1297        }
1298        Statement::CreateSink(stmt) => {
1299            stmt.with_options.retain_mut(|o| {
1300                match o.name {
1301                    CreateSinkOptionName::CommitInterval => true,
1302                    CreateSinkOptionName::PartitionStrategy => true,
1303                    CreateSinkOptionName::Snapshot => true,
1304                    // Drop version, which does not roundtrip.
1305                    CreateSinkOptionName::Version => false,
1306                }
1307            });
1308        }
1309        _ => (),
1310    }
1311
1312    Ok(mz_sql_pretty::to_pretty(
1313        &resolved,
1314        PrettyConfig {
1315            width: mz_sql_pretty::DEFAULT_WIDTH,
1316            format_mode: if redacted {
1317                FormatMode::SimpleRedacted
1318            } else {
1319                FormatMode::Simple
1320            },
1321        },
1322    ))
1323}