1use std::cell::RefCell;
15use std::collections::{BTreeMap, BTreeSet};
16
17use mz_repr::namespaces::is_system_schema;
18use mz_repr::{
19 CatalogItemId, ColumnIndex, RelationDesc, RelationVersionSelector, SqlColumnType, SqlScalarType,
20};
21use mz_sql_parser::ast::{
22 ColumnDef, ColumnName, ConnectionDefaultAwsPrivatelink, CreateMaterializedViewStatement,
23 RawItemName, ShowStatement, StatementKind, TableConstraint, UnresolvedDatabaseName,
24 UnresolvedSchemaName,
25};
26use mz_storage_types::connections::inline::ReferencedConnection;
27use mz_storage_types::connections::{AwsPrivatelink, Connection, SshTunnel, Tunnel};
28
29use crate::ast::{Ident, Statement, UnresolvedItemName};
30use crate::catalog::{
31 CatalogCluster, CatalogCollectionItem, CatalogDatabase, CatalogItem, CatalogItemType,
32 CatalogSchema, ObjectType, SessionCatalog, SystemObjectType,
33};
34use crate::names::{
35 self, Aug, DatabaseId, FullItemName, ItemQualifiers, ObjectId, PartialItemName,
36 QualifiedItemName, RawDatabaseSpecifier, ResolvedColumnReference, ResolvedDataType,
37 ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, ResolvedSchemaName, SchemaSpecifier,
38 SystemObjectId,
39};
40use crate::normalize;
41use crate::plan::error::PlanError;
42use crate::plan::{Params, Plan, PlanContext, PlanKind, query, with_options};
43use crate::session::vars::FeatureFlag;
44
45mod acl;
46pub(crate) mod ddl;
47mod dml;
48mod raise;
49mod scl;
50pub(crate) mod show;
51mod tcl;
52mod validate;
53
54use crate::session::vars;
55pub(crate) use ddl::PgConfigOptionExtracted;
56use mz_controller_types::ClusterId;
57use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
58use mz_repr::role_id::RoleId;
59
60#[derive(Debug, Clone, Eq, PartialEq)]
62pub struct StatementDesc {
63 pub relation_desc: Option<RelationDesc>,
66 pub param_types: Vec<SqlScalarType>,
68 pub is_copy: bool,
70}
71
72impl StatementDesc {
73 pub fn new(relation_desc: Option<RelationDesc>) -> Self {
74 StatementDesc {
75 relation_desc,
76 param_types: vec![],
77 is_copy: false,
78 }
79 }
80
81 pub fn arity(&self) -> usize {
84 self.relation_desc
85 .as_ref()
86 .map(|desc| desc.typ().column_types.len())
87 .unwrap_or(0)
88 }
89
90 fn with_params(mut self, param_types: Vec<SqlScalarType>) -> Self {
91 self.param_types = param_types;
92 self
93 }
94
95 fn with_is_copy(mut self) -> Self {
96 self.is_copy = true;
97 self
98 }
99}
100
101pub fn describe(
105 pcx: &PlanContext,
106 catalog: &dyn SessionCatalog,
107 stmt: Statement<Aug>,
108 param_types_in: &[Option<SqlScalarType>],
109) -> Result<StatementDesc, PlanError> {
110 let mut param_types = BTreeMap::new();
111 for (i, ty) in param_types_in.iter().enumerate() {
112 if let Some(ty) = ty {
113 param_types.insert(i + 1, ty.clone());
114 }
115 }
116
117 let scx = StatementContext {
118 pcx: Some(pcx),
119 catalog,
120 param_types: RefCell::new(param_types),
121 ambiguous_columns: RefCell::new(false),
122 };
123
124 let desc = match stmt {
125 Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?,
127 Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?,
128 Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?,
129 Statement::AlterMaterializedViewApplyReplacement(stmt) => {
130 ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)?
131 }
132 Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?,
133 Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?,
134 Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?,
135 Statement::AlterRole(stmt) => ddl::describe_alter_role(&scx, stmt)?,
136 Statement::AlterSecret(stmt) => ddl::describe_alter_secret_options(&scx, stmt)?,
137 Statement::AlterSetCluster(stmt) => ddl::describe_alter_set_cluster(&scx, stmt)?,
138 Statement::AlterSink(stmt) => ddl::describe_alter_sink(&scx, stmt)?,
139 Statement::AlterSource(stmt) => ddl::describe_alter_source(&scx, stmt)?,
140 Statement::AlterSystemSet(stmt) => ddl::describe_alter_system_set(&scx, stmt)?,
141 Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?,
142 Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?,
143 Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?,
144 Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?,
145 Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?,
146 Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?,
147 Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?,
148 Statement::CreateConnection(stmt) => ddl::describe_create_connection(&scx, stmt)?,
149 Statement::CreateDatabase(stmt) => ddl::describe_create_database(&scx, stmt)?,
150 Statement::CreateIndex(stmt) => ddl::describe_create_index(&scx, stmt)?,
151 Statement::CreateRole(stmt) => ddl::describe_create_role(&scx, stmt)?,
152 Statement::CreateSchema(stmt) => ddl::describe_create_schema(&scx, stmt)?,
153 Statement::CreateSecret(stmt) => ddl::describe_create_secret(&scx, stmt)?,
154 Statement::CreateSink(stmt) => ddl::describe_create_sink(&scx, stmt)?,
155 Statement::CreateWebhookSource(stmt) => ddl::describe_create_webhook_source(&scx, stmt)?,
156 Statement::CreateSource(stmt) => ddl::describe_create_source(&scx, stmt)?,
157 Statement::CreateSubsource(stmt) => ddl::describe_create_subsource(&scx, stmt)?,
158 Statement::CreateTable(stmt) => ddl::describe_create_table(&scx, stmt)?,
159 Statement::CreateTableFromSource(stmt) => {
160 ddl::describe_create_table_from_source(&scx, stmt)?
161 }
162 Statement::CreateType(stmt) => ddl::describe_create_type(&scx, stmt)?,
163 Statement::CreateView(stmt) => ddl::describe_create_view(&scx, stmt)?,
164 Statement::CreateMaterializedView(stmt) => {
165 ddl::describe_create_materialized_view(&scx, stmt)?
166 }
167 Statement::CreateContinualTask(stmt) => ddl::describe_create_continual_task(&scx, stmt)?,
168 Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?,
169 Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?,
170 Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?,
171
172 Statement::AlterOwner(stmt) => acl::describe_alter_owner(&scx, stmt)?,
174 Statement::GrantRole(stmt) => acl::describe_grant_role(&scx, stmt)?,
175 Statement::RevokeRole(stmt) => acl::describe_revoke_role(&scx, stmt)?,
176 Statement::GrantPrivileges(stmt) => acl::describe_grant_privileges(&scx, stmt)?,
177 Statement::RevokePrivileges(stmt) => acl::describe_revoke_privileges(&scx, stmt)?,
178 Statement::AlterDefaultPrivileges(stmt) => {
179 acl::describe_alter_default_privileges(&scx, stmt)?
180 }
181 Statement::ReassignOwned(stmt) => acl::describe_reassign_owned(&scx, stmt)?,
182
183 Statement::Show(ShowStatement::ShowColumns(stmt)) => {
185 show::show_columns(&scx, stmt)?.describe()?
186 }
187 Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
188 show::describe_show_create_connection(&scx, stmt)?
189 }
190 Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
191 show::describe_show_create_cluster(&scx, stmt)?
192 }
193 Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
194 show::describe_show_create_index(&scx, stmt)?
195 }
196 Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
197 show::describe_show_create_sink(&scx, stmt)?
198 }
199 Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
200 show::describe_show_create_source(&scx, stmt)?
201 }
202 Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
203 show::describe_show_create_table(&scx, stmt)?
204 }
205 Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
206 show::describe_show_create_view(&scx, stmt)?
207 }
208 Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
209 show::describe_show_create_materialized_view(&scx, stmt)?
210 }
211 Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
212 show::describe_show_create_type(&scx, stmt)?
213 }
214 Statement::Show(ShowStatement::ShowObjects(stmt)) => {
215 show::show_objects(&scx, stmt)?.describe()?
216 }
217
218 Statement::Close(stmt) => scl::describe_close(&scx, stmt)?,
220 Statement::Deallocate(stmt) => scl::describe_deallocate(&scx, stmt)?,
221 Statement::Declare(stmt) => scl::describe_declare(&scx, stmt, param_types_in)?,
222 Statement::Discard(stmt) => scl::describe_discard(&scx, stmt)?,
223 Statement::Execute(stmt) => scl::describe_execute(&scx, stmt)?,
224 Statement::Fetch(stmt) => scl::describe_fetch(&scx, stmt)?,
225 Statement::Prepare(stmt) => scl::describe_prepare(&scx, stmt)?,
226 Statement::ResetVariable(stmt) => scl::describe_reset_variable(&scx, stmt)?,
227 Statement::SetVariable(stmt) => scl::describe_set_variable(&scx, stmt)?,
228 Statement::Show(ShowStatement::ShowVariable(stmt)) => {
229 scl::describe_show_variable(&scx, stmt)?
230 }
231
232 Statement::Copy(stmt) => dml::describe_copy(&scx, stmt)?,
234 Statement::Delete(stmt) => dml::describe_delete(&scx, stmt)?,
235 Statement::ExplainPlan(stmt) => dml::describe_explain_plan(&scx, stmt)?,
236 Statement::ExplainPushdown(stmt) => dml::describe_explain_pushdown(&scx, stmt)?,
237 Statement::ExplainAnalyzeObject(stmt) => dml::describe_explain_analyze_object(&scx, stmt)?,
238 Statement::ExplainAnalyzeCluster(stmt) => {
239 dml::describe_explain_analyze_cluster(&scx, stmt)?
240 }
241 Statement::ExplainTimestamp(stmt) => dml::describe_explain_timestamp(&scx, stmt)?,
242 Statement::ExplainSinkSchema(stmt) => dml::describe_explain_schema(&scx, stmt)?,
243 Statement::Insert(stmt) => dml::describe_insert(&scx, stmt)?,
244 Statement::Select(stmt) => dml::describe_select(&scx, stmt)?,
245 Statement::Subscribe(stmt) => dml::describe_subscribe(&scx, stmt)?,
246 Statement::Update(stmt) => dml::describe_update(&scx, stmt)?,
247
248 Statement::Commit(stmt) => tcl::describe_commit(&scx, stmt)?,
250 Statement::Rollback(stmt) => tcl::describe_rollback(&scx, stmt)?,
251 Statement::SetTransaction(stmt) => tcl::describe_set_transaction(&scx, stmt)?,
252 Statement::StartTransaction(stmt) => tcl::describe_start_transaction(&scx, stmt)?,
253
254 Statement::Raise(stmt) => raise::describe_raise(&scx, stmt)?,
256 Statement::Show(ShowStatement::InspectShard(stmt)) => {
257 scl::describe_inspect_shard(&scx, stmt)?
258 }
259 Statement::ValidateConnection(stmt) => validate::describe_validate_connection(&scx, stmt)?,
260 };
261
262 let desc = desc.with_params(scx.finalize_param_types()?);
263 Ok(desc)
264}
265
266#[mz_ore::instrument(level = "debug")]
283pub fn plan(
284 pcx: Option<&PlanContext>,
285 catalog: &dyn SessionCatalog,
286 stmt: Statement<Aug>,
287 params: &Params,
288 resolved_ids: &ResolvedIds,
289) -> Result<Plan, PlanError> {
290 let param_types = params
291 .expected_types
295 .iter()
296 .enumerate()
297 .map(|(i, ty)| (i + 1, ty.clone()))
298 .collect();
299
300 let kind: StatementKind = (&stmt).into();
301 let permitted_plans = Plan::generated_from(&kind);
302
303 let scx = &mut StatementContext {
304 pcx,
305 catalog,
306 param_types: RefCell::new(param_types),
307 ambiguous_columns: RefCell::new(false),
308 };
309
310 if resolved_ids
311 .items()
312 .filter_map(|id| catalog.try_get_item(id))
314 .any(|item| {
315 item.func().is_ok()
316 && item.name().qualifiers.schema_spec
317 == SchemaSpecifier::Id(catalog.get_mz_unsafe_schema_id())
318 })
319 {
320 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNSAFE_FUNCTIONS)?;
321 }
322
323 let plan = match stmt {
324 Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt),
326 Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt),
327 Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt),
328 Statement::AlterMaterializedViewApplyReplacement(stmt) => {
329 ddl::plan_alter_materialized_view_apply_replacement(scx, stmt)
330 }
331 Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt),
332 Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt),
333 Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt),
334 Statement::AlterRole(stmt) => ddl::plan_alter_role(scx, stmt),
335 Statement::AlterSecret(stmt) => ddl::plan_alter_secret(scx, stmt),
336 Statement::AlterSetCluster(stmt) => ddl::plan_alter_item_set_cluster(scx, stmt),
337 Statement::AlterSink(stmt) => ddl::plan_alter_sink(scx, stmt),
338 Statement::AlterSource(stmt) => ddl::plan_alter_source(scx, stmt),
339 Statement::AlterSystemSet(stmt) => ddl::plan_alter_system_set(scx, stmt),
340 Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt),
341 Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt),
342 Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt),
343 Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt),
344 Statement::Comment(stmt) => ddl::plan_comment(scx, stmt),
345 Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt),
346 Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt),
347 Statement::CreateConnection(stmt) => ddl::plan_create_connection(scx, stmt),
348 Statement::CreateDatabase(stmt) => ddl::plan_create_database(scx, stmt),
349 Statement::CreateIndex(stmt) => ddl::plan_create_index(scx, stmt),
350 Statement::CreateRole(stmt) => ddl::plan_create_role(scx, stmt),
351 Statement::CreateSchema(stmt) => ddl::plan_create_schema(scx, stmt),
352 Statement::CreateSecret(stmt) => ddl::plan_create_secret(scx, stmt),
353 Statement::CreateSink(stmt) => ddl::plan_create_sink(scx, stmt),
354 Statement::CreateWebhookSource(stmt) => ddl::plan_create_webhook_source(scx, stmt),
355 Statement::CreateSource(stmt) => ddl::plan_create_source(scx, stmt),
356 Statement::CreateSubsource(stmt) => ddl::plan_create_subsource(scx, stmt),
357 Statement::CreateTable(stmt) => ddl::plan_create_table(scx, stmt),
358 Statement::CreateTableFromSource(stmt) => ddl::plan_create_table_from_source(scx, stmt),
359 Statement::CreateType(stmt) => ddl::plan_create_type(scx, stmt),
360 Statement::CreateView(stmt) => ddl::plan_create_view(scx, stmt),
361 Statement::CreateMaterializedView(stmt) => ddl::plan_create_materialized_view(scx, stmt),
362 Statement::CreateContinualTask(stmt) => ddl::plan_create_continual_task(scx, stmt),
363 Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt),
364 Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt),
365 Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt),
366
367 Statement::AlterOwner(stmt) => acl::plan_alter_owner(scx, stmt),
369 Statement::GrantRole(stmt) => acl::plan_grant_role(scx, stmt),
370 Statement::RevokeRole(stmt) => acl::plan_revoke_role(scx, stmt),
371 Statement::GrantPrivileges(stmt) => acl::plan_grant_privileges(scx, stmt),
372 Statement::RevokePrivileges(stmt) => acl::plan_revoke_privileges(scx, stmt),
373 Statement::AlterDefaultPrivileges(stmt) => acl::plan_alter_default_privileges(scx, stmt),
374 Statement::ReassignOwned(stmt) => acl::plan_reassign_owned(scx, stmt),
375
376 Statement::Copy(stmt) => dml::plan_copy(scx, stmt),
378 Statement::Delete(stmt) => dml::plan_delete(scx, stmt, params),
379 Statement::ExplainPlan(stmt) => dml::plan_explain_plan(scx, stmt, params),
380 Statement::ExplainPushdown(stmt) => dml::plan_explain_pushdown(scx, stmt, params),
381 Statement::ExplainAnalyzeObject(stmt) => {
382 dml::plan_explain_analyze_object(scx, stmt, params)
383 }
384 Statement::ExplainAnalyzeCluster(stmt) => {
385 dml::plan_explain_analyze_cluster(scx, stmt, params)
386 }
387 Statement::ExplainTimestamp(stmt) => dml::plan_explain_timestamp(scx, stmt),
388 Statement::ExplainSinkSchema(stmt) => dml::plan_explain_schema(scx, stmt),
389 Statement::Insert(stmt) => dml::plan_insert(scx, stmt, params),
390 Statement::Select(stmt) => dml::plan_select(scx, stmt, params, None),
391 Statement::Subscribe(stmt) => dml::plan_subscribe(scx, stmt, params, None),
392 Statement::Update(stmt) => dml::plan_update(scx, stmt, params),
393
394 Statement::Show(ShowStatement::ShowColumns(stmt)) => show::show_columns(scx, stmt)?.plan(),
396 Statement::Show(ShowStatement::ShowCreateConnection(stmt)) => {
397 show::plan_show_create_connection(scx, stmt).map(Plan::ShowCreate)
398 }
399 Statement::Show(ShowStatement::ShowCreateCluster(stmt)) => {
400 show::plan_show_create_cluster(scx, stmt).map(Plan::ShowCreate)
401 }
402 Statement::Show(ShowStatement::ShowCreateIndex(stmt)) => {
403 show::plan_show_create_index(scx, stmt).map(Plan::ShowCreate)
404 }
405 Statement::Show(ShowStatement::ShowCreateSink(stmt)) => {
406 show::plan_show_create_sink(scx, stmt).map(Plan::ShowCreate)
407 }
408 Statement::Show(ShowStatement::ShowCreateSource(stmt)) => {
409 show::plan_show_create_source(scx, stmt).map(Plan::ShowCreate)
410 }
411 Statement::Show(ShowStatement::ShowCreateTable(stmt)) => {
412 show::plan_show_create_table(scx, stmt).map(Plan::ShowCreate)
413 }
414 Statement::Show(ShowStatement::ShowCreateView(stmt)) => {
415 show::plan_show_create_view(scx, stmt).map(Plan::ShowCreate)
416 }
417 Statement::Show(ShowStatement::ShowCreateMaterializedView(stmt)) => {
418 show::plan_show_create_materialized_view(scx, stmt).map(Plan::ShowCreate)
419 }
420 Statement::Show(ShowStatement::ShowCreateType(stmt)) => {
421 show::plan_show_create_type(scx, stmt).map(Plan::ShowCreate)
422 }
423 Statement::Show(ShowStatement::ShowObjects(stmt)) => show::show_objects(scx, stmt)?.plan(),
424
425 Statement::Close(stmt) => scl::plan_close(scx, stmt),
427 Statement::Deallocate(stmt) => scl::plan_deallocate(scx, stmt),
428 Statement::Declare(stmt) => scl::plan_declare(scx, stmt, params),
429 Statement::Discard(stmt) => scl::plan_discard(scx, stmt),
430 Statement::Execute(stmt) => scl::plan_execute(scx, stmt),
431 Statement::Fetch(stmt) => scl::plan_fetch(scx, stmt),
432 Statement::Prepare(stmt) => scl::plan_prepare(scx, stmt),
433 Statement::ResetVariable(stmt) => scl::plan_reset_variable(scx, stmt),
434 Statement::SetVariable(stmt) => scl::plan_set_variable(scx, stmt),
435 Statement::Show(ShowStatement::ShowVariable(stmt)) => scl::plan_show_variable(scx, stmt),
436
437 Statement::Commit(stmt) => tcl::plan_commit(scx, stmt),
439 Statement::Rollback(stmt) => tcl::plan_rollback(scx, stmt),
440 Statement::SetTransaction(stmt) => tcl::plan_set_transaction(scx, stmt),
441 Statement::StartTransaction(stmt) => tcl::plan_start_transaction(scx, stmt),
442
443 Statement::Raise(stmt) => raise::plan_raise(scx, stmt),
445 Statement::Show(ShowStatement::InspectShard(stmt)) => scl::plan_inspect_shard(scx, stmt),
446 Statement::ValidateConnection(stmt) => validate::plan_validate_connection(scx, stmt),
447 };
448
449 if let Ok(plan) = &plan {
450 mz_ore::soft_assert_no_log!(
451 permitted_plans.contains(&PlanKind::from(plan)),
452 "plan {:?}, permitted plans {:?}",
453 plan,
454 permitted_plans
455 );
456 }
457
458 plan
459}
460
461pub fn plan_copy_from(
462 pcx: &PlanContext,
463 catalog: &dyn SessionCatalog,
464 target_id: CatalogItemId,
465 target_name: String,
466 columns: Vec<ColumnIndex>,
467 rows: Vec<mz_repr::Row>,
468) -> Result<super::HirRelationExpr, PlanError> {
469 query::plan_copy_from_rows(pcx, catalog, target_id, target_name, columns, rows)
470}
471
472impl PartialEq<ObjectType> for CatalogItemType {
479 fn eq(&self, other: &ObjectType) -> bool {
480 match (self, other) {
481 (CatalogItemType::Source, ObjectType::Source)
482 | (CatalogItemType::Table, ObjectType::Table)
483 | (CatalogItemType::Sink, ObjectType::Sink)
484 | (CatalogItemType::View, ObjectType::View)
485 | (CatalogItemType::MaterializedView, ObjectType::MaterializedView)
486 | (CatalogItemType::Index, ObjectType::Index)
487 | (CatalogItemType::Type, ObjectType::Type)
488 | (CatalogItemType::Secret, ObjectType::Secret)
489 | (CatalogItemType::Connection, ObjectType::Connection) => true,
490 (_, _) => false,
491 }
492 }
493}
494
495impl PartialEq<CatalogItemType> for ObjectType {
496 fn eq(&self, other: &CatalogItemType) -> bool {
497 other == self
498 }
499}
500
501#[derive(Debug, Clone)]
503pub struct StatementContext<'a> {
504 pcx: Option<&'a PlanContext>,
509 pub catalog: &'a dyn SessionCatalog,
510 pub param_types: RefCell<BTreeMap<usize, SqlScalarType>>,
513 pub ambiguous_columns: RefCell<bool>,
516}
517
518impl<'a> StatementContext<'a> {
519 pub fn new(
520 pcx: Option<&'a PlanContext>,
521 catalog: &'a dyn SessionCatalog,
522 ) -> StatementContext<'a> {
523 StatementContext {
524 pcx,
525 catalog,
526 param_types: Default::default(),
527 ambiguous_columns: RefCell::new(false),
528 }
529 }
530
531 pub fn current_schemas(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
533 self.catalog.search_path()
534 }
535
536 pub fn current_schema(&self) -> Option<&(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
539 self.current_schemas().into_iter().next()
540 }
541
542 pub fn pcx(&self) -> Result<&PlanContext, PlanError> {
543 self.pcx.ok_or_else(|| sql_err!("no plan context"))
544 }
545
546 pub fn allocate_full_name(&self, name: PartialItemName) -> Result<FullItemName, PlanError> {
547 let (database, schema): (RawDatabaseSpecifier, String) = match (name.database, name.schema)
548 {
549 (None, None) => {
550 let Some((database, schema)) = self.current_schema() else {
551 return Err(PlanError::InvalidSchemaName);
552 };
553 let schema = self.get_schema(database, schema);
554 let database = match schema.database() {
555 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
556 ResolvedDatabaseSpecifier::Id(id) => {
557 RawDatabaseSpecifier::Name(self.catalog.get_database(id).name().to_string())
558 }
559 };
560 (database, schema.name().schema.clone())
561 }
562 (None, Some(schema)) => {
563 if is_system_schema(&schema) {
564 (RawDatabaseSpecifier::Ambient, schema)
565 } else {
566 match self.catalog.active_database_name() {
567 Some(name) => (RawDatabaseSpecifier::Name(name.to_string()), schema),
568 None => {
569 sql_bail!(
570 "no database specified for non-system schema and no active database"
571 )
572 }
573 }
574 }
575 }
576 (Some(_database), None) => {
577 sql_bail!("unreachable: specified the database but no schema")
580 }
581 (Some(database), Some(schema)) => (RawDatabaseSpecifier::Name(database), schema),
582 };
583 let item = name.item;
584 Ok(FullItemName {
585 database,
586 schema,
587 item,
588 })
589 }
590
591 pub fn allocate_qualified_name(
592 &self,
593 name: PartialItemName,
594 ) -> Result<QualifiedItemName, PlanError> {
595 let full_name = self.allocate_full_name(name)?;
596 let database_spec = match full_name.database {
597 RawDatabaseSpecifier::Ambient => ResolvedDatabaseSpecifier::Ambient,
598 RawDatabaseSpecifier::Name(name) => ResolvedDatabaseSpecifier::Id(
599 self.resolve_database(&UnresolvedDatabaseName(Ident::new(name)?))?
600 .id(),
601 ),
602 };
603 let schema_spec = self
604 .resolve_schema_in_database(&database_spec, &Ident::new(full_name.schema)?)?
605 .id()
606 .clone();
607 Ok(QualifiedItemName {
608 qualifiers: ItemQualifiers {
609 database_spec,
610 schema_spec,
611 },
612 item: full_name.item,
613 })
614 }
615
616 pub fn allocate_temporary_full_name(&self, name: PartialItemName) -> FullItemName {
617 FullItemName {
618 database: RawDatabaseSpecifier::Ambient,
619 schema: name
620 .schema
621 .unwrap_or_else(|| mz_repr::namespaces::MZ_TEMP_SCHEMA.to_owned()),
622 item: name.item,
623 }
624 }
625
626 pub fn allocate_temporary_qualified_name(
627 &self,
628 name: PartialItemName,
629 ) -> Result<QualifiedItemName, PlanError> {
630 if let Some(schema_name) = name.schema {
635 if schema_name != mz_repr::namespaces::MZ_TEMP_SCHEMA {
636 return Err(PlanError::InvalidTemporarySchema);
637 }
638 }
639
640 Ok(QualifiedItemName {
641 qualifiers: ItemQualifiers {
642 database_spec: ResolvedDatabaseSpecifier::Ambient,
643 schema_spec: SchemaSpecifier::Temporary,
644 },
645 item: name.item,
646 })
647 }
648
649 pub fn allocate_resolved_item_name(
652 &self,
653 id: CatalogItemId,
654 name: UnresolvedItemName,
655 ) -> Result<ResolvedItemName, PlanError> {
656 let partial = normalize::unresolved_item_name(name)?;
657 let qualified = self.allocate_qualified_name(partial.clone())?;
658 let full_name = self.allocate_full_name(partial)?;
659 Ok(ResolvedItemName::Item {
660 id,
661 qualifiers: qualified.qualifiers,
662 full_name,
663 print_id: true,
664 version: RelationVersionSelector::Latest,
665 })
666 }
667
668 pub fn active_database(&self) -> Option<&DatabaseId> {
669 self.catalog.active_database()
670 }
671
672 pub fn resolve_optional_schema(
673 &self,
674 schema_name: &Option<ResolvedSchemaName>,
675 ) -> Result<SchemaSpecifier, PlanError> {
676 match schema_name {
677 Some(ResolvedSchemaName::Schema { schema_spec, .. }) => Ok(schema_spec.clone()),
678 None => self.resolve_active_schema().map(|spec| spec.clone()),
679 Some(ResolvedSchemaName::Error) => {
680 unreachable!("should have been handled by name resolution")
681 }
682 }
683 }
684
685 pub fn resolve_active_schema(&self) -> Result<&SchemaSpecifier, PlanError> {
686 match self.current_schema() {
687 Some((_db, schema)) => Ok(schema),
688 None => Err(PlanError::InvalidSchemaName),
689 }
690 }
691
692 pub fn get_cluster(&self, id: &ClusterId) -> &dyn CatalogCluster<'_> {
693 self.catalog.get_cluster(*id)
694 }
695
696 pub fn resolve_database(
697 &self,
698 name: &UnresolvedDatabaseName,
699 ) -> Result<&dyn CatalogDatabase, PlanError> {
700 let name = normalize::ident_ref(&name.0);
701 Ok(self.catalog.resolve_database(name)?)
702 }
703
704 pub fn get_database(&self, id: &DatabaseId) -> &dyn CatalogDatabase {
705 self.catalog.get_database(id)
706 }
707
708 pub fn resolve_schema_in_database(
709 &self,
710 database_spec: &ResolvedDatabaseSpecifier,
711 schema: &Ident,
712 ) -> Result<&dyn CatalogSchema, PlanError> {
713 let schema = normalize::ident_ref(schema);
714 Ok(self
715 .catalog
716 .resolve_schema_in_database(database_spec, schema)?)
717 }
718
719 pub fn resolve_schema(
720 &self,
721 name: UnresolvedSchemaName,
722 ) -> Result<&dyn CatalogSchema, PlanError> {
723 let name = normalize::unresolved_schema_name(name)?;
724 Ok(self
725 .catalog
726 .resolve_schema(name.database.as_deref(), &name.schema)?)
727 }
728
729 pub fn get_schema(
730 &self,
731 database_spec: &ResolvedDatabaseSpecifier,
732 schema_spec: &SchemaSpecifier,
733 ) -> &dyn CatalogSchema {
734 self.catalog.get_schema(database_spec, schema_spec)
735 }
736
737 pub fn resolve_item(&self, name: RawItemName) -> Result<&dyn CatalogItem, PlanError> {
738 match name {
739 RawItemName::Name(name) => {
740 let name = normalize::unresolved_item_name(name)?;
741 Ok(self.catalog.resolve_item(&name)?)
742 }
743 RawItemName::Id(id, _, _) => {
744 let gid = id.parse()?;
745 Ok(self.catalog.get_item(&gid))
746 }
747 }
748 }
749
750 pub fn get_item(&self, id: &CatalogItemId) -> &dyn CatalogItem {
751 self.catalog.get_item(id)
752 }
753
754 pub fn get_item_by_resolved_name(
755 &self,
756 name: &ResolvedItemName,
757 ) -> Result<Box<dyn CatalogCollectionItem + '_>, PlanError> {
758 match name {
759 ResolvedItemName::Item { id, version, .. } => {
760 Ok(self.get_item(id).at_version(*version))
761 }
762 ResolvedItemName::Cte { .. } => sql_bail!("non-user item"),
763 ResolvedItemName::ContinualTask { .. } => sql_bail!("non-user item"),
764 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
765 }
766 }
767
768 pub fn get_column_by_resolved_name(
769 &self,
770 name: &ColumnName<Aug>,
771 ) -> Result<(Box<dyn CatalogCollectionItem + '_>, usize), PlanError> {
772 match (&name.relation, &name.column) {
773 (
774 ResolvedItemName::Item { id, version, .. },
775 ResolvedColumnReference::Column { index, .. },
776 ) => {
777 let item = self.get_item(id).at_version(*version);
778 Ok((item, *index))
779 }
780 _ => unreachable!(
781 "get_column_by_resolved_name errors should have been caught in name resolution"
782 ),
783 }
784 }
785
786 pub fn resolve_function(
787 &self,
788 name: UnresolvedItemName,
789 ) -> Result<&dyn CatalogItem, PlanError> {
790 let name = normalize::unresolved_item_name(name)?;
791 Ok(self.catalog.resolve_function(&name)?)
792 }
793
794 pub fn resolve_cluster(
795 &self,
796 name: Option<&Ident>,
797 ) -> Result<&dyn CatalogCluster<'_>, PlanError> {
798 let name = name.map(|name| name.as_str());
799 Ok(self.catalog.resolve_cluster(name)?)
800 }
801
802 pub fn resolve_type(&self, mut ty: mz_pgrepr::Type) -> Result<ResolvedDataType, PlanError> {
803 match &mut ty {
809 mz_pgrepr::Type::Interval { constraints } => *constraints = None,
810 mz_pgrepr::Type::Time { precision } => *precision = None,
811 mz_pgrepr::Type::TimeTz { precision } => *precision = None,
812 mz_pgrepr::Type::Timestamp { precision } => *precision = None,
813 mz_pgrepr::Type::TimestampTz { precision } => *precision = None,
814 _ => (),
815 }
816 let mut ty = if ty.oid() >= FIRST_USER_OID {
822 sql_bail!("internal error, unexpected user type: {ty:?} ");
823 } else if ty.oid() < FIRST_MATERIALIZE_OID {
824 format!("pg_catalog.{}", ty)
825 } else {
826 format!("mz_catalog.{}", ty)
828 };
829 if ty == "pg_catalog.json" {
833 ty = "pg_catalog.jsonb".into();
834 }
835 let data_type = mz_sql_parser::parser::parse_data_type(&ty)?;
836 let (data_type, _) = names::resolve(self.catalog, data_type)?;
837 Ok(data_type)
838 }
839
840 pub fn get_object_type(&self, id: &ObjectId) -> ObjectType {
841 self.catalog.get_object_type(id)
842 }
843
844 pub fn get_system_object_type(&self, id: &SystemObjectId) -> SystemObjectType {
845 match id {
846 SystemObjectId::Object(id) => SystemObjectType::Object(self.get_object_type(id)),
847 SystemObjectId::System => SystemObjectType::System,
848 }
849 }
850
851 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
853 flag.require(self.catalog.system_vars())?;
854 Ok(())
855 }
856
857 pub fn is_feature_flag_enabled(&self, flag: &'static FeatureFlag) -> bool {
859 self.require_feature_flag(flag).is_ok()
860 }
861
862 pub fn finalize_param_types(self) -> Result<Vec<SqlScalarType>, PlanError> {
863 let param_types = self.param_types.into_inner();
864 let mut out = vec![];
865 for (i, (n, typ)) in param_types.into_iter().enumerate() {
866 if n != i + 1 {
867 sql_bail!("unable to infer type for parameter ${}", i + 1);
868 }
869 out.push(typ);
870 }
871 Ok(out)
872 }
873
874 pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
877 self.catalog.humanize_scalar_type(typ, postgres_compat)
878 }
879
880 pub fn humanize_column_type(&self, typ: &SqlColumnType, postgres_compat: bool) -> String {
883 self.catalog.humanize_column_type(typ, postgres_compat)
884 }
885
886 pub(crate) fn build_tunnel_definition(
887 &self,
888 ssh_tunnel: Option<with_options::Object>,
889 aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
890 ) -> Result<Tunnel<ReferencedConnection>, PlanError> {
891 match (ssh_tunnel, aws_privatelink) {
892 (None, None) => Ok(Tunnel::Direct),
893 (Some(ssh_tunnel), None) => {
894 let id = CatalogItemId::from(ssh_tunnel);
895 let ssh_tunnel = self.catalog.get_item(&id);
896 match ssh_tunnel.connection()? {
897 Connection::Ssh(_connection) => Ok(Tunnel::Ssh(SshTunnel {
898 connection_id: id,
899 connection: id,
900 })),
901 _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
902 }
903 }
904 (None, Some(aws_privatelink)) => {
905 let id = aws_privatelink.connection.item_id().clone();
906 let entry = self.catalog.get_item(&id);
907 match entry.connection()? {
908 Connection::AwsPrivatelink(_) => Ok(Tunnel::AwsPrivatelink(AwsPrivatelink {
909 connection_id: id,
910 availability_zone: None,
912 port: aws_privatelink.port,
914 })),
915 _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
916 }
917 }
918 (Some(_), Some(_)) => {
919 sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
920 }
921 }
922 }
923
924 pub fn relation_desc_into_table_defs(
925 &self,
926 desc: &RelationDesc,
927 ) -> Result<(Vec<ColumnDef<Aug>>, Vec<TableConstraint<Aug>>), PlanError> {
928 let mut columns = vec![];
929 let mut null_cols = BTreeSet::new();
930 for (column_name, column_type) in desc.iter() {
931 let name = Ident::new(column_name.as_str().to_owned())?;
932
933 let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
934 let data_type = self.resolve_type(ty)?;
935
936 let options = if !column_type.nullable {
937 null_cols.insert(columns.len());
938 vec![mz_sql_parser::ast::ColumnOptionDef {
939 name: None,
940 option: mz_sql_parser::ast::ColumnOption::NotNull,
941 }]
942 } else {
943 vec![]
944 };
945
946 columns.push(ColumnDef {
947 name,
948 data_type,
949 collation: None,
950 options,
951 });
952 }
953
954 let mut table_constraints = vec![];
955 for key in desc.typ().keys.iter() {
956 let mut col_names = vec![];
957 for col_idx in key {
958 if !null_cols.contains(col_idx) {
959 sql_bail!(
962 "[internal error] key columns must be NOT NULL when generating table constraints"
963 );
964 }
965 col_names.push(columns[*col_idx].name.clone());
966 }
967 table_constraints.push(TableConstraint::Unique {
968 name: None,
969 columns: col_names,
970 is_primary: false,
971 nulls_not_distinct: false,
972 });
973 }
974
975 Ok((columns, table_constraints))
976 }
977
978 pub fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
979 self.catalog.get_owner_id(id)
980 }
981
982 pub fn humanize_resolved_name(
983 &self,
984 name: &ResolvedItemName,
985 ) -> Result<PartialItemName, PlanError> {
986 let item = self.get_item_by_resolved_name(name)?;
987 Ok(self.catalog.minimal_qualification(item.name()))
988 }
989
990 pub fn dangerous_resolve_name(&self, name: Vec<&str>) -> ResolvedItemName {
993 tracing::trace!("dangerous_resolve_name {:?}", name);
994 let name: Vec<_> = name.into_iter().map(Ident::new_unchecked).collect();
996 let name = UnresolvedItemName::qualified(&name);
997 let entry = match self.resolve_item(RawItemName::Name(name.clone())) {
998 Ok(entry) => entry,
999 Err(_) => self
1000 .resolve_function(name.clone())
1001 .expect("name referred to an existing object"),
1002 };
1003
1004 let partial = normalize::unresolved_item_name(name).unwrap();
1005 let full_name = self.allocate_full_name(partial).unwrap();
1006
1007 ResolvedItemName::Item {
1008 id: entry.id(),
1009 qualifiers: entry.name().qualifiers.clone(),
1010 full_name,
1011 print_id: true,
1012 version: RelationVersionSelector::Latest,
1013 }
1014 }
1015}
1016
1017pub fn resolve_cluster_for_materialized_view<'a>(
1018 catalog: &'a dyn SessionCatalog,
1019 stmt: &CreateMaterializedViewStatement<Aug>,
1020) -> Result<ClusterId, PlanError> {
1021 Ok(match &stmt.in_cluster {
1022 None => catalog.resolve_cluster(None)?.id(),
1023 Some(in_cluster) => in_cluster.id,
1024 })
1025}
1026
1027#[derive(Debug, Clone, Copy)]
1029pub enum StatementClassification {
1030 ACL,
1031 DDL,
1032 DML,
1033 Other,
1034 SCL,
1035 Show,
1036 TCL,
1037}
1038
1039impl StatementClassification {
1040 pub fn is_ddl(&self) -> bool {
1041 matches!(self, StatementClassification::DDL)
1042 }
1043}
1044
1045impl<T: mz_sql_parser::ast::AstInfo> From<&Statement<T>> for StatementClassification {
1046 fn from(value: &Statement<T>) -> Self {
1047 use StatementClassification::*;
1048
1049 match value {
1050 Statement::AlterCluster(_) => DDL,
1052 Statement::AlterConnection(_) => DDL,
1053 Statement::AlterIndex(_) => DDL,
1054 Statement::AlterMaterializedViewApplyReplacement(_) => DDL,
1055 Statement::AlterObjectRename(_) => DDL,
1056 Statement::AlterObjectSwap(_) => DDL,
1057 Statement::AlterNetworkPolicy(_) => DDL,
1058 Statement::AlterRetainHistory(_) => DDL,
1059 Statement::AlterRole(_) => DDL,
1060 Statement::AlterSecret(_) => DDL,
1061 Statement::AlterSetCluster(_) => DDL,
1062 Statement::AlterSink(_) => DDL,
1063 Statement::AlterSource(_) => DDL,
1064 Statement::AlterSystemSet(_) => DDL,
1065 Statement::AlterSystemReset(_) => DDL,
1066 Statement::AlterSystemResetAll(_) => DDL,
1067 Statement::AlterTableAddColumn(_) => DDL,
1068 Statement::Comment(_) => DDL,
1069 Statement::CreateCluster(_) => DDL,
1070 Statement::CreateClusterReplica(_) => DDL,
1071 Statement::CreateConnection(_) => DDL,
1072 Statement::CreateContinualTask(_) => DDL,
1073 Statement::CreateDatabase(_) => DDL,
1074 Statement::CreateIndex(_) => DDL,
1075 Statement::CreateRole(_) => DDL,
1076 Statement::CreateSchema(_) => DDL,
1077 Statement::CreateSecret(_) => DDL,
1078 Statement::CreateSink(_) => DDL,
1079 Statement::CreateWebhookSource(_) => DDL,
1080 Statement::CreateSource(_) => DDL,
1081 Statement::CreateSubsource(_) => DDL,
1082 Statement::CreateTable(_) => DDL,
1083 Statement::CreateTableFromSource(_) => DDL,
1084 Statement::CreateType(_) => DDL,
1085 Statement::CreateView(_) => DDL,
1086 Statement::CreateMaterializedView(_) => DDL,
1087 Statement::CreateNetworkPolicy(_) => DDL,
1088 Statement::DropObjects(_) => DDL,
1089 Statement::DropOwned(_) => DDL,
1090
1091 Statement::AlterOwner(_) => ACL,
1093 Statement::GrantRole(_) => ACL,
1094 Statement::RevokeRole(_) => ACL,
1095 Statement::GrantPrivileges(_) => ACL,
1096 Statement::RevokePrivileges(_) => ACL,
1097 Statement::AlterDefaultPrivileges(_) => ACL,
1098 Statement::ReassignOwned(_) => ACL,
1099
1100 Statement::Copy(_) => DML,
1102 Statement::Delete(_) => DML,
1103 Statement::ExplainPlan(_) => DML,
1104 Statement::ExplainPushdown(_) => DML,
1105 Statement::ExplainAnalyzeObject(_) => DML,
1106 Statement::ExplainAnalyzeCluster(_) => DML,
1107 Statement::ExplainTimestamp(_) => DML,
1108 Statement::ExplainSinkSchema(_) => DML,
1109 Statement::Insert(_) => DML,
1110 Statement::Select(_) => DML,
1111 Statement::Subscribe(_) => DML,
1112 Statement::Update(_) => DML,
1113
1114 Statement::Show(ShowStatement::ShowColumns(_)) => Show,
1116 Statement::Show(ShowStatement::ShowCreateConnection(_)) => Show,
1117 Statement::Show(ShowStatement::ShowCreateCluster(_)) => Show,
1118 Statement::Show(ShowStatement::ShowCreateIndex(_)) => Show,
1119 Statement::Show(ShowStatement::ShowCreateSink(_)) => Show,
1120 Statement::Show(ShowStatement::ShowCreateSource(_)) => Show,
1121 Statement::Show(ShowStatement::ShowCreateTable(_)) => Show,
1122 Statement::Show(ShowStatement::ShowCreateView(_)) => Show,
1123 Statement::Show(ShowStatement::ShowCreateMaterializedView(_)) => Show,
1124 Statement::Show(ShowStatement::ShowCreateType(_)) => Show,
1125 Statement::Show(ShowStatement::ShowObjects(_)) => Show,
1126
1127 Statement::Close(_) => SCL,
1129 Statement::Deallocate(_) => SCL,
1130 Statement::Declare(_) => SCL,
1131 Statement::Discard(_) => SCL,
1132 Statement::Execute(_) => SCL,
1133 Statement::Fetch(_) => SCL,
1134 Statement::Prepare(_) => SCL,
1135 Statement::ResetVariable(_) => SCL,
1136 Statement::SetVariable(_) => SCL,
1137 Statement::Show(ShowStatement::ShowVariable(_)) => SCL,
1138
1139 Statement::Commit(_) => TCL,
1141 Statement::Rollback(_) => TCL,
1142 Statement::SetTransaction(_) => TCL,
1143 Statement::StartTransaction(_) => TCL,
1144
1145 Statement::Raise(_) => Other,
1147 Statement::Show(ShowStatement::InspectShard(_)) => Other,
1148 Statement::ValidateConnection(_) => Other,
1149 }
1150 }
1151}