Skip to main content

mz_sql/plan/
statement.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Statement planning.
11//!
12//! This module houses the entry points for planning a SQL statement.
13
14use std::cell::RefCell;
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::{Arc, Mutex};
17
18use mz_repr::namespaces::is_system_schema;
19use mz_repr::{
20    CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType,
21};
22use mz_sql_parser::ast::{
23    ColumnDef, ColumnName, CreateMaterializedViewStatement, RawItemName, ShowStatement,
24    StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName,
25};
26use mz_storage_types::connections::Connection;
27
28use crate::ast::{Ident, Statement, UnresolvedItemName};
29use crate::catalog::{
30    CatalogCluster, CatalogCollectionItem, CatalogDatabase, CatalogItem, CatalogItemType,
31    CatalogSchema, ObjectType, SessionCatalog, SystemObjectType,
32};
33use crate::names::{
34    self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
35    QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
36    ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
37    SystemObjectId,
38};
39use crate::normalize;
40use crate::plan::error::PlanError;
41use crate::plan::{Params, Plan, PlanContext, PlanKind, query};
42use crate::session::vars::FeatureFlag;
43
44mod acl;
45pub(crate) mod ddl;
46mod dml;
47mod raise;
48mod scl;
49pub(crate) mod show;
50mod tcl;
51mod validate;
52
53use crate::session::vars;
54pub(crate) use ddl::PgConfigOptionExtracted;
55use mz_controller_types::ClusterId;
56use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
57use mz_repr::role_id::RoleId;
58
59/// Describes the output of a SQL statement.
60#[derive(Debug, Clone, Eq, PartialEq)]
61pub struct StatementDesc {
62    /// The shape of the rows produced by the statement, if the statement
63    /// produces rows.
64    pub relation_desc: Option<RelationDesc>,
65    /// The determined types of the parameters in the statement, if any.
66    pub param_types: Vec<SqlScalarType>,
67    /// Whether the statement is a `COPY` statement.
68    pub is_copy: bool,
69}
70
71impl StatementDesc {
72    pub fn new(relation_desc: Option<RelationDesc>) -> Self {
73        StatementDesc {
74            relation_desc,
75            param_types: vec![],
76            is_copy: false,
77        }
78    }
79
80    /// Reports the number of columns in the statement's result set, or zero if
81    /// the statement does not return rows.
82    pub fn arity(&self) -> usize {
83        self.relation_desc
84            .as_ref()
85            .map(|desc| desc.typ().column_types.len())
86            .unwrap_or(0)
87    }
88
89    fn with_params(mut self, param_types: Vec<SqlScalarType>) -> Self {
90        self.param_types = param_types;
91        self
92    }
93
94    fn with_is_copy(mut self) -> Self {
95        self.is_copy = true;
96        self
97    }
98}
99
100/// Creates a description of the purified statement `stmt`.
101///
102/// See the documentation of [`StatementDesc`] for details.
103pub fn describe(
104    pcx: &PlanContext,
105    catalog: &dyn SessionCatalog,
106    stmt: Statement<Aug>,
107    param_types_in: &[Option<SqlScalarType>],
108) -> Result<StatementDesc, PlanError> {
109    let mut param_types = BTreeMap::new();
110    for (i, ty) in param_types_in.iter().enumerate() {
111        if let Some(ty) = ty {
112            param_types.insert(i + 1, ty.clone());
113        }
114    }
115
116    let scx = StatementContext {
117        pcx: Some(pcx),
118        catalog,
119        param_types: RefCell::new(param_types),
120        ambiguous_columns: RefCell::new(false),
121        sql_impl_resolved_ids: Arc::new(Mutex::new(ResolvedIds::empty())),
122    };
123
124    let desc = match stmt {
125        // DDL statements.
126        Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?,
127        Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?,
128        Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?,
129        Statement::AlterMaterializedViewApplyReplacement(stmt) => {
130            ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)?
131        }
132        Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?,
133        Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?,
134        Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?,
135        Statement::AlterRole(stmt) => ddl::describe_alter_role(&scx, stmt)?,
136        Statement::AlterSecret(stmt) => ddl::describe_alter_secret_options(&scx, stmt)?,
137        Statement::AlterSetCluster(stmt) => ddl::describe_alter_set_cluster(&scx, stmt)?,
138        Statement::AlterSink(stmt) => ddl::describe_alter_sink(&scx, stmt)?,
139        Statement::AlterSource(stmt) => ddl::describe_alter_source(&scx, stmt)?,
140        Statement::AlterSystemSet(stmt) => ddl::describe_alter_system_set(&scx, stmt)?,
141        Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?,
142        Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?,
143        Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?,
144        Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?,
145        Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?,
146        Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?,
147        Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?,
148        Statement::CreateConnection(stmt) => ddl::describe_create_connection(&scx, stmt)?,
149        Statement::CreateDatabase(stmt) => ddl::describe_create_database(&scx, stmt)?,
150        Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
151        Statement::CreateRole(stmt) => ddl::describe_create_role(&scx, stmt)?,
152        Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
153        Statement::CreateSecret(stmt) => ddl::describe_create_secret(&scx, stmt)?,
154        Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
155        Statement::CreateWebhookSource(stmt) => ddl::describe_create_webhook_source(&scx, stmt)?,
156        Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
157        Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
158        Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
159        Statement::CreateTableFromSource(stmt) => {
160            ddl::describe_create_table_from_source(&scx, stmt)?
161        }
162        Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
163        Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
164        Statement::CreateMaterializedView(stmt) => {
165            ddl::describe_create_materialized_view(&scx, stmt)?
166        }
167        Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?,
168        Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?,
169        Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?,
170
171        // `ACL` statements.
172        Statement::AlterOwner(stmt) => acl::describe_alter_owner(&scx, stmt)?,
173        Statement::GrantRole(stmt) => acl::describe_grant_role(&scx, stmt)?,
174        Statement::RevokeRole(stmt) => acl::describe_revoke_role(&scx, stmt)?,
175        Statement::GrantPrivileges(stmt) => acl::describe_grant_privileges(&scx, stmt)?,
176        Statement::RevokePrivileges(stmt) => acl::describe_revoke_privileges(&scx, stmt)?,
177        Statement::AlterDefaultPrivileges(stmt) => {
178            acl::describe_alter_default_privileges(&scx, stmt)?
179        }
180        Statement::ReassignOwned(stmt) => acl::describe_reassign_owned(&scx, stmt)?,
181
182        // `SHOW` statements.
183        Statement::Show(ShowStatement::ShowColumns(stmt)) => {
184            show::show_columns(&scx, stmt)?.describe()?
185        }
186        Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
187            show::describe_show_create_connection(&scx, stmt)?
188        }
189        Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
190            show::describe_show_create_cluster(&scx, stmt)?
191        }
192        Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
193            show::describe_show_create_index(&scx, stmt)?
194        }
195        Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
196            show::describe_show_create_sink(&scx, stmt)?
197        }
198        Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
199            show::describe_show_create_source(&scx, stmt)?
200        }
201        Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
202            show::describe_show_create_table(&scx, stmt)?
203        }
204        Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
205            show::describe_show_create_view(&scx, stmt)?
206        }
207        Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
208            show::describe_show_create_materialized_view(&scx, stmt)?
209        }
210        Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
211            show::describe_show_create_type(&scx, stmt)?
212        }
213        Statement::Show(ShowStatement::ShowObjects(stmt)) => {
214            show::show_objects(&scx, stmt)?.describe()?
215        }
216
217        // SCL statements.
218        Statement::Close(stmt) => scl::describe_close(&scx, stmt)?,
219        Statement::Deallocate(stmt) => scl::describe_deallocate(&scx, stmt)?,
220        Statement::Declare(stmt) => scl::describe_declare(&scx, stmt, param_types_in)?,
221        Statement::Discard(stmt) => scl::describe_discard(&scx, stmt)?,
222        Statement::Execute(stmt) => scl::describe_execute(&scx, stmt)?,
223        Statement::Fetch(stmt) => scl::describe_fetch(&scx, stmt)?,
224        Statement::Prepare(stmt) => scl::describe_prepare(&scx, stmt)?,
225        Statement::ResetVariable(stmt) => scl::describe_reset_variable(&scx, stmt)?,
226        Statement::SetVariable(stmt) => scl::describe_set_variable(&scx, stmt)?,
227        Statement::Show(ShowStatement::ShowVariable(stmt)) => {
228            scl::describe_show_variable(&scx, stmt)?
229        }
230
231        // DML statements.
232        Statement::Copy(stmt) => dml::describe_copy(&scx, stmt)?,
233        Statement::Delete(stmt) => dml::describe_delete(&scx, stmt)?,
234        Statement::ExplainPlan(stmt) => dml::describe_explain_plan(&scx, stmt)?,
235        Statement::ExplainPushdown(stmt) => dml::describe_explain_pushdown(&scx, stmt)?,
236        Statement::ExplainAnalyzeObject(stmt) => dml::describe_explain_analyze_object(&scx, stmt)?,
237        Statement::ExplainAnalyzeCluster(stmt) => {
238            dml::describe_explain_analyze_cluster(&scx, stmt)?
239        }
240        Statement::ExplainTimestamp(stmt) => dml::describe_explain_timestamp(&scx, stmt)?,
241        Statement::ExplainSinkSchema(stmt) => dml::describe_explain_schema(&scx, stmt)?,
242        Statement::Insert(stmt) => dml::describe_insert(&scx, stmt)?,
243        Statement::Select(stmt) => dml::describe_select(&scx, stmt)?,
244        Statement::Subscribe(stmt) => dml::describe_subscribe(&scx, stmt)?,
245        Statement::Update(stmt) => dml::describe_update(&scx, stmt)?,
246
247        // TCL statements.
248        Statement::Commit(stmt) => tcl::describe_commit(&scx, stmt)?,
249        Statement::Rollback(stmt) => tcl::describe_rollback(&scx, stmt)?,
250        Statement::SetTransaction(stmt) => tcl::describe_set_transaction(&scx, stmt)?,
251        Statement::StartTransaction(stmt) => tcl::describe_start_transaction(&scx, stmt)?,
252
253        // Other statements.
254        Statement::Raise(stmt) => raise::describe_raise(&scx, stmt)?,
255        Statement::Show(ShowStatement::InspectShard(stmt)) => {
256            scl::describe_inspect_shard(&scx, stmt)?
257        }
258        Statement::ValidateConnection(stmt) => validate::describe_validate_connection(&scx, stmt)?,
259        Statement::ExecuteUnitTest(_) => {
260            return Err(PlanError::Unsupported {
261                feature: "EXECUTE UNIT TEST statement".to_string(),
262                discussion_no: None,
263            });
264        }
265    };
266
267    let desc = desc.with_params(scx.finalize_param_types()?);
268    Ok(desc)
269}
270
271/// Produces a [`Plan`] from the purified statement `stmt`.
272///
273/// Planning is a pure, synchronous function and so requires that the provided
274/// `stmt` does does not depend on any external state. Statements that rely on
275/// external state must remove that state prior to calling this function via
276/// [`crate::pure::purify_statement`] or
277/// [`crate::pure::purify_create_materialized_view_options`].
278///
279/// The returned plan is tied to the state of the provided catalog. If the state
280/// of the catalog changes after planning, the validity of the plan is not
281/// guaranteed.
282///
283/// Note that if you want to do something else asynchronously (e.g. validating
284/// connections), these might want to take different code paths than
285/// `purify_statement`. Feel free to rationalize this by thinking of those
286/// statements as not necessarily depending on external state.
287#[mz_ore::instrument(level = "debug")]
288pub fn plan(
289    pcx: Option<&PlanContext>,
290    catalog: &dyn SessionCatalog,
291    stmt: Statement<Aug>,
292    params: &Params,
293    resolved_ids: &ResolvedIds,
294) -> Result<(Plan, ResolvedIds), PlanError> {
295    let param_types = params
296        // We need the `expected_types` here, not the `actual_types`! This is because
297        // `expected_types` is how the parameter expression (e.g. `$1`) looks "from the outside":
298        // `bind_parameters` will insert a cast from the actual type to the expected type.
299        .expected_types
300        .iter()
301        .enumerate()
302        .map(|(i, ty)| (i + 1, ty.clone()))
303        .collect();
304
305    let kind: StatementKind = (&stmt).into();
306    let permitted_plans = Plan::generated_from(&kind);
307
308    let scx = &mut StatementContext {
309        pcx,
310        catalog,
311        param_types: RefCell::new(param_types),
312        ambiguous_columns: RefCell::new(false),
313        sql_impl_resolved_ids: Arc::new(Mutex::new(ResolvedIds::empty())),
314    };
315
316    if resolved_ids
317        .items()
318        // Filter out items that may not have been created yet, such as sub-sources.
319        .filter_map(|id| catalog.try_get_item(id))
320        .any(|item| {
321            item.func().is_ok()
322                && item.name().qualifiers.schema_spec
323                    == SchemaSpecifier::Id(catalog.get_mz_unsafe_schema_id())
324        })
325    {
326        scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNSAFE_FUNCTIONS)?;
327    }
328
329    let plan = match stmt {
330        // DDL statements.
331        Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt),
332        Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt),
333        Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt),
334        Statement::AlterMaterializedViewApplyReplacement(stmt) => {
335            ddl::plan_alter_materialized_view_apply_replacement(scx, stmt)
336        }
337        Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt),
338        Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt),
339        Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt),
340        Statement::AlterRole(stmt) => ddl::plan_alter_role(scx, stmt),
341        Statement::AlterSecret(stmt) => ddl::plan_alter_secret(scx, stmt),
342        Statement::AlterSetCluster(stmt) => ddl::plan_alter_item_set_cluster(scx, stmt),
343        Statement::AlterSink(stmt) => ddl::plan_alter_sink(scx, stmt),
344        Statement::AlterSource(stmt) => ddl::plan_alter_source(scx, stmt),
345        Statement::AlterSystemSet(stmt) => ddl::plan_alter_system_set(scx, stmt),
346        Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt),
347        Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt),
348        Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt),
349        Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt),
350        Statement::Comment(stmt) => ddl::plan_comment(scx, stmt),
351        Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt),
352        Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt),
353        Statement::CreateConnection(stmt) => ddl::plan_create_connection(scx, stmt),
354        Statement::CreateDatabase(stmt) => ddl::plan_create_database(scx, stmt),
355        Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
356        Statement::CreateRole(stmt) => ddl::plan_create_role(scx, stmt),
357        Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
358        Statement::CreateSecret(stmt) => ddl::plan_create_secret(scx, stmt),
359        Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
360        Statement::CreateWebhookSource(stmt) => ddl::plan_create_webhook_source(scx, stmt),
361        Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
362        Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
363        Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
364        Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
365        Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
366        Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt),
367        Statement::CreateMaterializedView(stmt) => ddl::plan_create_materialized_view(scx, stmt),
368        Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt),
369        Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt),
370        Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt),
371
372        // `ACL` statements.
373        Statement::AlterOwner(stmt) => acl::plan_alter_owner(scx, stmt),
374        Statement::GrantRole(stmt) => acl::plan_grant_role(scx, stmt),
375        Statement::RevokeRole(stmt) => acl::plan_revoke_role(scx, stmt),
376        Statement::GrantPrivileges(stmt) => acl::plan_grant_privileges(scx, stmt),
377        Statement::RevokePrivileges(stmt) => acl::plan_revoke_privileges(scx, stmt),
378        Statement::AlterDefaultPrivileges(stmt) => acl::plan_alter_default_privileges(scx, stmt),
379        Statement::ReassignOwned(stmt) => acl::plan_reassign_owned(scx, stmt),
380
381        // DML statements.
382        Statement::Copy(stmt) => dml::plan_copy(scx, stmt),
383        Statement::Delete(stmt) => dml::plan_delete(scx, stmt, params),
384        Statement::ExplainPlan(stmt) => dml::plan_explain_plan(scx, stmt, params),
385        Statement::ExplainPushdown(stmt) => dml::plan_explain_pushdown(scx, stmt, params),
386        Statement::ExplainAnalyzeObject(stmt) => {
387            dml::plan_explain_analyze_object(scx, stmt, params)
388        }
389        Statement::ExplainAnalyzeCluster(stmt) => {
390            dml::plan_explain_analyze_cluster(scx, stmt, params)
391        }
392        Statement::ExplainTimestamp(stmt) => dml::plan_explain_timestamp(scx, stmt),
393        Statement::ExplainSinkSchema(stmt) => dml::plan_explain_schema(scx, stmt),
394        Statement::Insert(stmt) => dml::plan_insert(scx, stmt, params),
395        Statement::Select(stmt) => dml::plan_select(scx, stmt, params, None),
396        Statement::Subscribe(stmt) => dml::plan_subscribe(scx, stmt, params, None),
397        Statement::Update(stmt) => dml::plan_update(scx, stmt, params),
398
399        // `SHOW` statements.
400        Statement::Show(ShowStatement::ShowColumns(stmt)) => show::show_columns(scx, stmt)?.plan(),
401        Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
402            show::plan_show_create_connection(scx, stmt).map(Plan::ShowCreate)
403        }
404        Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
405            show::plan_show_create_cluster(scx, stmt).map(Plan::ShowCreate)
406        }
407        Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
408            show::plan_show_create_index(scx, stmt).map(Plan::ShowCreate)
409        }
410        Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
411            show::plan_show_create_sink(scx, stmt).map(Plan::ShowCreate)
412        }
413        Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
414            show::plan_show_create_source(scx, stmt).map(Plan::ShowCreate)
415        }
416        Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
417            show::plan_show_create_table(scx, stmt).map(Plan::ShowCreate)
418        }
419        Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
420            show::plan_show_create_view(scx, stmt).map(Plan::ShowCreate)
421        }
422        Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
423            show::plan_show_create_materialized_view(scx, stmt).map(Plan::ShowCreate)
424        }
425        Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
426            show::plan_show_create_type(scx, stmt).map(Plan::ShowCreate)
427        }
428        Statement::Show(ShowStatement::ShowObjects(stmt)) => show::show_objects(scx, stmt)?.plan(),
429
430        // SCL statements.
431        Statement::Close(stmt) => scl::plan_close(scx, stmt),
432        Statement::Deallocate(stmt) => scl::plan_deallocate(scx, stmt),
433        Statement::Declare(stmt) => scl::plan_declare(scx, stmt, params),
434        Statement::Discard(stmt) => scl::plan_discard(scx, stmt),
435        Statement::Execute(stmt) => scl::plan_execute(scx, stmt),
436        Statement::Fetch(stmt) => scl::plan_fetch(scx, stmt),
437        Statement::Prepare(stmt) => scl::plan_prepare(scx, stmt),
438        Statement::ResetVariable(stmt) => scl::plan_reset_variable(scx, stmt),
439        Statement::SetVariable(stmt) => scl::plan_set_variable(scx, stmt),
440        Statement::Show(ShowStatement::ShowVariable(stmt)) => scl::plan_show_variable(scx, stmt),
441
442        // TCL statements.
443        Statement::Commit(stmt) => tcl::plan_commit(scx, stmt),
444        Statement::Rollback(stmt) => tcl::plan_rollback(scx, stmt),
445        Statement::SetTransaction(stmt) => tcl::plan_set_transaction(scx, stmt),
446        Statement::StartTransaction(stmt) => tcl::plan_start_transaction(scx, stmt),
447
448        // Other statements.
449        Statement::Raise(stmt) => raise::plan_raise(scx, stmt),
450        Statement::Show(ShowStatement::InspectShard(stmt)) => scl::plan_inspect_shard(scx, stmt),
451        Statement::ValidateConnection(stmt) => validate::plan_validate_connection(scx, stmt),
452        Statement::ExecuteUnitTest(_) => {
453            return Err(PlanError::Unsupported {
454                feature: "EXECUTE UNIT TEST statement".to_string(),
455                discussion_no: None,
456            });
457        }
458    };
459
460    if let Ok(plan) = &plan {
461        mz_ore::soft_assert_no_log!(
462            permitted_plans.contains(&PlanKind::from(plan)),
463            "plan {:?}, permitted plans {:?}",
464            plan,
465            permitted_plans
466        );
467    }
468
469    // Return the plan along with any resolved IDs accumulated from sql_impl
470    // function bodies. These are kept separate from the main resolved_ids
471    // because they are implementation details of the functions, not real
472    // dependencies of the statement. They should only be used for the
473    // restrict_to_user_objects RBAC check.
474    let sql_impl_ids = scx
475        .sql_impl_resolved_ids
476        .lock()
477        .expect("planning is single-threaded")
478        .clone();
479    plan.map(|p| (p, sql_impl_ids))
480}
481
482pub fn plan_copy_from(
483    pcx: &PlanContext,
484    catalog: &dyn SessionCatalog,
485    target_id: CatalogItemId,
486    target_name: String,
487    columns: Vec<ColumnIndex>,
488    rows: Vec<mz_repr::Row>,
489) -> Result<super::HirRelationExpr, PlanError> {
490    query::plan_copy_from_rows(pcx, catalog, target_id, target_name, columns, rows)
491}
492
493/// Whether a SQL object type can be interpreted as matching the type of the given catalog item.
494/// For example, if `v` is a view, `DROP SOURCE v` should not work, since Source and View
495/// are non-matching types.
496///
497/// For now tables are treated as a special kind of source in Materialize, so just
498/// allow `TABLE` to refer to either.
499impl PartialEq<ObjectType> for CatalogItemType {
500    fn eq(&self, other: &ObjectType) -> bool {
501        match (self, other) {
502            (CatalogItemType::Source, ObjectType::Source)
503            | (CatalogItemType::Table, ObjectType::Table)
504            | (CatalogItemType::Sink, ObjectType::Sink)
505            | (CatalogItemType::View, ObjectType::View)
506            | (CatalogItemType::MaterializedView, ObjectType::MaterializedView)
507            | (CatalogItemType::Index, ObjectType::Index)
508            | (CatalogItemType::Type, ObjectType::Type)
509            | (CatalogItemType::Secret, ObjectType::Secret)
510            | (CatalogItemType::Connection, ObjectType::Connection) => true,
511            (_, _) => false,
512        }
513    }
514}
515
516impl PartialEq<CatalogItemType> for ObjectType {
517    fn eq(&self, other: &CatalogItemType) -> bool {
518        other == self
519    }
520}
521
522/// Immutable state that applies to the planning of an entire `Statement`.
523#[derive(Debug, Clone)]
524pub struct StatementContext<'a> {
525    /// The optional PlanContext, which will be present for statements that execute
526    /// within the OneShot QueryLifetime and None otherwise (views). This is an
527    /// awkward field and should probably be relocated to a place that fits our
528    /// execution model more closely.
529    pcx: Option<&'a PlanContext>,
530    pub catalog: &'a dyn SessionCatalog,
531    /// The types of the parameters in the query. This is filled in as planning
532    /// occurs.
533    pub param_types: RefCell<BTreeMap<usize, SqlScalarType>>,
534    /// Whether the statement contains an expression that can make the exact column list
535    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`. This is filled in as planning occurs.
536    pub ambiguous_columns: RefCell<bool>,
537    /// Accumulates resolved IDs from SQL-implemented function bodies (`sql_impl_func`,
538    /// `sql_impl_table_func`). These are kept separate from the statement's main
539    /// `resolved_ids` because they are implementation details of the functions, not
540    /// real dependencies of the statement. They are only used for the
541    /// `restrict_to_user_objects` RBAC check.
542    ///
543    /// Uses `Arc<Mutex<_>>` so that cloned `StatementContext`s (as in `sql_impl`)
544    /// share the same underlying storage. `Arc` (vs `Rc`) is needed because
545    /// `StatementContext` must be `Send`.
546    pub sql_impl_resolved_ids: Arc<Mutex<ResolvedIds>>,
547}
548
549impl<'a> StatementContext<'a> {
550    pub fn new(
551        pcx: Option<&'a PlanContext>,
552        catalog: &'a dyn SessionCatalog,
553    ) -> StatementContext<'a> {
554        StatementContext {
555            pcx,
556            catalog,
557            param_types: Default::default(),
558            ambiguous_columns: RefCell::new(false),
559            sql_impl_resolved_ids: Arc::new(Mutex::new(ResolvedIds::empty())),
560        }
561    }
562
563    /// Returns the schemas in order of search_path that exist in the catalog.
564    pub fn current_schemas(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
565        self.catalog.search_path()
566    }
567
568    /// Returns the first schema from the search_path that exist in the catalog,
569    /// or None if there are none.
570    pub fn current_schema(&self) -> Option<&(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
571        self.current_schemas().into_iter().next()
572    }
573
574    pub fn pcx(&self) -> Result<&PlanContext, PlanError> {
575        self.pcx.ok_or_else(|| sql_err!("no plan context"))
576    }
577
578    pub fn allocate_full_name(&self, name: PartialItemName) -> Result<FullItemName, PlanError> {
579        let (database, schema): (RawDatabaseSpecifier, String) = match (name.database, name.schema)
580        {
581            (None, None) => {
582                let Some((database, schema)) = self.current_schema() else {
583                    return Err(PlanError::InvalidSchemaName);
584                };
585                let schema = self.get_schema(database, schema);
586                let database = match schema.database() {
587                    ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
588                    ResolvedDatabaseSpecifier::Id(id) => {
589                        RawDatabaseSpecifier::Name(self.catalog.get_database(id).name().to_string())
590                    }
591                };
592                (database, schema.name().schema.clone())
593            }
594            (None, Some(schema)) => {
595                if is_system_schema(&schema) {
596                    (RawDatabaseSpecifier::Ambient, schema)
597                } else {
598                    match self.catalog.active_database_name() {
599                        Some(name) => (RawDatabaseSpecifier::Name(name.to_string()), schema),
600                        None => {
601                            sql_bail!(
602                                "no database specified for non-system schema and no active database"
603                            )
604                        }
605                    }
606                }
607            }
608            (Some(_database), None) => {
609                // This shouldn't be possible. Refactor the datastructure to
610                // make it not exist.
611                sql_bail!("unreachable: specified the database but no schema")
612            }
613            (Some(database), Some(schema)) => (RawDatabaseSpecifier::Name(database), schema),
614        };
615        let item = name.item;
616        Ok(FullItemName {
617            database,
618            schema,
619            item,
620        })
621    }
622
623    pub fn allocate_qualified_name(
624        &self,
625        name: PartialItemName,
626    ) -> Result<QualifiedItemName, PlanError> {
627        let full_name = self.allocate_full_name(name)?;
628        let database_spec = match full_name.database {
629            RawDatabaseSpecifier::Ambient => ResolvedDatabaseSpecifier::Ambient,
630            RawDatabaseSpecifier::Name(name) => ResolvedDatabaseSpecifier::Id(
631                self.resolve_database(&UnresolvedDatabaseName(Ident::new(name)?))?
632                    .id(),
633            ),
634        };
635        let schema_spec = self
636            .resolve_schema_in_database(&database_spec, &Ident::new(full_name.schema)?)?
637            .id()
638            .clone();
639        Ok(QualifiedItemName {
640            qualifiers: ItemQualifiers {
641                database_spec,
642                schema_spec,
643            },
644            item: full_name.item,
645        })
646    }
647
648    pub fn allocate_temporary_full_name(&self, name: PartialItemName) -> FullItemName {
649        FullItemName {
650            database: RawDatabaseSpecifier::Ambient,
651            schema: name
652                .schema
653                .unwrap_or_else(|| mz_repr::namespaces::MZ_TEMP_SCHEMA.to_owned()),
654            item: name.item,
655        }
656    }
657
658    pub fn allocate_temporary_qualified_name(
659        &self,
660        name: PartialItemName,
661    ) -> Result<QualifiedItemName, PlanError> {
662        // Compare against the MZ_TEMP_SCHEMA constant directly instead of calling
663        // get_schema(), because with lazy temporary schema creation, the temp
664        // schema may not exist yet. (This is similar to what `allocate_temporary_full_name` was
665        // doing already before making temporary schemas lazy.)
666        if let Some(schema_name) = name.schema {
667            if schema_name != mz_repr::namespaces::MZ_TEMP_SCHEMA {
668                return Err(PlanError::InvalidTemporarySchema);
669            }
670        }
671
672        Ok(QualifiedItemName {
673            qualifiers: ItemQualifiers {
674                database_spec: ResolvedDatabaseSpecifier::Ambient,
675                schema_spec: SchemaSpecifier::Temporary,
676            },
677            item: name.item,
678        })
679    }
680
681    // Creates a `ResolvedItemName::Item` from a `GlobalId` and an
682    // `UnresolvedItemName`.
683    pub fn allocate_resolved_item_name(
684        &self,
685        id: CatalogItemId,
686        name: UnresolvedItemName,
687    ) -> Result<ResolvedItemName, PlanError> {
688        let partial = normalize::unresolved_item_name(name)?;
689        let qualified = self.allocate_qualified_name(partial.clone())?;
690        let full_name = self.allocate_full_name(partial)?;
691        Ok(ResolvedItemName::Item {
692            id,
693            qualifiers: qualified.qualifiers,
694            full_name,
695            print_id: true,
696            version: RelationVersionSelector::Latest,
697        })
698    }
699
700    pub fn active_database(&self) -> Option<&DatabaseId> {
701        self.catalog.active_database()
702    }
703
704    pub fn resolve_optional_schema(
705        &self,
706        schema_name: &Option<ResolvedSchemaName>,
707    ) -> Result<SchemaSpecifier, PlanError> {
708        match schema_name {
709            Some(ResolvedSchemaName::Schema { schema_spec, .. }) => Ok(schema_spec.clone()),
710            None => self.resolve_active_schema().map(|spec| spec.clone()),
711            Some(ResolvedSchemaName::Error) => {
712                unreachable!("should have been handled by name resolution")
713            }
714        }
715    }
716
717    pub fn resolve_active_schema(&self) -> Result<&SchemaSpecifier, PlanError> {
718        match self.current_schema() {
719            Some((_db, schema)) => Ok(schema),
720            None => Err(PlanError::InvalidSchemaName),
721        }
722    }
723
724    pub fn get_cluster(&self, id: &ClusterId) -> &dyn CatalogCluster<'_> {
725        self.catalog.get_cluster(*id)
726    }
727
728    pub fn resolve_database(
729        &self,
730        name: &UnresolvedDatabaseName,
731    ) -> Result<&dyn CatalogDatabase, PlanError> {
732        let name = normalize::ident_ref(&name.0);
733        Ok(self.catalog.resolve_database(name)?)
734    }
735
736    pub fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase {
737        self.catalog.get_database(id)
738    }
739
740    pub fn resolve_schema_in_database(
741        &self,
742        database_spec: &ResolvedDatabaseSpecifier,
743        schema: &Ident,
744    ) -> Result<&dyn CatalogSchema, PlanError> {
745        let schema = normalize::ident_ref(schema);
746        Ok(self
747            .catalog
748            .resolve_schema_in_database(database_spec, schema)?)
749    }
750
751    pub fn resolve_schema(
752        &self,
753        name: UnresolvedSchemaName,
754    ) -> Result<&dyn CatalogSchema, PlanError> {
755        let name = normalize::unresolved_schema_name(name)?;
756        Ok(self
757            .catalog
758            .resolve_schema(name.database.as_deref(), &name.schema)?)
759    }
760
761    pub fn get_schema(
762        &self,
763        database_spec: &ResolvedDatabaseSpecifier,
764        schema_spec: &SchemaSpecifier,
765    ) -> &dyn CatalogSchema {
766        self.catalog.get_schema(database_spec, schema_spec)
767    }
768
769    pub fn resolve_item(&self, name: RawItemName) -> Result<&dyn CatalogItem, PlanError> {
770        match name {
771            RawItemName::Name(name) => {
772                let name = normalize::unresolved_item_name(name)?;
773                Ok(self.catalog.resolve_item(&name)?)
774            }
775            RawItemName::Id(id, _, _) => {
776                let gid = id.parse()?;
777                Ok(self.catalog.get_item(&gid))
778            }
779        }
780    }
781
782    pub fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem {
783        self.catalog.get_item(id)
784    }
785
786    pub fn get_item_by_resolved_name(
787        &self,
788        name: &ResolvedItemName,
789    ) -> Result<Box<dyn CatalogCollectionItem + '_>, PlanError> {
790        match name {
791            ResolvedItemName::Item { id, version, .. } => {
792                Ok(self.get_item(id).at_version(*version))
793            }
794            ResolvedItemName::Cte { .. } => sql_bail!("non-user item"),
795            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
796        }
797    }
798
799    pub fn get_column_by_resolved_name(
800        &self,
801        name: &ColumnName<Aug>,
802    ) -> Result<(Box<dyn CatalogCollectionItem + '_>, usize), PlanError> {
803        match (&name.relation, &name.column) {
804            (
805                ResolvedItemName::Item { id, version, .. },
806                ResolvedColumnReference::Column { index, .. },
807            ) => {
808                let item = self.get_item(id).at_version(*version);
809                Ok((item, *index))
810            }
811            _ => unreachable!(
812                "get_column_by_resolved_name errors should have been caught in name resolution"
813            ),
814        }
815    }
816
817    pub fn resolve_function(
818        &self,
819        name: UnresolvedItemName,
820    ) -> Result<&dyn CatalogItem, PlanError> {
821        let name = normalize::unresolved_item_name(name)?;
822        Ok(self.catalog.resolve_function(&name)?)
823    }
824
825    pub fn resolve_cluster(
826        &self,
827        name: Option<&Ident>,
828    ) -> Result<&dyn CatalogCluster<'_>, PlanError> {
829        let name = name.map(|name| name.as_str());
830        Ok(self.catalog.resolve_cluster(name)?)
831    }
832
833    pub fn resolve_type(&self, mut ty: mz_pgrepr::Type) -> Result<ResolvedDataType, PlanError> {
834        // Ignore precision constraints on date/time types until we support
835        // it. This should be safe enough because our types are wide enough
836        // to support the maximum possible precision.
837        //
838        // See: https://github.com/MaterializeInc/database-issues/issues/3179
839        match &mut ty {
840            mz_pgrepr::Type::Interval { constraints } => *constraints = None,
841            mz_pgrepr::Type::Time { precision } => *precision = None,
842            mz_pgrepr::Type::TimeTz { precision } => *precision = None,
843            mz_pgrepr::Type::Timestamp { precision } => *precision = None,
844            mz_pgrepr::Type::TimestampTz { precision } => *precision = None,
845            _ => (),
846        }
847        // NOTE(benesch): this *looks* gross, but it is
848        // safe enough. The `fmt::Display`
849        // representation on `pgrepr::Type` promises to
850        // produce an unqualified type name that does
851        // not require quoting.
852        let mut ty = if ty.oid() >= FIRST_USER_OID {
853            sql_bail!("internal error, unexpected user type: {ty:?} ");
854        } else if ty.oid() < FIRST_MATERIALIZE_OID {
855            format!("pg_catalog.{}", ty)
856        } else {
857            // This relies on all non-PG types existing in `mz_catalog`, which is annoying.
858            format!("mz_catalog.{}", ty)
859        };
860        // TODO(benesch): converting `json` to `jsonb`
861        // is wrong. We ought to support the `json` type
862        // directly.
863        if ty == "pg_catalog.json" {
864            ty = "pg_catalog.jsonb".into();
865        }
866        let data_type = mz_sql_parser::parser::parse_data_type(&ty)?;
867        let (data_type, _) = names::resolve(self.catalog, data_type)?;
868        Ok(data_type)
869    }
870
871    pub fn get_object_type(&self, id: &ObjectId) -> ObjectType {
872        self.catalog.get_object_type(id)
873    }
874
875    pub fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType {
876        match id {
877            SystemObjectId::Object(id) => SystemObjectType::Object(self.get_object_type(id)),
878            SystemObjectId::System => SystemObjectType::System,
879        }
880    }
881
882    /// Returns an error if the named `FeatureFlag` is not set to `on`.
883    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
884        flag.require(self.catalog.system_vars())?;
885        Ok(())
886    }
887
888    /// Returns true if the named [`FeatureFlag`] is set to `on`, returns false otherwise.
889    pub fn is_feature_flag_enabled(&self, flag: &'static FeatureFlag) -> bool {
890        self.require_feature_flag(flag).is_ok()
891    }
892
893    pub fn finalize_param_types(self) -> Result<Vec<SqlScalarType>, PlanError> {
894        let param_types = self.param_types.into_inner();
895        let mut out = vec![];
896        for (i, (n, typ)) in param_types.into_iter().enumerate() {
897            if n != i + 1 {
898                sql_bail!("unable to infer type for parameter ${}", i + 1);
899            }
900            out.push(typ);
901        }
902        Ok(out)
903    }
904
905    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
906    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
907    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
908        self.catalog.humanize_sql_scalar_type(typ, postgres_compat)
909    }
910
911    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
912    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
913    pub fn humanize_column_type(&self, typ: &SqlColumnType, postgres_compat: bool) -> String {
914        self.catalog.humanize_sql_column_type(typ, postgres_compat)
915    }
916
917    pub fn relation_desc_into_table_defs(
918        &self,
919        desc: &RelationDesc,
920    ) -> Result<(Vec<ColumnDef<Aug>>, Vec<TableConstraint<Aug>>), PlanError> {
921        let mut columns = vec![];
922        let mut null_cols = BTreeSet::new();
923        for (column_name, column_type) in desc.iter() {
924            let name = Ident::new(column_name.as_str().to_owned())?;
925
926            let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
927            let data_type = self.resolve_type(ty)?;
928
929            let options = if !column_type.nullable {
930                null_cols.insert(columns.len());
931                vec![mz_sql_parser::ast::ColumnOptionDef {
932                    name: None,
933                    option: mz_sql_parser::ast::ColumnOption::NotNull,
934                }]
935            } else {
936                vec![]
937            };
938
939            columns.push(ColumnDef {
940                name,
941                data_type,
942                collation: None,
943                options,
944            });
945        }
946
947        let mut table_constraints = vec![];
948        for key in desc.typ().keys.iter() {
949            let mut col_names = vec![];
950            for col_idx in key {
951                if !null_cols.contains(col_idx) {
952                    // Note that alternatively we could support NULL values in keys with `NULLS NOT
953                    // DISTINCT` semantics, which treats `NULL` as a distinct value.
954                    sql_bail!(
955                        "[internal error] key columns must be NOT NULL when generating table constraints"
956                    );
957                }
958                col_names.push(columns[*col_idx].name.clone());
959            }
960            table_constraints.push(TableConstraint::Unique {
961                name: None,
962                columns: col_names,
963                is_primary: false,
964                nulls_not_distinct: false,
965            });
966        }
967
968        Ok((columns, table_constraints))
969    }
970
971    pub fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
972        self.catalog.get_owner_id(id)
973    }
974
975    pub fn humanize_resolved_name(
976        &self,
977        name: &ResolvedItemName,
978    ) -> Result<PartialItemName, PlanError> {
979        let item = self.get_item_by_resolved_name(name)?;
980        Ok(self.catalog.minimal_qualification(item.name()))
981    }
982
983    /// WARNING! This style of name resolution assumes the referred-to objects exists (i.e. panics
984    /// if objects do not exist) so should never be used to handle user input.
985    pub fn dangerous_resolve_name(&self, name: Vec<&str>) -> ResolvedItemName {
986        tracing::trace!("dangerous_resolve_name {:?}", name);
987        // Note: Using unchecked here is okay because this function is already dangerous.
988        let name: Vec<_> = name.into_iter().map(Ident::new_unchecked).collect();
989        let name = UnresolvedItemName::qualified(&name);
990        let entry = match self.resolve_item(RawItemName::Name(name.clone())) {
991            Ok(entry) => entry,
992            Err(_) => self
993                .resolve_function(name.clone())
994                .expect("name referred to an existing object"),
995        };
996
997        let partial = normalize::unresolved_item_name(name).unwrap();
998        let full_name = self.allocate_full_name(partial).unwrap();
999
1000        ResolvedItemName::Item {
1001            id: entry.id(),
1002            qualifiers: entry.name().qualifiers.clone(),
1003            full_name,
1004            print_id: true,
1005            version: RelationVersionSelector::Latest,
1006        }
1007    }
1008}
1009
1010pub fn resolve_cluster_for_materialized_view<'a>(
1011    catalog: &'a dyn SessionCatalog,
1012    stmt: &CreateMaterializedViewStatement<Aug>,
1013) -> Result<ClusterId, PlanError> {
1014    Ok(match &stmt.in_cluster {
1015        None => catalog.resolve_cluster(None)?.id(),
1016        Some(in_cluster) => in_cluster.id,
1017    })
1018}
1019
1020/// Statement classification as documented by [`plan`].
1021#[derive(Debug, Clone, Copy)]
1022pub enum StatementClassification {
1023    ACL,
1024    DDL,
1025    DML,
1026    Other,
1027    SCL,
1028    Show,
1029    TCL,
1030}
1031
1032impl StatementClassification {
1033    pub fn is_ddl(&self) -> bool {
1034        matches!(self, StatementClassification::DDL)
1035    }
1036}
1037
1038impl<T: mz_sql_parser::ast::AstInfo> From<&Statement<T>> for StatementClassification {
1039    fn from(value: &Statement<T>) -> Self {
1040        use StatementClassification::*;
1041
1042        match value {
1043            // DDL statements.
1044            Statement::AlterCluster(_) => DDL,
1045            Statement::AlterConnection(_) => DDL,
1046            Statement::AlterIndex(_) => DDL,
1047            Statement::AlterMaterializedViewApplyReplacement(_) => DDL,
1048            Statement::AlterObjectRename(_) => DDL,
1049            Statement::AlterObjectSwap(_) => DDL,
1050            Statement::AlterNetworkPolicy(_) => DDL,
1051            Statement::AlterRetainHistory(_) => DDL,
1052            Statement::AlterRole(_) => DDL,
1053            Statement::AlterSecret(_) => DDL,
1054            Statement::AlterSetCluster(_) => DDL,
1055            Statement::AlterSink(_) => DDL,
1056            Statement::AlterSource(_) => DDL,
1057            Statement::AlterSystemSet(_) => DDL,
1058            Statement::AlterSystemReset(_) => DDL,
1059            Statement::AlterSystemResetAll(_) => DDL,
1060            Statement::AlterTableAddColumn(_) => DDL,
1061            Statement::Comment(_) => DDL,
1062            Statement::CreateCluster(_) => DDL,
1063            Statement::CreateClusterReplica(_) => DDL,
1064            Statement::CreateConnection(_) => DDL,
1065            Statement::CreateDatabase(_) => DDL,
1066            Statement::CreateIndex(_) => DDL,
1067            Statement::CreateRole(_) => DDL,
1068            Statement::CreateSchema(_) => DDL,
1069            Statement::CreateSecret(_) => DDL,
1070            Statement::CreateSink(_) => DDL,
1071            Statement::CreateWebhookSource(_) => DDL,
1072            Statement::CreateSource(_) => DDL,
1073            Statement::CreateSubsource(_) => DDL,
1074            Statement::CreateTable(_) => DDL,
1075            Statement::CreateTableFromSource(_) => DDL,
1076            Statement::CreateType(_) => DDL,
1077            Statement::CreateView(_) => DDL,
1078            Statement::CreateMaterializedView(_) => DDL,
1079            Statement::CreateNetworkPolicy(_) => DDL,
1080            Statement::DropObjects(_) => DDL,
1081            Statement::DropOwned(_) => DDL,
1082
1083            // `ACL` statements.
1084            Statement::AlterOwner(_) => ACL,
1085            Statement::GrantRole(_) => ACL,
1086            Statement::RevokeRole(_) => ACL,
1087            Statement::GrantPrivileges(_) => ACL,
1088            Statement::RevokePrivileges(_) => ACL,
1089            Statement::AlterDefaultPrivileges(_) => ACL,
1090            Statement::ReassignOwned(_) => ACL,
1091
1092            // DML statements.
1093            Statement::Copy(_) => DML,
1094            Statement::Delete(_) => DML,
1095            Statement::ExplainPlan(_) => DML,
1096            Statement::ExplainPushdown(_) => DML,
1097            Statement::ExplainAnalyzeObject(_) => DML,
1098            Statement::ExplainAnalyzeCluster(_) => DML,
1099            Statement::ExplainTimestamp(_) => DML,
1100            Statement::ExplainSinkSchema(_) => DML,
1101            Statement::Insert(_) => DML,
1102            Statement::Select(_) => DML,
1103            Statement::Subscribe(_) => DML,
1104            Statement::Update(_) => DML,
1105
1106            // `SHOW` statements.
1107            Statement::Show(ShowStatement::ShowColumns(_)) => Show,
1108            Statement::Show(ShowStatement::ShowCreateConnection(_)) => Show,
1109            Statement::Show(ShowStatement::ShowCreateCluster(_)) => Show,
1110            Statement::Show(ShowStatement::ShowCreateIndex(_)) => Show,
1111            Statement::Show(ShowStatement::ShowCreateSink(_)) => Show,
1112            Statement::Show(ShowStatement::ShowCreateSource(_)) => Show,
1113            Statement::Show(ShowStatement::ShowCreateTable(_)) => Show,
1114            Statement::Show(ShowStatement::ShowCreateView(_)) => Show,
1115            Statement::Show(ShowStatement::ShowCreateMaterializedView(_)) => Show,
1116            Statement::Show(ShowStatement::ShowCreateType(_)) => Show,
1117            Statement::Show(ShowStatement::ShowObjects(_)) => Show,
1118
1119            // SCL statements.
1120            Statement::Close(_) => SCL,
1121            Statement::Deallocate(_) => SCL,
1122            Statement::Declare(_) => SCL,
1123            Statement::Discard(_) => SCL,
1124            Statement::Execute(_) => SCL,
1125            Statement::Fetch(_) => SCL,
1126            Statement::Prepare(_) => SCL,
1127            Statement::ResetVariable(_) => SCL,
1128            Statement::SetVariable(_) => SCL,
1129            Statement::Show(ShowStatement::ShowVariable(_)) => SCL,
1130
1131            // TCL statements.
1132            Statement::Commit(_) => TCL,
1133            Statement::Rollback(_) => TCL,
1134            Statement::SetTransaction(_) => TCL,
1135            Statement::StartTransaction(_) => TCL,
1136
1137            // Other statements.
1138            Statement::Raise(_) => Other,
1139            Statement::Show(ShowStatement::InspectShard(_)) => Other,
1140            Statement::ValidateConnection(_) => Other,
1141            Statement::ExecuteUnitTest(_) => Other,
1142        }
1143    }
1144}