Skip to main content

mz_sql/plan/
statement.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Statement planning.
11//!
12//! This module houses the entry points for planning a SQL statement.
13
14use std::cell::RefCell;
15use std::collections::{BTreeMap, BTreeSet};
16
17use mz_repr::namespaces::is_system_schema;
18use mz_repr::{
19    CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType,
20};
21use mz_sql_parser::ast::{
22    ColumnDef, ColumnName, CreateMaterializedViewStatement, RawItemName, ShowStatement,
23    StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName,
24};
25use mz_storage_types::connections::Connection;
26
27use crate::ast::{Ident, Statement, UnresolvedItemName};
28use crate::catalog::{
29    CatalogCluster, CatalogCollectionItem, CatalogDatabase, CatalogItem, CatalogItemType,
30    CatalogSchema, ObjectType, SessionCatalog, SystemObjectType,
31};
32use crate::names::{
33    self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
34    QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
35    ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
36    SystemObjectId,
37};
38use crate::normalize;
39use crate::plan::error::PlanError;
40use crate::plan::{Params, Plan, PlanContext, PlanKind, query};
41use crate::session::vars::FeatureFlag;
42
43mod acl;
44pub(crate) mod ddl;
45mod dml;
46mod raise;
47mod scl;
48pub(crate) mod show;
49mod tcl;
50mod validate;
51
52use crate::session::vars;
53pub(crate) use ddl::PgConfigOptionExtracted;
54use mz_controller_types::ClusterId;
55use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
56use mz_repr::role_id::RoleId;
57
58/// Describes the output of a SQL statement.
59#[derive(Debug, Clone, Eq, PartialEq)]
60pub struct StatementDesc {
61    /// The shape of the rows produced by the statement, if the statement
62    /// produces rows.
63    pub relation_desc: Option<RelationDesc>,
64    /// The determined types of the parameters in the statement, if any.
65    pub param_types: Vec<SqlScalarType>,
66    /// Whether the statement is a `COPY` statement.
67    pub is_copy: bool,
68}
69
70impl StatementDesc {
71    pub fn new(relation_desc: Option<RelationDesc>) -> Self {
72        StatementDesc {
73            relation_desc,
74            param_types: vec![],
75            is_copy: false,
76        }
77    }
78
79    /// Reports the number of columns in the statement's result set, or zero if
80    /// the statement does not return rows.
81    pub fn arity(&self) -> usize {
82        self.relation_desc
83            .as_ref()
84            .map(|desc| desc.typ().column_types.len())
85            .unwrap_or(0)
86    }
87
88    fn with_params(mut self, param_types: Vec<SqlScalarType>) -> Self {
89        self.param_types = param_types;
90        self
91    }
92
93    fn with_is_copy(mut self) -> Self {
94        self.is_copy = true;
95        self
96    }
97}
98
99/// Creates a description of the purified statement `stmt`.
100///
101/// See the documentation of [`StatementDesc`] for details.
102pub fn describe(
103    pcx: &PlanContext,
104    catalog: &dyn SessionCatalog,
105    stmt: Statement<Aug>,
106    param_types_in: &[Option<SqlScalarType>],
107) -> Result<StatementDesc, PlanError> {
108    let mut param_types = BTreeMap::new();
109    for (i, ty) in param_types_in.iter().enumerate() {
110        if let Some(ty) = ty {
111            param_types.insert(i + 1, ty.clone());
112        }
113    }
114
115    let scx = StatementContext {
116        pcx: Some(pcx),
117        catalog,
118        param_types: RefCell::new(param_types),
119        ambiguous_columns: RefCell::new(false),
120    };
121
122    let desc = match stmt {
123        // DDL statements.
124        Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?,
125        Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?,
126        Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?,
127        Statement::AlterMaterializedViewApplyReplacement(stmt) => {
128            ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)?
129        }
130        Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?,
131        Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?,
132        Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?,
133        Statement::AlterRole(stmt) => ddl::describe_alter_role(&scx, stmt)?,
134        Statement::AlterSecret(stmt) => ddl::describe_alter_secret_options(&scx, stmt)?,
135        Statement::AlterSetCluster(stmt) => ddl::describe_alter_set_cluster(&scx, stmt)?,
136        Statement::AlterSink(stmt) => ddl::describe_alter_sink(&scx, stmt)?,
137        Statement::AlterSource(stmt) => ddl::describe_alter_source(&scx, stmt)?,
138        Statement::AlterSystemSet(stmt) => ddl::describe_alter_system_set(&scx, stmt)?,
139        Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?,
140        Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?,
141        Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?,
142        Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?,
143        Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?,
144        Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?,
145        Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?,
146        Statement::CreateConnection(stmt) => ddl::describe_create_connection(&scx, stmt)?,
147        Statement::CreateDatabase(stmt) => ddl::describe_create_database(&scx, stmt)?,
148        Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
149        Statement::CreateRole(stmt) => ddl::describe_create_role(&scx, stmt)?,
150        Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
151        Statement::CreateSecret(stmt) => ddl::describe_create_secret(&scx, stmt)?,
152        Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
153        Statement::CreateWebhookSource(stmt) => ddl::describe_create_webhook_source(&scx, stmt)?,
154        Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
155        Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
156        Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
157        Statement::CreateTableFromSource(stmt) => {
158            ddl::describe_create_table_from_source(&scx, stmt)?
159        }
160        Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
161        Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
162        Statement::CreateMaterializedView(stmt) => {
163            ddl::describe_create_materialized_view(&scx, stmt)?
164        }
165        Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?,
166        Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?,
167        Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?,
168
169        // `ACL` statements.
170        Statement::AlterOwner(stmt) => acl::describe_alter_owner(&scx, stmt)?,
171        Statement::GrantRole(stmt) => acl::describe_grant_role(&scx, stmt)?,
172        Statement::RevokeRole(stmt) => acl::describe_revoke_role(&scx, stmt)?,
173        Statement::GrantPrivileges(stmt) => acl::describe_grant_privileges(&scx, stmt)?,
174        Statement::RevokePrivileges(stmt) => acl::describe_revoke_privileges(&scx, stmt)?,
175        Statement::AlterDefaultPrivileges(stmt) => {
176            acl::describe_alter_default_privileges(&scx, stmt)?
177        }
178        Statement::ReassignOwned(stmt) => acl::describe_reassign_owned(&scx, stmt)?,
179
180        // `SHOW` statements.
181        Statement::Show(ShowStatement::ShowColumns(stmt)) => {
182            show::show_columns(&scx, stmt)?.describe()?
183        }
184        Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
185            show::describe_show_create_connection(&scx, stmt)?
186        }
187        Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
188            show::describe_show_create_cluster(&scx, stmt)?
189        }
190        Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
191            show::describe_show_create_index(&scx, stmt)?
192        }
193        Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
194            show::describe_show_create_sink(&scx, stmt)?
195        }
196        Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
197            show::describe_show_create_source(&scx, stmt)?
198        }
199        Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
200            show::describe_show_create_table(&scx, stmt)?
201        }
202        Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
203            show::describe_show_create_view(&scx, stmt)?
204        }
205        Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
206            show::describe_show_create_materialized_view(&scx, stmt)?
207        }
208        Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
209            show::describe_show_create_type(&scx, stmt)?
210        }
211        Statement::Show(ShowStatement::ShowObjects(stmt)) => {
212            show::show_objects(&scx, stmt)?.describe()?
213        }
214
215        // SCL statements.
216        Statement::Close(stmt) => scl::describe_close(&scx, stmt)?,
217        Statement::Deallocate(stmt) => scl::describe_deallocate(&scx, stmt)?,
218        Statement::Declare(stmt) => scl::describe_declare(&scx, stmt, param_types_in)?,
219        Statement::Discard(stmt) => scl::describe_discard(&scx, stmt)?,
220        Statement::Execute(stmt) => scl::describe_execute(&scx, stmt)?,
221        Statement::Fetch(stmt) => scl::describe_fetch(&scx, stmt)?,
222        Statement::Prepare(stmt) => scl::describe_prepare(&scx, stmt)?,
223        Statement::ResetVariable(stmt) => scl::describe_reset_variable(&scx, stmt)?,
224        Statement::SetVariable(stmt) => scl::describe_set_variable(&scx, stmt)?,
225        Statement::Show(ShowStatement::ShowVariable(stmt)) => {
226            scl::describe_show_variable(&scx, stmt)?
227        }
228
229        // DML statements.
230        Statement::Copy(stmt) => dml::describe_copy(&scx, stmt)?,
231        Statement::Delete(stmt) => dml::describe_delete(&scx, stmt)?,
232        Statement::ExplainPlan(stmt) => dml::describe_explain_plan(&scx, stmt)?,
233        Statement::ExplainPushdown(stmt) => dml::describe_explain_pushdown(&scx, stmt)?,
234        Statement::ExplainAnalyzeObject(stmt) => dml::describe_explain_analyze_object(&scx, stmt)?,
235        Statement::ExplainAnalyzeCluster(stmt) => {
236            dml::describe_explain_analyze_cluster(&scx, stmt)?
237        }
238        Statement::ExplainTimestamp(stmt) => dml::describe_explain_timestamp(&scx, stmt)?,
239        Statement::ExplainSinkSchema(stmt) => dml::describe_explain_schema(&scx, stmt)?,
240        Statement::Insert(stmt) => dml::describe_insert(&scx, stmt)?,
241        Statement::Select(stmt) => dml::describe_select(&scx, stmt)?,
242        Statement::Subscribe(stmt) => dml::describe_subscribe(&scx, stmt)?,
243        Statement::Update(stmt) => dml::describe_update(&scx, stmt)?,
244
245        // TCL statements.
246        Statement::Commit(stmt) => tcl::describe_commit(&scx, stmt)?,
247        Statement::Rollback(stmt) => tcl::describe_rollback(&scx, stmt)?,
248        Statement::SetTransaction(stmt) => tcl::describe_set_transaction(&scx, stmt)?,
249        Statement::StartTransaction(stmt) => tcl::describe_start_transaction(&scx, stmt)?,
250
251        // Other statements.
252        Statement::Raise(stmt) => raise::describe_raise(&scx, stmt)?,
253        Statement::Show(ShowStatement::InspectShard(stmt)) => {
254            scl::describe_inspect_shard(&scx, stmt)?
255        }
256        Statement::ValidateConnection(stmt) => validate::describe_validate_connection(&scx, stmt)?,
257    };
258
259    let desc = desc.with_params(scx.finalize_param_types()?);
260    Ok(desc)
261}
262
263/// Produces a [`Plan`] from the purified statement `stmt`.
264///
265/// Planning is a pure, synchronous function and so requires that the provided
266/// `stmt` does does not depend on any external state. Statements that rely on
267/// external state must remove that state prior to calling this function via
268/// [`crate::pure::purify_statement`] or
269/// [`crate::pure::purify_create_materialized_view_options`].
270///
271/// The returned plan is tied to the state of the provided catalog. If the state
272/// of the catalog changes after planning, the validity of the plan is not
273/// guaranteed.
274///
275/// Note that if you want to do something else asynchronously (e.g. validating
276/// connections), these might want to take different code paths than
277/// `purify_statement`. Feel free to rationalize this by thinking of those
278/// statements as not necessarily depending on external state.
279#[mz_ore::instrument(level = "debug")]
280pub fn plan(
281    pcx: Option<&PlanContext>,
282    catalog: &dyn SessionCatalog,
283    stmt: Statement<Aug>,
284    params: &Params,
285    resolved_ids: &ResolvedIds,
286) -> Result<Plan, PlanError> {
287    let param_types = params
288        // We need the `expected_types` here, not the `actual_types`! This is because
289        // `expected_types` is how the parameter expression (e.g. `$1`) looks "from the outside":
290        // `bind_parameters` will insert a cast from the actual type to the expected type.
291        .expected_types
292        .iter()
293        .enumerate()
294        .map(|(i, ty)| (i + 1, ty.clone()))
295        .collect();
296
297    let kind: StatementKind = (&stmt).into();
298    let permitted_plans = Plan::generated_from(&kind);
299
300    let scx = &mut StatementContext {
301        pcx,
302        catalog,
303        param_types: RefCell::new(param_types),
304        ambiguous_columns: RefCell::new(false),
305    };
306
307    if resolved_ids
308        .items()
309        // Filter out items that may not have been created yet, such as sub-sources.
310        .filter_map(|id| catalog.try_get_item(id))
311        .any(|item| {
312            item.func().is_ok()
313                && item.name().qualifiers.schema_spec
314                    == SchemaSpecifier::Id(catalog.get_mz_unsafe_schema_id())
315        })
316    {
317        scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNSAFE_FUNCTIONS)?;
318    }
319
320    let plan = match stmt {
321        // DDL statements.
322        Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt),
323        Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt),
324        Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt),
325        Statement::AlterMaterializedViewApplyReplacement(stmt) => {
326            ddl::plan_alter_materialized_view_apply_replacement(scx, stmt)
327        }
328        Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt),
329        Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt),
330        Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt),
331        Statement::AlterRole(stmt) => ddl::plan_alter_role(scx, stmt),
332        Statement::AlterSecret(stmt) => ddl::plan_alter_secret(scx, stmt),
333        Statement::AlterSetCluster(stmt) => ddl::plan_alter_item_set_cluster(scx, stmt),
334        Statement::AlterSink(stmt) => ddl::plan_alter_sink(scx, stmt),
335        Statement::AlterSource(stmt) => ddl::plan_alter_source(scx, stmt),
336        Statement::AlterSystemSet(stmt) => ddl::plan_alter_system_set(scx, stmt),
337        Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt),
338        Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt),
339        Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt),
340        Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt),
341        Statement::Comment(stmt) => ddl::plan_comment(scx, stmt),
342        Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt),
343        Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt),
344        Statement::CreateConnection(stmt) => ddl::plan_create_connection(scx, stmt),
345        Statement::CreateDatabase(stmt) => ddl::plan_create_database(scx, stmt),
346        Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
347        Statement::CreateRole(stmt) => ddl::plan_create_role(scx, stmt),
348        Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
349        Statement::CreateSecret(stmt) => ddl::plan_create_secret(scx, stmt),
350        Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
351        Statement::CreateWebhookSource(stmt) => ddl::plan_create_webhook_source(scx, stmt),
352        Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
353        Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
354        Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
355        Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
356        Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
357        Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt),
358        Statement::CreateMaterializedView(stmt) => ddl::plan_create_materialized_view(scx, stmt),
359        Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt),
360        Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt),
361        Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt),
362
363        // `ACL` statements.
364        Statement::AlterOwner(stmt) => acl::plan_alter_owner(scx, stmt),
365        Statement::GrantRole(stmt) => acl::plan_grant_role(scx, stmt),
366        Statement::RevokeRole(stmt) => acl::plan_revoke_role(scx, stmt),
367        Statement::GrantPrivileges(stmt) => acl::plan_grant_privileges(scx, stmt),
368        Statement::RevokePrivileges(stmt) => acl::plan_revoke_privileges(scx, stmt),
369        Statement::AlterDefaultPrivileges(stmt) => acl::plan_alter_default_privileges(scx, stmt),
370        Statement::ReassignOwned(stmt) => acl::plan_reassign_owned(scx, stmt),
371
372        // DML statements.
373        Statement::Copy(stmt) => dml::plan_copy(scx, stmt),
374        Statement::Delete(stmt) => dml::plan_delete(scx, stmt, params),
375        Statement::ExplainPlan(stmt) => dml::plan_explain_plan(scx, stmt, params),
376        Statement::ExplainPushdown(stmt) => dml::plan_explain_pushdown(scx, stmt, params),
377        Statement::ExplainAnalyzeObject(stmt) => {
378            dml::plan_explain_analyze_object(scx, stmt, params)
379        }
380        Statement::ExplainAnalyzeCluster(stmt) => {
381            dml::plan_explain_analyze_cluster(scx, stmt, params)
382        }
383        Statement::ExplainTimestamp(stmt) => dml::plan_explain_timestamp(scx, stmt),
384        Statement::ExplainSinkSchema(stmt) => dml::plan_explain_schema(scx, stmt),
385        Statement::Insert(stmt) => dml::plan_insert(scx, stmt, params),
386        Statement::Select(stmt) => dml::plan_select(scx, stmt, params, None),
387        Statement::Subscribe(stmt) => dml::plan_subscribe(scx, stmt, params, None),
388        Statement::Update(stmt) => dml::plan_update(scx, stmt, params),
389
390        // `SHOW` statements.
391        Statement::Show(ShowStatement::ShowColumns(stmt)) => show::show_columns(scx, stmt)?.plan(),
392        Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
393            show::plan_show_create_connection(scx, stmt).map(Plan::ShowCreate)
394        }
395        Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
396            show::plan_show_create_cluster(scx, stmt).map(Plan::ShowCreate)
397        }
398        Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
399            show::plan_show_create_index(scx, stmt).map(Plan::ShowCreate)
400        }
401        Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
402            show::plan_show_create_sink(scx, stmt).map(Plan::ShowCreate)
403        }
404        Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
405            show::plan_show_create_source(scx, stmt).map(Plan::ShowCreate)
406        }
407        Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
408            show::plan_show_create_table(scx, stmt).map(Plan::ShowCreate)
409        }
410        Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
411            show::plan_show_create_view(scx, stmt).map(Plan::ShowCreate)
412        }
413        Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
414            show::plan_show_create_materialized_view(scx, stmt).map(Plan::ShowCreate)
415        }
416        Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
417            show::plan_show_create_type(scx, stmt).map(Plan::ShowCreate)
418        }
419        Statement::Show(ShowStatement::ShowObjects(stmt)) => show::show_objects(scx, stmt)?.plan(),
420
421        // SCL statements.
422        Statement::Close(stmt) => scl::plan_close(scx, stmt),
423        Statement::Deallocate(stmt) => scl::plan_deallocate(scx, stmt),
424        Statement::Declare(stmt) => scl::plan_declare(scx, stmt, params),
425        Statement::Discard(stmt) => scl::plan_discard(scx, stmt),
426        Statement::Execute(stmt) => scl::plan_execute(scx, stmt),
427        Statement::Fetch(stmt) => scl::plan_fetch(scx, stmt),
428        Statement::Prepare(stmt) => scl::plan_prepare(scx, stmt),
429        Statement::ResetVariable(stmt) => scl::plan_reset_variable(scx, stmt),
430        Statement::SetVariable(stmt) => scl::plan_set_variable(scx, stmt),
431        Statement::Show(ShowStatement::ShowVariable(stmt)) => scl::plan_show_variable(scx, stmt),
432
433        // TCL statements.
434        Statement::Commit(stmt) => tcl::plan_commit(scx, stmt),
435        Statement::Rollback(stmt) => tcl::plan_rollback(scx, stmt),
436        Statement::SetTransaction(stmt) => tcl::plan_set_transaction(scx, stmt),
437        Statement::StartTransaction(stmt) => tcl::plan_start_transaction(scx, stmt),
438
439        // Other statements.
440        Statement::Raise(stmt) => raise::plan_raise(scx, stmt),
441        Statement::Show(ShowStatement::InspectShard(stmt)) => scl::plan_inspect_shard(scx, stmt),
442        Statement::ValidateConnection(stmt) => validate::plan_validate_connection(scx, stmt),
443    };
444
445    if let Ok(plan) = &plan {
446        mz_ore::soft_assert_no_log!(
447            permitted_plans.contains(&PlanKind::from(plan)),
448            "plan {:?}, permitted plans {:?}",
449            plan,
450            permitted_plans
451        );
452    }
453
454    plan
455}
456
457pub fn plan_copy_from(
458    pcx: &PlanContext,
459    catalog: &dyn SessionCatalog,
460    target_id: CatalogItemId,
461    target_name: String,
462    columns: Vec<ColumnIndex>,
463    rows: Vec<mz_repr::Row>,
464) -> Result<super::HirRelationExpr, PlanError> {
465    query::plan_copy_from_rows(pcx, catalog, target_id, target_name, columns, rows)
466}
467
468/// Whether a SQL object type can be interpreted as matching the type of the given catalog item.
469/// For example, if `v` is a view, `DROP SOURCE v` should not work, since Source and View
470/// are non-matching types.
471///
472/// For now tables are treated as a special kind of source in Materialize, so just
473/// allow `TABLE` to refer to either.
474impl PartialEq<ObjectType> for CatalogItemType {
475    fn eq(&self, other: &ObjectType) -> bool {
476        match (self, other) {
477            (CatalogItemType::Source, ObjectType::Source)
478            | (CatalogItemType::Table, ObjectType::Table)
479            | (CatalogItemType::Sink, ObjectType::Sink)
480            | (CatalogItemType::View, ObjectType::View)
481            | (CatalogItemType::MaterializedView, ObjectType::MaterializedView)
482            | (CatalogItemType::Index, ObjectType::Index)
483            | (CatalogItemType::Type, ObjectType::Type)
484            | (CatalogItemType::Secret, ObjectType::Secret)
485            | (CatalogItemType::Connection, ObjectType::Connection) => true,
486            (_, _) => false,
487        }
488    }
489}
490
491impl PartialEq<CatalogItemType> for ObjectType {
492    fn eq(&self, other: &CatalogItemType) -> bool {
493        other == self
494    }
495}
496
497/// Immutable state that applies to the planning of an entire `Statement`.
498#[derive(Debug, Clone)]
499pub struct StatementContext<'a> {
500    /// The optional PlanContext, which will be present for statements that execute
501    /// within the OneShot QueryLifetime and None otherwise (views). This is an
502    /// awkward field and should probably be relocated to a place that fits our
503    /// execution model more closely.
504    pcx: Option<&'a PlanContext>,
505    pub catalog: &'a dyn SessionCatalog,
506    /// The types of the parameters in the query. This is filled in as planning
507    /// occurs.
508    pub param_types: RefCell<BTreeMap<usize, SqlScalarType>>,
509    /// Whether the statement contains an expression that can make the exact column list
510    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`. This is filled in as planning occurs.
511    pub ambiguous_columns: RefCell<bool>,
512}
513
514impl<'a> StatementContext<'a> {
515    pub fn new(
516        pcx: Option<&'a PlanContext>,
517        catalog: &'a dyn SessionCatalog,
518    ) -> StatementContext<'a> {
519        StatementContext {
520            pcx,
521            catalog,
522            param_types: Default::default(),
523            ambiguous_columns: RefCell::new(false),
524        }
525    }
526
527    /// Returns the schemas in order of search_path that exist in the catalog.
528    pub fn current_schemas(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
529        self.catalog.search_path()
530    }
531
532    /// Returns the first schema from the search_path that exist in the catalog,
533    /// or None if there are none.
534    pub fn current_schema(&self) -> Option<&(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
535        self.current_schemas().into_iter().next()
536    }
537
538    pub fn pcx(&self) -> Result<&PlanContext, PlanError> {
539        self.pcx.ok_or_else(|| sql_err!("no plan context"))
540    }
541
542    pub fn allocate_full_name(&self, name: PartialItemName) -> Result<FullItemName, PlanError> {
543        let (database, schema): (RawDatabaseSpecifier, String) = match (name.database, name.schema)
544        {
545            (None, None) => {
546                let Some((database, schema)) = self.current_schema() else {
547                    return Err(PlanError::InvalidSchemaName);
548                };
549                let schema = self.get_schema(database, schema);
550                let database = match schema.database() {
551                    ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
552                    ResolvedDatabaseSpecifier::Id(id) => {
553                        RawDatabaseSpecifier::Name(self.catalog.get_database(id).name().to_string())
554                    }
555                };
556                (database, schema.name().schema.clone())
557            }
558            (None, Some(schema)) => {
559                if is_system_schema(&schema) {
560                    (RawDatabaseSpecifier::Ambient, schema)
561                } else {
562                    match self.catalog.active_database_name() {
563                        Some(name) => (RawDatabaseSpecifier::Name(name.to_string()), schema),
564                        None => {
565                            sql_bail!(
566                                "no database specified for non-system schema and no active database"
567                            )
568                        }
569                    }
570                }
571            }
572            (Some(_database), None) => {
573                // This shouldn't be possible. Refactor the datastructure to
574                // make it not exist.
575                sql_bail!("unreachable: specified the database but no schema")
576            }
577            (Some(database), Some(schema)) => (RawDatabaseSpecifier::Name(database), schema),
578        };
579        let item = name.item;
580        Ok(FullItemName {
581            database,
582            schema,
583            item,
584        })
585    }
586
587    pub fn allocate_qualified_name(
588        &self,
589        name: PartialItemName,
590    ) -> Result<QualifiedItemName, PlanError> {
591        let full_name = self.allocate_full_name(name)?;
592        let database_spec = match full_name.database {
593            RawDatabaseSpecifier::Ambient => ResolvedDatabaseSpecifier::Ambient,
594            RawDatabaseSpecifier::Name(name) => ResolvedDatabaseSpecifier::Id(
595                self.resolve_database(&UnresolvedDatabaseName(Ident::new(name)?))?
596                    .id(),
597            ),
598        };
599        let schema_spec = self
600            .resolve_schema_in_database(&database_spec, &Ident::new(full_name.schema)?)?
601            .id()
602            .clone();
603        Ok(QualifiedItemName {
604            qualifiers: ItemQualifiers {
605                database_spec,
606                schema_spec,
607            },
608            item: full_name.item,
609        })
610    }
611
612    pub fn allocate_temporary_full_name(&self, name: PartialItemName) -> FullItemName {
613        FullItemName {
614            database: RawDatabaseSpecifier::Ambient,
615            schema: name
616                .schema
617                .unwrap_or_else(|| mz_repr::namespaces::MZ_TEMP_SCHEMA.to_owned()),
618            item: name.item,
619        }
620    }
621
622    pub fn allocate_temporary_qualified_name(
623        &self,
624        name: PartialItemName,
625    ) -> Result<QualifiedItemName, PlanError> {
626        // Compare against the MZ_TEMP_SCHEMA constant directly instead of calling
627        // get_schema(), because with lazy temporary schema creation, the temp
628        // schema may not exist yet. (This is similar to what `allocate_temporary_full_name` was
629        // doing already before making temporary schemas lazy.)
630        if let Some(schema_name) = name.schema {
631            if schema_name != mz_repr::namespaces::MZ_TEMP_SCHEMA {
632                return Err(PlanError::InvalidTemporarySchema);
633            }
634        }
635
636        Ok(QualifiedItemName {
637            qualifiers: ItemQualifiers {
638                database_spec: ResolvedDatabaseSpecifier::Ambient,
639                schema_spec: SchemaSpecifier::Temporary,
640            },
641            item: name.item,
642        })
643    }
644
645    // Creates a `ResolvedItemName::Item` from a `GlobalId` and an
646    // `UnresolvedItemName`.
647    pub fn allocate_resolved_item_name(
648        &self,
649        id: CatalogItemId,
650        name: UnresolvedItemName,
651    ) -> Result<ResolvedItemName, PlanError> {
652        let partial = normalize::unresolved_item_name(name)?;
653        let qualified = self.allocate_qualified_name(partial.clone())?;
654        let full_name = self.allocate_full_name(partial)?;
655        Ok(ResolvedItemName::Item {
656            id,
657            qualifiers: qualified.qualifiers,
658            full_name,
659            print_id: true,
660            version: RelationVersionSelector::Latest,
661        })
662    }
663
664    pub fn active_database(&self) -> Option<&DatabaseId> {
665        self.catalog.active_database()
666    }
667
668    pub fn resolve_optional_schema(
669        &self,
670        schema_name: &Option<ResolvedSchemaName>,
671    ) -> Result<SchemaSpecifier, PlanError> {
672        match schema_name {
673            Some(ResolvedSchemaName::Schema { schema_spec, .. }) => Ok(schema_spec.clone()),
674            None => self.resolve_active_schema().map(|spec| spec.clone()),
675            Some(ResolvedSchemaName::Error) => {
676                unreachable!("should have been handled by name resolution")
677            }
678        }
679    }
680
681    pub fn resolve_active_schema(&self) -> Result<&SchemaSpecifier, PlanError> {
682        match self.current_schema() {
683            Some((_db, schema)) => Ok(schema),
684            None => Err(PlanError::InvalidSchemaName),
685        }
686    }
687
688    pub fn get_cluster(&self, id: &ClusterId) -> &dyn CatalogCluster<'_> {
689        self.catalog.get_cluster(*id)
690    }
691
692    pub fn resolve_database(
693        &self,
694        name: &UnresolvedDatabaseName,
695    ) -> Result<&dyn CatalogDatabase, PlanError> {
696        let name = normalize::ident_ref(&name.0);
697        Ok(self.catalog.resolve_database(name)?)
698    }
699
700    pub fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase {
701        self.catalog.get_database(id)
702    }
703
704    pub fn resolve_schema_in_database(
705        &self,
706        database_spec: &ResolvedDatabaseSpecifier,
707        schema: &Ident,
708    ) -> Result<&dyn CatalogSchema, PlanError> {
709        let schema = normalize::ident_ref(schema);
710        Ok(self
711            .catalog
712            .resolve_schema_in_database(database_spec, schema)?)
713    }
714
715    pub fn resolve_schema(
716        &self,
717        name: UnresolvedSchemaName,
718    ) -> Result<&dyn CatalogSchema, PlanError> {
719        let name = normalize::unresolved_schema_name(name)?;
720        Ok(self
721            .catalog
722            .resolve_schema(name.database.as_deref(), &name.schema)?)
723    }
724
725    pub fn get_schema(
726        &self,
727        database_spec: &ResolvedDatabaseSpecifier,
728        schema_spec: &SchemaSpecifier,
729    ) -> &dyn CatalogSchema {
730        self.catalog.get_schema(database_spec, schema_spec)
731    }
732
733    pub fn resolve_item(&self, name: RawItemName) -> Result<&dyn CatalogItem, PlanError> {
734        match name {
735            RawItemName::Name(name) => {
736                let name = normalize::unresolved_item_name(name)?;
737                Ok(self.catalog.resolve_item(&name)?)
738            }
739            RawItemName::Id(id, _, _) => {
740                let gid = id.parse()?;
741                Ok(self.catalog.get_item(&gid))
742            }
743        }
744    }
745
746    pub fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem {
747        self.catalog.get_item(id)
748    }
749
750    pub fn get_item_by_resolved_name(
751        &self,
752        name: &ResolvedItemName,
753    ) -> Result<Box<dyn CatalogCollectionItem + '_>, PlanError> {
754        match name {
755            ResolvedItemName::Item { id, version, .. } => {
756                Ok(self.get_item(id).at_version(*version))
757            }
758            ResolvedItemName::Cte { .. } => sql_bail!("non-user item"),
759            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
760        }
761    }
762
763    pub fn get_column_by_resolved_name(
764        &self,
765        name: &ColumnName<Aug>,
766    ) -> Result<(Box<dyn CatalogCollectionItem + '_>, usize), PlanError> {
767        match (&name.relation, &name.column) {
768            (
769                ResolvedItemName::Item { id, version, .. },
770                ResolvedColumnReference::Column { index, .. },
771            ) => {
772                let item = self.get_item(id).at_version(*version);
773                Ok((item, *index))
774            }
775            _ => unreachable!(
776                "get_column_by_resolved_name errors should have been caught in name resolution"
777            ),
778        }
779    }
780
781    pub fn resolve_function(
782        &self,
783        name: UnresolvedItemName,
784    ) -> Result<&dyn CatalogItem, PlanError> {
785        let name = normalize::unresolved_item_name(name)?;
786        Ok(self.catalog.resolve_function(&name)?)
787    }
788
789    pub fn resolve_cluster(
790        &self,
791        name: Option<&Ident>,
792    ) -> Result<&dyn CatalogCluster<'_>, PlanError> {
793        let name = name.map(|name| name.as_str());
794        Ok(self.catalog.resolve_cluster(name)?)
795    }
796
797    pub fn resolve_type(&self, mut ty: mz_pgrepr::Type) -> Result<ResolvedDataType, PlanError> {
798        // Ignore precision constraints on date/time types until we support
799        // it. This should be safe enough because our types are wide enough
800        // to support the maximum possible precision.
801        //
802        // See: https://github.com/MaterializeInc/database-issues/issues/3179
803        match &mut ty {
804            mz_pgrepr::Type::Interval { constraints } => *constraints = None,
805            mz_pgrepr::Type::Time { precision } => *precision = None,
806            mz_pgrepr::Type::TimeTz { precision } => *precision = None,
807            mz_pgrepr::Type::Timestamp { precision } => *precision = None,
808            mz_pgrepr::Type::TimestampTz { precision } => *precision = None,
809            _ => (),
810        }
811        // NOTE(benesch): this *looks* gross, but it is
812        // safe enough. The `fmt::Display`
813        // representation on `pgrepr::Type` promises to
814        // produce an unqualified type name that does
815        // not require quoting.
816        let mut ty = if ty.oid() >= FIRST_USER_OID {
817            sql_bail!("internal error, unexpected user type: {ty:?} ");
818        } else if ty.oid() < FIRST_MATERIALIZE_OID {
819            format!("pg_catalog.{}", ty)
820        } else {
821            // This relies on all non-PG types existing in `mz_catalog`, which is annoying.
822            format!("mz_catalog.{}", ty)
823        };
824        // TODO(benesch): converting `json` to `jsonb`
825        // is wrong. We ought to support the `json` type
826        // directly.
827        if ty == "pg_catalog.json" {
828            ty = "pg_catalog.jsonb".into();
829        }
830        let data_type = mz_sql_parser::parser::parse_data_type(&ty)?;
831        let (data_type, _) = names::resolve(self.catalog, data_type)?;
832        Ok(data_type)
833    }
834
835    pub fn get_object_type(&self, id: &ObjectId) -> ObjectType {
836        self.catalog.get_object_type(id)
837    }
838
839    pub fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType {
840        match id {
841            SystemObjectId::Object(id) => SystemObjectType::Object(self.get_object_type(id)),
842            SystemObjectId::System => SystemObjectType::System,
843        }
844    }
845
846    /// Returns an error if the named `FeatureFlag` is not set to `on`.
847    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
848        flag.require(self.catalog.system_vars())?;
849        Ok(())
850    }
851
852    /// Returns true if the named [`FeatureFlag`] is set to `on`, returns false otherwise.
853    pub fn is_feature_flag_enabled(&self, flag: &'static FeatureFlag) -> bool {
854        self.require_feature_flag(flag).is_ok()
855    }
856
857    pub fn finalize_param_types(self) -> Result<Vec<SqlScalarType>, PlanError> {
858        let param_types = self.param_types.into_inner();
859        let mut out = vec![];
860        for (i, (n, typ)) in param_types.into_iter().enumerate() {
861            if n != i + 1 {
862                sql_bail!("unable to infer type for parameter ${}", i + 1);
863            }
864            out.push(typ);
865        }
866        Ok(out)
867    }
868
869    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
870    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
871    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
872        self.catalog.humanize_sql_scalar_type(typ, postgres_compat)
873    }
874
875    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
876    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
877    pub fn humanize_column_type(&self, typ: &SqlColumnType, postgres_compat: bool) -> String {
878        self.catalog.humanize_sql_column_type(typ, postgres_compat)
879    }
880
881    pub fn relation_desc_into_table_defs(
882        &self,
883        desc: &RelationDesc,
884    ) -> Result<(Vec<ColumnDef<Aug>>, Vec<TableConstraint<Aug>>), PlanError> {
885        let mut columns = vec![];
886        let mut null_cols = BTreeSet::new();
887        for (column_name, column_type) in desc.iter() {
888            let name = Ident::new(column_name.as_str().to_owned())?;
889
890            let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
891            let data_type = self.resolve_type(ty)?;
892
893            let options = if !column_type.nullable {
894                null_cols.insert(columns.len());
895                vec![mz_sql_parser::ast::ColumnOptionDef {
896                    name: None,
897                    option: mz_sql_parser::ast::ColumnOption::NotNull,
898                }]
899            } else {
900                vec![]
901            };
902
903            columns.push(ColumnDef {
904                name,
905                data_type,
906                collation: None,
907                options,
908            });
909        }
910
911        let mut table_constraints = vec![];
912        for key in desc.typ().keys.iter() {
913            let mut col_names = vec![];
914            for col_idx in key {
915                if !null_cols.contains(col_idx) {
916                    // Note that alternatively we could support NULL values in keys with `NULLS NOT
917                    // DISTINCT` semantics, which treats `NULL` as a distinct value.
918                    sql_bail!(
919                        "[internal error] key columns must be NOT NULL when generating table constraints"
920                    );
921                }
922                col_names.push(columns[*col_idx].name.clone());
923            }
924            table_constraints.push(TableConstraint::Unique {
925                name: None,
926                columns: col_names,
927                is_primary: false,
928                nulls_not_distinct: false,
929            });
930        }
931
932        Ok((columns, table_constraints))
933    }
934
935    pub fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
936        self.catalog.get_owner_id(id)
937    }
938
939    pub fn humanize_resolved_name(
940        &self,
941        name: &ResolvedItemName,
942    ) -> Result<PartialItemName, PlanError> {
943        let item = self.get_item_by_resolved_name(name)?;
944        Ok(self.catalog.minimal_qualification(item.name()))
945    }
946
947    /// WARNING! This style of name resolution assumes the referred-to objects exists (i.e. panics
948    /// if objects do not exist) so should never be used to handle user input.
949    pub fn dangerous_resolve_name(&self, name: Vec<&str>) -> ResolvedItemName {
950        tracing::trace!("dangerous_resolve_name {:?}", name);
951        // Note: Using unchecked here is okay because this function is already dangerous.
952        let name: Vec<_> = name.into_iter().map(Ident::new_unchecked).collect();
953        let name = UnresolvedItemName::qualified(&name);
954        let entry = match self.resolve_item(RawItemName::Name(name.clone())) {
955            Ok(entry) => entry,
956            Err(_) => self
957                .resolve_function(name.clone())
958                .expect("name referred to an existing object"),
959        };
960
961        let partial = normalize::unresolved_item_name(name).unwrap();
962        let full_name = self.allocate_full_name(partial).unwrap();
963
964        ResolvedItemName::Item {
965            id: entry.id(),
966            qualifiers: entry.name().qualifiers.clone(),
967            full_name,
968            print_id: true,
969            version: RelationVersionSelector::Latest,
970        }
971    }
972}
973
974pub fn resolve_cluster_for_materialized_view<'a>(
975    catalog: &'a dyn SessionCatalog,
976    stmt: &CreateMaterializedViewStatement<Aug>,
977) -> Result<ClusterId, PlanError> {
978    Ok(match &stmt.in_cluster {
979        None => catalog.resolve_cluster(None)?.id(),
980        Some(in_cluster) => in_cluster.id,
981    })
982}
983
984/// Statement classification as documented by [`plan`].
985#[derive(Debug, Clone, Copy)]
986pub enum StatementClassification {
987    ACL,
988    DDL,
989    DML,
990    Other,
991    SCL,
992    Show,
993    TCL,
994}
995
996impl StatementClassification {
997    pub fn is_ddl(&self) -> bool {
998        matches!(self, StatementClassification::DDL)
999    }
1000}
1001
1002impl<T: mz_sql_parser::ast::AstInfo> From<&Statement<T>> for StatementClassification {
1003    fn from(value: &Statement<T>) -> Self {
1004        use StatementClassification::*;
1005
1006        match value {
1007            // DDL statements.
1008            Statement::AlterCluster(_) => DDL,
1009            Statement::AlterConnection(_) => DDL,
1010            Statement::AlterIndex(_) => DDL,
1011            Statement::AlterMaterializedViewApplyReplacement(_) => DDL,
1012            Statement::AlterObjectRename(_) => DDL,
1013            Statement::AlterObjectSwap(_) => DDL,
1014            Statement::AlterNetworkPolicy(_) => DDL,
1015            Statement::AlterRetainHistory(_) => DDL,
1016            Statement::AlterRole(_) => DDL,
1017            Statement::AlterSecret(_) => DDL,
1018            Statement::AlterSetCluster(_) => DDL,
1019            Statement::AlterSink(_) => DDL,
1020            Statement::AlterSource(_) => DDL,
1021            Statement::AlterSystemSet(_) => DDL,
1022            Statement::AlterSystemReset(_) => DDL,
1023            Statement::AlterSystemResetAll(_) => DDL,
1024            Statement::AlterTableAddColumn(_) => DDL,
1025            Statement::Comment(_) => DDL,
1026            Statement::CreateCluster(_) => DDL,
1027            Statement::CreateClusterReplica(_) => DDL,
1028            Statement::CreateConnection(_) => DDL,
1029            Statement::CreateDatabase(_) => DDL,
1030            Statement::CreateIndex(_) => DDL,
1031            Statement::CreateRole(_) => DDL,
1032            Statement::CreateSchema(_) => DDL,
1033            Statement::CreateSecret(_) => DDL,
1034            Statement::CreateSink(_) => DDL,
1035            Statement::CreateWebhookSource(_) => DDL,
1036            Statement::CreateSource(_) => DDL,
1037            Statement::CreateSubsource(_) => DDL,
1038            Statement::CreateTable(_) => DDL,
1039            Statement::CreateTableFromSource(_) => DDL,
1040            Statement::CreateType(_) => DDL,
1041            Statement::CreateView(_) => DDL,
1042            Statement::CreateMaterializedView(_) => DDL,
1043            Statement::CreateNetworkPolicy(_) => DDL,
1044            Statement::DropObjects(_) => DDL,
1045            Statement::DropOwned(_) => DDL,
1046
1047            // `ACL` statements.
1048            Statement::AlterOwner(_) => ACL,
1049            Statement::GrantRole(_) => ACL,
1050            Statement::RevokeRole(_) => ACL,
1051            Statement::GrantPrivileges(_) => ACL,
1052            Statement::RevokePrivileges(_) => ACL,
1053            Statement::AlterDefaultPrivileges(_) => ACL,
1054            Statement::ReassignOwned(_) => ACL,
1055
1056            // DML statements.
1057            Statement::Copy(_) => DML,
1058            Statement::Delete(_) => DML,
1059            Statement::ExplainPlan(_) => DML,
1060            Statement::ExplainPushdown(_) => DML,
1061            Statement::ExplainAnalyzeObject(_) => DML,
1062            Statement::ExplainAnalyzeCluster(_) => DML,
1063            Statement::ExplainTimestamp(_) => DML,
1064            Statement::ExplainSinkSchema(_) => DML,
1065            Statement::Insert(_) => DML,
1066            Statement::Select(_) => DML,
1067            Statement::Subscribe(_) => DML,
1068            Statement::Update(_) => DML,
1069
1070            // `SHOW` statements.
1071            Statement::Show(ShowStatement::ShowColumns(_)) => Show,
1072            Statement::Show(ShowStatement::ShowCreateConnection(_)) => Show,
1073            Statement::Show(ShowStatement::ShowCreateCluster(_)) => Show,
1074            Statement::Show(ShowStatement::ShowCreateIndex(_)) => Show,
1075            Statement::Show(ShowStatement::ShowCreateSink(_)) => Show,
1076            Statement::Show(ShowStatement::ShowCreateSource(_)) => Show,
1077            Statement::Show(ShowStatement::ShowCreateTable(_)) => Show,
1078            Statement::Show(ShowStatement::ShowCreateView(_)) => Show,
1079            Statement::Show(ShowStatement::ShowCreateMaterializedView(_)) => Show,
1080            Statement::Show(ShowStatement::ShowCreateType(_)) => Show,
1081            Statement::Show(ShowStatement::ShowObjects(_)) => Show,
1082
1083            // SCL statements.
1084            Statement::Close(_) => SCL,
1085            Statement::Deallocate(_) => SCL,
1086            Statement::Declare(_) => SCL,
1087            Statement::Discard(_) => SCL,
1088            Statement::Execute(_) => SCL,
1089            Statement::Fetch(_) => SCL,
1090            Statement::Prepare(_) => SCL,
1091            Statement::ResetVariable(_) => SCL,
1092            Statement::SetVariable(_) => SCL,
1093            Statement::Show(ShowStatement::ShowVariable(_)) => SCL,
1094
1095            // TCL statements.
1096            Statement::Commit(_) => TCL,
1097            Statement::Rollback(_) => TCL,
1098            Statement::SetTransaction(_) => TCL,
1099            Statement::StartTransaction(_) => TCL,
1100
1101            // Other statements.
1102            Statement::Raise(_) => Other,
1103            Statement::Show(ShowStatement::InspectShard(_)) => Other,
1104            Statement::ValidateConnection(_) => Other,
1105        }
1106    }
1107}