1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71 TypeReference,
72};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use mz_transform::notice::OptimizerNotice;
94use serde::Serialize;
95use timely::progress::Antichain;
96use tokio::sync::mpsc;
97use tracing::{debug, warn};
98
99use crate::AdapterError;
101use crate::catalog::{Catalog, ConnCatalog};
102use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
103use crate::optimize::{self, Optimize, OptimizerCatalog};
104use crate::session::Session;
105
106#[derive(Debug, Clone, Serialize)]
113pub struct CatalogState {
114 pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
120 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121 pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
122 #[serde(serialize_with = "skip_temp_items")]
123 pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
124 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125 pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
126 pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
127 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128 pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
129 pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
130 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131 pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
132 pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
133 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
134 pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
135 pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
136 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
137 pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
138 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
139 pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
140
141 #[serde(skip)]
142 pub(super) system_configuration: Arc<SystemVars>,
143 pub(super) default_privileges: Arc<DefaultPrivileges>,
144 pub(super) system_privileges: Arc<PrivilegeMap>,
145 pub(super) comments: Arc<CommentsMap>,
146 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
147 pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
148 pub(super) storage_metadata: Arc<StorageMetadata>,
149 pub(super) mock_authentication_nonce: Option<String>,
150
151 #[serde(skip)]
156 pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
157
158 #[serde(skip)]
162 pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
163
164 #[serde(skip)]
166 pub(super) config: mz_sql::catalog::CatalogConfig,
167 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
168 #[serde(skip)]
169 pub(crate) availability_zones: Vec<String>,
170
171 #[serde(skip)]
173 pub(super) egress_addresses: Vec<IpNet>,
174 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
175 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
176 pub(super) http_host_name: Option<String>,
177
178 #[serde(skip)]
180 pub(super) license_key: ValidatedLicenseKey,
181}
182
183#[derive(Debug, Clone, Serialize)]
188pub(crate) enum LocalExpressionCache {
189 Open {
191 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
193 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
195 },
196 Closed,
198}
199
200impl LocalExpressionCache {
201 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
202 Self::Open {
203 cached_exprs,
204 uncached_exprs: BTreeMap::new(),
205 }
206 }
207
208 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
209 match self {
210 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
211 LocalExpressionCache::Closed => None,
212 }
213 }
214
215 pub(super) fn insert_cached_expression(
218 &mut self,
219 id: GlobalId,
220 local_expressions: LocalExpressions,
221 ) {
222 match self {
223 LocalExpressionCache::Open { cached_exprs, .. } => {
224 cached_exprs.insert(id, local_expressions);
225 }
226 LocalExpressionCache::Closed => {}
227 }
228 }
229
230 pub(super) fn insert_uncached_expression(
233 &mut self,
234 id: GlobalId,
235 local_mir: OptimizedMirRelationExpr,
236 optimizer_features: OptimizerFeatures,
237 ) {
238 match self {
239 LocalExpressionCache::Open { uncached_exprs, .. } => {
240 let local_expr = LocalExpressions {
241 local_mir,
242 optimizer_features,
243 };
244 let prev = uncached_exprs.remove(&id);
251 match prev {
252 Some(prev) if prev == local_expr => {
253 uncached_exprs.insert(id, local_expr);
254 }
255 None => {
256 uncached_exprs.insert(id, local_expr);
257 }
258 Some(_) => {}
259 }
260 }
261 LocalExpressionCache::Closed => {}
262 }
263 }
264
265 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
266 match self {
267 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
268 LocalExpressionCache::Closed => BTreeMap::new(),
269 }
270 }
271}
272
273fn skip_temp_items<S>(
274 entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
275 serializer: S,
276) -> Result<S::Ok, S::Error>
277where
278 S: serde::Serializer,
279{
280 mz_ore::serde::map_key_to_string(
281 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
282 serializer,
283 )
284}
285
286impl CatalogState {
287 pub fn empty_test() -> Self {
291 CatalogState {
292 database_by_name: Default::default(),
293 database_by_id: Default::default(),
294 entry_by_id: Default::default(),
295 entry_by_global_id: Default::default(),
296 notices_by_dep_id: Default::default(),
297 ambient_schemas_by_name: Default::default(),
298 ambient_schemas_by_id: Default::default(),
299 temporary_schemas: Default::default(),
300 clusters_by_id: Default::default(),
301 clusters_by_name: Default::default(),
302 network_policies_by_name: Default::default(),
303 roles_by_name: Default::default(),
304 roles_by_id: Default::default(),
305 network_policies_by_id: Default::default(),
306 role_auth_by_id: Default::default(),
307 config: CatalogConfig {
308 start_time: Default::default(),
309 start_instant: Instant::now(),
310 nonce: Default::default(),
311 environment_id: EnvironmentId::for_tests(),
312 session_id: Default::default(),
313 build_info: &DUMMY_BUILD_INFO,
314 now: NOW_ZERO.clone(),
315 connection_context: ConnectionContext::for_tests(Arc::new(
316 InMemorySecretsController::new(),
317 )),
318 builtins_cfg: BuiltinsConfig {
319 include_continual_tasks: true,
320 },
321 helm_chart_version: None,
322 },
323 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
324 availability_zones: Default::default(),
325 system_configuration: Arc::new(SystemVars::default()),
326 egress_addresses: Default::default(),
327 aws_principal_context: Default::default(),
328 aws_privatelink_availability_zones: Default::default(),
329 http_host_name: Default::default(),
330 default_privileges: Arc::new(DefaultPrivileges::default()),
331 system_privileges: Arc::new(PrivilegeMap::default()),
332 comments: Arc::new(CommentsMap::default()),
333 source_references: Default::default(),
334 storage_metadata: Arc::new(StorageMetadata::default()),
335 license_key: ValidatedLicenseKey::for_tests(),
336 mock_authentication_nonce: Default::default(),
337 }
338 }
339
340 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
341 let search_path = self.resolve_search_path(session);
342 let database = self
343 .database_by_name
344 .get(session.vars().database())
345 .map(|id| id.clone());
346 let state = match session.transaction().catalog_state() {
347 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
348 None => Cow::Borrowed(self),
349 };
350 ConnCatalog {
351 state,
352 unresolvable_ids: BTreeSet::new(),
353 conn_id: session.conn_id().clone(),
354 cluster: session.vars().cluster().into(),
355 database,
356 search_path,
357 role_id: session.current_role_id().clone(),
358 prepared_statements: Some(session.prepared_statements()),
359 portals: Some(session.portals()),
360 notices_tx: session.retain_notice_transmitter(),
361 }
362 }
363
364 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
365 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
366 let cluster = self.system_configuration.default_cluster();
367
368 ConnCatalog {
369 state: Cow::Borrowed(self),
370 unresolvable_ids: BTreeSet::new(),
371 conn_id: SYSTEM_CONN_ID.clone(),
372 cluster,
373 database: self
374 .resolve_database(DEFAULT_DATABASE_NAME)
375 .ok()
376 .map(|db| db.id()),
377 search_path: Vec::new(),
380 role_id,
381 prepared_statements: None,
382 portals: None,
383 notices_tx,
384 }
385 }
386
387 pub fn for_system_session(&self) -> ConnCatalog<'_> {
388 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
389 }
390
391 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
396 struct I<'a> {
397 queue: VecDeque<CatalogItemId>,
398 seen: BTreeSet<CatalogItemId>,
399 this: &'a CatalogState,
400 }
401 impl<'a> Iterator for I<'a> {
402 type Item = CatalogItemId;
403 fn next(&mut self) -> Option<Self::Item> {
404 if let Some(next) = self.queue.pop_front() {
405 for child in self.this.get_entry(&next).item().uses() {
406 if !self.seen.contains(&child) {
407 self.queue.push_back(child);
408 self.seen.insert(child);
409 }
410 }
411 Some(next)
412 } else {
413 None
414 }
415 }
416 }
417
418 I {
419 queue: [id].into_iter().collect(),
420 seen: [id].into_iter().collect(),
421 this: self,
422 }
423 }
424
425 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
428 let mut out = Vec::new();
429 self.introspection_dependencies_inner(id, &mut out);
430 out
431 }
432
433 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
434 match self.get_entry(&id).item() {
435 CatalogItem::Log(_) => out.push(id),
436 item @ (CatalogItem::View(_)
437 | CatalogItem::MaterializedView(_)
438 | CatalogItem::Connection(_)
439 | CatalogItem::ContinualTask(_)) => {
440 for item_id in item.references().items() {
442 self.introspection_dependencies_inner(*item_id, out);
443 }
444 }
445 CatalogItem::Sink(sink) => {
446 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
447 self.introspection_dependencies_inner(from_item_id, out)
448 }
449 CatalogItem::Index(idx) => {
450 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
451 self.introspection_dependencies_inner(on_item_id, out)
452 }
453 CatalogItem::Table(_)
454 | CatalogItem::Source(_)
455 | CatalogItem::Type(_)
456 | CatalogItem::Func(_)
457 | CatalogItem::Secret(_) => (),
458 }
459 }
460
461 pub(super) fn object_dependents(
467 &self,
468 object_ids: &Vec<ObjectId>,
469 conn_id: &ConnectionId,
470 seen: &mut BTreeSet<ObjectId>,
471 ) -> Vec<ObjectId> {
472 let mut dependents = Vec::new();
473 for object_id in object_ids {
474 match object_id {
475 ObjectId::Cluster(id) => {
476 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
477 }
478 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
479 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
480 ),
481 ObjectId::Database(id) => {
482 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
483 }
484 ObjectId::Schema((database_spec, schema_spec)) => {
485 dependents.extend_from_slice(&self.schema_dependents(
486 database_spec.clone(),
487 schema_spec.clone(),
488 conn_id,
489 seen,
490 ));
491 }
492 ObjectId::NetworkPolicy(id) => {
493 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
494 }
495 id @ ObjectId::Role(_) => {
496 let unseen = seen.insert(id.clone());
497 if unseen {
498 dependents.push(id.clone());
499 }
500 }
501 ObjectId::Item(id) => {
502 dependents.extend_from_slice(&self.item_dependents(*id, seen))
503 }
504 }
505 }
506 dependents
507 }
508
509 fn cluster_dependents(
516 &self,
517 cluster_id: ClusterId,
518 seen: &mut BTreeSet<ObjectId>,
519 ) -> Vec<ObjectId> {
520 let mut dependents = Vec::new();
521 let object_id = ObjectId::Cluster(cluster_id);
522 if !seen.contains(&object_id) {
523 seen.insert(object_id.clone());
524 let cluster = self.get_cluster(cluster_id);
525 for item_id in cluster.bound_objects() {
526 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
527 }
528 for replica_id in cluster.replica_ids().values() {
529 dependents.extend_from_slice(&self.cluster_replica_dependents(
530 cluster_id,
531 *replica_id,
532 seen,
533 ));
534 }
535 dependents.push(object_id);
536 }
537 dependents
538 }
539
540 pub(super) fn cluster_replica_dependents(
547 &self,
548 cluster_id: ClusterId,
549 replica_id: ReplicaId,
550 seen: &mut BTreeSet<ObjectId>,
551 ) -> Vec<ObjectId> {
552 let mut dependents = Vec::new();
553 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
554 if !seen.contains(&object_id) {
555 seen.insert(object_id.clone());
556 dependents.push(object_id);
557 }
558 dependents
559 }
560
561 fn database_dependents(
568 &self,
569 database_id: DatabaseId,
570 conn_id: &ConnectionId,
571 seen: &mut BTreeSet<ObjectId>,
572 ) -> Vec<ObjectId> {
573 let mut dependents = Vec::new();
574 let object_id = ObjectId::Database(database_id);
575 if !seen.contains(&object_id) {
576 seen.insert(object_id.clone());
577 let database = self.get_database(&database_id);
578 for schema_id in database.schema_ids().values() {
579 dependents.extend_from_slice(&self.schema_dependents(
580 ResolvedDatabaseSpecifier::Id(database_id),
581 SchemaSpecifier::Id(*schema_id),
582 conn_id,
583 seen,
584 ));
585 }
586 dependents.push(object_id);
587 }
588 dependents
589 }
590
591 fn schema_dependents(
598 &self,
599 database_spec: ResolvedDatabaseSpecifier,
600 schema_spec: SchemaSpecifier,
601 conn_id: &ConnectionId,
602 seen: &mut BTreeSet<ObjectId>,
603 ) -> Vec<ObjectId> {
604 let mut dependents = Vec::new();
605 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
606 if !seen.contains(&object_id) {
607 seen.insert(object_id.clone());
608 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
609 for item_id in schema.item_ids() {
610 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
611 }
612 dependents.push(object_id)
613 }
614 dependents
615 }
616
617 pub(super) fn item_dependents(
624 &self,
625 item_id: CatalogItemId,
626 seen: &mut BTreeSet<ObjectId>,
627 ) -> Vec<ObjectId> {
628 let mut dependents = Vec::new();
629 let object_id = ObjectId::Item(item_id);
630 if !seen.contains(&object_id) {
631 seen.insert(object_id.clone());
632 let entry = self.get_entry(&item_id);
633 for dependent_id in entry.used_by() {
634 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
635 }
636 dependents.push(object_id);
637 if let Some(progress_id) = entry.progress_id() {
641 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
642 }
643 }
644 dependents
645 }
646
647 pub(super) fn network_policy_dependents(
654 &self,
655 network_policy_id: NetworkPolicyId,
656 _seen: &mut BTreeSet<ObjectId>,
657 ) -> Vec<ObjectId> {
658 let object_id = ObjectId::NetworkPolicy(network_policy_id);
659 vec![object_id]
663 }
664
665 fn is_stable(&self, id: CatalogItemId) -> bool {
669 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
670 !self.is_unstable_schema_specifier(spec)
671 }
672
673 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
674 if self.system_config().unsafe_enable_unstable_dependencies() {
675 return Ok(());
676 }
677
678 let unstable_dependencies: Vec<_> = item
679 .references()
680 .items()
681 .filter(|id| !self.is_stable(**id))
682 .map(|id| self.get_entry(id).name().item.clone())
683 .collect();
684
685 if unstable_dependencies.is_empty() || item.is_temporary() {
689 Ok(())
690 } else {
691 let object_type = item.typ().to_string();
692 Err(Error {
693 kind: ErrorKind::UnstableDependency {
694 object_type,
695 unstable_dependencies,
696 },
697 })
698 }
699 }
700
701 pub fn resolve_full_name(
702 &self,
703 name: &QualifiedItemName,
704 conn_id: Option<&ConnectionId>,
705 ) -> FullItemName {
706 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
707
708 let database = match &name.qualifiers.database_spec {
709 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
710 ResolvedDatabaseSpecifier::Id(id) => {
711 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
712 }
713 };
714 let schema = match &name.qualifiers.schema_spec {
717 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
718 SchemaSpecifier::Id(_) => self
719 .get_schema(
720 &name.qualifiers.database_spec,
721 &name.qualifiers.schema_spec,
722 conn_id,
723 )
724 .name()
725 .schema
726 .clone(),
727 };
728 FullItemName {
729 database,
730 schema,
731 item: name.item.clone(),
732 }
733 }
734
735 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
736 let database = match &name.database {
737 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
738 ResolvedDatabaseSpecifier::Id(id) => {
739 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
740 }
741 };
742 FullSchemaName {
743 database,
744 schema: name.schema.clone(),
745 }
746 }
747
748 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
749 self.entry_by_id
750 .get(id)
751 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
752 }
753
754 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
755 let item_id = self
756 .entry_by_global_id
757 .get(id)
758 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
759
760 let entry = self.get_entry(item_id).clone();
761 let version = match entry.item() {
762 CatalogItem::Table(table) => {
763 let (version, _) = table
764 .collections
765 .iter()
766 .find(|(_verison, gid)| *gid == id)
767 .expect("version to exist");
768 RelationVersionSelector::Specific(*version)
769 }
770 _ => RelationVersionSelector::Latest,
771 };
772 CatalogCollectionEntry { entry, version }
773 }
774
775 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
776 self.entry_by_id.iter()
777 }
778
779 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
780 self.temporary_schemas
782 .get(conn)
783 .into_iter()
784 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
785 }
786
787 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
793 self.temporary_schemas.contains_key(conn)
794 }
795
796 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
802 let mut res = None;
803 for schema_id in self.system_schema_ids() {
804 let schema = &self.ambient_schemas_by_id[&schema_id];
805 if let Some(global_id) = schema.types.get(name) {
806 match res {
807 None => res = Some(self.get_entry(global_id)),
808 Some(_) => panic!(
809 "only call get_system_type on objects uniquely identifiable in one system schema"
810 ),
811 }
812 }
813 }
814
815 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
816 }
817
818 pub fn get_item_by_name(
819 &self,
820 name: &QualifiedItemName,
821 conn_id: &ConnectionId,
822 ) -> Option<&CatalogEntry> {
823 self.get_schema(
824 &name.qualifiers.database_spec,
825 &name.qualifiers.schema_spec,
826 conn_id,
827 )
828 .items
829 .get(&name.item)
830 .and_then(|id| self.try_get_entry(id))
831 }
832
833 pub fn get_type_by_name(
834 &self,
835 name: &QualifiedItemName,
836 conn_id: &ConnectionId,
837 ) -> Option<&CatalogEntry> {
838 self.get_schema(
839 &name.qualifiers.database_spec,
840 &name.qualifiers.schema_spec,
841 conn_id,
842 )
843 .types
844 .get(&name.item)
845 .and_then(|id| self.try_get_entry(id))
846 }
847
848 pub(super) fn find_available_name(
849 &self,
850 mut name: QualifiedItemName,
851 conn_id: &ConnectionId,
852 ) -> QualifiedItemName {
853 let mut i = 0;
854 let orig_item_name = name.item.clone();
855 while self.get_item_by_name(&name, conn_id).is_some() {
856 i += 1;
857 name.item = format!("{}{}", orig_item_name, i);
858 }
859 name
860 }
861
862 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
863 self.entry_by_id.get(id)
864 }
865
866 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
867 let item_id = self.entry_by_global_id.get(id)?;
868 self.try_get_entry(item_id)
869 }
870
871 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
874 let entry = self.try_get_entry_by_global_id(id)?;
875 let desc = match entry.item() {
876 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
877 other => other.relation_desc(RelationVersionSelector::Latest)?,
879 };
880 Some(desc)
881 }
882
883 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
884 self.try_get_cluster(cluster_id)
885 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
886 }
887
888 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
889 self.clusters_by_id.get(&cluster_id)
890 }
891
892 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
893 self.roles_by_id.get(id)
894 }
895
896 pub fn get_role(&self, id: &RoleId) -> &Role {
897 self.roles_by_id.get(id).expect("catalog out of sync")
898 }
899
900 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
901 self.roles_by_id.keys()
902 }
903
904 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
905 self.roles_by_name
906 .get(role_name)
907 .map(|id| &self.roles_by_id[id])
908 }
909
910 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
911 self.role_auth_by_id
912 .get(id)
913 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
914 }
915
916 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
917 self.role_auth_by_id.get(id)
918 }
919
920 pub(super) fn try_get_network_policy_by_name(
921 &self,
922 policy_name: &str,
923 ) -> Option<&NetworkPolicy> {
924 self.network_policies_by_name
925 .get(policy_name)
926 .map(|id| &self.network_policies_by_id[id])
927 }
928
929 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
930 let mut membership = BTreeSet::new();
931 let mut queue = VecDeque::from(vec![id]);
932 while let Some(cur_id) = queue.pop_front() {
933 if !membership.contains(cur_id) {
934 membership.insert(cur_id.clone());
935 let role = self.get_role(cur_id);
936 soft_assert_no_log!(
937 !role.membership().keys().contains(id),
938 "circular membership exists in the catalog"
939 );
940 queue.extend(role.membership().keys());
941 }
942 }
943 membership.insert(RoleId::Public);
944 membership
945 }
946
947 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
948 self.network_policies_by_id
949 .get(id)
950 .expect("catalog out of sync")
951 }
952
953 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
954 self.network_policies_by_id.keys()
955 }
956
957 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
962 let entry = self.try_get_entry(id)?;
963 let name = self.resolve_full_name(entry.name(), None);
965 let host_name = self
966 .http_host_name
967 .as_ref()
968 .map(|x| x.as_str())
969 .unwrap_or_else(|| "HOST");
970
971 let RawDatabaseSpecifier::Name(database) = name.database else {
972 return None;
973 };
974
975 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
976 url.path_segments_mut()
977 .ok()?
978 .push(&database)
979 .push(&name.schema)
980 .push(&name.item);
981
982 Some(url)
983 }
984
985 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
993 &mut self,
996 create_sql: &str,
997 force_if_exists_skip: bool,
998 ) -> Result<(Plan, ResolvedIds), AdapterError> {
999 self.with_enable_for_item_parsing(|state| {
1000 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1001 let pcx = Some(&pcx);
1002 let session_catalog = state.for_system_session();
1003
1004 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1005 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1006 let plan =
1007 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1008
1009 Ok((plan, resolved_ids))
1010 })
1011 }
1012
1013 #[mz_ore::instrument]
1015 pub(crate) fn parse_plan(
1016 create_sql: &str,
1017 pcx: Option<&PlanContext>,
1018 catalog: &ConnCatalog,
1019 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1020 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1021 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1022 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1023
1024 Ok((plan, resolved_ids))
1025 }
1026
1027 pub(crate) fn deserialize_item(
1029 &self,
1030 global_id: GlobalId,
1031 create_sql: &str,
1032 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1033 local_expression_cache: &mut LocalExpressionCache,
1034 previous_item: Option<CatalogItem>,
1035 ) -> Result<CatalogItem, AdapterError> {
1036 self.parse_item(
1037 global_id,
1038 create_sql,
1039 extra_versions,
1040 None,
1041 false,
1042 None,
1043 local_expression_cache,
1044 previous_item,
1045 )
1046 }
1047
1048 #[mz_ore::instrument]
1050 pub(crate) fn parse_item(
1051 &self,
1052 global_id: GlobalId,
1053 create_sql: &str,
1054 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1055 pcx: Option<&PlanContext>,
1056 is_retained_metrics_object: bool,
1057 custom_logical_compaction_window: Option<CompactionWindow>,
1058 local_expression_cache: &mut LocalExpressionCache,
1059 previous_item: Option<CatalogItem>,
1060 ) -> Result<CatalogItem, AdapterError> {
1061 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1062 match self.parse_item_inner(
1063 global_id,
1064 create_sql,
1065 extra_versions,
1066 pcx,
1067 is_retained_metrics_object,
1068 custom_logical_compaction_window,
1069 cached_expr,
1070 previous_item,
1071 ) {
1072 Ok((item, uncached_expr)) => {
1073 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1074 local_expression_cache.insert_uncached_expression(
1075 global_id,
1076 uncached_expr,
1077 optimizer_features,
1078 );
1079 }
1080 Ok(item)
1081 }
1082 Err((err, cached_expr)) => {
1083 if let Some(local_expr) = cached_expr {
1084 local_expression_cache.insert_cached_expression(global_id, local_expr);
1085 }
1086 Err(err)
1087 }
1088 }
1089 }
1090
1091 #[mz_ore::instrument]
1098 pub(crate) fn parse_item_inner(
1099 &self,
1100 global_id: GlobalId,
1101 create_sql: &str,
1102 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1103 pcx: Option<&PlanContext>,
1104 is_retained_metrics_object: bool,
1105 custom_logical_compaction_window: Option<CompactionWindow>,
1106 cached_expr: Option<LocalExpressions>,
1107 previous_item: Option<CatalogItem>,
1108 ) -> Result<
1109 (
1110 CatalogItem,
1111 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1112 ),
1113 (AdapterError, Option<LocalExpressions>),
1114 > {
1115 let session_catalog = self.for_system_session();
1116
1117 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1118 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1119 Err(err) => return Err((err, cached_expr)),
1120 };
1121
1122 let mut uncached_expr = None;
1123
1124 let previous_plans = previous_item.as_ref().map(|item| {
1134 (
1135 item.optimized_plan().cloned(),
1136 item.physical_plan().cloned(),
1137 item.dataflow_metainfo().cloned(),
1138 )
1139 });
1140
1141 let mut item = match plan {
1142 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1143 let collections = extra_versions
1144 .iter()
1145 .map(|(version, gid)| (*version, *gid))
1146 .chain([(RelationVersion::root(), global_id)].into_iter())
1147 .collect();
1148
1149 CatalogItem::Table(Table {
1150 create_sql: Some(table.create_sql),
1151 desc: table.desc,
1152 collections,
1153 conn_id: None,
1154 resolved_ids,
1155 custom_logical_compaction_window: custom_logical_compaction_window
1156 .or(table.compaction_window),
1157 is_retained_metrics_object,
1158 data_source: match table.data_source {
1159 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1160 TableDataSource::TableWrites { defaults }
1161 }
1162 mz_sql::plan::TableDataSource::DataSource {
1163 desc: data_source_desc,
1164 timeline,
1165 } => match data_source_desc {
1166 mz_sql::plan::DataSourceDesc::IngestionExport {
1167 ingestion_id,
1168 external_reference,
1169 details,
1170 data_config,
1171 } => TableDataSource::DataSource {
1172 desc: DataSourceDesc::IngestionExport {
1173 ingestion_id,
1174 external_reference,
1175 details,
1176 data_config,
1177 },
1178 timeline,
1179 },
1180 mz_sql::plan::DataSourceDesc::Webhook {
1181 validate_using,
1182 body_format,
1183 headers,
1184 cluster_id,
1185 } => TableDataSource::DataSource {
1186 desc: DataSourceDesc::Webhook {
1187 validate_using,
1188 body_format,
1189 headers,
1190 cluster_id: cluster_id
1191 .expect("Webhook Tables must have a cluster_id set"),
1192 },
1193 timeline,
1194 },
1195 _ => {
1196 return Err((
1197 AdapterError::Unstructured(anyhow::anyhow!(
1198 "unsupported data source for table"
1199 )),
1200 cached_expr,
1201 ));
1202 }
1203 },
1204 },
1205 })
1206 }
1207 Plan::CreateSource(CreateSourcePlan {
1208 source,
1209 timeline,
1210 in_cluster,
1211 ..
1212 }) => CatalogItem::Source(Source {
1213 create_sql: Some(source.create_sql),
1214 data_source: match source.data_source {
1215 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1216 desc,
1217 cluster_id: match in_cluster {
1218 Some(id) => id,
1219 None => {
1220 return Err((
1221 AdapterError::Unstructured(anyhow::anyhow!(
1222 "ingestion-based sources must have cluster specified"
1223 )),
1224 cached_expr,
1225 ));
1226 }
1227 },
1228 },
1229 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1230 desc,
1231 progress_subsource,
1232 data_config,
1233 details,
1234 } => DataSourceDesc::OldSyntaxIngestion {
1235 desc,
1236 progress_subsource,
1237 data_config,
1238 details,
1239 cluster_id: match in_cluster {
1240 Some(id) => id,
1241 None => {
1242 return Err((
1243 AdapterError::Unstructured(anyhow::anyhow!(
1244 "ingestion-based sources must have cluster specified"
1245 )),
1246 cached_expr,
1247 ));
1248 }
1249 },
1250 },
1251 mz_sql::plan::DataSourceDesc::IngestionExport {
1252 ingestion_id,
1253 external_reference,
1254 details,
1255 data_config,
1256 } => DataSourceDesc::IngestionExport {
1257 ingestion_id,
1258 external_reference,
1259 details,
1260 data_config,
1261 },
1262 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1263 mz_sql::plan::DataSourceDesc::Webhook {
1264 validate_using,
1265 body_format,
1266 headers,
1267 cluster_id,
1268 } => {
1269 mz_ore::soft_assert_or_log!(
1270 cluster_id.is_none(),
1271 "cluster_id set at Source level for Webhooks"
1272 );
1273 DataSourceDesc::Webhook {
1274 validate_using,
1275 body_format,
1276 headers,
1277 cluster_id: in_cluster
1278 .expect("webhook sources must use an existing cluster"),
1279 }
1280 }
1281 },
1282 desc: source.desc,
1283 global_id,
1284 timeline,
1285 resolved_ids,
1286 custom_logical_compaction_window: source
1287 .compaction_window
1288 .or(custom_logical_compaction_window),
1289 is_retained_metrics_object,
1290 }),
1291 Plan::CreateView(CreateViewPlan { view, .. }) => {
1292 let optimizer_config =
1294 optimize::OptimizerConfig::from(session_catalog.system_vars());
1295 let previous_exprs = previous_item.map(|item| match item {
1296 CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1297 _ => None,
1298 });
1299
1300 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1301 (Some(local_expr), _)
1302 if local_expr.optimizer_features == optimizer_config.features =>
1303 {
1304 debug!("local expression cache hit for {global_id:?}");
1305 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1306 }
1307 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1309 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1310 }
1311 (cached_expr, _) => {
1312 let optimizer_features = optimizer_config.features.clone();
1313 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1315
1316 let raw_expr = view.expr;
1318 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1319 Ok(optimzed_expr) => optimzed_expr,
1320 Err(err) => return Err((err.into(), cached_expr)),
1321 };
1322
1323 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1324
1325 (Arc::new(raw_expr), Arc::new(optimized_expr))
1326 }
1327 };
1328
1329 let dependencies: BTreeSet<_> = raw_expr
1331 .depends_on()
1332 .into_iter()
1333 .map(|gid| self.get_entry_by_global_id(&gid).id())
1334 .collect();
1335
1336 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1337 CatalogItem::View(View {
1338 create_sql: view.create_sql,
1339 global_id,
1340 raw_expr,
1341 desc: RelationDesc::new(typ, view.column_names),
1342 locally_optimized_expr: optimized_expr,
1343 conn_id: None,
1344 resolved_ids,
1345 dependencies: DependencyIds(dependencies),
1346 })
1347 }
1348 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1349 materialized_view, ..
1350 }) => {
1351 let collections = extra_versions
1352 .iter()
1353 .map(|(version, gid)| (*version, *gid))
1354 .chain([(RelationVersion::root(), global_id)].into_iter())
1355 .collect();
1356
1357 let system_vars = session_catalog.system_vars();
1359 let overrides = self
1360 .get_cluster(materialized_view.cluster_id)
1361 .config
1362 .features();
1363 let optimizer_config =
1364 optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1365 let previous_exprs = previous_item.map(|item| match item {
1366 CatalogItem::MaterializedView(materialized_view) => (
1367 materialized_view.raw_expr,
1368 materialized_view.locally_optimized_expr,
1369 ),
1370 item => unreachable!("expected materialized view, found: {item:#?}"),
1371 });
1372
1373 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1374 (Some(local_expr), _)
1375 if local_expr.optimizer_features == optimizer_config.features =>
1376 {
1377 debug!("local expression cache hit for {global_id:?}");
1378 (
1379 Arc::new(materialized_view.expr),
1380 Arc::new(local_expr.local_mir),
1381 )
1382 }
1383 (_, Some((raw_expr, optimized_expr)))
1385 if *raw_expr == materialized_view.expr =>
1386 {
1387 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1388 }
1389 (cached_expr, _) => {
1390 let optimizer_features = optimizer_config.features.clone();
1391 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1393
1394 let raw_expr = materialized_view.expr;
1395 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1396 Ok(optimized_expr) => optimized_expr,
1397 Err(err) => return Err((err.into(), cached_expr)),
1398 };
1399
1400 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1401
1402 (Arc::new(raw_expr), Arc::new(optimized_expr))
1403 }
1404 };
1405 let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1406
1407 for &i in &materialized_view.non_null_assertions {
1408 typ.column_types[i].nullable = false;
1409 }
1410 let desc = RelationDesc::new(typ, materialized_view.column_names);
1411 let desc = VersionedRelationDesc::new(desc);
1412
1413 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1414
1415 let dependencies = raw_expr
1417 .depends_on()
1418 .into_iter()
1419 .map(|gid| self.get_entry_by_global_id(&gid).id())
1420 .collect();
1421
1422 CatalogItem::MaterializedView(MaterializedView {
1423 create_sql: materialized_view.create_sql,
1424 collections,
1425 raw_expr,
1426 locally_optimized_expr: optimized_expr,
1427 desc,
1428 resolved_ids,
1429 dependencies,
1430 replacement_target: materialized_view.replacement_target,
1431 cluster_id: materialized_view.cluster_id,
1432 target_replica: materialized_view.target_replica,
1433 non_null_assertions: materialized_view.non_null_assertions,
1434 custom_logical_compaction_window: materialized_view.compaction_window,
1435 refresh_schedule: materialized_view.refresh_schedule,
1436 initial_as_of,
1437 optimized_plan: None,
1438 physical_plan: None,
1439 dataflow_metainfo: None,
1440 })
1441 }
1442 Plan::CreateContinualTask(plan) => {
1443 let ct =
1444 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1445 Ok(ct) => ct,
1446 Err(err) => return Err((err, cached_expr)),
1447 };
1448 CatalogItem::ContinualTask(ct)
1449 }
1450 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1451 create_sql: index.create_sql,
1452 global_id,
1453 on: index.on,
1454 keys: index.keys.into(),
1455 conn_id: None,
1456 resolved_ids,
1457 cluster_id: index.cluster_id,
1458 custom_logical_compaction_window: custom_logical_compaction_window
1459 .or(index.compaction_window),
1460 is_retained_metrics_object,
1461 optimized_plan: None,
1462 physical_plan: None,
1463 dataflow_metainfo: None,
1464 }),
1465 Plan::CreateSink(CreateSinkPlan {
1466 sink,
1467 with_snapshot,
1468 in_cluster,
1469 ..
1470 }) => CatalogItem::Sink(Sink {
1471 create_sql: sink.create_sql,
1472 global_id,
1473 from: sink.from,
1474 connection: sink.connection,
1475 envelope: sink.envelope,
1476 version: sink.version,
1477 with_snapshot,
1478 resolved_ids,
1479 cluster_id: in_cluster,
1480 commit_interval: sink.commit_interval,
1481 }),
1482 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1483 if let Err(err) = typ.inner.desc(&session_catalog) {
1487 return Err((err.into(), cached_expr));
1488 }
1489 CatalogItem::Type(Type {
1490 create_sql: Some(typ.create_sql),
1491 global_id,
1492 details: CatalogTypeDetails {
1493 array_id: None,
1494 typ: typ.inner,
1495 pg_metadata: None,
1496 },
1497 resolved_ids,
1498 })
1499 }
1500 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1501 create_sql: secret.create_sql,
1502 global_id,
1503 }),
1504 Plan::CreateConnection(CreateConnectionPlan {
1505 connection:
1506 mz_sql::plan::Connection {
1507 create_sql,
1508 details,
1509 },
1510 ..
1511 }) => CatalogItem::Connection(Connection {
1512 create_sql,
1513 global_id,
1514 details,
1515 resolved_ids,
1516 }),
1517 _ => {
1518 return Err((
1519 Error::new(ErrorKind::Corruption {
1520 detail: "catalog entry generated inappropriate plan".to_string(),
1521 })
1522 .into(),
1523 cached_expr,
1524 ));
1525 }
1526 };
1527
1528 if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1532 if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1533 {
1534 *optimized_plan = prev_optimized;
1535 *physical_plan = prev_physical;
1536 *dataflow_metainfo = prev_metainfo;
1537 }
1538 }
1539
1540 Ok((item, uncached_expr))
1541 }
1542
1543 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1549 let restore = Arc::clone(&self.system_configuration);
1560 Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1561 let res = f(self);
1562 self.system_configuration = restore;
1563 res
1564 }
1565
1566 pub fn get_indexes_on(
1568 &self,
1569 id: GlobalId,
1570 cluster: ClusterId,
1571 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1572 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1573
1574 self.try_get_entry_by_global_id(&id)
1575 .into_iter()
1576 .map(move |e| {
1577 e.used_by()
1578 .iter()
1579 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1580 CatalogItem::Index(index) if index_matches(index) => {
1581 Some((index.global_id(), index))
1582 }
1583 _ => None,
1584 })
1585 })
1586 .flatten()
1587 }
1588
1589 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1590 &self.database_by_id[database_id]
1591 }
1592
1593 pub(super) fn try_get_cluster_replica(
1598 &self,
1599 id: ClusterId,
1600 replica_id: ReplicaId,
1601 ) -> Option<&ClusterReplica> {
1602 self.try_get_cluster(id)
1603 .and_then(|cluster| cluster.replica(replica_id))
1604 }
1605
1606 pub(crate) fn get_cluster_replica(
1610 &self,
1611 cluster_id: ClusterId,
1612 replica_id: ReplicaId,
1613 ) -> &ClusterReplica {
1614 self.try_get_cluster_replica(cluster_id, replica_id)
1615 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1616 }
1617
1618 pub(super) fn resolve_replica_in_cluster(
1619 &self,
1620 cluster_id: &ClusterId,
1621 replica_name: &str,
1622 ) -> Result<&ClusterReplica, SqlCatalogError> {
1623 let cluster = self.get_cluster(*cluster_id);
1624 let replica_id = cluster
1625 .replica_id_by_name_
1626 .get(replica_name)
1627 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1628 Ok(&cluster.replicas_by_id_[replica_id])
1629 }
1630
1631 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1633 Ok(self.system_configuration.get(name)?)
1634 }
1635
1636 pub(super) fn parse_system_configuration(
1640 &self,
1641 name: &str,
1642 value: VarInput,
1643 ) -> Result<String, Error> {
1644 let value = self.system_configuration.parse(name, value)?;
1645 Ok(value.format())
1646 }
1647
1648 pub(super) fn resolve_schema_in_database(
1650 &self,
1651 database_spec: &ResolvedDatabaseSpecifier,
1652 schema_name: &str,
1653 conn_id: &ConnectionId,
1654 ) -> Result<&Schema, SqlCatalogError> {
1655 let schema = match database_spec {
1656 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1657 self.temporary_schemas.get(conn_id)
1658 }
1659 ResolvedDatabaseSpecifier::Ambient => self
1660 .ambient_schemas_by_name
1661 .get(schema_name)
1662 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1663 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1664 db.schemas_by_name
1665 .get(schema_name)
1666 .and_then(|id| db.schemas_by_id.get(id))
1667 }),
1668 };
1669 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1670 }
1671
1672 pub fn try_get_schema(
1677 &self,
1678 database_spec: &ResolvedDatabaseSpecifier,
1679 schema_spec: &SchemaSpecifier,
1680 conn_id: &ConnectionId,
1681 ) -> Option<&Schema> {
1682 match (database_spec, schema_spec) {
1684 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1685 self.temporary_schemas.get(conn_id)
1686 }
1687 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1688 self.ambient_schemas_by_id.get(id)
1689 }
1690 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1691 .database_by_id
1692 .get(database_id)
1693 .and_then(|db| db.schemas_by_id.get(schema_id)),
1694 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1695 unreachable!("temporary schemas are in the ambient database")
1696 }
1697 }
1698 }
1699
1700 pub fn get_schema(
1701 &self,
1702 database_spec: &ResolvedDatabaseSpecifier,
1703 schema_spec: &SchemaSpecifier,
1704 conn_id: &ConnectionId,
1705 ) -> &Schema {
1706 self.try_get_schema(database_spec, schema_spec, conn_id)
1708 .expect("schema must exist")
1709 }
1710
1711 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1712 self.database_by_id
1713 .values()
1714 .filter_map(|database| database.schemas_by_id.get(schema_id))
1715 .chain(self.ambient_schemas_by_id.values())
1716 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1717 .into_first()
1718 }
1719
1720 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1721 self.temporary_schemas
1722 .values()
1723 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1724 .into_first()
1725 }
1726
1727 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1728 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1729 }
1730
1731 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1732 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1733 }
1734
1735 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1736 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1737 }
1738
1739 pub fn get_information_schema_id(&self) -> SchemaId {
1740 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1741 }
1742
1743 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1744 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1745 }
1746
1747 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1748 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1749 }
1750
1751 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1752 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1753 }
1754
1755 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1756 SYSTEM_SCHEMAS
1757 .iter()
1758 .map(|name| self.ambient_schemas_by_name[*name])
1759 }
1760
1761 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1762 self.system_schema_ids().contains(&id)
1763 }
1764
1765 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1766 match spec {
1767 SchemaSpecifier::Temporary => false,
1768 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1769 }
1770 }
1771
1772 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1773 UNSTABLE_SCHEMAS
1774 .iter()
1775 .map(|name| self.ambient_schemas_by_name[*name])
1776 }
1777
1778 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1779 self.unstable_schema_ids().contains(&id)
1780 }
1781
1782 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1783 match spec {
1784 SchemaSpecifier::Temporary => false,
1785 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1786 }
1787 }
1788
1789 pub fn create_temporary_schema(
1792 &mut self,
1793 conn_id: &ConnectionId,
1794 owner_id: RoleId,
1795 ) -> Result<(), Error> {
1796 let oid = INVALID_OID;
1801 self.temporary_schemas.insert(
1802 conn_id.clone(),
1803 Schema {
1804 name: QualifiedSchemaName {
1805 database: ResolvedDatabaseSpecifier::Ambient,
1806 schema: MZ_TEMP_SCHEMA.into(),
1807 },
1808 id: SchemaSpecifier::Temporary,
1809 oid,
1810 items: BTreeMap::new(),
1811 functions: BTreeMap::new(),
1812 types: BTreeMap::new(),
1813 owner_id,
1814 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1815 mz_sql::catalog::ObjectType::Schema,
1816 owner_id,
1817 )]),
1818 },
1819 );
1820 Ok(())
1821 }
1822
1823 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1825 std::iter::empty()
1826 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1827 if schema.id.is_temporary() {
1828 Some(schema.oid)
1829 } else {
1830 None
1831 }
1832 }))
1833 .chain(self.entry_by_id.values().filter_map(|entry| {
1834 if entry.item().is_temporary() {
1835 Some(entry.oid)
1836 } else {
1837 None
1838 }
1839 }))
1840 }
1841
1842 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1846 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1847 }
1848
1849 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1853 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1854 let log = match self.get_entry(&item_id).item() {
1855 CatalogItem::Log(log) => log,
1856 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1857 };
1858 (item_id, log.global_id)
1859 }
1860
1861 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1865 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1866 }
1867
1868 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1872 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1873 let schema = &self.ambient_schemas_by_id[schema_id];
1874 match builtin.catalog_item_type() {
1875 CatalogItemType::Type => schema.types[builtin.name()],
1876 CatalogItemType::Func => schema.functions[builtin.name()],
1877 CatalogItemType::Table
1878 | CatalogItemType::Source
1879 | CatalogItemType::Sink
1880 | CatalogItemType::View
1881 | CatalogItemType::MaterializedView
1882 | CatalogItemType::Index
1883 | CatalogItemType::Secret
1884 | CatalogItemType::Connection
1885 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1886 }
1887 }
1888
1889 pub fn resolve_builtin_type_references(
1891 &self,
1892 builtin: &BuiltinType<NameReference>,
1893 ) -> BuiltinType<IdReference> {
1894 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1895 CatalogType::AclItem => CatalogType::AclItem,
1896 CatalogType::Array { element_reference } => CatalogType::Array {
1897 element_reference: self.get_system_type(element_reference).id,
1898 },
1899 CatalogType::List {
1900 element_reference,
1901 element_modifiers,
1902 } => CatalogType::List {
1903 element_reference: self.get_system_type(element_reference).id,
1904 element_modifiers: element_modifiers.clone(),
1905 },
1906 CatalogType::Map {
1907 key_reference,
1908 value_reference,
1909 key_modifiers,
1910 value_modifiers,
1911 } => CatalogType::Map {
1912 key_reference: self.get_system_type(key_reference).id,
1913 value_reference: self.get_system_type(value_reference).id,
1914 key_modifiers: key_modifiers.clone(),
1915 value_modifiers: value_modifiers.clone(),
1916 },
1917 CatalogType::Range { element_reference } => CatalogType::Range {
1918 element_reference: self.get_system_type(element_reference).id,
1919 },
1920 CatalogType::Record { fields } => CatalogType::Record {
1921 fields: fields
1922 .into_iter()
1923 .map(|f| CatalogRecordField {
1924 name: f.name.clone(),
1925 type_reference: self.get_system_type(f.type_reference).id,
1926 type_modifiers: f.type_modifiers.clone(),
1927 })
1928 .collect(),
1929 },
1930 CatalogType::Bool => CatalogType::Bool,
1931 CatalogType::Bytes => CatalogType::Bytes,
1932 CatalogType::Char => CatalogType::Char,
1933 CatalogType::Date => CatalogType::Date,
1934 CatalogType::Float32 => CatalogType::Float32,
1935 CatalogType::Float64 => CatalogType::Float64,
1936 CatalogType::Int16 => CatalogType::Int16,
1937 CatalogType::Int32 => CatalogType::Int32,
1938 CatalogType::Int64 => CatalogType::Int64,
1939 CatalogType::UInt16 => CatalogType::UInt16,
1940 CatalogType::UInt32 => CatalogType::UInt32,
1941 CatalogType::UInt64 => CatalogType::UInt64,
1942 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1943 CatalogType::Interval => CatalogType::Interval,
1944 CatalogType::Jsonb => CatalogType::Jsonb,
1945 CatalogType::Numeric => CatalogType::Numeric,
1946 CatalogType::Oid => CatalogType::Oid,
1947 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1948 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1949 CatalogType::Pseudo => CatalogType::Pseudo,
1950 CatalogType::RegClass => CatalogType::RegClass,
1951 CatalogType::RegProc => CatalogType::RegProc,
1952 CatalogType::RegType => CatalogType::RegType,
1953 CatalogType::String => CatalogType::String,
1954 CatalogType::Time => CatalogType::Time,
1955 CatalogType::Timestamp => CatalogType::Timestamp,
1956 CatalogType::TimestampTz => CatalogType::TimestampTz,
1957 CatalogType::Uuid => CatalogType::Uuid,
1958 CatalogType::VarChar => CatalogType::VarChar,
1959 CatalogType::Int2Vector => CatalogType::Int2Vector,
1960 CatalogType::MzAclItem => CatalogType::MzAclItem,
1961 };
1962
1963 BuiltinType {
1964 name: builtin.name,
1965 schema: builtin.schema,
1966 oid: builtin.oid,
1967 details: CatalogTypeDetails {
1968 array_id: builtin.details.array_id,
1969 typ,
1970 pg_metadata: builtin.details.pg_metadata.clone(),
1971 },
1972 }
1973 }
1974
1975 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1976 &self.config
1977 }
1978
1979 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1980 match self.database_by_name.get(database_name) {
1981 Some(id) => Ok(&self.database_by_id[id]),
1982 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1983 }
1984 }
1985
1986 pub fn resolve_schema(
1987 &self,
1988 current_database: Option<&DatabaseId>,
1989 database_name: Option<&str>,
1990 schema_name: &str,
1991 conn_id: &ConnectionId,
1992 ) -> Result<&Schema, SqlCatalogError> {
1993 let database_spec = match database_name {
1994 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1999 self.resolve_database(database)?.id().clone(),
2000 )),
2001 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2002 };
2003
2004 if let Some(database_spec) = database_spec {
2006 if let Ok(schema) =
2007 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2008 {
2009 return Ok(schema);
2010 }
2011 }
2012
2013 if let Ok(schema) = self.resolve_schema_in_database(
2015 &ResolvedDatabaseSpecifier::Ambient,
2016 schema_name,
2017 conn_id,
2018 ) {
2019 return Ok(schema);
2020 }
2021
2022 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2023 }
2024
2025 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2029 self.ambient_schemas_by_name[name]
2030 }
2031
2032 pub fn resolve_search_path(
2033 &self,
2034 session: &dyn SessionMetadata,
2035 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2036 let database = self
2037 .database_by_name
2038 .get(session.database())
2039 .map(|id| id.clone());
2040
2041 session
2042 .search_path()
2043 .iter()
2044 .map(|schema| {
2045 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2046 })
2047 .filter_map(|schema| schema.ok())
2048 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2049 .collect()
2050 }
2051
2052 pub fn effective_search_path(
2053 &self,
2054 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2055 include_temp_schema: bool,
2056 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2057 let mut v = Vec::with_capacity(search_path.len() + 3);
2058 let temp_schema = (
2060 ResolvedDatabaseSpecifier::Ambient,
2061 SchemaSpecifier::Temporary,
2062 );
2063 if include_temp_schema && !search_path.contains(&temp_schema) {
2064 v.push(temp_schema);
2065 }
2066 let default_schemas = [
2067 (
2068 ResolvedDatabaseSpecifier::Ambient,
2069 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2070 ),
2071 (
2072 ResolvedDatabaseSpecifier::Ambient,
2073 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2074 ),
2075 ];
2076 for schema in default_schemas.into_iter() {
2077 if !search_path.contains(&schema) {
2078 v.push(schema);
2079 }
2080 }
2081 v.extend_from_slice(search_path);
2082 v
2083 }
2084
2085 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2086 let id = self
2087 .clusters_by_name
2088 .get(name)
2089 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2090 Ok(&self.clusters_by_id[id])
2091 }
2092
2093 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2094 let id = self
2095 .clusters_by_name
2096 .get(cluster.name)
2097 .expect("failed to lookup BuiltinCluster by name");
2098 self.clusters_by_id
2099 .get(id)
2100 .expect("failed to lookup BuiltinCluster by ID")
2101 }
2102
2103 pub fn resolve_cluster_replica(
2104 &self,
2105 cluster_replica_name: &QualifiedReplica,
2106 ) -> Result<&ClusterReplica, SqlCatalogError> {
2107 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2108 let replica_name = cluster_replica_name.replica.as_str();
2109 let replica_id = cluster
2110 .replica_id(replica_name)
2111 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2112 Ok(cluster.replica(replica_id).expect("Must exist"))
2113 }
2114
2115 #[allow(clippy::useless_let_if_seq)]
2121 pub fn resolve(
2122 &self,
2123 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2124 current_database: Option<&DatabaseId>,
2125 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2126 name: &PartialItemName,
2127 conn_id: &ConnectionId,
2128 err_gen: fn(String) -> SqlCatalogError,
2129 ) -> Result<&CatalogEntry, SqlCatalogError> {
2130 let schemas = match &name.schema {
2135 Some(schema_name) => {
2136 match self.resolve_schema(
2137 current_database,
2138 name.database.as_deref(),
2139 schema_name,
2140 conn_id,
2141 ) {
2142 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2143 Err(e) => return Err(e),
2144 }
2145 }
2146 None => match self
2147 .try_get_schema(
2148 &ResolvedDatabaseSpecifier::Ambient,
2149 &SchemaSpecifier::Temporary,
2150 conn_id,
2151 )
2152 .and_then(|schema| schema.items.get(&name.item))
2153 {
2154 Some(id) => return Ok(self.get_entry(id)),
2155 None => search_path.to_vec(),
2156 },
2157 };
2158
2159 for (database_spec, schema_spec) in &schemas {
2160 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2163 continue;
2164 };
2165
2166 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2167 return Ok(&self.entry_by_id[id]);
2168 }
2169 }
2170
2171 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2176 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2177 for schema_id in [
2178 self.get_mz_catalog_unstable_schema_id(),
2179 self.get_mz_introspection_schema_id(),
2180 ] {
2181 let schema = self.get_schema(
2182 &ResolvedDatabaseSpecifier::Ambient,
2183 &SchemaSpecifier::Id(schema_id),
2184 conn_id,
2185 );
2186
2187 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2188 debug!(
2189 github_27831 = true,
2190 "encountered use of outdated schema `mz_internal` for relation: {name}",
2191 );
2192 return Ok(&self.entry_by_id[id]);
2193 }
2194 }
2195 }
2196
2197 Err(err_gen(name.to_string()))
2198 }
2199
2200 pub fn resolve_entry(
2202 &self,
2203 current_database: Option<&DatabaseId>,
2204 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2205 name: &PartialItemName,
2206 conn_id: &ConnectionId,
2207 ) -> Result<&CatalogEntry, SqlCatalogError> {
2208 self.resolve(
2209 |schema| &schema.items,
2210 current_database,
2211 search_path,
2212 name,
2213 conn_id,
2214 SqlCatalogError::UnknownItem,
2215 )
2216 }
2217
2218 pub fn resolve_function(
2220 &self,
2221 current_database: Option<&DatabaseId>,
2222 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2223 name: &PartialItemName,
2224 conn_id: &ConnectionId,
2225 ) -> Result<&CatalogEntry, SqlCatalogError> {
2226 self.resolve(
2227 |schema| &schema.functions,
2228 current_database,
2229 search_path,
2230 name,
2231 conn_id,
2232 |name| SqlCatalogError::UnknownFunction {
2233 name,
2234 alternative: None,
2235 },
2236 )
2237 }
2238
2239 pub fn resolve_type(
2241 &self,
2242 current_database: Option<&DatabaseId>,
2243 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2244 name: &PartialItemName,
2245 conn_id: &ConnectionId,
2246 ) -> Result<&CatalogEntry, SqlCatalogError> {
2247 static NON_PG_CATALOG_TYPES: LazyLock<
2248 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2249 > = LazyLock::new(|| {
2250 BUILTINS::types()
2251 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2252 .map(|typ| (typ.name, typ))
2253 .collect()
2254 });
2255
2256 let entry = self.resolve(
2257 |schema| &schema.types,
2258 current_database,
2259 search_path,
2260 name,
2261 conn_id,
2262 |name| SqlCatalogError::UnknownType { name },
2263 )?;
2264
2265 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2266 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2267 warn!(
2268 "user specified an incorrect schema of {} for the type {}, which should be in \
2269 the {} schema. This works now due to a bug but will be fixed in a later release.",
2270 PG_CATALOG_SCHEMA.quoted(),
2271 typ.name.quoted(),
2272 typ.schema.quoted(),
2273 )
2274 }
2275 }
2276
2277 Ok(entry)
2278 }
2279
2280 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2282 match object_id {
2283 ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2284 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2285 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2286 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2287 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2288 ObjectId::ClusterReplica(cluster_replica_id) => {
2289 CommentObjectId::ClusterReplica(cluster_replica_id)
2290 }
2291 ObjectId::NetworkPolicy(network_policy_id) => {
2292 CommentObjectId::NetworkPolicy(network_policy_id)
2293 }
2294 }
2295 }
2296
2297 pub fn system_config(&self) -> &SystemVars {
2299 &self.system_configuration
2300 }
2301
2302 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2304 Arc::make_mut(&mut self.system_configuration)
2305 }
2306
2307 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2317 let mut dump = serde_json::to_value(&self).map_err(|e| {
2319 Error::new(ErrorKind::Unstructured(format!(
2320 "internal error: could not dump catalog: {}",
2323 e
2324 )))
2325 })?;
2326
2327 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2328 dump_obj.insert(
2330 "system_parameter_defaults".into(),
2331 serde_json::json!(self.system_config().defaults()),
2332 );
2333 if let Some(unfinalized_shards) = unfinalized_shards {
2335 dump_obj
2336 .get_mut("storage_metadata")
2337 .expect("known to exist")
2338 .as_object_mut()
2339 .expect("storage_metadata is an object")
2340 .insert(
2341 "unfinalized_shards".into(),
2342 serde_json::json!(unfinalized_shards),
2343 );
2344 }
2345 let temporary_gids: Vec<_> = self
2350 .entry_by_global_id
2351 .iter()
2352 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2353 .map(|(gid, _item_id)| *gid)
2354 .collect();
2355 if !temporary_gids.is_empty() {
2356 let gids = dump_obj
2357 .get_mut("entry_by_global_id")
2358 .expect("known_to_exist")
2359 .as_object_mut()
2360 .expect("entry_by_global_id is an object");
2361 for gid in temporary_gids {
2362 gids.remove(&gid.to_string());
2363 }
2364 }
2365 dump_obj.remove("role_auth_by_id");
2368
2369 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2371 }
2372
2373 pub fn availability_zones(&self) -> &[String] {
2374 &self.availability_zones
2375 }
2376
2377 pub fn concretize_replica_location(
2378 &self,
2379 location: mz_catalog::durable::ReplicaLocation,
2380 allowed_sizes: &Vec<String>,
2381 allowed_availability_zones: Option<&[String]>,
2382 ) -> Result<ReplicaLocation, Error> {
2383 let location = match location {
2384 mz_catalog::durable::ReplicaLocation::Unmanaged {
2385 storagectl_addrs,
2386 computectl_addrs,
2387 } => {
2388 if allowed_availability_zones.is_some() {
2389 return Err(Error {
2390 kind: ErrorKind::Internal(
2391 "tried concretize unmanaged replica with specific availability_zones"
2392 .to_string(),
2393 ),
2394 });
2395 }
2396 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2397 storagectl_addrs,
2398 computectl_addrs,
2399 })
2400 }
2401 mz_catalog::durable::ReplicaLocation::Managed {
2402 size,
2403 availability_zone,
2404 billed_as,
2405 internal,
2406 pending,
2407 } => {
2408 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2409 let message = "tried concretize managed replica with specific availability zones and availability zone";
2410 return Err(Error {
2411 kind: ErrorKind::Internal(message.to_string()),
2412 });
2413 }
2414 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2415 let cluster_replica_sizes = &self.cluster_replica_sizes;
2416
2417 ReplicaLocation::Managed(ManagedReplicaLocation {
2418 allocation: cluster_replica_sizes
2419 .0
2420 .get(&size)
2421 .expect("catalog out of sync")
2422 .clone(),
2423 availability_zones: match (availability_zone, allowed_availability_zones) {
2424 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2425 (None, Some([])) => ManagedReplicaAvailabilityZones::FromCluster(None),
2426 (None, Some(azs)) => {
2427 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2428 }
2429 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2430 },
2431 size,
2432 billed_as,
2433 internal,
2434 pending,
2435 })
2436 }
2437 };
2438 Ok(location)
2439 }
2440
2441 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2452 let alloc = &self.cluster_replica_sizes.0[size];
2453 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2454 }
2455
2456 pub(crate) fn ensure_valid_replica_size(
2457 &self,
2458 allowed_sizes: &[String],
2459 size: &String,
2460 ) -> Result<(), Error> {
2461 let cluster_replica_sizes = &self.cluster_replica_sizes;
2462
2463 if !cluster_replica_sizes.0.contains_key(size)
2464 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2465 || cluster_replica_sizes.0[size].disabled
2466 {
2467 let mut entries = cluster_replica_sizes
2468 .enabled_allocations()
2469 .collect::<Vec<_>>();
2470
2471 if !allowed_sizes.is_empty() {
2472 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2473 entries.retain(|(name, _)| allowed_sizes.contains(name));
2474 }
2475
2476 entries.sort_by_key(
2477 |(
2478 _name,
2479 ReplicaAllocation {
2480 scale, cpu_limit, ..
2481 },
2482 )| (scale, cpu_limit),
2483 );
2484
2485 Err(Error {
2486 kind: ErrorKind::InvalidClusterReplicaSize {
2487 size: size.to_owned(),
2488 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2489 },
2490 })
2491 } else {
2492 Ok(())
2493 }
2494 }
2495
2496 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2497 if role_id.is_builtin() {
2498 let role = self.get_role(role_id);
2499 Err(Error::new(ErrorKind::ReservedRoleName(
2500 role.name().to_string(),
2501 )))
2502 } else {
2503 Ok(())
2504 }
2505 }
2506
2507 pub fn ensure_not_reserved_network_policy(
2508 &self,
2509 network_policy_id: &NetworkPolicyId,
2510 ) -> Result<(), Error> {
2511 if network_policy_id.is_builtin() {
2512 let policy = self.get_network_policy(network_policy_id);
2513 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2514 policy.name.clone(),
2515 )))
2516 } else {
2517 Ok(())
2518 }
2519 }
2520
2521 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2522 let is_grantable = !role_id.is_public() && !role_id.is_system();
2523 if is_grantable {
2524 Ok(())
2525 } else {
2526 let role = self.get_role(role_id);
2527 Err(Error::new(ErrorKind::UngrantableRoleName(
2528 role.name().to_string(),
2529 )))
2530 }
2531 }
2532
2533 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2534 if role_id.is_system() {
2535 let role = self.get_role(role_id);
2536 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2537 role.name().to_string(),
2538 )))
2539 } else {
2540 Ok(())
2541 }
2542 }
2543
2544 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2545 if role_id.is_predefined() {
2546 let role = self.get_role(role_id);
2547 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2548 role.name().to_string(),
2549 )))
2550 } else {
2551 Ok(())
2552 }
2553 }
2554
2555 pub(crate) fn add_to_audit_log(
2558 system_configuration: &SystemVars,
2559 oracle_write_ts: mz_repr::Timestamp,
2560 session: Option<&ConnMeta>,
2561 tx: &mut mz_catalog::durable::Transaction,
2562 audit_events: &mut Vec<VersionedEvent>,
2563 event_type: EventType,
2564 object_type: ObjectType,
2565 details: EventDetails,
2566 ) -> Result<(), Error> {
2567 let user = session.map(|session| session.user().name.to_string());
2568
2569 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2572 Some(ts) => ts.into(),
2573 _ => oracle_write_ts.into(),
2574 };
2575 let id = tx.allocate_audit_log_id()?;
2576 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2577 audit_events.push(event.clone());
2578 tx.insert_audit_log_event(event);
2579 Ok(())
2580 }
2581
2582 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2583 match id {
2584 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2585 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2586 self.get_cluster_replica(*cluster_id, *replica_id)
2587 .owner_id(),
2588 ),
2589 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2590 ObjectId::Schema((database_spec, schema_spec)) => Some(
2591 self.get_schema(database_spec, schema_spec, conn_id)
2592 .owner_id(),
2593 ),
2594 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2595 ObjectId::Role(_) => None,
2596 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2597 }
2598 }
2599
2600 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2601 match object_id {
2602 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2603 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2604 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2605 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2606 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2607 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2608 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2609 }
2610 }
2611
2612 pub(super) fn get_system_object_type(
2613 &self,
2614 id: &SystemObjectId,
2615 ) -> mz_sql::catalog::SystemObjectType {
2616 match id {
2617 SystemObjectId::Object(object_id) => {
2618 SystemObjectType::Object(self.get_object_type(object_id))
2619 }
2620 SystemObjectId::System => SystemObjectType::System,
2621 }
2622 }
2623
2624 pub fn storage_metadata(&self) -> &StorageMetadata {
2628 &self.storage_metadata
2629 }
2630
2631 pub fn source_compaction_windows(
2633 &self,
2634 ids: impl IntoIterator<Item = CatalogItemId>,
2635 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2636 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2637 let mut seen = BTreeSet::new();
2638 for item_id in ids {
2639 if !seen.insert(item_id) {
2640 continue;
2641 }
2642 let entry = self.get_entry(&item_id);
2643 match entry.item() {
2644 CatalogItem::Source(source) => {
2645 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2646 cws.entry(source_cw).or_default().insert(item_id);
2647 }
2648 CatalogItem::Table(table) => {
2649 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2650 match &table.data_source {
2651 TableDataSource::DataSource {
2652 desc:
2653 DataSourceDesc::IngestionExport { .. }
2654 | DataSourceDesc::Webhook { .. },
2656 timeline: _,
2657 } => {
2658 cws.entry(table_cw).or_default().insert(item_id);
2659 }
2660 TableDataSource::TableWrites { .. } => {}
2663 TableDataSource::DataSource {
2664 desc:
2665 DataSourceDesc::Ingestion { .. }
2666 | DataSourceDesc::OldSyntaxIngestion { .. }
2667 | DataSourceDesc::Introspection(_)
2668 | DataSourceDesc::Progress
2669 | DataSourceDesc::Catalog,
2670 ..
2671 } => {
2672 unreachable!(
2673 "unexpected DataSourceDesc for table {item_id}: {:?}",
2674 table.data_source
2675 )
2676 }
2677 }
2678 }
2679 _ => {
2680 continue;
2682 }
2683 }
2684 }
2685 cws
2686 }
2687
2688 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2689 match id {
2690 CommentObjectId::Table(id)
2691 | CommentObjectId::View(id)
2692 | CommentObjectId::MaterializedView(id)
2693 | CommentObjectId::Source(id)
2694 | CommentObjectId::Sink(id)
2695 | CommentObjectId::Index(id)
2696 | CommentObjectId::Func(id)
2697 | CommentObjectId::Connection(id)
2698 | CommentObjectId::Type(id)
2699 | CommentObjectId::Secret(id)
2700 | CommentObjectId::ContinualTask(id) => Some(*id),
2701 CommentObjectId::Role(_)
2702 | CommentObjectId::Database(_)
2703 | CommentObjectId::Schema(_)
2704 | CommentObjectId::Cluster(_)
2705 | CommentObjectId::ClusterReplica(_)
2706 | CommentObjectId::NetworkPolicy(_) => None,
2707 }
2708 }
2709
2710 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2711 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2712 }
2713
2714 pub fn comment_id_to_audit_log_name(
2715 &self,
2716 id: CommentObjectId,
2717 conn_id: &ConnectionId,
2718 ) -> String {
2719 match id {
2720 CommentObjectId::Table(id)
2721 | CommentObjectId::View(id)
2722 | CommentObjectId::MaterializedView(id)
2723 | CommentObjectId::Source(id)
2724 | CommentObjectId::Sink(id)
2725 | CommentObjectId::Index(id)
2726 | CommentObjectId::Func(id)
2727 | CommentObjectId::Connection(id)
2728 | CommentObjectId::Type(id)
2729 | CommentObjectId::Secret(id)
2730 | CommentObjectId::ContinualTask(id) => {
2731 let item = self.get_entry(&id);
2732 let name = self.resolve_full_name(item.name(), Some(conn_id));
2733 name.to_string()
2734 }
2735 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2736 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2737 CommentObjectId::Schema((spec, schema_id)) => {
2738 let schema = self.get_schema(&spec, &schema_id, conn_id);
2739 self.resolve_full_schema_name(&schema.name).to_string()
2740 }
2741 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2742 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2743 let cluster = self.get_cluster(cluster_id);
2744 let replica = self.get_cluster_replica(cluster_id, replica_id);
2745 QualifiedReplica {
2746 cluster: Ident::new_unchecked(cluster.name.clone()),
2747 replica: Ident::new_unchecked(replica.name.clone()),
2748 }
2749 .to_string()
2750 }
2751 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2752 }
2753 }
2754
2755 pub fn mock_authentication_nonce(&self) -> String {
2756 self.mock_authentication_nonce.clone().unwrap_or_default()
2757 }
2758}
2759
2760impl ConnectionResolver for CatalogState {
2761 fn resolve_connection(
2762 &self,
2763 id: CatalogItemId,
2764 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2765 use mz_storage_types::connections::Connection::*;
2766 match self
2767 .get_entry(&id)
2768 .connection()
2769 .expect("catalog out of sync")
2770 .details
2771 .to_connection()
2772 {
2773 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2774 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2775 Csr(conn) => Csr(conn.into_inline_connection(self)),
2776 Ssh(conn) => Ssh(conn),
2777 Aws(conn) => Aws(conn),
2778 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2779 MySql(conn) => MySql(conn.into_inline_connection(self)),
2780 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2781 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2782 }
2783 }
2784}
2785
2786impl OptimizerCatalog for CatalogState {
2787 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2788 CatalogState::get_entry_by_global_id(self, id)
2789 }
2790 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2791 CatalogState::get_entry(self, id)
2792 }
2793 fn resolve_full_name(
2794 &self,
2795 name: &QualifiedItemName,
2796 conn_id: Option<&ConnectionId>,
2797 ) -> FullItemName {
2798 CatalogState::resolve_full_name(self, name, conn_id)
2799 }
2800 fn get_indexes_on(
2801 &self,
2802 id: GlobalId,
2803 cluster: ClusterId,
2804 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2805 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2806 }
2807}
2808
2809impl OptimizerCatalog for Catalog {
2810 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2811 self.state.get_entry_by_global_id(id)
2812 }
2813
2814 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2815 self.state.get_entry(id)
2816 }
2817
2818 fn resolve_full_name(
2819 &self,
2820 name: &QualifiedItemName,
2821 conn_id: Option<&ConnectionId>,
2822 ) -> FullItemName {
2823 self.state.resolve_full_name(name, conn_id)
2824 }
2825
2826 fn get_indexes_on(
2827 &self,
2828 id: GlobalId,
2829 cluster: ClusterId,
2830 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2831 Box::new(self.state.get_indexes_on(id, cluster))
2832 }
2833}
2834
2835impl Catalog {
2836 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2837 self
2838 }
2839}