1use std::cell::RefCell;
15use std::collections::{BTreeMap, BTreeSet};
16
17use mz_repr::namespaces::is_system_schema;
18use mz_repr::{
19 CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType,
20};
21use mz_sql_parser::ast::{
22 ColumnDef, ColumnName, CreateMaterializedViewStatement, RawItemName, ShowStatement,
23 StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName,
24};
25use mz_storage_types::connections::Connection;
26
27use crate::ast::{Ident, Statement, UnresolvedItemName};
28use crate::catalog::{
29 CatalogCluster, CatalogCollectionItem, CatalogDatabase, CatalogItem, CatalogItemType,
30 CatalogSchema, ObjectType, SessionCatalog, SystemObjectType,
31};
32use crate::names::{
33 self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
34 QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
35 ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
36 SystemObjectId,
37};
38use crate::normalize;
39use crate::plan::error::PlanError;
40use crate::plan::{Params, Plan, PlanContext, PlanKind, query};
41use crate::session::vars::FeatureFlag;
42
43mod acl;
44pub(crate) mod ddl;
45mod dml;
46mod raise;
47mod scl;
48pub(crate) mod show;
49mod tcl;
50mod validate;
51
52use crate::session::vars;
53pub(crate) use ddl::PgConfigOptionExtracted;
54use mz_controller_types::ClusterId;
55use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
56use mz_repr::role_id::RoleId;
57
58#[derive(Debug, Clone, Eq, PartialEq)]
60pub struct StatementDesc {
61 pub relation_desc: Option<RelationDesc>,
64 pub param_types: Vec<SqlScalarType>,
66 pub is_copy: bool,
68}
69
70impl StatementDesc {
71 pub fn new(relation_desc: Option<RelationDesc>) -> Self {
72 StatementDesc {
73 relation_desc,
74 param_types: vec![],
75 is_copy: false,
76 }
77 }
78
79 pub fn arity(&self) -> usize {
82 self.relation_desc
83 .as_ref()
84 .map(|desc| desc.typ().column_types.len())
85 .unwrap_or(0)
86 }
87
88 fn with_params(mut self, param_types: Vec<SqlScalarType>) -> Self {
89 self.param_types = param_types;
90 self
91 }
92
93 fn with_is_copy(mut self) -> Self {
94 self.is_copy = true;
95 self
96 }
97}
98
99pub fn describe(
103 pcx: &PlanContext,
104 catalog: &dyn SessionCatalog,
105 stmt: Statement<Aug>,
106 param_types_in: &[Option<SqlScalarType>],
107) -> Result<StatementDesc, PlanError> {
108 let mut param_types = BTreeMap::new();
109 for (i, ty) in param_types_in.iter().enumerate() {
110 if let Some(ty) = ty {
111 param_types.insert(i + 1, ty.clone());
112 }
113 }
114
115 let scx = StatementContext {
116 pcx: Some(pcx),
117 catalog,
118 param_types: RefCell::new(param_types),
119 ambiguous_columns: RefCell::new(false),
120 };
121
122 let desc = match stmt {
123 Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?,
125 Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?,
126 Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?,
127 Statement::AlterMaterializedViewApplyReplacement(stmt) => {
128 ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)?
129 }
130 Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?,
131 Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?,
132 Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?,
133 Statement::AlterRole(stmt) => ddl::describe_alter_role(&scx, stmt)?,
134 Statement::AlterSecret(stmt) => ddl::describe_alter_secret_options(&scx, stmt)?,
135 Statement::AlterSetCluster(stmt) => ddl::describe_alter_set_cluster(&scx, stmt)?,
136 Statement::AlterSink(stmt) => ddl::describe_alter_sink(&scx, stmt)?,
137 Statement::AlterSource(stmt) => ddl::describe_alter_source(&scx, stmt)?,
138 Statement::AlterSystemSet(stmt) => ddl::describe_alter_system_set(&scx, stmt)?,
139 Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?,
140 Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?,
141 Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?,
142 Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?,
143 Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?,
144 Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?,
145 Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?,
146 Statement::CreateConnection(stmt) => ddl::describe_create_connection(&scx, stmt)?,
147 Statement::CreateDatabase(stmt) => ddl::describe_create_database(&scx, stmt)?,
148 Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
149 Statement::CreateRole(stmt) => ddl::describe_create_role(&scx, stmt)?,
150 Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
151 Statement::CreateSecret(stmt) => ddl::describe_create_secret(&scx, stmt)?,
152 Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
153 Statement::CreateWebhookSource(stmt) => ddl::describe_create_webhook_source(&scx, stmt)?,
154 Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
155 Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
156 Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
157 Statement::CreateTableFromSource(stmt) => {
158 ddl::describe_create_table_from_source(&scx, stmt)?
159 }
160 Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
161 Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
162 Statement::CreateMaterializedView(stmt) => {
163 ddl::describe_create_materialized_view(&scx, stmt)?
164 }
165 Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?,
166 Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?,
167 Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?,
168
169 Statement::AlterOwner(stmt) => acl::describe_alter_owner(&scx, stmt)?,
171 Statement::GrantRole(stmt) => acl::describe_grant_role(&scx, stmt)?,
172 Statement::RevokeRole(stmt) => acl::describe_revoke_role(&scx, stmt)?,
173 Statement::GrantPrivileges(stmt) => acl::describe_grant_privileges(&scx, stmt)?,
174 Statement::RevokePrivileges(stmt) => acl::describe_revoke_privileges(&scx, stmt)?,
175 Statement::AlterDefaultPrivileges(stmt) => {
176 acl::describe_alter_default_privileges(&scx, stmt)?
177 }
178 Statement::ReassignOwned(stmt) => acl::describe_reassign_owned(&scx, stmt)?,
179
180 Statement::Show(ShowStatement::ShowColumns(stmt)) => {
182 show::show_columns(&scx, stmt)?.describe()?
183 }
184 Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
185 show::describe_show_create_connection(&scx, stmt)?
186 }
187 Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
188 show::describe_show_create_cluster(&scx, stmt)?
189 }
190 Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
191 show::describe_show_create_index(&scx, stmt)?
192 }
193 Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
194 show::describe_show_create_sink(&scx, stmt)?
195 }
196 Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
197 show::describe_show_create_source(&scx, stmt)?
198 }
199 Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
200 show::describe_show_create_table(&scx, stmt)?
201 }
202 Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
203 show::describe_show_create_view(&scx, stmt)?
204 }
205 Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
206 show::describe_show_create_materialized_view(&scx, stmt)?
207 }
208 Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
209 show::describe_show_create_type(&scx, stmt)?
210 }
211 Statement::Show(ShowStatement::ShowObjects(stmt)) => {
212 show::show_objects(&scx, stmt)?.describe()?
213 }
214
215 Statement::Close(stmt) => scl::describe_close(&scx, stmt)?,
217 Statement::Deallocate(stmt) => scl::describe_deallocate(&scx, stmt)?,
218 Statement::Declare(stmt) => scl::describe_declare(&scx, stmt, param_types_in)?,
219 Statement::Discard(stmt) => scl::describe_discard(&scx, stmt)?,
220 Statement::Execute(stmt) => scl::describe_execute(&scx, stmt)?,
221 Statement::Fetch(stmt) => scl::describe_fetch(&scx, stmt)?,
222 Statement::Prepare(stmt) => scl::describe_prepare(&scx, stmt)?,
223 Statement::ResetVariable(stmt) => scl::describe_reset_variable(&scx, stmt)?,
224 Statement::SetVariable(stmt) => scl::describe_set_variable(&scx, stmt)?,
225 Statement::Show(ShowStatement::ShowVariable(stmt)) => {
226 scl::describe_show_variable(&scx, stmt)?
227 }
228
229 Statement::Copy(stmt) => dml::describe_copy(&scx, stmt)?,
231 Statement::Delete(stmt) => dml::describe_delete(&scx, stmt)?,
232 Statement::ExplainPlan(stmt) => dml::describe_explain_plan(&scx, stmt)?,
233 Statement::ExplainPushdown(stmt) => dml::describe_explain_pushdown(&scx, stmt)?,
234 Statement::ExplainAnalyzeObject(stmt) => dml::describe_explain_analyze_object(&scx, stmt)?,
235 Statement::ExplainAnalyzeCluster(stmt) => {
236 dml::describe_explain_analyze_cluster(&scx, stmt)?
237 }
238 Statement::ExplainTimestamp(stmt) => dml::describe_explain_timestamp(&scx, stmt)?,
239 Statement::ExplainSinkSchema(stmt) => dml::describe_explain_schema(&scx, stmt)?,
240 Statement::Insert(stmt) => dml::describe_insert(&scx, stmt)?,
241 Statement::Select(stmt) => dml::describe_select(&scx, stmt)?,
242 Statement::Subscribe(stmt) => dml::describe_subscribe(&scx, stmt)?,
243 Statement::Update(stmt) => dml::describe_update(&scx, stmt)?,
244
245 Statement::Commit(stmt) => tcl::describe_commit(&scx, stmt)?,
247 Statement::Rollback(stmt) => tcl::describe_rollback(&scx, stmt)?,
248 Statement::SetTransaction(stmt) => tcl::describe_set_transaction(&scx, stmt)?,
249 Statement::StartTransaction(stmt) => tcl::describe_start_transaction(&scx, stmt)?,
250
251 Statement::Raise(stmt) => raise::describe_raise(&scx, stmt)?,
253 Statement::Show(ShowStatement::InspectShard(stmt)) => {
254 scl::describe_inspect_shard(&scx, stmt)?
255 }
256 Statement::ValidateConnection(stmt) => validate::describe_validate_connection(&scx, stmt)?,
257 };
258
259 let desc = desc.with_params(scx.finalize_param_types()?);
260 Ok(desc)
261}
262
263#[mz_ore::instrument(level = "debug")]
280pub fn plan(
281 pcx: Option<&PlanContext>,
282 catalog: &dyn SessionCatalog,
283 stmt: Statement<Aug>,
284 params: &Params,
285 resolved_ids: &ResolvedIds,
286) -> Result<Plan, PlanError> {
287 let param_types = params
288 .expected_types
292 .iter()
293 .enumerate()
294 .map(|(i, ty)| (i + 1, ty.clone()))
295 .collect();
296
297 let kind: StatementKind = (&stmt).into();
298 let permitted_plans = Plan::generated_from(&kind);
299
300 let scx = &mut StatementContext {
301 pcx,
302 catalog,
303 param_types: RefCell::new(param_types),
304 ambiguous_columns: RefCell::new(false),
305 };
306
307 if resolved_ids
308 .items()
309 .filter_map(|id| catalog.try_get_item(id))
311 .any(|item| {
312 item.func().is_ok()
313 && item.name().qualifiers.schema_spec
314 == SchemaSpecifier::Id(catalog.get_mz_unsafe_schema_id())
315 })
316 {
317 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNSAFE_FUNCTIONS)?;
318 }
319
320 let plan = match stmt {
321 Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt),
323 Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt),
324 Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt),
325 Statement::AlterMaterializedViewApplyReplacement(stmt) => {
326 ddl::plan_alter_materialized_view_apply_replacement(scx, stmt)
327 }
328 Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt),
329 Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt),
330 Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt),
331 Statement::AlterRole(stmt) => ddl::plan_alter_role(scx, stmt),
332 Statement::AlterSecret(stmt) => ddl::plan_alter_secret(scx, stmt),
333 Statement::AlterSetCluster(stmt) => ddl::plan_alter_item_set_cluster(scx, stmt),
334 Statement::AlterSink(stmt) => ddl::plan_alter_sink(scx, stmt),
335 Statement::AlterSource(stmt) => ddl::plan_alter_source(scx, stmt),
336 Statement::AlterSystemSet(stmt) => ddl::plan_alter_system_set(scx, stmt),
337 Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt),
338 Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt),
339 Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt),
340 Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt),
341 Statement::Comment(stmt) => ddl::plan_comment(scx, stmt),
342 Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt),
343 Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt),
344 Statement::CreateConnection(stmt) => ddl::plan_create_connection(scx, stmt),
345 Statement::CreateDatabase(stmt) => ddl::plan_create_database(scx, stmt),
346 Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
347 Statement::CreateRole(stmt) => ddl::plan_create_role(scx, stmt),
348 Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
349 Statement::CreateSecret(stmt) => ddl::plan_create_secret(scx, stmt),
350 Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
351 Statement::CreateWebhookSource(stmt) => ddl::plan_create_webhook_source(scx, stmt),
352 Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
353 Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
354 Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
355 Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
356 Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
357 Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt),
358 Statement::CreateMaterializedView(stmt) => ddl::plan_create_materialized_view(scx, stmt),
359 Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt),
360 Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt),
361 Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt),
362
363 Statement::AlterOwner(stmt) => acl::plan_alter_owner(scx, stmt),
365 Statement::GrantRole(stmt) => acl::plan_grant_role(scx, stmt),
366 Statement::RevokeRole(stmt) => acl::plan_revoke_role(scx, stmt),
367 Statement::GrantPrivileges(stmt) => acl::plan_grant_privileges(scx, stmt),
368 Statement::RevokePrivileges(stmt) => acl::plan_revoke_privileges(scx, stmt),
369 Statement::AlterDefaultPrivileges(stmt) => acl::plan_alter_default_privileges(scx, stmt),
370 Statement::ReassignOwned(stmt) => acl::plan_reassign_owned(scx, stmt),
371
372 Statement::Copy(stmt) => dml::plan_copy(scx, stmt),
374 Statement::Delete(stmt) => dml::plan_delete(scx, stmt, params),
375 Statement::ExplainPlan(stmt) => dml::plan_explain_plan(scx, stmt, params),
376 Statement::ExplainPushdown(stmt) => dml::plan_explain_pushdown(scx, stmt, params),
377 Statement::ExplainAnalyzeObject(stmt) => {
378 dml::plan_explain_analyze_object(scx, stmt, params)
379 }
380 Statement::ExplainAnalyzeCluster(stmt) => {
381 dml::plan_explain_analyze_cluster(scx, stmt, params)
382 }
383 Statement::ExplainTimestamp(stmt) => dml::plan_explain_timestamp(scx, stmt),
384 Statement::ExplainSinkSchema(stmt) => dml::plan_explain_schema(scx, stmt),
385 Statement::Insert(stmt) => dml::plan_insert(scx, stmt, params),
386 Statement::Select(stmt) => dml::plan_select(scx, stmt, params, None),
387 Statement::Subscribe(stmt) => dml::plan_subscribe(scx, stmt, params, None),
388 Statement::Update(stmt) => dml::plan_update(scx, stmt, params),
389
390 Statement::Show(ShowStatement::ShowColumns(stmt)) => show::show_columns(scx, stmt)?.plan(),
392 Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
393 show::plan_show_create_connection(scx, stmt).map(Plan::ShowCreate)
394 }
395 Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
396 show::plan_show_create_cluster(scx, stmt).map(Plan::ShowCreate)
397 }
398 Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
399 show::plan_show_create_index(scx, stmt).map(Plan::ShowCreate)
400 }
401 Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
402 show::plan_show_create_sink(scx, stmt).map(Plan::ShowCreate)
403 }
404 Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
405 show::plan_show_create_source(scx, stmt).map(Plan::ShowCreate)
406 }
407 Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
408 show::plan_show_create_table(scx, stmt).map(Plan::ShowCreate)
409 }
410 Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
411 show::plan_show_create_view(scx, stmt).map(Plan::ShowCreate)
412 }
413 Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
414 show::plan_show_create_materialized_view(scx, stmt).map(Plan::ShowCreate)
415 }
416 Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
417 show::plan_show_create_type(scx, stmt).map(Plan::ShowCreate)
418 }
419 Statement::Show(ShowStatement::ShowObjects(stmt)) => show::show_objects(scx, stmt)?.plan(),
420
421 Statement::Close(stmt) => scl::plan_close(scx, stmt),
423 Statement::Deallocate(stmt) => scl::plan_deallocate(scx, stmt),
424 Statement::Declare(stmt) => scl::plan_declare(scx, stmt, params),
425 Statement::Discard(stmt) => scl::plan_discard(scx, stmt),
426 Statement::Execute(stmt) => scl::plan_execute(scx, stmt),
427 Statement::Fetch(stmt) => scl::plan_fetch(scx, stmt),
428 Statement::Prepare(stmt) => scl::plan_prepare(scx, stmt),
429 Statement::ResetVariable(stmt) => scl::plan_reset_variable(scx, stmt),
430 Statement::SetVariable(stmt) => scl::plan_set_variable(scx, stmt),
431 Statement::Show(ShowStatement::ShowVariable(stmt)) => scl::plan_show_variable(scx, stmt),
432
433 Statement::Commit(stmt) => tcl::plan_commit(scx, stmt),
435 Statement::Rollback(stmt) => tcl::plan_rollback(scx, stmt),
436 Statement::SetTransaction(stmt) => tcl::plan_set_transaction(scx, stmt),
437 Statement::StartTransaction(stmt) => tcl::plan_start_transaction(scx, stmt),
438
439 Statement::Raise(stmt) => raise::plan_raise(scx, stmt),
441 Statement::Show(ShowStatement::InspectShard(stmt)) => scl::plan_inspect_shard(scx, stmt),
442 Statement::ValidateConnection(stmt) => validate::plan_validate_connection(scx, stmt),
443 };
444
445 if let Ok(plan) = &plan {
446 mz_ore::soft_assert_no_log!(
447 permitted_plans.contains(&PlanKind::from(plan)),
448 "plan {:?}, permitted plans {:?}",
449 plan,
450 permitted_plans
451 );
452 }
453
454 plan
455}
456
457pub fn plan_copy_from(
458 pcx: &PlanContext,
459 catalog: &dyn SessionCatalog,
460 target_id: CatalogItemId,
461 target_name: String,
462 columns: Vec<ColumnIndex>,
463 rows: Vec<mz_repr::Row>,
464) -> Result<super::HirRelationExpr, PlanError> {
465 query::plan_copy_from_rows(pcx, catalog, target_id, target_name, columns, rows)
466}
467
468impl PartialEq<ObjectType> for CatalogItemType {
475 fn eq(&self, other: &ObjectType) -> bool {
476 match (self, other) {
477 (CatalogItemType::Source, ObjectType::Source)
478 | (CatalogItemType::Table, ObjectType::Table)
479 | (CatalogItemType::Sink, ObjectType::Sink)
480 | (CatalogItemType::View, ObjectType::View)
481 | (CatalogItemType::MaterializedView, ObjectType::MaterializedView)
482 | (CatalogItemType::Index, ObjectType::Index)
483 | (CatalogItemType::Type, ObjectType::Type)
484 | (CatalogItemType::Secret, ObjectType::Secret)
485 | (CatalogItemType::Connection, ObjectType::Connection) => true,
486 (_, _) => false,
487 }
488 }
489}
490
491impl PartialEq<CatalogItemType> for ObjectType {
492 fn eq(&self, other: &CatalogItemType) -> bool {
493 other == self
494 }
495}
496
497#[derive(Debug, Clone)]
499pub struct StatementContext<'a> {
500 pcx: Option<&'a PlanContext>,
505 pub catalog: &'a dyn SessionCatalog,
506 pub param_types: RefCell<BTreeMap<usize, SqlScalarType>>,
509 pub ambiguous_columns: RefCell<bool>,
512}
513
514impl<'a> StatementContext<'a> {
515 pub fn new(
516 pcx: Option<&'a PlanContext>,
517 catalog: &'a dyn SessionCatalog,
518 ) -> StatementContext<'a> {
519 StatementContext {
520 pcx,
521 catalog,
522 param_types: Default::default(),
523 ambiguous_columns: RefCell::new(false),
524 }
525 }
526
527 pub fn current_schemas(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
529 self.catalog.search_path()
530 }
531
532 pub fn current_schema(&self) -> Option<&(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
535 self.current_schemas().into_iter().next()
536 }
537
538 pub fn pcx(&self) -> Result<&PlanContext, PlanError> {
539 self.pcx.ok_or_else(|| sql_err!("no plan context"))
540 }
541
542 pub fn allocate_full_name(&self, name: PartialItemName) -> Result<FullItemName, PlanError> {
543 let (database, schema): (RawDatabaseSpecifier, String) = match (name.database, name.schema)
544 {
545 (None, None) => {
546 let Some((database, schema)) = self.current_schema() else {
547 return Err(PlanError::InvalidSchemaName);
548 };
549 let schema = self.get_schema(database, schema);
550 let database = match schema.database() {
551 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
552 ResolvedDatabaseSpecifier::Id(id) => {
553 RawDatabaseSpecifier::Name(self.catalog.get_database(id).name().to_string())
554 }
555 };
556 (database, schema.name().schema.clone())
557 }
558 (None, Some(schema)) => {
559 if is_system_schema(&schema) {
560 (RawDatabaseSpecifier::Ambient, schema)
561 } else {
562 match self.catalog.active_database_name() {
563 Some(name) => (RawDatabaseSpecifier::Name(name.to_string()), schema),
564 None => {
565 sql_bail!(
566 "no database specified for non-system schema and no active database"
567 )
568 }
569 }
570 }
571 }
572 (Some(_database), None) => {
573 sql_bail!("unreachable: specified the database but no schema")
576 }
577 (Some(database), Some(schema)) => (RawDatabaseSpecifier::Name(database), schema),
578 };
579 let item = name.item;
580 Ok(FullItemName {
581 database,
582 schema,
583 item,
584 })
585 }
586
587 pub fn allocate_qualified_name(
588 &self,
589 name: PartialItemName,
590 ) -> Result<QualifiedItemName, PlanError> {
591 let full_name = self.allocate_full_name(name)?;
592 let database_spec = match full_name.database {
593 RawDatabaseSpecifier::Ambient => ResolvedDatabaseSpecifier::Ambient,
594 RawDatabaseSpecifier::Name(name) => ResolvedDatabaseSpecifier::Id(
595 self.resolve_database(&UnresolvedDatabaseName(Ident::new(name)?))?
596 .id(),
597 ),
598 };
599 let schema_spec = self
600 .resolve_schema_in_database(&database_spec, &Ident::new(full_name.schema)?)?
601 .id()
602 .clone();
603 Ok(QualifiedItemName {
604 qualifiers: ItemQualifiers {
605 database_spec,
606 schema_spec,
607 },
608 item: full_name.item,
609 })
610 }
611
612 pub fn allocate_temporary_full_name(&self, name: PartialItemName) -> FullItemName {
613 FullItemName {
614 database: RawDatabaseSpecifier::Ambient,
615 schema: name
616 .schema
617 .unwrap_or_else(|| mz_repr::namespaces::MZ_TEMP_SCHEMA.to_owned()),
618 item: name.item,
619 }
620 }
621
622 pub fn allocate_temporary_qualified_name(
623 &self,
624 name: PartialItemName,
625 ) -> Result<QualifiedItemName, PlanError> {
626 if let Some(schema_name) = name.schema {
631 if schema_name != mz_repr::namespaces::MZ_TEMP_SCHEMA {
632 return Err(PlanError::InvalidTemporarySchema);
633 }
634 }
635
636 Ok(QualifiedItemName {
637 qualifiers: ItemQualifiers {
638 database_spec: ResolvedDatabaseSpecifier::Ambient,
639 schema_spec: SchemaSpecifier::Temporary,
640 },
641 item: name.item,
642 })
643 }
644
645 pub fn allocate_resolved_item_name(
648 &self,
649 id: CatalogItemId,
650 name: UnresolvedItemName,
651 ) -> Result<ResolvedItemName, PlanError> {
652 let partial = normalize::unresolved_item_name(name)?;
653 let qualified = self.allocate_qualified_name(partial.clone())?;
654 let full_name = self.allocate_full_name(partial)?;
655 Ok(ResolvedItemName::Item {
656 id,
657 qualifiers: qualified.qualifiers,
658 full_name,
659 print_id: true,
660 version: RelationVersionSelector::Latest,
661 })
662 }
663
664 pub fn active_database(&self) -> Option<&DatabaseId> {
665 self.catalog.active_database()
666 }
667
668 pub fn resolve_optional_schema(
669 &self,
670 schema_name: &Option<ResolvedSchemaName>,
671 ) -> Result<SchemaSpecifier, PlanError> {
672 match schema_name {
673 Some(ResolvedSchemaName::Schema { schema_spec, .. }) => Ok(schema_spec.clone()),
674 None => self.resolve_active_schema().map(|spec| spec.clone()),
675 Some(ResolvedSchemaName::Error) => {
676 unreachable!("should have been handled by name resolution")
677 }
678 }
679 }
680
681 pub fn resolve_active_schema(&self) -> Result<&SchemaSpecifier, PlanError> {
682 match self.current_schema() {
683 Some((_db, schema)) => Ok(schema),
684 None => Err(PlanError::InvalidSchemaName),
685 }
686 }
687
688 pub fn get_cluster(&self, id: &ClusterId) -> &dyn CatalogCluster<'_> {
689 self.catalog.get_cluster(*id)
690 }
691
692 pub fn resolve_database(
693 &self,
694 name: &UnresolvedDatabaseName,
695 ) -> Result<&dyn CatalogDatabase, PlanError> {
696 let name = normalize::ident_ref(&name.0);
697 Ok(self.catalog.resolve_database(name)?)
698 }
699
700 pub fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase {
701 self.catalog.get_database(id)
702 }
703
704 pub fn resolve_schema_in_database(
705 &self,
706 database_spec: &ResolvedDatabaseSpecifier,
707 schema: &Ident,
708 ) -> Result<&dyn CatalogSchema, PlanError> {
709 let schema = normalize::ident_ref(schema);
710 Ok(self
711 .catalog
712 .resolve_schema_in_database(database_spec, schema)?)
713 }
714
715 pub fn resolve_schema(
716 &self,
717 name: UnresolvedSchemaName,
718 ) -> Result<&dyn CatalogSchema, PlanError> {
719 let name = normalize::unresolved_schema_name(name)?;
720 Ok(self
721 .catalog
722 .resolve_schema(name.database.as_deref(), &name.schema)?)
723 }
724
725 pub fn get_schema(
726 &self,
727 database_spec: &ResolvedDatabaseSpecifier,
728 schema_spec: &SchemaSpecifier,
729 ) -> &dyn CatalogSchema {
730 self.catalog.get_schema(database_spec, schema_spec)
731 }
732
733 pub fn resolve_item(&self, name: RawItemName) -> Result<&dyn CatalogItem, PlanError> {
734 match name {
735 RawItemName::Name(name) => {
736 let name = normalize::unresolved_item_name(name)?;
737 Ok(self.catalog.resolve_item(&name)?)
738 }
739 RawItemName::Id(id, _, _) => {
740 let gid = id.parse()?;
741 Ok(self.catalog.get_item(&gid))
742 }
743 }
744 }
745
746 pub fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem {
747 self.catalog.get_item(id)
748 }
749
750 pub fn get_item_by_resolved_name(
751 &self,
752 name: &ResolvedItemName,
753 ) -> Result<Box<dyn CatalogCollectionItem + '_>, PlanError> {
754 match name {
755 ResolvedItemName::Item { id, version, .. } => {
756 Ok(self.get_item(id).at_version(*version))
757 }
758 ResolvedItemName::Cte { .. } => sql_bail!("non-user item"),
759 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
760 }
761 }
762
763 pub fn get_column_by_resolved_name(
764 &self,
765 name: &ColumnName<Aug>,
766 ) -> Result<(Box<dyn CatalogCollectionItem + '_>, usize), PlanError> {
767 match (&name.relation, &name.column) {
768 (
769 ResolvedItemName::Item { id, version, .. },
770 ResolvedColumnReference::Column { index, .. },
771 ) => {
772 let item = self.get_item(id).at_version(*version);
773 Ok((item, *index))
774 }
775 _ => unreachable!(
776 "get_column_by_resolved_name errors should have been caught in name resolution"
777 ),
778 }
779 }
780
781 pub fn resolve_function(
782 &self,
783 name: UnresolvedItemName,
784 ) -> Result<&dyn CatalogItem, PlanError> {
785 let name = normalize::unresolved_item_name(name)?;
786 Ok(self.catalog.resolve_function(&name)?)
787 }
788
789 pub fn resolve_cluster(
790 &self,
791 name: Option<&Ident>,
792 ) -> Result<&dyn CatalogCluster<'_>, PlanError> {
793 let name = name.map(|name| name.as_str());
794 Ok(self.catalog.resolve_cluster(name)?)
795 }
796
797 pub fn resolve_type(&self, mut ty: mz_pgrepr::Type) -> Result<ResolvedDataType, PlanError> {
798 match &mut ty {
804 mz_pgrepr::Type::Interval { constraints } => *constraints = None,
805 mz_pgrepr::Type::Time { precision } => *precision = None,
806 mz_pgrepr::Type::TimeTz { precision } => *precision = None,
807 mz_pgrepr::Type::Timestamp { precision } => *precision = None,
808 mz_pgrepr::Type::TimestampTz { precision } => *precision = None,
809 _ => (),
810 }
811 let mut ty = if ty.oid() >= FIRST_USER_OID {
817 sql_bail!("internal error, unexpected user type: {ty:?} ");
818 } else if ty.oid() < FIRST_MATERIALIZE_OID {
819 format!("pg_catalog.{}", ty)
820 } else {
821 format!("mz_catalog.{}", ty)
823 };
824 if ty == "pg_catalog.json" {
828 ty = "pg_catalog.jsonb".into();
829 }
830 let data_type = mz_sql_parser::parser::parse_data_type(&ty)?;
831 let (data_type, _) = names::resolve(self.catalog, data_type)?;
832 Ok(data_type)
833 }
834
835 pub fn get_object_type(&self, id: &ObjectId) -> ObjectType {
836 self.catalog.get_object_type(id)
837 }
838
839 pub fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType {
840 match id {
841 SystemObjectId::Object(id) => SystemObjectType::Object(self.get_object_type(id)),
842 SystemObjectId::System => SystemObjectType::System,
843 }
844 }
845
846 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
848 flag.require(self.catalog.system_vars())?;
849 Ok(())
850 }
851
852 pub fn is_feature_flag_enabled(&self, flag: &'static FeatureFlag) -> bool {
854 self.require_feature_flag(flag).is_ok()
855 }
856
857 pub fn finalize_param_types(self) -> Result<Vec<SqlScalarType>, PlanError> {
858 let param_types = self.param_types.into_inner();
859 let mut out = vec![];
860 for (i, (n, typ)) in param_types.into_iter().enumerate() {
861 if n != i + 1 {
862 sql_bail!("unable to infer type for parameter ${}", i + 1);
863 }
864 out.push(typ);
865 }
866 Ok(out)
867 }
868
869 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
872 self.catalog.humanize_sql_scalar_type(typ, postgres_compat)
873 }
874
875 pub fn humanize_column_type(&self, typ: &SqlColumnType, postgres_compat: bool) -> String {
878 self.catalog.humanize_sql_column_type(typ, postgres_compat)
879 }
880
881 pub fn relation_desc_into_table_defs(
882 &self,
883 desc: &RelationDesc,
884 ) -> Result<(Vec<ColumnDef<Aug>>, Vec<TableConstraint<Aug>>), PlanError> {
885 let mut columns = vec![];
886 let mut null_cols = BTreeSet::new();
887 for (column_name, column_type) in desc.iter() {
888 let name = Ident::new(column_name.as_str().to_owned())?;
889
890 let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
891 let data_type = self.resolve_type(ty)?;
892
893 let options = if !column_type.nullable {
894 null_cols.insert(columns.len());
895 vec![mz_sql_parser::ast::ColumnOptionDef {
896 name: None,
897 option: mz_sql_parser::ast::ColumnOption::NotNull,
898 }]
899 } else {
900 vec![]
901 };
902
903 columns.push(ColumnDef {
904 name,
905 data_type,
906 collation: None,
907 options,
908 });
909 }
910
911 let mut table_constraints = vec![];
912 for key in desc.typ().keys.iter() {
913 let mut col_names = vec![];
914 for col_idx in key {
915 if !null_cols.contains(col_idx) {
916 sql_bail!(
919 "[internal error] key columns must be NOT NULL when generating table constraints"
920 );
921 }
922 col_names.push(columns[*col_idx].name.clone());
923 }
924 table_constraints.push(TableConstraint::Unique {
925 name: None,
926 columns: col_names,
927 is_primary: false,
928 nulls_not_distinct: false,
929 });
930 }
931
932 Ok((columns, table_constraints))
933 }
934
935 pub fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
936 self.catalog.get_owner_id(id)
937 }
938
939 pub fn humanize_resolved_name(
940 &self,
941 name: &ResolvedItemName,
942 ) -> Result<PartialItemName, PlanError> {
943 let item = self.get_item_by_resolved_name(name)?;
944 Ok(self.catalog.minimal_qualification(item.name()))
945 }
946
947 pub fn dangerous_resolve_name(&self, name: Vec<&str>) -> ResolvedItemName {
950 tracing::trace!("dangerous_resolve_name {:?}", name);
951 let name: Vec<_> = name.into_iter().map(Ident::new_unchecked).collect();
953 let name = UnresolvedItemName::qualified(&name);
954 let entry = match self.resolve_item(RawItemName::Name(name.clone())) {
955 Ok(entry) => entry,
956 Err(_) => self
957 .resolve_function(name.clone())
958 .expect("name referred to an existing object"),
959 };
960
961 let partial = normalize::unresolved_item_name(name).unwrap();
962 let full_name = self.allocate_full_name(partial).unwrap();
963
964 ResolvedItemName::Item {
965 id: entry.id(),
966 qualifiers: entry.name().qualifiers.clone(),
967 full_name,
968 print_id: true,
969 version: RelationVersionSelector::Latest,
970 }
971 }
972}
973
974pub fn resolve_cluster_for_materialized_view<'a>(
975 catalog: &'a dyn SessionCatalog,
976 stmt: &CreateMaterializedViewStatement<Aug>,
977) -> Result<ClusterId, PlanError> {
978 Ok(match &stmt.in_cluster {
979 None => catalog.resolve_cluster(None)?.id(),
980 Some(in_cluster) => in_cluster.id,
981 })
982}
983
984#[derive(Debug, Clone, Copy)]
986pub enum StatementClassification {
987 ACL,
988 DDL,
989 DML,
990 Other,
991 SCL,
992 Show,
993 TCL,
994}
995
996impl StatementClassification {
997 pub fn is_ddl(&self) -> bool {
998 matches!(self, StatementClassification::DDL)
999 }
1000}
1001
1002impl<T: mz_sql_parser::ast::AstInfo> From<&Statement<T>> for StatementClassification {
1003 fn from(value: &Statement<T>) -> Self {
1004 use StatementClassification::*;
1005
1006 match value {
1007 Statement::AlterCluster(_) => DDL,
1009 Statement::AlterConnection(_) => DDL,
1010 Statement::AlterIndex(_) => DDL,
1011 Statement::AlterMaterializedViewApplyReplacement(_) => DDL,
1012 Statement::AlterObjectRename(_) => DDL,
1013 Statement::AlterObjectSwap(_) => DDL,
1014 Statement::AlterNetworkPolicy(_) => DDL,
1015 Statement::AlterRetainHistory(_) => DDL,
1016 Statement::AlterRole(_) => DDL,
1017 Statement::AlterSecret(_) => DDL,
1018 Statement::AlterSetCluster(_) => DDL,
1019 Statement::AlterSink(_) => DDL,
1020 Statement::AlterSource(_) => DDL,
1021 Statement::AlterSystemSet(_) => DDL,
1022 Statement::AlterSystemReset(_) => DDL,
1023 Statement::AlterSystemResetAll(_) => DDL,
1024 Statement::AlterTableAddColumn(_) => DDL,
1025 Statement::Comment(_) => DDL,
1026 Statement::CreateCluster(_) => DDL,
1027 Statement::CreateClusterReplica(_) => DDL,
1028 Statement::CreateConnection(_) => DDL,
1029 Statement::CreateDatabase(_) => DDL,
1030 Statement::CreateIndex(_) => DDL,
1031 Statement::CreateRole(_) => DDL,
1032 Statement::CreateSchema(_) => DDL,
1033 Statement::CreateSecret(_) => DDL,
1034 Statement::CreateSink(_) => DDL,
1035 Statement::CreateWebhookSource(_) => DDL,
1036 Statement::CreateSource(_) => DDL,
1037 Statement::CreateSubsource(_) => DDL,
1038 Statement::CreateTable(_) => DDL,
1039 Statement::CreateTableFromSource(_) => DDL,
1040 Statement::CreateType(_) => DDL,
1041 Statement::CreateView(_) => DDL,
1042 Statement::CreateMaterializedView(_) => DDL,
1043 Statement::CreateNetworkPolicy(_) => DDL,
1044 Statement::DropObjects(_) => DDL,
1045 Statement::DropOwned(_) => DDL,
1046
1047 Statement::AlterOwner(_) => ACL,
1049 Statement::GrantRole(_) => ACL,
1050 Statement::RevokeRole(_) => ACL,
1051 Statement::GrantPrivileges(_) => ACL,
1052 Statement::RevokePrivileges(_) => ACL,
1053 Statement::AlterDefaultPrivileges(_) => ACL,
1054 Statement::ReassignOwned(_) => ACL,
1055
1056 Statement::Copy(_) => DML,
1058 Statement::Delete(_) => DML,
1059 Statement::ExplainPlan(_) => DML,
1060 Statement::ExplainPushdown(_) => DML,
1061 Statement::ExplainAnalyzeObject(_) => DML,
1062 Statement::ExplainAnalyzeCluster(_) => DML,
1063 Statement::ExplainTimestamp(_) => DML,
1064 Statement::ExplainSinkSchema(_) => DML,
1065 Statement::Insert(_) => DML,
1066 Statement::Select(_) => DML,
1067 Statement::Subscribe(_) => DML,
1068 Statement::Update(_) => DML,
1069
1070 Statement::Show(ShowStatement::ShowColumns(_)) => Show,
1072 Statement::Show(ShowStatement::ShowCreateConnection(_)) => Show,
1073 Statement::Show(ShowStatement::ShowCreateCluster(_)) => Show,
1074 Statement::Show(ShowStatement::ShowCreateIndex(_)) => Show,
1075 Statement::Show(ShowStatement::ShowCreateSink(_)) => Show,
1076 Statement::Show(ShowStatement::ShowCreateSource(_)) => Show,
1077 Statement::Show(ShowStatement::ShowCreateTable(_)) => Show,
1078 Statement::Show(ShowStatement::ShowCreateView(_)) => Show,
1079 Statement::Show(ShowStatement::ShowCreateMaterializedView(_)) => Show,
1080 Statement::Show(ShowStatement::ShowCreateType(_)) => Show,
1081 Statement::Show(ShowStatement::ShowObjects(_)) => Show,
1082
1083 Statement::Close(_) => SCL,
1085 Statement::Deallocate(_) => SCL,
1086 Statement::Declare(_) => SCL,
1087 Statement::Discard(_) => SCL,
1088 Statement::Execute(_) => SCL,
1089 Statement::Fetch(_) => SCL,
1090 Statement::Prepare(_) => SCL,
1091 Statement::ResetVariable(_) => SCL,
1092 Statement::SetVariable(_) => SCL,
1093 Statement::Show(ShowStatement::ShowVariable(_)) => SCL,
1094
1095 Statement::Commit(_) => TCL,
1097 Statement::Rollback(_) => TCL,
1098 Statement::SetTransaction(_) => TCL,
1099 Statement::StartTransaction(_) => TCL,
1100
1101 Statement::Raise(_) => Other,
1103 Statement::Show(ShowStatement::InspectShard(_)) => Other,
1104 Statement::ValidateConnection(_) => Other,
1105 }
1106 }
1107}