mz_sql/plan/
statement.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Statement planning.
11//!
12//! This module houses the entry points for planning a SQL statement.
13
14use std::cell::RefCell;
15use std::collections::{BTreeMap, BTreeSet};
16
17use mz_repr::namespaces::is_system_schema;
18use mz_repr::{
19    CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType,
20};
21use mz_sql_parser::ast::{
22    ColumnDef, ColumnName, ConnectionDefaultAwsPrivatelink, CreateMaterializedViewStatement,
23    RawItemName, ShowStatement, StatementKind, TableConstraint, UnresolvedDatabaseName,
24    UnresolvedSchemaName,
25};
26use mz_storage_types::connections::inline::ReferencedConnection;
27use mz_storage_types::connections::{AwsPrivatelink, Connection, SshTunnel, Tunnel};
28
29use crate::ast::{Ident, Statement, UnresolvedItemName};
30use crate::catalog::{
31    CatalogCluster, CatalogCollectionItem, CatalogDatabase, CatalogItem, CatalogItemType,
32    CatalogSchema, ObjectType, SessionCatalog, SystemObjectType,
33};
34use crate::names::{
35    self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
36    QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
37    ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
38    SystemObjectId,
39};
40use crate::normalize;
41use crate::plan::error::PlanError;
42use crate::plan::{Params, Plan, PlanContext, PlanKind, query, with_options};
43use crate::session::vars::FeatureFlag;
44
45mod acl;
46pub(crate) mod ddl;
47mod dml;
48mod raise;
49mod scl;
50pub(crate) mod show;
51mod tcl;
52mod validate;
53
54use crate::session::vars;
55pub(crate) use ddl::PgConfigOptionExtracted;
56use mz_controller_types::ClusterId;
57use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
58use mz_repr::role_id::RoleId;
59
60/// Describes the output of a SQL statement.
61#[derive(Debug, Clone, Eq, PartialEq)]
62pub struct StatementDesc {
63    /// The shape of the rows produced by the statement, if the statement
64    /// produces rows.
65    pub relation_desc: Option<RelationDesc>,
66    /// The determined types of the parameters in the statement, if any.
67    pub param_types: Vec<SqlScalarType>,
68    /// Whether the statement is a `COPY` statement.
69    pub is_copy: bool,
70}
71
72impl StatementDesc {
73    pub fn new(relation_desc: Option<RelationDesc>) -> Self {
74        StatementDesc {
75            relation_desc,
76            param_types: vec![],
77            is_copy: false,
78        }
79    }
80
81    /// Reports the number of columns in the statement's result set, or zero if
82    /// the statement does not return rows.
83    pub fn arity(&self) -> usize {
84        self.relation_desc
85            .as_ref()
86            .map(|desc| desc.typ().column_types.len())
87            .unwrap_or(0)
88    }
89
90    fn with_params(mut self, param_types: Vec<SqlScalarType>) -> Self {
91        self.param_types = param_types;
92        self
93    }
94
95    fn with_is_copy(mut self) -> Self {
96        self.is_copy = true;
97        self
98    }
99}
100
101/// Creates a description of the purified statement `stmt`.
102///
103/// See the documentation of [`StatementDesc`] for details.
104pub fn describe(
105    pcx: &PlanContext,
106    catalog: &dyn SessionCatalog,
107    stmt: Statement<Aug>,
108    param_types_in: &[Option<SqlScalarType>],
109) -> Result<StatementDesc, PlanError> {
110    let mut param_types = BTreeMap::new();
111    for (i, ty) in param_types_in.iter().enumerate() {
112        if let Some(ty) = ty {
113            param_types.insert(i + 1, ty.clone());
114        }
115    }
116
117    let scx = StatementContext {
118        pcx: Some(pcx),
119        catalog,
120        param_types: RefCell::new(param_types),
121        ambiguous_columns: RefCell::new(false),
122    };
123
124    let desc = match stmt {
125        // DDL statements.
126        Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?,
127        Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?,
128        Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?,
129        Statement::AlterMaterializedViewApplyReplacement(stmt) => {
130            ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)?
131        }
132        Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?,
133        Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?,
134        Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?,
135        Statement::AlterRole(stmt) => ddl::describe_alter_role(&scx, stmt)?,
136        Statement::AlterSecret(stmt) => ddl::describe_alter_secret_options(&scx, stmt)?,
137        Statement::AlterSetCluster(stmt) => ddl::describe_alter_set_cluster(&scx, stmt)?,
138        Statement::AlterSink(stmt) => ddl::describe_alter_sink(&scx, stmt)?,
139        Statement::AlterSource(stmt) => ddl::describe_alter_source(&scx, stmt)?,
140        Statement::AlterSystemSet(stmt) => ddl::describe_alter_system_set(&scx, stmt)?,
141        Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?,
142        Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?,
143        Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?,
144        Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?,
145        Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?,
146        Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?,
147        Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?,
148        Statement::CreateConnection(stmt) => ddl::describe_create_connection(&scx, stmt)?,
149        Statement::CreateDatabase(stmt) => ddl::describe_create_database(&scx, stmt)?,
150        Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
151        Statement::CreateRole(stmt) => ddl::describe_create_role(&scx, stmt)?,
152        Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
153        Statement::CreateSecret(stmt) => ddl::describe_create_secret(&scx, stmt)?,
154        Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
155        Statement::CreateWebhookSource(stmt) => ddl::describe_create_webhook_source(&scx, stmt)?,
156        Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
157        Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
158        Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
159        Statement::CreateTableFromSource(stmt) => {
160            ddl::describe_create_table_from_source(&scx, stmt)?
161        }
162        Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
163        Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
164        Statement::CreateMaterializedView(stmt) => {
165            ddl::describe_create_materialized_view(&scx, stmt)?
166        }
167        Statement::CreateContinualTask(stmt) => ddl::describe_create_continual_task(&scx, stmt)?,
168        Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?,
169        Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?,
170        Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?,
171
172        // `ACL` statements.
173        Statement::AlterOwner(stmt) => acl::describe_alter_owner(&scx, stmt)?,
174        Statement::GrantRole(stmt) => acl::describe_grant_role(&scx, stmt)?,
175        Statement::RevokeRole(stmt) => acl::describe_revoke_role(&scx, stmt)?,
176        Statement::GrantPrivileges(stmt) => acl::describe_grant_privileges(&scx, stmt)?,
177        Statement::RevokePrivileges(stmt) => acl::describe_revoke_privileges(&scx, stmt)?,
178        Statement::AlterDefaultPrivileges(stmt) => {
179            acl::describe_alter_default_privileges(&scx, stmt)?
180        }
181        Statement::ReassignOwned(stmt) => acl::describe_reassign_owned(&scx, stmt)?,
182
183        // `SHOW` statements.
184        Statement::Show(ShowStatement::ShowColumns(stmt)) => {
185            show::show_columns(&scx, stmt)?.describe()?
186        }
187        Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
188            show::describe_show_create_connection(&scx, stmt)?
189        }
190        Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
191            show::describe_show_create_cluster(&scx, stmt)?
192        }
193        Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
194            show::describe_show_create_index(&scx, stmt)?
195        }
196        Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
197            show::describe_show_create_sink(&scx, stmt)?
198        }
199        Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
200            show::describe_show_create_source(&scx, stmt)?
201        }
202        Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
203            show::describe_show_create_table(&scx, stmt)?
204        }
205        Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
206            show::describe_show_create_view(&scx, stmt)?
207        }
208        Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
209            show::describe_show_create_materialized_view(&scx, stmt)?
210        }
211        Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
212            show::describe_show_create_type(&scx, stmt)?
213        }
214        Statement::Show(ShowStatement::ShowObjects(stmt)) => {
215            show::show_objects(&scx, stmt)?.describe()?
216        }
217
218        // SCL statements.
219        Statement::Close(stmt) => scl::describe_close(&scx, stmt)?,
220        Statement::Deallocate(stmt) => scl::describe_deallocate(&scx, stmt)?,
221        Statement::Declare(stmt) => scl::describe_declare(&scx, stmt, param_types_in)?,
222        Statement::Discard(stmt) => scl::describe_discard(&scx, stmt)?,
223        Statement::Execute(stmt) => scl::describe_execute(&scx, stmt)?,
224        Statement::Fetch(stmt) => scl::describe_fetch(&scx, stmt)?,
225        Statement::Prepare(stmt) => scl::describe_prepare(&scx, stmt)?,
226        Statement::ResetVariable(stmt) => scl::describe_reset_variable(&scx, stmt)?,
227        Statement::SetVariable(stmt) => scl::describe_set_variable(&scx, stmt)?,
228        Statement::Show(ShowStatement::ShowVariable(stmt)) => {
229            scl::describe_show_variable(&scx, stmt)?
230        }
231
232        // DML statements.
233        Statement::Copy(stmt) => dml::describe_copy(&scx, stmt)?,
234        Statement::Delete(stmt) => dml::describe_delete(&scx, stmt)?,
235        Statement::ExplainPlan(stmt) => dml::describe_explain_plan(&scx, stmt)?,
236        Statement::ExplainPushdown(stmt) => dml::describe_explain_pushdown(&scx, stmt)?,
237        Statement::ExplainAnalyzeObject(stmt) => dml::describe_explain_analyze_object(&scx, stmt)?,
238        Statement::ExplainAnalyzeCluster(stmt) => {
239            dml::describe_explain_analyze_cluster(&scx, stmt)?
240        }
241        Statement::ExplainTimestamp(stmt) => dml::describe_explain_timestamp(&scx, stmt)?,
242        Statement::ExplainSinkSchema(stmt) => dml::describe_explain_schema(&scx, stmt)?,
243        Statement::Insert(stmt) => dml::describe_insert(&scx, stmt)?,
244        Statement::Select(stmt) => dml::describe_select(&scx, stmt)?,
245        Statement::Subscribe(stmt) => dml::describe_subscribe(&scx, stmt)?,
246        Statement::Update(stmt) => dml::describe_update(&scx, stmt)?,
247
248        // TCL statements.
249        Statement::Commit(stmt) => tcl::describe_commit(&scx, stmt)?,
250        Statement::Rollback(stmt) => tcl::describe_rollback(&scx, stmt)?,
251        Statement::SetTransaction(stmt) => tcl::describe_set_transaction(&scx, stmt)?,
252        Statement::StartTransaction(stmt) => tcl::describe_start_transaction(&scx, stmt)?,
253
254        // Other statements.
255        Statement::Raise(stmt) => raise::describe_raise(&scx, stmt)?,
256        Statement::Show(ShowStatement::InspectShard(stmt)) => {
257            scl::describe_inspect_shard(&scx, stmt)?
258        }
259        Statement::ValidateConnection(stmt) => validate::describe_validate_connection(&scx, stmt)?,
260    };
261
262    let desc = desc.with_params(scx.finalize_param_types()?);
263    Ok(desc)
264}
265
266/// Produces a [`Plan`] from the purified statement `stmt`.
267///
268/// Planning is a pure, synchronous function and so requires that the provided
269/// `stmt` does does not depend on any external state. Statements that rely on
270/// external state must remove that state prior to calling this function via
271/// [`crate::pure::purify_statement`] or
272/// [`crate::pure::purify_create_materialized_view_options`].
273///
274/// The returned plan is tied to the state of the provided catalog. If the state
275/// of the catalog changes after planning, the validity of the plan is not
276/// guaranteed.
277///
278/// Note that if you want to do something else asynchronously (e.g. validating
279/// connections), these might want to take different code paths than
280/// `purify_statement`. Feel free to rationalize this by thinking of those
281/// statements as not necessarily depending on external state.
282#[mz_ore::instrument(level = "debug")]
283pub fn plan(
284    pcx: Option<&PlanContext>,
285    catalog: &dyn SessionCatalog,
286    stmt: Statement<Aug>,
287    params: &Params,
288    resolved_ids: &ResolvedIds,
289) -> Result<Plan, PlanError> {
290    let param_types = params
291        // We need the `expected_types` here, not the `actual_types`! This is because
292        // `expected_types` is how the parameter expression (e.g. `$1`) looks "from the outside":
293        // `bind_parameters` will insert a cast from the actual type to the expected type.
294        .expected_types
295        .iter()
296        .enumerate()
297        .map(|(i, ty)| (i + 1, ty.clone()))
298        .collect();
299
300    let kind: StatementKind = (&stmt).into();
301    let permitted_plans = Plan::generated_from(&kind);
302
303    let scx = &mut StatementContext {
304        pcx,
305        catalog,
306        param_types: RefCell::new(param_types),
307        ambiguous_columns: RefCell::new(false),
308    };
309
310    if resolved_ids
311        .items()
312        // Filter out items that may not have been created yet, such as sub-sources.
313        .filter_map(|id| catalog.try_get_item(id))
314        .any(|item| {
315            item.func().is_ok()
316                && item.name().qualifiers.schema_spec
317                    == SchemaSpecifier::Id(catalog.get_mz_unsafe_schema_id())
318        })
319    {
320        scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNSAFE_FUNCTIONS)?;
321    }
322
323    let plan = match stmt {
324        // DDL statements.
325        Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt),
326        Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt),
327        Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt),
328        Statement::AlterMaterializedViewApplyReplacement(stmt) => {
329            ddl::plan_alter_materialized_view_apply_replacement(scx, stmt)
330        }
331        Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt),
332        Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt),
333        Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt),
334        Statement::AlterRole(stmt) => ddl::plan_alter_role(scx, stmt),
335        Statement::AlterSecret(stmt) => ddl::plan_alter_secret(scx, stmt),
336        Statement::AlterSetCluster(stmt) => ddl::plan_alter_item_set_cluster(scx, stmt),
337        Statement::AlterSink(stmt) => ddl::plan_alter_sink(scx, stmt),
338        Statement::AlterSource(stmt) => ddl::plan_alter_source(scx, stmt),
339        Statement::AlterSystemSet(stmt) => ddl::plan_alter_system_set(scx, stmt),
340        Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt),
341        Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt),
342        Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt),
343        Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt),
344        Statement::Comment(stmt) => ddl::plan_comment(scx, stmt),
345        Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt),
346        Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt),
347        Statement::CreateConnection(stmt) => ddl::plan_create_connection(scx, stmt),
348        Statement::CreateDatabase(stmt) => ddl::plan_create_database(scx, stmt),
349        Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
350        Statement::CreateRole(stmt) => ddl::plan_create_role(scx, stmt),
351        Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
352        Statement::CreateSecret(stmt) => ddl::plan_create_secret(scx, stmt),
353        Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
354        Statement::CreateWebhookSource(stmt) => ddl::plan_create_webhook_source(scx, stmt),
355        Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
356        Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
357        Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
358        Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
359        Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
360        Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt),
361        Statement::CreateMaterializedView(stmt) => ddl::plan_create_materialized_view(scx, stmt),
362        Statement::CreateContinualTask(stmt) => ddl::plan_create_continual_task(scx, stmt),
363        Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt),
364        Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt),
365        Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt),
366
367        // `ACL` statements.
368        Statement::AlterOwner(stmt) => acl::plan_alter_owner(scx, stmt),
369        Statement::GrantRole(stmt) => acl::plan_grant_role(scx, stmt),
370        Statement::RevokeRole(stmt) => acl::plan_revoke_role(scx, stmt),
371        Statement::GrantPrivileges(stmt) => acl::plan_grant_privileges(scx, stmt),
372        Statement::RevokePrivileges(stmt) => acl::plan_revoke_privileges(scx, stmt),
373        Statement::AlterDefaultPrivileges(stmt) => acl::plan_alter_default_privileges(scx, stmt),
374        Statement::ReassignOwned(stmt) => acl::plan_reassign_owned(scx, stmt),
375
376        // DML statements.
377        Statement::Copy(stmt) => dml::plan_copy(scx, stmt),
378        Statement::Delete(stmt) => dml::plan_delete(scx, stmt, params),
379        Statement::ExplainPlan(stmt) => dml::plan_explain_plan(scx, stmt, params),
380        Statement::ExplainPushdown(stmt) => dml::plan_explain_pushdown(scx, stmt, params),
381        Statement::ExplainAnalyzeObject(stmt) => {
382            dml::plan_explain_analyze_object(scx, stmt, params)
383        }
384        Statement::ExplainAnalyzeCluster(stmt) => {
385            dml::plan_explain_analyze_cluster(scx, stmt, params)
386        }
387        Statement::ExplainTimestamp(stmt) => dml::plan_explain_timestamp(scx, stmt),
388        Statement::ExplainSinkSchema(stmt) => dml::plan_explain_schema(scx, stmt),
389        Statement::Insert(stmt) => dml::plan_insert(scx, stmt, params),
390        Statement::Select(stmt) => dml::plan_select(scx, stmt, params, None),
391        Statement::Subscribe(stmt) => dml::plan_subscribe(scx, stmt, params, None),
392        Statement::Update(stmt) => dml::plan_update(scx, stmt, params),
393
394        // `SHOW` statements.
395        Statement::Show(ShowStatement::ShowColumns(stmt)) => show::show_columns(scx, stmt)?.plan(),
396        Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
397            show::plan_show_create_connection(scx, stmt).map(Plan::ShowCreate)
398        }
399        Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
400            show::plan_show_create_cluster(scx, stmt).map(Plan::ShowCreate)
401        }
402        Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
403            show::plan_show_create_index(scx, stmt).map(Plan::ShowCreate)
404        }
405        Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
406            show::plan_show_create_sink(scx, stmt).map(Plan::ShowCreate)
407        }
408        Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
409            show::plan_show_create_source(scx, stmt).map(Plan::ShowCreate)
410        }
411        Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
412            show::plan_show_create_table(scx, stmt).map(Plan::ShowCreate)
413        }
414        Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
415            show::plan_show_create_view(scx, stmt).map(Plan::ShowCreate)
416        }
417        Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
418            show::plan_show_create_materialized_view(scx, stmt).map(Plan::ShowCreate)
419        }
420        Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
421            show::plan_show_create_type(scx, stmt).map(Plan::ShowCreate)
422        }
423        Statement::Show(ShowStatement::ShowObjects(stmt)) => show::show_objects(scx, stmt)?.plan(),
424
425        // SCL statements.
426        Statement::Close(stmt) => scl::plan_close(scx, stmt),
427        Statement::Deallocate(stmt) => scl::plan_deallocate(scx, stmt),
428        Statement::Declare(stmt) => scl::plan_declare(scx, stmt, params),
429        Statement::Discard(stmt) => scl::plan_discard(scx, stmt),
430        Statement::Execute(stmt) => scl::plan_execute(scx, stmt),
431        Statement::Fetch(stmt) => scl::plan_fetch(scx, stmt),
432        Statement::Prepare(stmt) => scl::plan_prepare(scx, stmt),
433        Statement::ResetVariable(stmt) => scl::plan_reset_variable(scx, stmt),
434        Statement::SetVariable(stmt) => scl::plan_set_variable(scx, stmt),
435        Statement::Show(ShowStatement::ShowVariable(stmt)) => scl::plan_show_variable(scx, stmt),
436
437        // TCL statements.
438        Statement::Commit(stmt) => tcl::plan_commit(scx, stmt),
439        Statement::Rollback(stmt) => tcl::plan_rollback(scx, stmt),
440        Statement::SetTransaction(stmt) => tcl::plan_set_transaction(scx, stmt),
441        Statement::StartTransaction(stmt) => tcl::plan_start_transaction(scx, stmt),
442
443        // Other statements.
444        Statement::Raise(stmt) => raise::plan_raise(scx, stmt),
445        Statement::Show(ShowStatement::InspectShard(stmt)) => scl::plan_inspect_shard(scx, stmt),
446        Statement::ValidateConnection(stmt) => validate::plan_validate_connection(scx, stmt),
447    };
448
449    if let Ok(plan) = &plan {
450        mz_ore::soft_assert_no_log!(
451            permitted_plans.contains(&PlanKind::from(plan)),
452            "plan {:?}, permitted plans {:?}",
453            plan,
454            permitted_plans
455        );
456    }
457
458    plan
459}
460
461pub fn plan_copy_from(
462    pcx: &PlanContext,
463    catalog: &dyn SessionCatalog,
464    target_id: CatalogItemId,
465    target_name: String,
466    columns: Vec<ColumnIndex>,
467    rows: Vec<mz_repr::Row>,
468) -> Result<super::HirRelationExpr, PlanError> {
469    query::plan_copy_from_rows(pcx, catalog, target_id, target_name, columns, rows)
470}
471
472/// Whether a SQL object type can be interpreted as matching the type of the given catalog item.
473/// For example, if `v` is a view, `DROP SOURCE v` should not work, since Source and View
474/// are non-matching types.
475///
476/// For now tables are treated as a special kind of source in Materialize, so just
477/// allow `TABLE` to refer to either.
478impl PartialEq<ObjectType> for CatalogItemType {
479    fn eq(&self, other: &ObjectType) -> bool {
480        match (self, other) {
481            (CatalogItemType::Source, ObjectType::Source)
482            | (CatalogItemType::Table, ObjectType::Table)
483            | (CatalogItemType::Sink, ObjectType::Sink)
484            | (CatalogItemType::View, ObjectType::View)
485            | (CatalogItemType::MaterializedView, ObjectType::MaterializedView)
486            | (CatalogItemType::Index, ObjectType::Index)
487            | (CatalogItemType::Type, ObjectType::Type)
488            | (CatalogItemType::Secret, ObjectType::Secret)
489            | (CatalogItemType::Connection, ObjectType::Connection) => true,
490            (_, _) => false,
491        }
492    }
493}
494
495impl PartialEq<CatalogItemType> for ObjectType {
496    fn eq(&self, other: &CatalogItemType) -> bool {
497        other == self
498    }
499}
500
501/// Immutable state that applies to the planning of an entire `Statement`.
502#[derive(Debug, Clone)]
503pub struct StatementContext<'a> {
504    /// The optional PlanContext, which will be present for statements that execute
505    /// within the OneShot QueryLifetime and None otherwise (views). This is an
506    /// awkward field and should probably be relocated to a place that fits our
507    /// execution model more closely.
508    pcx: Option<&'a PlanContext>,
509    pub catalog: &'a dyn SessionCatalog,
510    /// The types of the parameters in the query. This is filled in as planning
511    /// occurs.
512    pub param_types: RefCell<BTreeMap<usize, SqlScalarType>>,
513    /// Whether the statement contains an expression that can make the exact column list
514    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`. This is filled in as planning occurs.
515    pub ambiguous_columns: RefCell<bool>,
516}
517
518impl<'a> StatementContext<'a> {
519    pub fn new(
520        pcx: Option<&'a PlanContext>,
521        catalog: &'a dyn SessionCatalog,
522    ) -> StatementContext<'a> {
523        StatementContext {
524            pcx,
525            catalog,
526            param_types: Default::default(),
527            ambiguous_columns: RefCell::new(false),
528        }
529    }
530
531    /// Returns the schemas in order of search_path that exist in the catalog.
532    pub fn current_schemas(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
533        self.catalog.search_path()
534    }
535
536    /// Returns the first schema from the search_path that exist in the catalog,
537    /// or None if there are none.
538    pub fn current_schema(&self) -> Option<&(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
539        self.current_schemas().into_iter().next()
540    }
541
542    pub fn pcx(&self) -> Result<&PlanContext, PlanError> {
543        self.pcx.ok_or_else(|| sql_err!("no plan context"))
544    }
545
546    pub fn allocate_full_name(&self, name: PartialItemName) -> Result<FullItemName, PlanError> {
547        let (database, schema): (RawDatabaseSpecifier, String) = match (name.database, name.schema)
548        {
549            (None, None) => {
550                let Some((database, schema)) = self.current_schema() else {
551                    return Err(PlanError::InvalidSchemaName);
552                };
553                let schema = self.get_schema(database, schema);
554                let database = match schema.database() {
555                    ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
556                    ResolvedDatabaseSpecifier::Id(id) => {
557                        RawDatabaseSpecifier::Name(self.catalog.get_database(id).name().to_string())
558                    }
559                };
560                (database, schema.name().schema.clone())
561            }
562            (None, Some(schema)) => {
563                if is_system_schema(&schema) {
564                    (RawDatabaseSpecifier::Ambient, schema)
565                } else {
566                    match self.catalog.active_database_name() {
567                        Some(name) => (RawDatabaseSpecifier::Name(name.to_string()), schema),
568                        None => {
569                            sql_bail!(
570                                "no database specified for non-system schema and no active database"
571                            )
572                        }
573                    }
574                }
575            }
576            (Some(_database), None) => {
577                // This shouldn't be possible. Refactor the datastructure to
578                // make it not exist.
579                sql_bail!("unreachable: specified the database but no schema")
580            }
581            (Some(database), Some(schema)) => (RawDatabaseSpecifier::Name(database), schema),
582        };
583        let item = name.item;
584        Ok(FullItemName {
585            database,
586            schema,
587            item,
588        })
589    }
590
591    pub fn allocate_qualified_name(
592        &self,
593        name: PartialItemName,
594    ) -> Result<QualifiedItemName, PlanError> {
595        let full_name = self.allocate_full_name(name)?;
596        let database_spec = match full_name.database {
597            RawDatabaseSpecifier::Ambient => ResolvedDatabaseSpecifier::Ambient,
598            RawDatabaseSpecifier::Name(name) => ResolvedDatabaseSpecifier::Id(
599                self.resolve_database(&UnresolvedDatabaseName(Ident::new(name)?))?
600                    .id(),
601            ),
602        };
603        let schema_spec = self
604            .resolve_schema_in_database(&database_spec, &Ident::new(full_name.schema)?)?
605            .id()
606            .clone();
607        Ok(QualifiedItemName {
608            qualifiers: ItemQualifiers {
609                database_spec,
610                schema_spec,
611            },
612            item: full_name.item,
613        })
614    }
615
616    pub fn allocate_temporary_full_name(&self, name: PartialItemName) -> FullItemName {
617        FullItemName {
618            database: RawDatabaseSpecifier::Ambient,
619            schema: name
620                .schema
621                .unwrap_or_else(|| mz_repr::namespaces::MZ_TEMP_SCHEMA.to_owned()),
622            item: name.item,
623        }
624    }
625
626    pub fn allocate_temporary_qualified_name(
627        &self,
628        name: PartialItemName,
629    ) -> Result<QualifiedItemName, PlanError> {
630        // Compare against the MZ_TEMP_SCHEMA constant directly instead of calling
631        // get_schema(), because with lazy temporary schema creation, the temp
632        // schema may not exist yet. (This is similar to what `allocate_temporary_full_name` was
633        // doing already before making temporary schemas lazy.)
634        if let Some(schema_name) = name.schema {
635            if schema_name != mz_repr::namespaces::MZ_TEMP_SCHEMA {
636                return Err(PlanError::InvalidTemporarySchema);
637            }
638        }
639
640        Ok(QualifiedItemName {
641            qualifiers: ItemQualifiers {
642                database_spec: ResolvedDatabaseSpecifier::Ambient,
643                schema_spec: SchemaSpecifier::Temporary,
644            },
645            item: name.item,
646        })
647    }
648
649    // Creates a `ResolvedItemName::Item` from a `GlobalId` and an
650    // `UnresolvedItemName`.
651    pub fn allocate_resolved_item_name(
652        &self,
653        id: CatalogItemId,
654        name: UnresolvedItemName,
655    ) -> Result<ResolvedItemName, PlanError> {
656        let partial = normalize::unresolved_item_name(name)?;
657        let qualified = self.allocate_qualified_name(partial.clone())?;
658        let full_name = self.allocate_full_name(partial)?;
659        Ok(ResolvedItemName::Item {
660            id,
661            qualifiers: qualified.qualifiers,
662            full_name,
663            print_id: true,
664            version: RelationVersionSelector::Latest,
665        })
666    }
667
668    pub fn active_database(&self) -> Option<&DatabaseId> {
669        self.catalog.active_database()
670    }
671
672    pub fn resolve_optional_schema(
673        &self,
674        schema_name: &Option<ResolvedSchemaName>,
675    ) -> Result<SchemaSpecifier, PlanError> {
676        match schema_name {
677            Some(ResolvedSchemaName::Schema { schema_spec, .. }) => Ok(schema_spec.clone()),
678            None => self.resolve_active_schema().map(|spec| spec.clone()),
679            Some(ResolvedSchemaName::Error) => {
680                unreachable!("should have been handled by name resolution")
681            }
682        }
683    }
684
685    pub fn resolve_active_schema(&self) -> Result<&SchemaSpecifier, PlanError> {
686        match self.current_schema() {
687            Some((_db, schema)) => Ok(schema),
688            None => Err(PlanError::InvalidSchemaName),
689        }
690    }
691
692    pub fn get_cluster(&self, id: &ClusterId) -> &dyn CatalogCluster<'_> {
693        self.catalog.get_cluster(*id)
694    }
695
696    pub fn resolve_database(
697        &self,
698        name: &UnresolvedDatabaseName,
699    ) -> Result<&dyn CatalogDatabase, PlanError> {
700        let name = normalize::ident_ref(&name.0);
701        Ok(self.catalog.resolve_database(name)?)
702    }
703
704    pub fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase {
705        self.catalog.get_database(id)
706    }
707
708    pub fn resolve_schema_in_database(
709        &self,
710        database_spec: &ResolvedDatabaseSpecifier,
711        schema: &Ident,
712    ) -> Result<&dyn CatalogSchema, PlanError> {
713        let schema = normalize::ident_ref(schema);
714        Ok(self
715            .catalog
716            .resolve_schema_in_database(database_spec, schema)?)
717    }
718
719    pub fn resolve_schema(
720        &self,
721        name: UnresolvedSchemaName,
722    ) -> Result<&dyn CatalogSchema, PlanError> {
723        let name = normalize::unresolved_schema_name(name)?;
724        Ok(self
725            .catalog
726            .resolve_schema(name.database.as_deref(), &name.schema)?)
727    }
728
729    pub fn get_schema(
730        &self,
731        database_spec: &ResolvedDatabaseSpecifier,
732        schema_spec: &SchemaSpecifier,
733    ) -> &dyn CatalogSchema {
734        self.catalog.get_schema(database_spec, schema_spec)
735    }
736
737    pub fn resolve_item(&self, name: RawItemName) -> Result<&dyn CatalogItem, PlanError> {
738        match name {
739            RawItemName::Name(name) => {
740                let name = normalize::unresolved_item_name(name)?;
741                Ok(self.catalog.resolve_item(&name)?)
742            }
743            RawItemName::Id(id, _, _) => {
744                let gid = id.parse()?;
745                Ok(self.catalog.get_item(&gid))
746            }
747        }
748    }
749
750    pub fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem {
751        self.catalog.get_item(id)
752    }
753
754    pub fn get_item_by_resolved_name(
755        &self,
756        name: &ResolvedItemName,
757    ) -> Result<Box<dyn CatalogCollectionItem + '_>, PlanError> {
758        match name {
759            ResolvedItemName::Item { id, version, .. } => {
760                Ok(self.get_item(id).at_version(*version))
761            }
762            ResolvedItemName::Cte { .. } => sql_bail!("non-user item"),
763            ResolvedItemName::ContinualTask { .. } => sql_bail!("non-user item"),
764            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
765        }
766    }
767
768    pub fn get_column_by_resolved_name(
769        &self,
770        name: &ColumnName<Aug>,
771    ) -> Result<(Box<dyn CatalogCollectionItem + '_>, usize), PlanError> {
772        match (&name.relation, &name.column) {
773            (
774                ResolvedItemName::Item { id, version, .. },
775                ResolvedColumnReference::Column { index, .. },
776            ) => {
777                let item = self.get_item(id).at_version(*version);
778                Ok((item, *index))
779            }
780            _ => unreachable!(
781                "get_column_by_resolved_name errors should have been caught in name resolution"
782            ),
783        }
784    }
785
786    pub fn resolve_function(
787        &self,
788        name: UnresolvedItemName,
789    ) -> Result<&dyn CatalogItem, PlanError> {
790        let name = normalize::unresolved_item_name(name)?;
791        Ok(self.catalog.resolve_function(&name)?)
792    }
793
794    pub fn resolve_cluster(
795        &self,
796        name: Option<&Ident>,
797    ) -> Result<&dyn CatalogCluster<'_>, PlanError> {
798        let name = name.map(|name| name.as_str());
799        Ok(self.catalog.resolve_cluster(name)?)
800    }
801
802    pub fn resolve_type(&self, mut ty: mz_pgrepr::Type) -> Result<ResolvedDataType, PlanError> {
803        // Ignore precision constraints on date/time types until we support
804        // it. This should be safe enough because our types are wide enough
805        // to support the maximum possible precision.
806        //
807        // See: https://github.com/MaterializeInc/database-issues/issues/3179
808        match &mut ty {
809            mz_pgrepr::Type::Interval { constraints } => *constraints = None,
810            mz_pgrepr::Type::Time { precision } => *precision = None,
811            mz_pgrepr::Type::TimeTz { precision } => *precision = None,
812            mz_pgrepr::Type::Timestamp { precision } => *precision = None,
813            mz_pgrepr::Type::TimestampTz { precision } => *precision = None,
814            _ => (),
815        }
816        // NOTE(benesch): this *looks* gross, but it is
817        // safe enough. The `fmt::Display`
818        // representation on `pgrepr::Type` promises to
819        // produce an unqualified type name that does
820        // not require quoting.
821        let mut ty = if ty.oid() >= FIRST_USER_OID {
822            sql_bail!("internal error, unexpected user type: {ty:?} ");
823        } else if ty.oid() < FIRST_MATERIALIZE_OID {
824            format!("pg_catalog.{}", ty)
825        } else {
826            // This relies on all non-PG types existing in `mz_catalog`, which is annoying.
827            format!("mz_catalog.{}", ty)
828        };
829        // TODO(benesch): converting `json` to `jsonb`
830        // is wrong. We ought to support the `json` type
831        // directly.
832        if ty == "pg_catalog.json" {
833            ty = "pg_catalog.jsonb".into();
834        }
835        let data_type = mz_sql_parser::parser::parse_data_type(&ty)?;
836        let (data_type, _) = names::resolve(self.catalog, data_type)?;
837        Ok(data_type)
838    }
839
840    pub fn get_object_type(&self, id: &ObjectId) -> ObjectType {
841        self.catalog.get_object_type(id)
842    }
843
844    pub fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType {
845        match id {
846            SystemObjectId::Object(id) => SystemObjectType::Object(self.get_object_type(id)),
847            SystemObjectId::System => SystemObjectType::System,
848        }
849    }
850
851    /// Returns an error if the named `FeatureFlag` is not set to `on`.
852    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
853        flag.require(self.catalog.system_vars())?;
854        Ok(())
855    }
856
857    /// Returns true if the named [`FeatureFlag`] is set to `on`, returns false otherwise.
858    pub fn is_feature_flag_enabled(&self, flag: &'static FeatureFlag) -> bool {
859        self.require_feature_flag(flag).is_ok()
860    }
861
862    pub fn finalize_param_types(self) -> Result<Vec<SqlScalarType>, PlanError> {
863        let param_types = self.param_types.into_inner();
864        let mut out = vec![];
865        for (i, (n, typ)) in param_types.into_iter().enumerate() {
866            if n != i + 1 {
867                sql_bail!("unable to infer type for parameter ${}", i + 1);
868            }
869            out.push(typ);
870        }
871        Ok(out)
872    }
873
874    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
875    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
876    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
877        self.catalog.humanize_scalar_type(typ, postgres_compat)
878    }
879
880    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
881    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
882    pub fn humanize_column_type(&self, typ: &SqlColumnType, postgres_compat: bool) -> String {
883        self.catalog.humanize_column_type(typ, postgres_compat)
884    }
885
886    pub(crate) fn build_tunnel_definition(
887        &self,
888        ssh_tunnel: Option<with_options::Object>,
889        aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
890    ) -> Result<Tunnel<ReferencedConnection>, PlanError> {
891        match (ssh_tunnel, aws_privatelink) {
892            (None, None) => Ok(Tunnel::Direct),
893            (Some(ssh_tunnel), None) => {
894                let id = CatalogItemId::from(ssh_tunnel);
895                let ssh_tunnel = self.catalog.get_item(&id);
896                match ssh_tunnel.connection()? {
897                    Connection::Ssh(_connection) => Ok(Tunnel::Ssh(SshTunnel {
898                        connection_id: id,
899                        connection: id,
900                    })),
901                    _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
902                }
903            }
904            (None, Some(aws_privatelink)) => {
905                let id = aws_privatelink.connection.item_id().clone();
906                let entry = self.catalog.get_item(&id);
907                match entry.connection()? {
908                    Connection::AwsPrivatelink(_) => Ok(Tunnel::AwsPrivatelink(AwsPrivatelink {
909                        connection_id: id,
910                        // By default we do not specify an availability zone for the tunnel.
911                        availability_zone: None,
912                        // We always use the port as specified by the top-level connection.
913                        port: aws_privatelink.port,
914                    })),
915                    _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
916                }
917            }
918            (Some(_), Some(_)) => {
919                sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
920            }
921        }
922    }
923
924    pub fn relation_desc_into_table_defs(
925        &self,
926        desc: &RelationDesc,
927    ) -> Result<(Vec<ColumnDef<Aug>>, Vec<TableConstraint<Aug>>), PlanError> {
928        let mut columns = vec![];
929        let mut null_cols = BTreeSet::new();
930        for (column_name, column_type) in desc.iter() {
931            let name = Ident::new(column_name.as_str().to_owned())?;
932
933            let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
934            let data_type = self.resolve_type(ty)?;
935
936            let options = if !column_type.nullable {
937                null_cols.insert(columns.len());
938                vec![mz_sql_parser::ast::ColumnOptionDef {
939                    name: None,
940                    option: mz_sql_parser::ast::ColumnOption::NotNull,
941                }]
942            } else {
943                vec![]
944            };
945
946            columns.push(ColumnDef {
947                name,
948                data_type,
949                collation: None,
950                options,
951            });
952        }
953
954        let mut table_constraints = vec![];
955        for key in desc.typ().keys.iter() {
956            let mut col_names = vec![];
957            for col_idx in key {
958                if !null_cols.contains(col_idx) {
959                    // Note that alternatively we could support NULL values in keys with `NULLS NOT
960                    // DISTINCT` semantics, which treats `NULL` as a distinct value.
961                    sql_bail!(
962                        "[internal error] key columns must be NOT NULL when generating table constraints"
963                    );
964                }
965                col_names.push(columns[*col_idx].name.clone());
966            }
967            table_constraints.push(TableConstraint::Unique {
968                name: None,
969                columns: col_names,
970                is_primary: false,
971                nulls_not_distinct: false,
972            });
973        }
974
975        Ok((columns, table_constraints))
976    }
977
978    pub fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
979        self.catalog.get_owner_id(id)
980    }
981
982    pub fn humanize_resolved_name(
983        &self,
984        name: &ResolvedItemName,
985    ) -> Result<PartialItemName, PlanError> {
986        let item = self.get_item_by_resolved_name(name)?;
987        Ok(self.catalog.minimal_qualification(item.name()))
988    }
989
990    /// WARNING! This style of name resolution assumes the referred-to objects exists (i.e. panics
991    /// if objects do not exist) so should never be used to handle user input.
992    pub fn dangerous_resolve_name(&self, name: Vec<&str>) -> ResolvedItemName {
993        tracing::trace!("dangerous_resolve_name {:?}", name);
994        // Note: Using unchecked here is okay because this function is already dangerous.
995        let name: Vec<_> = name.into_iter().map(Ident::new_unchecked).collect();
996        let name = UnresolvedItemName::qualified(&name);
997        let entry = match self.resolve_item(RawItemName::Name(name.clone())) {
998            Ok(entry) => entry,
999            Err(_) => self
1000                .resolve_function(name.clone())
1001                .expect("name referred to an existing object"),
1002        };
1003
1004        let partial = normalize::unresolved_item_name(name).unwrap();
1005        let full_name = self.allocate_full_name(partial).unwrap();
1006
1007        ResolvedItemName::Item {
1008            id: entry.id(),
1009            qualifiers: entry.name().qualifiers.clone(),
1010            full_name,
1011            print_id: true,
1012            version: RelationVersionSelector::Latest,
1013        }
1014    }
1015}
1016
1017pub fn resolve_cluster_for_materialized_view<'a>(
1018    catalog: &'a dyn SessionCatalog,
1019    stmt: &CreateMaterializedViewStatement<Aug>,
1020) -> Result<ClusterId, PlanError> {
1021    Ok(match &stmt.in_cluster {
1022        None => catalog.resolve_cluster(None)?.id(),
1023        Some(in_cluster) => in_cluster.id,
1024    })
1025}
1026
1027/// Statement classification as documented by [`plan`].
1028#[derive(Debug, Clone, Copy)]
1029pub enum StatementClassification {
1030    ACL,
1031    DDL,
1032    DML,
1033    Other,
1034    SCL,
1035    Show,
1036    TCL,
1037}
1038
1039impl StatementClassification {
1040    pub fn is_ddl(&self) -> bool {
1041        matches!(self, StatementClassification::DDL)
1042    }
1043}
1044
1045impl<T: mz_sql_parser::ast::AstInfo> From<&Statement<T>> for StatementClassification {
1046    fn from(value: &Statement<T>) -> Self {
1047        use StatementClassification::*;
1048
1049        match value {
1050            // DDL statements.
1051            Statement::AlterCluster(_) => DDL,
1052            Statement::AlterConnection(_) => DDL,
1053            Statement::AlterIndex(_) => DDL,
1054            Statement::AlterMaterializedViewApplyReplacement(_) => DDL,
1055            Statement::AlterObjectRename(_) => DDL,
1056            Statement::AlterObjectSwap(_) => DDL,
1057            Statement::AlterNetworkPolicy(_) => DDL,
1058            Statement::AlterRetainHistory(_) => DDL,
1059            Statement::AlterRole(_) => DDL,
1060            Statement::AlterSecret(_) => DDL,
1061            Statement::AlterSetCluster(_) => DDL,
1062            Statement::AlterSink(_) => DDL,
1063            Statement::AlterSource(_) => DDL,
1064            Statement::AlterSystemSet(_) => DDL,
1065            Statement::AlterSystemReset(_) => DDL,
1066            Statement::AlterSystemResetAll(_) => DDL,
1067            Statement::AlterTableAddColumn(_) => DDL,
1068            Statement::Comment(_) => DDL,
1069            Statement::CreateCluster(_) => DDL,
1070            Statement::CreateClusterReplica(_) => DDL,
1071            Statement::CreateConnection(_) => DDL,
1072            Statement::CreateContinualTask(_) => DDL,
1073            Statement::CreateDatabase(_) => DDL,
1074            Statement::CreateIndex(_) => DDL,
1075            Statement::CreateRole(_) => DDL,
1076            Statement::CreateSchema(_) => DDL,
1077            Statement::CreateSecret(_) => DDL,
1078            Statement::CreateSink(_) => DDL,
1079            Statement::CreateWebhookSource(_) => DDL,
1080            Statement::CreateSource(_) => DDL,
1081            Statement::CreateSubsource(_) => DDL,
1082            Statement::CreateTable(_) => DDL,
1083            Statement::CreateTableFromSource(_) => DDL,
1084            Statement::CreateType(_) => DDL,
1085            Statement::CreateView(_) => DDL,
1086            Statement::CreateMaterializedView(_) => DDL,
1087            Statement::CreateNetworkPolicy(_) => DDL,
1088            Statement::DropObjects(_) => DDL,
1089            Statement::DropOwned(_) => DDL,
1090
1091            // `ACL` statements.
1092            Statement::AlterOwner(_) => ACL,
1093            Statement::GrantRole(_) => ACL,
1094            Statement::RevokeRole(_) => ACL,
1095            Statement::GrantPrivileges(_) => ACL,
1096            Statement::RevokePrivileges(_) => ACL,
1097            Statement::AlterDefaultPrivileges(_) => ACL,
1098            Statement::ReassignOwned(_) => ACL,
1099
1100            // DML statements.
1101            Statement::Copy(_) => DML,
1102            Statement::Delete(_) => DML,
1103            Statement::ExplainPlan(_) => DML,
1104            Statement::ExplainPushdown(_) => DML,
1105            Statement::ExplainAnalyzeObject(_) => DML,
1106            Statement::ExplainAnalyzeCluster(_) => DML,
1107            Statement::ExplainTimestamp(_) => DML,
1108            Statement::ExplainSinkSchema(_) => DML,
1109            Statement::Insert(_) => DML,
1110            Statement::Select(_) => DML,
1111            Statement::Subscribe(_) => DML,
1112            Statement::Update(_) => DML,
1113
1114            // `SHOW` statements.
1115            Statement::Show(ShowStatement::ShowColumns(_)) => Show,
1116            Statement::Show(ShowStatement::ShowCreateConnection(_)) => Show,
1117            Statement::Show(ShowStatement::ShowCreateCluster(_)) => Show,
1118            Statement::Show(ShowStatement::ShowCreateIndex(_)) => Show,
1119            Statement::Show(ShowStatement::ShowCreateSink(_)) => Show,
1120            Statement::Show(ShowStatement::ShowCreateSource(_)) => Show,
1121            Statement::Show(ShowStatement::ShowCreateTable(_)) => Show,
1122            Statement::Show(ShowStatement::ShowCreateView(_)) => Show,
1123            Statement::Show(ShowStatement::ShowCreateMaterializedView(_)) => Show,
1124            Statement::Show(ShowStatement::ShowCreateType(_)) => Show,
1125            Statement::Show(ShowStatement::ShowObjects(_)) => Show,
1126
1127            // SCL statements.
1128            Statement::Close(_) => SCL,
1129            Statement::Deallocate(_) => SCL,
1130            Statement::Declare(_) => SCL,
1131            Statement::Discard(_) => SCL,
1132            Statement::Execute(_) => SCL,
1133            Statement::Fetch(_) => SCL,
1134            Statement::Prepare(_) => SCL,
1135            Statement::ResetVariable(_) => SCL,
1136            Statement::SetVariable(_) => SCL,
1137            Statement::Show(ShowStatement::ShowVariable(_)) => SCL,
1138
1139            // TCL statements.
1140            Statement::Commit(_) => TCL,
1141            Statement::Rollback(_) => TCL,
1142            Statement::SetTransaction(_) => TCL,
1143            Statement::StartTransaction(_) => TCL,
1144
1145            // Other statements.
1146            Statement::Raise(_) => Other,
1147            Statement::Show(ShowStatement::InspectShard(_)) => Other,
1148            Statement::ValidateConnection(_) => Other,
1149        }
1150    }
1151}