1use std::cell::RefCell;
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::{Arc, Mutex};
17
18use mz_repr::namespaces::is_system_schema;
19use mz_repr::{
20 CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType,
21};
22use mz_sql_parser::ast::{
23 ColumnDef, ColumnName, CreateMaterializedViewStatement, RawItemName, ShowStatement,
24 StatementKind, TableConstraint, UnresolvedDatabaseName, UnresolvedSchemaName,
25};
26use mz_storage_types::connections::Connection;
27
28use crate::ast::{Ident, Statement, UnresolvedItemName};
29use crate::catalog::{
30 CatalogCluster, CatalogCollectionItem, CatalogDatabase, CatalogItem, CatalogItemType,
31 CatalogSchema, ObjectType, SessionCatalog, SystemObjectType,
32};
33use crate::names::{
34 self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
35 QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
36 ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
37 SystemObjectId,
38};
39use crate::normalize;
40use crate::plan::error::PlanError;
41use crate::plan::{Params, Plan, PlanContext, PlanKind, query};
42use crate::session::vars::FeatureFlag;
43
44mod acl;
45pub(crate) mod ddl;
46mod dml;
47mod raise;
48mod scl;
49pub(crate) mod show;
50mod tcl;
51mod validate;
52
53use crate::session::vars;
54pub(crate) use ddl::PgConfigOptionExtracted;
55use mz_controller_types::ClusterId;
56use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
57use mz_repr::role_id::RoleId;
58
59#[derive(Debug, Clone, Eq, PartialEq)]
61pub struct StatementDesc {
62 pub relation_desc: Option<RelationDesc>,
65 pub param_types: Vec<SqlScalarType>,
67 pub is_copy: bool,
69}
70
71impl StatementDesc {
72 pub fn new(relation_desc: Option<RelationDesc>) -> Self {
73 StatementDesc {
74 relation_desc,
75 param_types: vec![],
76 is_copy: false,
77 }
78 }
79
80 pub fn arity(&self) -> usize {
83 self.relation_desc
84 .as_ref()
85 .map(|desc| desc.typ().column_types.len())
86 .unwrap_or(0)
87 }
88
89 fn with_params(mut self, param_types: Vec<SqlScalarType>) -> Self {
90 self.param_types = param_types;
91 self
92 }
93
94 fn with_is_copy(mut self) -> Self {
95 self.is_copy = true;
96 self
97 }
98}
99
100pub fn describe(
104 pcx: &PlanContext,
105 catalog: &dyn SessionCatalog,
106 stmt: Statement<Aug>,
107 param_types_in: &[Option<SqlScalarType>],
108) -> Result<StatementDesc, PlanError> {
109 let mut param_types = BTreeMap::new();
110 for (i, ty) in param_types_in.iter().enumerate() {
111 if let Some(ty) = ty {
112 param_types.insert(i + 1, ty.clone());
113 }
114 }
115
116 let scx = StatementContext {
117 pcx: Some(pcx),
118 catalog,
119 param_types: RefCell::new(param_types),
120 ambiguous_columns: RefCell::new(false),
121 sql_impl_resolved_ids: Arc::new(Mutex::new(ResolvedIds::empty())),
122 };
123
124 let desc = match stmt {
125 Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?,
127 Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?,
128 Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?,
129 Statement::AlterMaterializedViewApplyReplacement(stmt) => {
130 ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)?
131 }
132 Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?,
133 Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?,
134 Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?,
135 Statement::AlterRole(stmt) => ddl::describe_alter_role(&scx, stmt)?,
136 Statement::AlterSecret(stmt) => ddl::describe_alter_secret_options(&scx, stmt)?,
137 Statement::AlterSetCluster(stmt) => ddl::describe_alter_set_cluster(&scx, stmt)?,
138 Statement::AlterSink(stmt) => ddl::describe_alter_sink(&scx, stmt)?,
139 Statement::AlterSource(stmt) => ddl::describe_alter_source(&scx, stmt)?,
140 Statement::AlterSystemSet(stmt) => ddl::describe_alter_system_set(&scx, stmt)?,
141 Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?,
142 Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?,
143 Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?,
144 Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?,
145 Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?,
146 Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?,
147 Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?,
148 Statement::CreateConnection(stmt) => ddl::describe_create_connection(&scx, stmt)?,
149 Statement::CreateDatabase(stmt) => ddl::describe_create_database(&scx, stmt)?,
150 Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
151 Statement::CreateRole(stmt) => ddl::describe_create_role(&scx, stmt)?,
152 Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
153 Statement::CreateSecret(stmt) => ddl::describe_create_secret(&scx, stmt)?,
154 Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
155 Statement::CreateWebhookSource(stmt) => ddl::describe_create_webhook_source(&scx, stmt)?,
156 Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
157 Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
158 Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
159 Statement::CreateTableFromSource(stmt) => {
160 ddl::describe_create_table_from_source(&scx, stmt)?
161 }
162 Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
163 Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
164 Statement::CreateMaterializedView(stmt) => {
165 ddl::describe_create_materialized_view(&scx, stmt)?
166 }
167 Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?,
168 Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?,
169 Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?,
170
171 Statement::AlterOwner(stmt) => acl::describe_alter_owner(&scx, stmt)?,
173 Statement::GrantRole(stmt) => acl::describe_grant_role(&scx, stmt)?,
174 Statement::RevokeRole(stmt) => acl::describe_revoke_role(&scx, stmt)?,
175 Statement::GrantPrivileges(stmt) => acl::describe_grant_privileges(&scx, stmt)?,
176 Statement::RevokePrivileges(stmt) => acl::describe_revoke_privileges(&scx, stmt)?,
177 Statement::AlterDefaultPrivileges(stmt) => {
178 acl::describe_alter_default_privileges(&scx, stmt)?
179 }
180 Statement::ReassignOwned(stmt) => acl::describe_reassign_owned(&scx, stmt)?,
181
182 Statement::Show(ShowStatement::ShowColumns(stmt)) => {
184 show::show_columns(&scx, stmt)?.describe()?
185 }
186 Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
187 show::describe_show_create_connection(&scx, stmt)?
188 }
189 Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
190 show::describe_show_create_cluster(&scx, stmt)?
191 }
192 Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
193 show::describe_show_create_index(&scx, stmt)?
194 }
195 Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
196 show::describe_show_create_sink(&scx, stmt)?
197 }
198 Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
199 show::describe_show_create_source(&scx, stmt)?
200 }
201 Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
202 show::describe_show_create_table(&scx, stmt)?
203 }
204 Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
205 show::describe_show_create_view(&scx, stmt)?
206 }
207 Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
208 show::describe_show_create_materialized_view(&scx, stmt)?
209 }
210 Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
211 show::describe_show_create_type(&scx, stmt)?
212 }
213 Statement::Show(ShowStatement::ShowObjects(stmt)) => {
214 show::show_objects(&scx, stmt)?.describe()?
215 }
216
217 Statement::Close(stmt) => scl::describe_close(&scx, stmt)?,
219 Statement::Deallocate(stmt) => scl::describe_deallocate(&scx, stmt)?,
220 Statement::Declare(stmt) => scl::describe_declare(&scx, stmt, param_types_in)?,
221 Statement::Discard(stmt) => scl::describe_discard(&scx, stmt)?,
222 Statement::Execute(stmt) => scl::describe_execute(&scx, stmt)?,
223 Statement::Fetch(stmt) => scl::describe_fetch(&scx, stmt)?,
224 Statement::Prepare(stmt) => scl::describe_prepare(&scx, stmt)?,
225 Statement::ResetVariable(stmt) => scl::describe_reset_variable(&scx, stmt)?,
226 Statement::SetVariable(stmt) => scl::describe_set_variable(&scx, stmt)?,
227 Statement::Show(ShowStatement::ShowVariable(stmt)) => {
228 scl::describe_show_variable(&scx, stmt)?
229 }
230
231 Statement::Copy(stmt) => dml::describe_copy(&scx, stmt)?,
233 Statement::Delete(stmt) => dml::describe_delete(&scx, stmt)?,
234 Statement::ExplainPlan(stmt) => dml::describe_explain_plan(&scx, stmt)?,
235 Statement::ExplainPushdown(stmt) => dml::describe_explain_pushdown(&scx, stmt)?,
236 Statement::ExplainAnalyzeObject(stmt) => dml::describe_explain_analyze_object(&scx, stmt)?,
237 Statement::ExplainAnalyzeCluster(stmt) => {
238 dml::describe_explain_analyze_cluster(&scx, stmt)?
239 }
240 Statement::ExplainTimestamp(stmt) => dml::describe_explain_timestamp(&scx, stmt)?,
241 Statement::ExplainSinkSchema(stmt) => dml::describe_explain_schema(&scx, stmt)?,
242 Statement::Insert(stmt) => dml::describe_insert(&scx, stmt)?,
243 Statement::Select(stmt) => dml::describe_select(&scx, stmt)?,
244 Statement::Subscribe(stmt) => dml::describe_subscribe(&scx, stmt)?,
245 Statement::Update(stmt) => dml::describe_update(&scx, stmt)?,
246
247 Statement::Commit(stmt) => tcl::describe_commit(&scx, stmt)?,
249 Statement::Rollback(stmt) => tcl::describe_rollback(&scx, stmt)?,
250 Statement::SetTransaction(stmt) => tcl::describe_set_transaction(&scx, stmt)?,
251 Statement::StartTransaction(stmt) => tcl::describe_start_transaction(&scx, stmt)?,
252
253 Statement::Raise(stmt) => raise::describe_raise(&scx, stmt)?,
255 Statement::Show(ShowStatement::InspectShard(stmt)) => {
256 scl::describe_inspect_shard(&scx, stmt)?
257 }
258 Statement::ValidateConnection(stmt) => validate::describe_validate_connection(&scx, stmt)?,
259 Statement::ExecuteUnitTest(_) => {
260 return Err(PlanError::Unsupported {
261 feature: "EXECUTE UNIT TEST statement".to_string(),
262 discussion_no: None,
263 });
264 }
265 };
266
267 let desc = desc.with_params(scx.finalize_param_types()?);
268 Ok(desc)
269}
270
271#[mz_ore::instrument(level = "debug")]
288pub fn plan(
289 pcx: Option<&PlanContext>,
290 catalog: &dyn SessionCatalog,
291 stmt: Statement<Aug>,
292 params: &Params,
293 resolved_ids: &ResolvedIds,
294) -> Result<(Plan, ResolvedIds), PlanError> {
295 let param_types = params
296 .expected_types
300 .iter()
301 .enumerate()
302 .map(|(i, ty)| (i + 1, ty.clone()))
303 .collect();
304
305 let kind: StatementKind = (&stmt).into();
306 let permitted_plans = Plan::generated_from(&kind);
307
308 let scx = &mut StatementContext {
309 pcx,
310 catalog,
311 param_types: RefCell::new(param_types),
312 ambiguous_columns: RefCell::new(false),
313 sql_impl_resolved_ids: Arc::new(Mutex::new(ResolvedIds::empty())),
314 };
315
316 if resolved_ids
317 .items()
318 .filter_map(|id| catalog.try_get_item(id))
320 .any(|item| {
321 item.func().is_ok()
322 && item.name().qualifiers.schema_spec
323 == SchemaSpecifier::Id(catalog.get_mz_unsafe_schema_id())
324 })
325 {
326 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNSAFE_FUNCTIONS)?;
327 }
328
329 let plan = match stmt {
330 Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt),
332 Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt),
333 Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt),
334 Statement::AlterMaterializedViewApplyReplacement(stmt) => {
335 ddl::plan_alter_materialized_view_apply_replacement(scx, stmt)
336 }
337 Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt),
338 Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt),
339 Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt),
340 Statement::AlterRole(stmt) => ddl::plan_alter_role(scx, stmt),
341 Statement::AlterSecret(stmt) => ddl::plan_alter_secret(scx, stmt),
342 Statement::AlterSetCluster(stmt) => ddl::plan_alter_item_set_cluster(scx, stmt),
343 Statement::AlterSink(stmt) => ddl::plan_alter_sink(scx, stmt),
344 Statement::AlterSource(stmt) => ddl::plan_alter_source(scx, stmt),
345 Statement::AlterSystemSet(stmt) => ddl::plan_alter_system_set(scx, stmt),
346 Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt),
347 Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt),
348 Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt),
349 Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt),
350 Statement::Comment(stmt) => ddl::plan_comment(scx, stmt),
351 Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt),
352 Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt),
353 Statement::CreateConnection(stmt) => ddl::plan_create_connection(scx, stmt),
354 Statement::CreateDatabase(stmt) => ddl::plan_create_database(scx, stmt),
355 Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
356 Statement::CreateRole(stmt) => ddl::plan_create_role(scx, stmt),
357 Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
358 Statement::CreateSecret(stmt) => ddl::plan_create_secret(scx, stmt),
359 Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
360 Statement::CreateWebhookSource(stmt) => ddl::plan_create_webhook_source(scx, stmt),
361 Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
362 Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
363 Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
364 Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
365 Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
366 Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt),
367 Statement::CreateMaterializedView(stmt) => ddl::plan_create_materialized_view(scx, stmt),
368 Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt),
369 Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt),
370 Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt),
371
372 Statement::AlterOwner(stmt) => acl::plan_alter_owner(scx, stmt),
374 Statement::GrantRole(stmt) => acl::plan_grant_role(scx, stmt),
375 Statement::RevokeRole(stmt) => acl::plan_revoke_role(scx, stmt),
376 Statement::GrantPrivileges(stmt) => acl::plan_grant_privileges(scx, stmt),
377 Statement::RevokePrivileges(stmt) => acl::plan_revoke_privileges(scx, stmt),
378 Statement::AlterDefaultPrivileges(stmt) => acl::plan_alter_default_privileges(scx, stmt),
379 Statement::ReassignOwned(stmt) => acl::plan_reassign_owned(scx, stmt),
380
381 Statement::Copy(stmt) => dml::plan_copy(scx, stmt),
383 Statement::Delete(stmt) => dml::plan_delete(scx, stmt, params),
384 Statement::ExplainPlan(stmt) => dml::plan_explain_plan(scx, stmt, params),
385 Statement::ExplainPushdown(stmt) => dml::plan_explain_pushdown(scx, stmt, params),
386 Statement::ExplainAnalyzeObject(stmt) => {
387 dml::plan_explain_analyze_object(scx, stmt, params)
388 }
389 Statement::ExplainAnalyzeCluster(stmt) => {
390 dml::plan_explain_analyze_cluster(scx, stmt, params)
391 }
392 Statement::ExplainTimestamp(stmt) => dml::plan_explain_timestamp(scx, stmt),
393 Statement::ExplainSinkSchema(stmt) => dml::plan_explain_schema(scx, stmt),
394 Statement::Insert(stmt) => dml::plan_insert(scx, stmt, params),
395 Statement::Select(stmt) => dml::plan_select(scx, stmt, params, None),
396 Statement::Subscribe(stmt) => dml::plan_subscribe(scx, stmt, params, None),
397 Statement::Update(stmt) => dml::plan_update(scx, stmt, params),
398
399 Statement::Show(ShowStatement::ShowColumns(stmt)) => show::show_columns(scx, stmt)?.plan(),
401 Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
402 show::plan_show_create_connection(scx, stmt).map(Plan::ShowCreate)
403 }
404 Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
405 show::plan_show_create_cluster(scx, stmt).map(Plan::ShowCreate)
406 }
407 Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
408 show::plan_show_create_index(scx, stmt).map(Plan::ShowCreate)
409 }
410 Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
411 show::plan_show_create_sink(scx, stmt).map(Plan::ShowCreate)
412 }
413 Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
414 show::plan_show_create_source(scx, stmt).map(Plan::ShowCreate)
415 }
416 Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
417 show::plan_show_create_table(scx, stmt).map(Plan::ShowCreate)
418 }
419 Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
420 show::plan_show_create_view(scx, stmt).map(Plan::ShowCreate)
421 }
422 Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
423 show::plan_show_create_materialized_view(scx, stmt).map(Plan::ShowCreate)
424 }
425 Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
426 show::plan_show_create_type(scx, stmt).map(Plan::ShowCreate)
427 }
428 Statement::Show(ShowStatement::ShowObjects(stmt)) => show::show_objects(scx, stmt)?.plan(),
429
430 Statement::Close(stmt) => scl::plan_close(scx, stmt),
432 Statement::Deallocate(stmt) => scl::plan_deallocate(scx, stmt),
433 Statement::Declare(stmt) => scl::plan_declare(scx, stmt, params),
434 Statement::Discard(stmt) => scl::plan_discard(scx, stmt),
435 Statement::Execute(stmt) => scl::plan_execute(scx, stmt),
436 Statement::Fetch(stmt) => scl::plan_fetch(scx, stmt),
437 Statement::Prepare(stmt) => scl::plan_prepare(scx, stmt),
438 Statement::ResetVariable(stmt) => scl::plan_reset_variable(scx, stmt),
439 Statement::SetVariable(stmt) => scl::plan_set_variable(scx, stmt),
440 Statement::Show(ShowStatement::ShowVariable(stmt)) => scl::plan_show_variable(scx, stmt),
441
442 Statement::Commit(stmt) => tcl::plan_commit(scx, stmt),
444 Statement::Rollback(stmt) => tcl::plan_rollback(scx, stmt),
445 Statement::SetTransaction(stmt) => tcl::plan_set_transaction(scx, stmt),
446 Statement::StartTransaction(stmt) => tcl::plan_start_transaction(scx, stmt),
447
448 Statement::Raise(stmt) => raise::plan_raise(scx, stmt),
450 Statement::Show(ShowStatement::InspectShard(stmt)) => scl::plan_inspect_shard(scx, stmt),
451 Statement::ValidateConnection(stmt) => validate::plan_validate_connection(scx, stmt),
452 Statement::ExecuteUnitTest(_) => {
453 return Err(PlanError::Unsupported {
454 feature: "EXECUTE UNIT TEST statement".to_string(),
455 discussion_no: None,
456 });
457 }
458 };
459
460 if let Ok(plan) = &plan {
461 mz_ore::soft_assert_no_log!(
462 permitted_plans.contains(&PlanKind::from(plan)),
463 "plan {:?}, permitted plans {:?}",
464 plan,
465 permitted_plans
466 );
467 }
468
469 let sql_impl_ids = scx
475 .sql_impl_resolved_ids
476 .lock()
477 .expect("planning is single-threaded")
478 .clone();
479 plan.map(|p| (p, sql_impl_ids))
480}
481
482pub fn plan_copy_from(
483 pcx: &PlanContext,
484 catalog: &dyn SessionCatalog,
485 target_id: CatalogItemId,
486 target_name: String,
487 columns: Vec<ColumnIndex>,
488 rows: Vec<mz_repr::Row>,
489) -> Result<super::HirRelationExpr, PlanError> {
490 query::plan_copy_from_rows(pcx, catalog, target_id, target_name, columns, rows)
491}
492
493impl PartialEq<ObjectType> for CatalogItemType {
500 fn eq(&self, other: &ObjectType) -> bool {
501 match (self, other) {
502 (CatalogItemType::Source, ObjectType::Source)
503 | (CatalogItemType::Table, ObjectType::Table)
504 | (CatalogItemType::Sink, ObjectType::Sink)
505 | (CatalogItemType::View, ObjectType::View)
506 | (CatalogItemType::MaterializedView, ObjectType::MaterializedView)
507 | (CatalogItemType::Index, ObjectType::Index)
508 | (CatalogItemType::Type, ObjectType::Type)
509 | (CatalogItemType::Secret, ObjectType::Secret)
510 | (CatalogItemType::Connection, ObjectType::Connection) => true,
511 (_, _) => false,
512 }
513 }
514}
515
516impl PartialEq<CatalogItemType> for ObjectType {
517 fn eq(&self, other: &CatalogItemType) -> bool {
518 other == self
519 }
520}
521
522#[derive(Debug, Clone)]
524pub struct StatementContext<'a> {
525 pcx: Option<&'a PlanContext>,
530 pub catalog: &'a dyn SessionCatalog,
531 pub param_types: RefCell<BTreeMap<usize, SqlScalarType>>,
534 pub ambiguous_columns: RefCell<bool>,
537 pub sql_impl_resolved_ids: Arc<Mutex<ResolvedIds>>,
547}
548
549impl<'a> StatementContext<'a> {
550 pub fn new(
551 pcx: Option<&'a PlanContext>,
552 catalog: &'a dyn SessionCatalog,
553 ) -> StatementContext<'a> {
554 StatementContext {
555 pcx,
556 catalog,
557 param_types: Default::default(),
558 ambiguous_columns: RefCell::new(false),
559 sql_impl_resolved_ids: Arc::new(Mutex::new(ResolvedIds::empty())),
560 }
561 }
562
563 pub fn current_schemas(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
565 self.catalog.search_path()
566 }
567
568 pub fn current_schema(&self) -> Option<&(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
571 self.current_schemas().into_iter().next()
572 }
573
574 pub fn pcx(&self) -> Result<&PlanContext, PlanError> {
575 self.pcx.ok_or_else(|| sql_err!("no plan context"))
576 }
577
578 pub fn allocate_full_name(&self, name: PartialItemName) -> Result<FullItemName, PlanError> {
579 let (database, schema): (RawDatabaseSpecifier, String) = match (name.database, name.schema)
580 {
581 (None, None) => {
582 let Some((database, schema)) = self.current_schema() else {
583 return Err(PlanError::InvalidSchemaName);
584 };
585 let schema = self.get_schema(database, schema);
586 let database = match schema.database() {
587 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
588 ResolvedDatabaseSpecifier::Id(id) => {
589 RawDatabaseSpecifier::Name(self.catalog.get_database(id).name().to_string())
590 }
591 };
592 (database, schema.name().schema.clone())
593 }
594 (None, Some(schema)) => {
595 if is_system_schema(&schema) {
596 (RawDatabaseSpecifier::Ambient, schema)
597 } else {
598 match self.catalog.active_database_name() {
599 Some(name) => (RawDatabaseSpecifier::Name(name.to_string()), schema),
600 None => {
601 sql_bail!(
602 "no database specified for non-system schema and no active database"
603 )
604 }
605 }
606 }
607 }
608 (Some(_database), None) => {
609 sql_bail!("unreachable: specified the database but no schema")
612 }
613 (Some(database), Some(schema)) => (RawDatabaseSpecifier::Name(database), schema),
614 };
615 let item = name.item;
616 Ok(FullItemName {
617 database,
618 schema,
619 item,
620 })
621 }
622
623 pub fn allocate_qualified_name(
624 &self,
625 name: PartialItemName,
626 ) -> Result<QualifiedItemName, PlanError> {
627 let full_name = self.allocate_full_name(name)?;
628 let database_spec = match full_name.database {
629 RawDatabaseSpecifier::Ambient => ResolvedDatabaseSpecifier::Ambient,
630 RawDatabaseSpecifier::Name(name) => ResolvedDatabaseSpecifier::Id(
631 self.resolve_database(&UnresolvedDatabaseName(Ident::new(name)?))?
632 .id(),
633 ),
634 };
635 let schema_spec = self
636 .resolve_schema_in_database(&database_spec, &Ident::new(full_name.schema)?)?
637 .id()
638 .clone();
639 Ok(QualifiedItemName {
640 qualifiers: ItemQualifiers {
641 database_spec,
642 schema_spec,
643 },
644 item: full_name.item,
645 })
646 }
647
648 pub fn allocate_temporary_full_name(&self, name: PartialItemName) -> FullItemName {
649 FullItemName {
650 database: RawDatabaseSpecifier::Ambient,
651 schema: name
652 .schema
653 .unwrap_or_else(|| mz_repr::namespaces::MZ_TEMP_SCHEMA.to_owned()),
654 item: name.item,
655 }
656 }
657
658 pub fn allocate_temporary_qualified_name(
659 &self,
660 name: PartialItemName,
661 ) -> Result<QualifiedItemName, PlanError> {
662 if let Some(schema_name) = name.schema {
667 if schema_name != mz_repr::namespaces::MZ_TEMP_SCHEMA {
668 return Err(PlanError::InvalidTemporarySchema);
669 }
670 }
671
672 Ok(QualifiedItemName {
673 qualifiers: ItemQualifiers {
674 database_spec: ResolvedDatabaseSpecifier::Ambient,
675 schema_spec: SchemaSpecifier::Temporary,
676 },
677 item: name.item,
678 })
679 }
680
681 pub fn allocate_resolved_item_name(
684 &self,
685 id: CatalogItemId,
686 name: UnresolvedItemName,
687 ) -> Result<ResolvedItemName, PlanError> {
688 let partial = normalize::unresolved_item_name(name)?;
689 let qualified = self.allocate_qualified_name(partial.clone())?;
690 let full_name = self.allocate_full_name(partial)?;
691 Ok(ResolvedItemName::Item {
692 id,
693 qualifiers: qualified.qualifiers,
694 full_name,
695 print_id: true,
696 version: RelationVersionSelector::Latest,
697 })
698 }
699
700 pub fn active_database(&self) -> Option<&DatabaseId> {
701 self.catalog.active_database()
702 }
703
704 pub fn resolve_optional_schema(
705 &self,
706 schema_name: &Option<ResolvedSchemaName>,
707 ) -> Result<SchemaSpecifier, PlanError> {
708 match schema_name {
709 Some(ResolvedSchemaName::Schema { schema_spec, .. }) => Ok(schema_spec.clone()),
710 None => self.resolve_active_schema().map(|spec| spec.clone()),
711 Some(ResolvedSchemaName::Error) => {
712 unreachable!("should have been handled by name resolution")
713 }
714 }
715 }
716
717 pub fn resolve_active_schema(&self) -> Result<&SchemaSpecifier, PlanError> {
718 match self.current_schema() {
719 Some((_db, schema)) => Ok(schema),
720 None => Err(PlanError::InvalidSchemaName),
721 }
722 }
723
724 pub fn get_cluster(&self, id: &ClusterId) -> &dyn CatalogCluster<'_> {
725 self.catalog.get_cluster(*id)
726 }
727
728 pub fn resolve_database(
729 &self,
730 name: &UnresolvedDatabaseName,
731 ) -> Result<&dyn CatalogDatabase, PlanError> {
732 let name = normalize::ident_ref(&name.0);
733 Ok(self.catalog.resolve_database(name)?)
734 }
735
736 pub fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase {
737 self.catalog.get_database(id)
738 }
739
740 pub fn resolve_schema_in_database(
741 &self,
742 database_spec: &ResolvedDatabaseSpecifier,
743 schema: &Ident,
744 ) -> Result<&dyn CatalogSchema, PlanError> {
745 let schema = normalize::ident_ref(schema);
746 Ok(self
747 .catalog
748 .resolve_schema_in_database(database_spec, schema)?)
749 }
750
751 pub fn resolve_schema(
752 &self,
753 name: UnresolvedSchemaName,
754 ) -> Result<&dyn CatalogSchema, PlanError> {
755 let name = normalize::unresolved_schema_name(name)?;
756 Ok(self
757 .catalog
758 .resolve_schema(name.database.as_deref(), &name.schema)?)
759 }
760
761 pub fn get_schema(
762 &self,
763 database_spec: &ResolvedDatabaseSpecifier,
764 schema_spec: &SchemaSpecifier,
765 ) -> &dyn CatalogSchema {
766 self.catalog.get_schema(database_spec, schema_spec)
767 }
768
769 pub fn resolve_item(&self, name: RawItemName) -> Result<&dyn CatalogItem, PlanError> {
770 match name {
771 RawItemName::Name(name) => {
772 let name = normalize::unresolved_item_name(name)?;
773 Ok(self.catalog.resolve_item(&name)?)
774 }
775 RawItemName::Id(id, _, _) => {
776 let gid = id.parse()?;
777 Ok(self.catalog.get_item(&gid))
778 }
779 }
780 }
781
782 pub fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem {
783 self.catalog.get_item(id)
784 }
785
786 pub fn get_item_by_resolved_name(
787 &self,
788 name: &ResolvedItemName,
789 ) -> Result<Box<dyn CatalogCollectionItem + '_>, PlanError> {
790 match name {
791 ResolvedItemName::Item { id, version, .. } => {
792 Ok(self.get_item(id).at_version(*version))
793 }
794 ResolvedItemName::Cte { .. } => sql_bail!("non-user item"),
795 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
796 }
797 }
798
799 pub fn get_column_by_resolved_name(
800 &self,
801 name: &ColumnName<Aug>,
802 ) -> Result<(Box<dyn CatalogCollectionItem + '_>, usize), PlanError> {
803 match (&name.relation, &name.column) {
804 (
805 ResolvedItemName::Item { id, version, .. },
806 ResolvedColumnReference::Column { index, .. },
807 ) => {
808 let item = self.get_item(id).at_version(*version);
809 Ok((item, *index))
810 }
811 _ => unreachable!(
812 "get_column_by_resolved_name errors should have been caught in name resolution"
813 ),
814 }
815 }
816
817 pub fn resolve_function(
818 &self,
819 name: UnresolvedItemName,
820 ) -> Result<&dyn CatalogItem, PlanError> {
821 let name = normalize::unresolved_item_name(name)?;
822 Ok(self.catalog.resolve_function(&name)?)
823 }
824
825 pub fn resolve_cluster(
826 &self,
827 name: Option<&Ident>,
828 ) -> Result<&dyn CatalogCluster<'_>, PlanError> {
829 let name = name.map(|name| name.as_str());
830 Ok(self.catalog.resolve_cluster(name)?)
831 }
832
833 pub fn resolve_type(&self, mut ty: mz_pgrepr::Type) -> Result<ResolvedDataType, PlanError> {
834 match &mut ty {
840 mz_pgrepr::Type::Interval { constraints } => *constraints = None,
841 mz_pgrepr::Type::Time { precision } => *precision = None,
842 mz_pgrepr::Type::TimeTz { precision } => *precision = None,
843 mz_pgrepr::Type::Timestamp { precision } => *precision = None,
844 mz_pgrepr::Type::TimestampTz { precision } => *precision = None,
845 _ => (),
846 }
847 let mut ty = if ty.oid() >= FIRST_USER_OID {
853 sql_bail!("internal error, unexpected user type: {ty:?} ");
854 } else if ty.oid() < FIRST_MATERIALIZE_OID {
855 format!("pg_catalog.{}", ty)
856 } else {
857 format!("mz_catalog.{}", ty)
859 };
860 if ty == "pg_catalog.json" {
864 ty = "pg_catalog.jsonb".into();
865 }
866 let data_type = mz_sql_parser::parser::parse_data_type(&ty)?;
867 let (data_type, _) = names::resolve(self.catalog, data_type)?;
868 Ok(data_type)
869 }
870
871 pub fn get_object_type(&self, id: &ObjectId) -> ObjectType {
872 self.catalog.get_object_type(id)
873 }
874
875 pub fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType {
876 match id {
877 SystemObjectId::Object(id) => SystemObjectType::Object(self.get_object_type(id)),
878 SystemObjectId::System => SystemObjectType::System,
879 }
880 }
881
882 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
884 flag.require(self.catalog.system_vars())?;
885 Ok(())
886 }
887
888 pub fn is_feature_flag_enabled(&self, flag: &'static FeatureFlag) -> bool {
890 self.require_feature_flag(flag).is_ok()
891 }
892
893 pub fn finalize_param_types(self) -> Result<Vec<SqlScalarType>, PlanError> {
894 let param_types = self.param_types.into_inner();
895 let mut out = vec![];
896 for (i, (n, typ)) in param_types.into_iter().enumerate() {
897 if n != i + 1 {
898 sql_bail!("unable to infer type for parameter ${}", i + 1);
899 }
900 out.push(typ);
901 }
902 Ok(out)
903 }
904
905 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
908 self.catalog.humanize_sql_scalar_type(typ, postgres_compat)
909 }
910
911 pub fn humanize_column_type(&self, typ: &SqlColumnType, postgres_compat: bool) -> String {
914 self.catalog.humanize_sql_column_type(typ, postgres_compat)
915 }
916
917 pub fn relation_desc_into_table_defs(
918 &self,
919 desc: &RelationDesc,
920 ) -> Result<(Vec<ColumnDef<Aug>>, Vec<TableConstraint<Aug>>), PlanError> {
921 let mut columns = vec![];
922 let mut null_cols = BTreeSet::new();
923 for (column_name, column_type) in desc.iter() {
924 let name = Ident::new(column_name.as_str().to_owned())?;
925
926 let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
927 let data_type = self.resolve_type(ty)?;
928
929 let options = if !column_type.nullable {
930 null_cols.insert(columns.len());
931 vec![mz_sql_parser::ast::ColumnOptionDef {
932 name: None,
933 option: mz_sql_parser::ast::ColumnOption::NotNull,
934 }]
935 } else {
936 vec![]
937 };
938
939 columns.push(ColumnDef {
940 name,
941 data_type,
942 collation: None,
943 options,
944 });
945 }
946
947 let mut table_constraints = vec![];
948 for key in desc.typ().keys.iter() {
949 let mut col_names = vec![];
950 for col_idx in key {
951 if !null_cols.contains(col_idx) {
952 sql_bail!(
955 "[internal error] key columns must be NOT NULL when generating table constraints"
956 );
957 }
958 col_names.push(columns[*col_idx].name.clone());
959 }
960 table_constraints.push(TableConstraint::Unique {
961 name: None,
962 columns: col_names,
963 is_primary: false,
964 nulls_not_distinct: false,
965 });
966 }
967
968 Ok((columns, table_constraints))
969 }
970
971 pub fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
972 self.catalog.get_owner_id(id)
973 }
974
975 pub fn humanize_resolved_name(
976 &self,
977 name: &ResolvedItemName,
978 ) -> Result<PartialItemName, PlanError> {
979 let item = self.get_item_by_resolved_name(name)?;
980 Ok(self.catalog.minimal_qualification(item.name()))
981 }
982
983 pub fn dangerous_resolve_name(&self, name: Vec<&str>) -> ResolvedItemName {
986 tracing::trace!("dangerous_resolve_name {:?}", name);
987 let name: Vec<_> = name.into_iter().map(Ident::new_unchecked).collect();
989 let name = UnresolvedItemName::qualified(&name);
990 let entry = match self.resolve_item(RawItemName::Name(name.clone())) {
991 Ok(entry) => entry,
992 Err(_) => self
993 .resolve_function(name.clone())
994 .expect("name referred to an existing object"),
995 };
996
997 let partial = normalize::unresolved_item_name(name).unwrap();
998 let full_name = self.allocate_full_name(partial).unwrap();
999
1000 ResolvedItemName::Item {
1001 id: entry.id(),
1002 qualifiers: entry.name().qualifiers.clone(),
1003 full_name,
1004 print_id: true,
1005 version: RelationVersionSelector::Latest,
1006 }
1007 }
1008}
1009
1010pub fn resolve_cluster_for_materialized_view<'a>(
1011 catalog: &'a dyn SessionCatalog,
1012 stmt: &CreateMaterializedViewStatement<Aug>,
1013) -> Result<ClusterId, PlanError> {
1014 Ok(match &stmt.in_cluster {
1015 None => catalog.resolve_cluster(None)?.id(),
1016 Some(in_cluster) => in_cluster.id,
1017 })
1018}
1019
1020#[derive(Debug, Clone, Copy)]
1022pub enum StatementClassification {
1023 ACL,
1024 DDL,
1025 DML,
1026 Other,
1027 SCL,
1028 Show,
1029 TCL,
1030}
1031
1032impl StatementClassification {
1033 pub fn is_ddl(&self) -> bool {
1034 matches!(self, StatementClassification::DDL)
1035 }
1036}
1037
1038impl<T: mz_sql_parser::ast::AstInfo> From<&Statement<T>> for StatementClassification {
1039 fn from(value: &Statement<T>) -> Self {
1040 use StatementClassification::*;
1041
1042 match value {
1043 Statement::AlterCluster(_) => DDL,
1045 Statement::AlterConnection(_) => DDL,
1046 Statement::AlterIndex(_) => DDL,
1047 Statement::AlterMaterializedViewApplyReplacement(_) => DDL,
1048 Statement::AlterObjectRename(_) => DDL,
1049 Statement::AlterObjectSwap(_) => DDL,
1050 Statement::AlterNetworkPolicy(_) => DDL,
1051 Statement::AlterRetainHistory(_) => DDL,
1052 Statement::AlterRole(_) => DDL,
1053 Statement::AlterSecret(_) => DDL,
1054 Statement::AlterSetCluster(_) => DDL,
1055 Statement::AlterSink(_) => DDL,
1056 Statement::AlterSource(_) => DDL,
1057 Statement::AlterSystemSet(_) => DDL,
1058 Statement::AlterSystemReset(_) => DDL,
1059 Statement::AlterSystemResetAll(_) => DDL,
1060 Statement::AlterTableAddColumn(_) => DDL,
1061 Statement::Comment(_) => DDL,
1062 Statement::CreateCluster(_) => DDL,
1063 Statement::CreateClusterReplica(_) => DDL,
1064 Statement::CreateConnection(_) => DDL,
1065 Statement::CreateDatabase(_) => DDL,
1066 Statement::CreateIndex(_) => DDL,
1067 Statement::CreateRole(_) => DDL,
1068 Statement::CreateSchema(_) => DDL,
1069 Statement::CreateSecret(_) => DDL,
1070 Statement::CreateSink(_) => DDL,
1071 Statement::CreateWebhookSource(_) => DDL,
1072 Statement::CreateSource(_) => DDL,
1073 Statement::CreateSubsource(_) => DDL,
1074 Statement::CreateTable(_) => DDL,
1075 Statement::CreateTableFromSource(_) => DDL,
1076 Statement::CreateType(_) => DDL,
1077 Statement::CreateView(_) => DDL,
1078 Statement::CreateMaterializedView(_) => DDL,
1079 Statement::CreateNetworkPolicy(_) => DDL,
1080 Statement::DropObjects(_) => DDL,
1081 Statement::DropOwned(_) => DDL,
1082
1083 Statement::AlterOwner(_) => ACL,
1085 Statement::GrantRole(_) => ACL,
1086 Statement::RevokeRole(_) => ACL,
1087 Statement::GrantPrivileges(_) => ACL,
1088 Statement::RevokePrivileges(_) => ACL,
1089 Statement::AlterDefaultPrivileges(_) => ACL,
1090 Statement::ReassignOwned(_) => ACL,
1091
1092 Statement::Copy(_) => DML,
1094 Statement::Delete(_) => DML,
1095 Statement::ExplainPlan(_) => DML,
1096 Statement::ExplainPushdown(_) => DML,
1097 Statement::ExplainAnalyzeObject(_) => DML,
1098 Statement::ExplainAnalyzeCluster(_) => DML,
1099 Statement::ExplainTimestamp(_) => DML,
1100 Statement::ExplainSinkSchema(_) => DML,
1101 Statement::Insert(_) => DML,
1102 Statement::Select(_) => DML,
1103 Statement::Subscribe(_) => DML,
1104 Statement::Update(_) => DML,
1105
1106 Statement::Show(ShowStatement::ShowColumns(_)) => Show,
1108 Statement::Show(ShowStatement::ShowCreateConnection(_)) => Show,
1109 Statement::Show(ShowStatement::ShowCreateCluster(_)) => Show,
1110 Statement::Show(ShowStatement::ShowCreateIndex(_)) => Show,
1111 Statement::Show(ShowStatement::ShowCreateSink(_)) => Show,
1112 Statement::Show(ShowStatement::ShowCreateSource(_)) => Show,
1113 Statement::Show(ShowStatement::ShowCreateTable(_)) => Show,
1114 Statement::Show(ShowStatement::ShowCreateView(_)) => Show,
1115 Statement::Show(ShowStatement::ShowCreateMaterializedView(_)) => Show,
1116 Statement::Show(ShowStatement::ShowCreateType(_)) => Show,
1117 Statement::Show(ShowStatement::ShowObjects(_)) => Show,
1118
1119 Statement::Close(_) => SCL,
1121 Statement::Deallocate(_) => SCL,
1122 Statement::Declare(_) => SCL,
1123 Statement::Discard(_) => SCL,
1124 Statement::Execute(_) => SCL,
1125 Statement::Fetch(_) => SCL,
1126 Statement::Prepare(_) => SCL,
1127 Statement::ResetVariable(_) => SCL,
1128 Statement::SetVariable(_) => SCL,
1129 Statement::Show(ShowStatement::ShowVariable(_)) => SCL,
1130
1131 Statement::Commit(_) => TCL,
1133 Statement::Rollback(_) => TCL,
1134 Statement::SetTransaction(_) => TCL,
1135 Statement::StartTransaction(_) => TCL,
1136
1137 Statement::Raise(_) => Other,
1139 Statement::Show(ShowStatement::InspectShard(_)) => Other,
1140 Statement::ValidateConnection(_) => Other,
1141 Statement::ExecuteUnitTest(_) => Other,
1142 }
1143 }
1144}