1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation, UnmanagedReplicaLocation,
40};
41use mz_controller_types::{ClusterId, ReplicaId};
42use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
43use mz_license_keys::ValidatedLicenseKey;
44use mz_ore::collections::CollectionExt;
45use mz_ore::now::NOW_ZERO;
46use mz_ore::soft_assert_no_log;
47use mz_ore::str::StrExt;
48use mz_pgrepr::oid::INVALID_OID;
49use mz_repr::adt::mz_acl_item::PrivilegeMap;
50use mz_repr::namespaces::{
51 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
52 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
53 UNSTABLE_SCHEMAS,
54};
55use mz_repr::network_policy_id::NetworkPolicyId;
56use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
57use mz_repr::role_id::RoleId;
58use mz_repr::{
59 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
60 VersionedRelationDesc,
61};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::ast::Ident;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68 TypeReference,
69};
70use mz_sql::catalog::{CatalogConfig, EnvironmentId};
71use mz_sql::names::{
72 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
73 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
74 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
75};
76use mz_sql::plan::{
77 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
78 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
79 Plan, PlanContext,
80};
81use mz_sql::rbac;
82use mz_sql::session::metadata::SessionMetadata;
83use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
84use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
85use mz_sql_parser::ast::QualifiedReplica;
86use mz_storage_client::controller::StorageMetadata;
87use mz_storage_types::connections::ConnectionContext;
88use mz_storage_types::connections::inline::{
89 ConnectionResolver, InlinedConnection, IntoInlineConnection,
90};
91use mz_transform::notice::OptimizerNotice;
92use serde::Serialize;
93use timely::progress::Antichain;
94use tokio::sync::mpsc;
95use tracing::{debug, warn};
96
97use crate::AdapterError;
99use crate::catalog::{Catalog, ConnCatalog};
100use crate::config::ScopedParameters;
101use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113 pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
119 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120 pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
121 #[serde(serialize_with = "skip_temp_items")]
122 pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
125 pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
128 pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
131 pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
134 pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
135 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136 pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
137 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138 pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
139
140 #[serde(skip)]
141 pub(super) system_configuration: Arc<SystemVars>,
142 #[serde(skip)]
150 pub(super) scoped_system_parameters: ScopedParameters,
151 pub(super) default_privileges: Arc<DefaultPrivileges>,
152 pub(super) system_privileges: Arc<PrivilegeMap>,
153 pub(super) comments: Arc<CommentsMap>,
154 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
155 pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
156 pub(super) storage_metadata: Arc<StorageMetadata>,
157 pub(super) mock_authentication_nonce: Option<String>,
158
159 #[serde(skip)]
164 pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
165
166 #[serde(skip)]
170 pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
171
172 #[serde(skip)]
174 pub(super) config: mz_sql::catalog::CatalogConfig,
175 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
176 #[serde(skip)]
177 pub(crate) availability_zones: Vec<String>,
178
179 #[serde(skip)]
181 pub(super) egress_addresses: Vec<IpNet>,
182 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
183 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
184 pub(super) http_host_name: Option<String>,
185
186 #[serde(skip)]
188 pub(super) license_key: ValidatedLicenseKey,
189}
190
191#[derive(Debug, Clone, Serialize)]
196pub(crate) enum LocalExpressionCache {
197 Open {
199 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
201 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
203 },
204 Closed,
206}
207
208impl LocalExpressionCache {
209 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
210 Self::Open {
211 cached_exprs,
212 uncached_exprs: BTreeMap::new(),
213 }
214 }
215
216 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
217 match self {
218 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
219 LocalExpressionCache::Closed => None,
220 }
221 }
222
223 pub(super) fn insert_cached_expression(
226 &mut self,
227 id: GlobalId,
228 local_expressions: LocalExpressions,
229 ) {
230 match self {
231 LocalExpressionCache::Open { cached_exprs, .. } => {
232 cached_exprs.insert(id, local_expressions);
233 }
234 LocalExpressionCache::Closed => {}
235 }
236 }
237
238 pub(super) fn insert_uncached_expression(
241 &mut self,
242 id: GlobalId,
243 local_mir: OptimizedMirRelationExpr,
244 optimizer_features: OptimizerFeatures,
245 ) {
246 match self {
247 LocalExpressionCache::Open { uncached_exprs, .. } => {
248 let local_expr = LocalExpressions {
249 local_mir,
250 optimizer_features,
251 };
252 let prev = uncached_exprs.remove(&id);
259 match prev {
260 Some(prev) if prev == local_expr => {
261 uncached_exprs.insert(id, local_expr);
262 }
263 None => {
264 uncached_exprs.insert(id, local_expr);
265 }
266 Some(_) => {}
267 }
268 }
269 LocalExpressionCache::Closed => {}
270 }
271 }
272
273 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
274 match self {
275 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
276 LocalExpressionCache::Closed => BTreeMap::new(),
277 }
278 }
279}
280
281fn skip_temp_items<S>(
282 entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
283 serializer: S,
284) -> Result<S::Ok, S::Error>
285where
286 S: serde::Serializer,
287{
288 mz_ore::serde::map_key_to_string(
289 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
290 serializer,
291 )
292}
293
294impl CatalogState {
295 pub fn empty_test() -> Self {
299 CatalogState {
300 database_by_name: Default::default(),
301 database_by_id: Default::default(),
302 entry_by_id: Default::default(),
303 entry_by_global_id: Default::default(),
304 notices_by_dep_id: Default::default(),
305 ambient_schemas_by_name: Default::default(),
306 ambient_schemas_by_id: Default::default(),
307 temporary_schemas: Default::default(),
308 clusters_by_id: Default::default(),
309 clusters_by_name: Default::default(),
310 network_policies_by_name: Default::default(),
311 roles_by_name: Default::default(),
312 roles_by_id: Default::default(),
313 network_policies_by_id: Default::default(),
314 role_auth_by_id: Default::default(),
315 config: CatalogConfig {
316 start_time: Default::default(),
317 start_instant: Instant::now(),
318 nonce: Default::default(),
319 environment_id: EnvironmentId::for_tests(),
320 session_id: Default::default(),
321 build_info: &DUMMY_BUILD_INFO,
322 now: NOW_ZERO.clone(),
323 connection_context: ConnectionContext::for_tests(Arc::new(
324 InMemorySecretsController::new(),
325 )),
326 helm_chart_version: None,
327 },
328 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
329 availability_zones: Default::default(),
330 system_configuration: Arc::new(SystemVars::default()),
331 scoped_system_parameters: Default::default(),
332 egress_addresses: Default::default(),
333 aws_principal_context: Default::default(),
334 aws_privatelink_availability_zones: Default::default(),
335 http_host_name: Default::default(),
336 default_privileges: Arc::new(DefaultPrivileges::default()),
337 system_privileges: Arc::new(PrivilegeMap::default()),
338 comments: Arc::new(CommentsMap::default()),
339 source_references: Default::default(),
340 storage_metadata: Arc::new(StorageMetadata::default()),
341 license_key: ValidatedLicenseKey::for_tests(),
342 mock_authentication_nonce: Default::default(),
343 }
344 }
345
346 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
347 let search_path = self.resolve_search_path(session);
348 let database = self
349 .database_by_name
350 .get(session.vars().database())
351 .map(|id| id.clone());
352 let state = match session.transaction().catalog_state() {
353 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
354 None => Cow::Borrowed(self),
355 };
356 ConnCatalog {
357 state,
358 unresolvable_ids: BTreeSet::new(),
359 conn_id: session.conn_id().clone(),
360 cluster: session.vars().cluster().into(),
361 database,
362 search_path,
363 role_id: session.current_role_id().clone(),
364 prepared_statements: Some(session.prepared_statements()),
365 portals: Some(session.portals()),
366 notices_tx: session.retain_notice_transmitter(),
367 restrict_to_user_objects: session.vars().restrict_to_user_objects(),
368 }
369 }
370
371 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
372 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
373 let cluster = self.system_configuration.default_cluster();
374
375 ConnCatalog {
376 state: Cow::Borrowed(self),
377 unresolvable_ids: BTreeSet::new(),
378 conn_id: SYSTEM_CONN_ID.clone(),
379 cluster,
380 database: self
381 .resolve_database(DEFAULT_DATABASE_NAME)
382 .ok()
383 .map(|db| db.id()),
384 search_path: Vec::new(),
387 role_id,
388 prepared_statements: None,
389 portals: None,
390 notices_tx,
391 restrict_to_user_objects: false,
392 }
393 }
394
395 pub fn for_system_session(&self) -> ConnCatalog<'_> {
396 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
397 }
398
399 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
404 struct I<'a> {
405 queue: VecDeque<CatalogItemId>,
406 seen: BTreeSet<CatalogItemId>,
407 this: &'a CatalogState,
408 }
409 impl<'a> Iterator for I<'a> {
410 type Item = CatalogItemId;
411 fn next(&mut self) -> Option<Self::Item> {
412 if let Some(next) = self.queue.pop_front() {
413 for child in self.this.get_entry(&next).item().uses() {
414 if !self.seen.contains(&child) {
415 self.queue.push_back(child);
416 self.seen.insert(child);
417 }
418 }
419 Some(next)
420 } else {
421 None
422 }
423 }
424 }
425
426 I {
427 queue: [id].into_iter().collect(),
428 seen: [id].into_iter().collect(),
429 this: self,
430 }
431 }
432
433 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
436 let mut out = Vec::new();
437 self.introspection_dependencies_inner(id, &mut out);
438 out
439 }
440
441 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
442 match self.get_entry(&id).item() {
443 CatalogItem::Log(_) => out.push(id),
444 item @ (CatalogItem::View(_)
445 | CatalogItem::MaterializedView(_)
446 | CatalogItem::Connection(_)) => {
447 for item_id in item.references().items() {
449 self.introspection_dependencies_inner(*item_id, out);
450 }
451 }
452 CatalogItem::Sink(sink) => {
453 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
454 self.introspection_dependencies_inner(from_item_id, out)
455 }
456 CatalogItem::Index(idx) => {
457 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
458 self.introspection_dependencies_inner(on_item_id, out)
459 }
460 CatalogItem::Table(_)
461 | CatalogItem::Source(_)
462 | CatalogItem::Type(_)
463 | CatalogItem::Func(_)
464 | CatalogItem::Secret(_) => (),
465 }
466 }
467
468 pub(super) fn object_dependents(
474 &self,
475 object_ids: &Vec<ObjectId>,
476 conn_id: &ConnectionId,
477 seen: &mut BTreeSet<ObjectId>,
478 ) -> Vec<ObjectId> {
479 let mut dependents = Vec::new();
480 for object_id in object_ids {
481 match object_id {
482 ObjectId::Cluster(id) => {
483 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
484 }
485 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
486 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
487 ),
488 ObjectId::Database(id) => {
489 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
490 }
491 ObjectId::Schema((database_spec, schema_spec)) => {
492 dependents.extend_from_slice(&self.schema_dependents(
493 database_spec.clone(),
494 schema_spec.clone(),
495 conn_id,
496 seen,
497 ));
498 }
499 ObjectId::NetworkPolicy(id) => {
500 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
501 }
502 id @ ObjectId::Role(_) => {
503 let unseen = seen.insert(id.clone());
504 if unseen {
505 dependents.push(id.clone());
506 }
507 }
508 ObjectId::Item(id) => {
509 dependents.extend_from_slice(&self.item_dependents(*id, seen))
510 }
511 }
512 }
513 dependents
514 }
515
516 fn cluster_dependents(
523 &self,
524 cluster_id: ClusterId,
525 seen: &mut BTreeSet<ObjectId>,
526 ) -> Vec<ObjectId> {
527 let mut dependents = Vec::new();
528 let object_id = ObjectId::Cluster(cluster_id);
529 if !seen.contains(&object_id) {
530 seen.insert(object_id.clone());
531 let cluster = self.get_cluster(cluster_id);
532 for item_id in cluster.bound_objects() {
533 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
534 }
535 for replica_id in cluster.replica_ids().values() {
536 dependents.extend_from_slice(&self.cluster_replica_dependents(
537 cluster_id,
538 *replica_id,
539 seen,
540 ));
541 }
542 dependents.push(object_id);
543 }
544 dependents
545 }
546
547 pub(super) fn cluster_replica_dependents(
554 &self,
555 cluster_id: ClusterId,
556 replica_id: ReplicaId,
557 seen: &mut BTreeSet<ObjectId>,
558 ) -> Vec<ObjectId> {
559 let mut dependents = Vec::new();
560 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
561 if !seen.contains(&object_id) {
562 seen.insert(object_id.clone());
563 let cluster = self.get_cluster(cluster_id);
567 for item_id in cluster.bound_objects() {
568 if let CatalogItem::MaterializedView(mv) = self.get_entry(item_id).item()
569 && mv.target_replica == Some(replica_id)
570 {
571 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
572 }
573 }
574 dependents.push(object_id);
575 }
576 dependents
577 }
578
579 fn database_dependents(
586 &self,
587 database_id: DatabaseId,
588 conn_id: &ConnectionId,
589 seen: &mut BTreeSet<ObjectId>,
590 ) -> Vec<ObjectId> {
591 let mut dependents = Vec::new();
592 let object_id = ObjectId::Database(database_id);
593 if !seen.contains(&object_id) {
594 seen.insert(object_id.clone());
595 let database = self.get_database(&database_id);
596 for schema_id in database.schema_ids().values() {
597 dependents.extend_from_slice(&self.schema_dependents(
598 ResolvedDatabaseSpecifier::Id(database_id),
599 SchemaSpecifier::Id(*schema_id),
600 conn_id,
601 seen,
602 ));
603 }
604 dependents.push(object_id);
605 }
606 dependents
607 }
608
609 fn schema_dependents(
616 &self,
617 database_spec: ResolvedDatabaseSpecifier,
618 schema_spec: SchemaSpecifier,
619 conn_id: &ConnectionId,
620 seen: &mut BTreeSet<ObjectId>,
621 ) -> Vec<ObjectId> {
622 let mut dependents = Vec::new();
623 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
624 if !seen.contains(&object_id) {
625 seen.insert(object_id.clone());
626 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
627 for item_id in schema.item_ids() {
628 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
629 }
630 dependents.push(object_id)
631 }
632 dependents
633 }
634
635 pub(super) fn item_dependents(
642 &self,
643 item_id: CatalogItemId,
644 seen: &mut BTreeSet<ObjectId>,
645 ) -> Vec<ObjectId> {
646 let mut dependents = Vec::new();
647 let object_id = ObjectId::Item(item_id);
648 if !seen.contains(&object_id) {
649 seen.insert(object_id.clone());
650 let entry = self.get_entry(&item_id);
651 for dependent_id in entry.used_by() {
652 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
653 }
654 dependents.push(object_id);
655 if let Some(progress_id) = entry.progress_id() {
659 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
660 }
661 }
662 dependents
663 }
664
665 pub(super) fn network_policy_dependents(
672 &self,
673 network_policy_id: NetworkPolicyId,
674 _seen: &mut BTreeSet<ObjectId>,
675 ) -> Vec<ObjectId> {
676 let object_id = ObjectId::NetworkPolicy(network_policy_id);
677 vec![object_id]
681 }
682
683 fn is_stable(&self, id: CatalogItemId) -> bool {
687 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
688 !self.is_unstable_schema_specifier(spec)
689 }
690
691 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
692 if self.system_config().unsafe_enable_unstable_dependencies() {
693 return Ok(());
694 }
695
696 let unstable_dependencies: Vec<_> = item
697 .references()
698 .items()
699 .filter(|id| !self.is_stable(**id))
700 .map(|id| self.get_entry(id).name().item.clone())
701 .collect();
702
703 if unstable_dependencies.is_empty() || item.is_temporary() {
707 Ok(())
708 } else {
709 let object_type = item.typ().to_string();
710 Err(Error {
711 kind: ErrorKind::UnstableDependency {
712 object_type,
713 unstable_dependencies,
714 },
715 })
716 }
717 }
718
719 pub fn resolve_full_name(
720 &self,
721 name: &QualifiedItemName,
722 conn_id: Option<&ConnectionId>,
723 ) -> FullItemName {
724 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
725
726 let database = match &name.qualifiers.database_spec {
727 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
728 ResolvedDatabaseSpecifier::Id(id) => {
729 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
730 }
731 };
732 let schema = match &name.qualifiers.schema_spec {
735 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
736 SchemaSpecifier::Id(_) => self
737 .get_schema(
738 &name.qualifiers.database_spec,
739 &name.qualifiers.schema_spec,
740 conn_id,
741 )
742 .name()
743 .schema
744 .clone(),
745 };
746 FullItemName {
747 database,
748 schema,
749 item: name.item.clone(),
750 }
751 }
752
753 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
754 let database = match &name.database {
755 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
756 ResolvedDatabaseSpecifier::Id(id) => {
757 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
758 }
759 };
760 FullSchemaName {
761 database,
762 schema: name.schema.clone(),
763 }
764 }
765
766 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
767 self.entry_by_id
768 .get(id)
769 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
770 }
771
772 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
773 let item_id = self
774 .entry_by_global_id
775 .get(id)
776 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
777
778 let entry = self.get_entry(item_id).clone();
779 let version = match entry.item() {
780 CatalogItem::Table(table) => {
781 let (version, _) = table
782 .collections
783 .iter()
784 .find(|(_verison, gid)| *gid == id)
785 .expect("version to exist");
786 RelationVersionSelector::Specific(*version)
787 }
788 _ => RelationVersionSelector::Latest,
789 };
790 CatalogCollectionEntry { entry, version }
791 }
792
793 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
794 self.entry_by_id.iter()
795 }
796
797 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
798 self.temporary_schemas
800 .get(conn)
801 .into_iter()
802 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
803 }
804
805 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
811 self.temporary_schemas.contains_key(conn)
812 }
813
814 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
820 let mut res = None;
821 for schema_id in self.system_schema_ids() {
822 let schema = &self.ambient_schemas_by_id[&schema_id];
823 if let Some(global_id) = schema.types.get(name) {
824 match res {
825 None => res = Some(self.get_entry(global_id)),
826 Some(_) => panic!(
827 "only call get_system_type on objects uniquely identifiable in one system schema"
828 ),
829 }
830 }
831 }
832
833 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
834 }
835
836 pub fn get_item_by_name(
837 &self,
838 name: &QualifiedItemName,
839 conn_id: &ConnectionId,
840 ) -> Option<&CatalogEntry> {
841 self.get_schema(
842 &name.qualifiers.database_spec,
843 &name.qualifiers.schema_spec,
844 conn_id,
845 )
846 .items
847 .get(&name.item)
848 .and_then(|id| self.try_get_entry(id))
849 }
850
851 pub fn get_type_by_name(
852 &self,
853 name: &QualifiedItemName,
854 conn_id: &ConnectionId,
855 ) -> Option<&CatalogEntry> {
856 self.get_schema(
857 &name.qualifiers.database_spec,
858 &name.qualifiers.schema_spec,
859 conn_id,
860 )
861 .types
862 .get(&name.item)
863 .and_then(|id| self.try_get_entry(id))
864 }
865
866 pub(super) fn find_available_name(
867 &self,
868 mut name: QualifiedItemName,
869 conn_id: &ConnectionId,
870 ) -> QualifiedItemName {
871 let mut i = 0;
872 let orig_item_name = name.item.clone();
873 while self.get_item_by_name(&name, conn_id).is_some() {
874 i += 1;
875 name.item = format!("{}{}", orig_item_name, i);
876 }
877 name
878 }
879
880 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
881 self.entry_by_id.get(id)
882 }
883
884 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
885 let item_id = self.entry_by_global_id.get(id)?;
886 self.try_get_entry(item_id)
887 }
888
889 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
892 let entry = self.try_get_entry_by_global_id(id)?;
893 let desc = match entry.item() {
894 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
895 other => other.relation_desc(RelationVersionSelector::Latest)?,
897 };
898 Some(desc)
899 }
900
901 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
902 self.try_get_cluster(cluster_id)
903 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
904 }
905
906 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
907 self.clusters_by_id.get(&cluster_id)
908 }
909
910 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
911 self.roles_by_id.get(id)
912 }
913
914 pub fn get_role(&self, id: &RoleId) -> &Role {
915 self.roles_by_id.get(id).expect("catalog out of sync")
916 }
917
918 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
919 self.roles_by_id.keys()
920 }
921
922 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
923 self.roles_by_name
924 .get(role_name)
925 .map(|id| &self.roles_by_id[id])
926 }
927
928 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
929 self.role_auth_by_id
930 .get(id)
931 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
932 }
933
934 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
935 self.role_auth_by_id.get(id)
936 }
937
938 pub(super) fn try_get_network_policy_by_name(
939 &self,
940 policy_name: &str,
941 ) -> Option<&NetworkPolicy> {
942 self.network_policies_by_name
943 .get(policy_name)
944 .map(|id| &self.network_policies_by_id[id])
945 }
946
947 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
948 let mut membership = BTreeSet::new();
949 let mut queue = VecDeque::from(vec![id]);
950 while let Some(cur_id) = queue.pop_front() {
951 if !membership.contains(cur_id) {
952 membership.insert(cur_id.clone());
953 let role = self.get_role(cur_id);
954 soft_assert_no_log!(
955 !role.membership().keys().contains(id),
956 "circular membership exists in the catalog"
957 );
958 queue.extend(role.membership().keys());
959 }
960 }
961 membership.insert(RoleId::Public);
962 membership
963 }
964
965 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
966 self.network_policies_by_id
967 .get(id)
968 .expect("catalog out of sync")
969 }
970
971 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
972 self.network_policies_by_id.keys()
973 }
974
975 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
980 let entry = self.try_get_entry(id)?;
981 let name = self.resolve_full_name(entry.name(), None);
983 let host_name = self
984 .http_host_name
985 .as_ref()
986 .map(|x| x.as_str())
987 .unwrap_or_else(|| "HOST");
988
989 let RawDatabaseSpecifier::Name(database) = name.database else {
990 return None;
991 };
992
993 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
994 url.path_segments_mut()
995 .ok()?
996 .push(&database)
997 .push(&name.schema)
998 .push(&name.item);
999
1000 Some(url)
1001 }
1002
1003 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1011 &mut self,
1014 create_sql: &str,
1015 force_if_exists_skip: bool,
1016 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1017 self.with_enable_for_item_parsing(|state| {
1018 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1019 let pcx = Some(&pcx);
1020 let session_catalog = state.for_system_session();
1021
1022 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1023 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1024 let (plan, _sql_impl_ids) =
1025 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1026
1027 Ok((plan, resolved_ids))
1028 })
1029 }
1030
1031 #[mz_ore::instrument]
1033 pub(crate) fn parse_plan(
1034 create_sql: &str,
1035 pcx: Option<&PlanContext>,
1036 catalog: &ConnCatalog,
1037 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1038 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1039 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1040 let (plan, _sql_impl_ids) =
1041 mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1042
1043 Ok((plan, resolved_ids))
1044 }
1045
1046 pub(crate) fn deserialize_item(
1048 &self,
1049 global_id: GlobalId,
1050 create_sql: &str,
1051 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1052 local_expression_cache: &mut LocalExpressionCache,
1053 previous_item: Option<CatalogItem>,
1054 ) -> Result<CatalogItem, AdapterError> {
1055 self.parse_item(
1056 global_id,
1057 create_sql,
1058 extra_versions,
1059 None,
1060 false,
1061 None,
1062 local_expression_cache,
1063 previous_item,
1064 )
1065 }
1066
1067 #[mz_ore::instrument]
1069 pub(crate) fn parse_item(
1070 &self,
1071 global_id: GlobalId,
1072 create_sql: &str,
1073 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1074 pcx: Option<&PlanContext>,
1075 is_retained_metrics_object: bool,
1076 custom_logical_compaction_window: Option<CompactionWindow>,
1077 local_expression_cache: &mut LocalExpressionCache,
1078 previous_item: Option<CatalogItem>,
1079 ) -> Result<CatalogItem, AdapterError> {
1080 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1081 match self.parse_item_inner(
1082 global_id,
1083 create_sql,
1084 extra_versions,
1085 pcx,
1086 is_retained_metrics_object,
1087 custom_logical_compaction_window,
1088 cached_expr,
1089 previous_item,
1090 ) {
1091 Ok((item, uncached_expr)) => {
1092 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1093 local_expression_cache.insert_uncached_expression(
1094 global_id,
1095 uncached_expr,
1096 optimizer_features,
1097 );
1098 }
1099 Ok(item)
1100 }
1101 Err((err, cached_expr)) => {
1102 if let Some(local_expr) = cached_expr {
1103 local_expression_cache.insert_cached_expression(global_id, local_expr);
1104 }
1105 Err(err)
1106 }
1107 }
1108 }
1109
1110 #[mz_ore::instrument]
1117 pub(crate) fn parse_item_inner(
1118 &self,
1119 global_id: GlobalId,
1120 create_sql: &str,
1121 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1122 pcx: Option<&PlanContext>,
1123 is_retained_metrics_object: bool,
1124 custom_logical_compaction_window: Option<CompactionWindow>,
1125 cached_expr: Option<LocalExpressions>,
1126 previous_item: Option<CatalogItem>,
1127 ) -> Result<
1128 (
1129 CatalogItem,
1130 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1131 ),
1132 (AdapterError, Option<LocalExpressions>),
1133 > {
1134 let session_catalog = self.for_system_session();
1135
1136 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1137 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1138 Err(err) => return Err((err, cached_expr)),
1139 };
1140
1141 let mut uncached_expr = None;
1142
1143 let previous_plans = previous_item.as_ref().map(|item| {
1153 (
1154 item.optimized_plan().cloned(),
1155 item.physical_plan().cloned(),
1156 item.dataflow_metainfo().cloned(),
1157 )
1158 });
1159
1160 let mut item = match plan {
1161 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1162 let collections = extra_versions
1163 .iter()
1164 .map(|(version, gid)| (*version, *gid))
1165 .chain([(RelationVersion::root(), global_id)].into_iter())
1166 .collect();
1167
1168 CatalogItem::Table(Table {
1169 create_sql: Some(table.create_sql),
1170 desc: table.desc,
1171 collections,
1172 conn_id: None,
1173 resolved_ids,
1174 custom_logical_compaction_window: custom_logical_compaction_window
1175 .or(table.compaction_window),
1176 is_retained_metrics_object,
1177 data_source: match table.data_source {
1178 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1179 TableDataSource::TableWrites { defaults }
1180 }
1181 mz_sql::plan::TableDataSource::DataSource {
1182 desc: data_source_desc,
1183 timeline,
1184 } => match data_source_desc {
1185 mz_sql::plan::DataSourceDesc::IngestionExport {
1186 ingestion_id,
1187 external_reference,
1188 details,
1189 data_config,
1190 } => TableDataSource::DataSource {
1191 desc: DataSourceDesc::IngestionExport {
1192 ingestion_id,
1193 external_reference,
1194 details,
1195 data_config,
1196 },
1197 timeline,
1198 },
1199 mz_sql::plan::DataSourceDesc::Webhook {
1200 validate_using,
1201 body_format,
1202 headers,
1203 cluster_id,
1204 } => TableDataSource::DataSource {
1205 desc: DataSourceDesc::Webhook {
1206 validate_using,
1207 body_format,
1208 headers,
1209 cluster_id: cluster_id
1210 .expect("Webhook Tables must have a cluster_id set"),
1211 },
1212 timeline,
1213 },
1214 _ => {
1215 return Err((
1216 AdapterError::Unstructured(anyhow::anyhow!(
1217 "unsupported data source for table"
1218 )),
1219 cached_expr,
1220 ));
1221 }
1222 },
1223 },
1224 })
1225 }
1226 Plan::CreateSource(CreateSourcePlan {
1227 source,
1228 timeline,
1229 in_cluster,
1230 ..
1231 }) => CatalogItem::Source(Source {
1232 create_sql: Some(source.create_sql),
1233 data_source: match source.data_source {
1234 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1235 desc,
1236 cluster_id: match in_cluster {
1237 Some(id) => id,
1238 None => {
1239 return Err((
1240 AdapterError::Unstructured(anyhow::anyhow!(
1241 "ingestion-based sources must have cluster specified"
1242 )),
1243 cached_expr,
1244 ));
1245 }
1246 },
1247 },
1248 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1249 desc,
1250 progress_subsource,
1251 data_config,
1252 details,
1253 } => DataSourceDesc::OldSyntaxIngestion {
1254 desc,
1255 progress_subsource,
1256 data_config,
1257 details,
1258 cluster_id: match in_cluster {
1259 Some(id) => id,
1260 None => {
1261 return Err((
1262 AdapterError::Unstructured(anyhow::anyhow!(
1263 "ingestion-based sources must have cluster specified"
1264 )),
1265 cached_expr,
1266 ));
1267 }
1268 },
1269 },
1270 mz_sql::plan::DataSourceDesc::IngestionExport {
1271 ingestion_id,
1272 external_reference,
1273 details,
1274 data_config,
1275 } => DataSourceDesc::IngestionExport {
1276 ingestion_id,
1277 external_reference,
1278 details,
1279 data_config,
1280 },
1281 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1282 mz_sql::plan::DataSourceDesc::Webhook {
1283 validate_using,
1284 body_format,
1285 headers,
1286 cluster_id,
1287 } => {
1288 mz_ore::soft_assert_or_log!(
1289 cluster_id.is_none(),
1290 "cluster_id set at Source level for Webhooks"
1291 );
1292 DataSourceDesc::Webhook {
1293 validate_using,
1294 body_format,
1295 headers,
1296 cluster_id: in_cluster
1297 .expect("webhook sources must use an existing cluster"),
1298 }
1299 }
1300 },
1301 desc: source.desc,
1302 global_id,
1303 timeline,
1304 resolved_ids,
1305 custom_logical_compaction_window: source
1306 .compaction_window
1307 .or(custom_logical_compaction_window),
1308 is_retained_metrics_object,
1309 }),
1310 Plan::CreateView(CreateViewPlan { view, .. }) => {
1311 let optimizer_config =
1313 optimize::OptimizerConfig::from(session_catalog.system_vars());
1314 let previous_exprs = previous_item.map(|item| match item {
1315 CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1316 _ => None,
1317 });
1318
1319 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1320 (Some(local_expr), _)
1321 if local_expr.optimizer_features == optimizer_config.features =>
1322 {
1323 debug!("local expression cache hit for {global_id:?}");
1324 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1325 }
1326 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1328 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1329 }
1330 (cached_expr, _) => {
1331 let optimizer_features = optimizer_config.features.clone();
1332 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1334
1335 let raw_expr = view.expr;
1337 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1338 Ok(optimzed_expr) => optimzed_expr,
1339 Err(err) => return Err((err.into(), cached_expr)),
1340 };
1341
1342 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1343
1344 (Arc::new(raw_expr), Arc::new(optimized_expr))
1345 }
1346 };
1347
1348 let dependencies: BTreeSet<_> = raw_expr
1350 .depends_on()
1351 .into_iter()
1352 .map(|gid| self.get_entry_by_global_id(&gid).id())
1353 .collect();
1354
1355 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1356 CatalogItem::View(View {
1357 create_sql: view.create_sql,
1358 global_id,
1359 raw_expr,
1360 desc: RelationDesc::new(typ, view.column_names),
1361 locally_optimized_expr: optimized_expr,
1362 conn_id: None,
1363 resolved_ids,
1364 dependencies: DependencyIds(dependencies),
1365 })
1366 }
1367 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1368 materialized_view, ..
1369 }) => {
1370 let collections = extra_versions
1371 .iter()
1372 .map(|(version, gid)| (*version, *gid))
1373 .chain([(RelationVersion::root(), global_id)].into_iter())
1374 .collect();
1375
1376 let system_vars = session_catalog.system_vars();
1378 let overrides = self
1379 .get_cluster(materialized_view.cluster_id)
1380 .config
1381 .features();
1382 let optimizer_config =
1383 optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1384 let previous_exprs = previous_item.map(|item| match item {
1385 CatalogItem::MaterializedView(materialized_view) => (
1386 materialized_view.raw_expr,
1387 materialized_view.locally_optimized_expr,
1388 ),
1389 item => unreachable!("expected materialized view, found: {item:#?}"),
1390 });
1391
1392 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1393 (Some(local_expr), _)
1394 if local_expr.optimizer_features == optimizer_config.features =>
1395 {
1396 debug!("local expression cache hit for {global_id:?}");
1397 (
1398 Arc::new(materialized_view.expr),
1399 Arc::new(local_expr.local_mir),
1400 )
1401 }
1402 (_, Some((raw_expr, optimized_expr)))
1404 if *raw_expr == materialized_view.expr =>
1405 {
1406 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1407 }
1408 (cached_expr, _) => {
1409 let optimizer_features = optimizer_config.features.clone();
1410 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1412
1413 let raw_expr = materialized_view.expr;
1414 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1415 Ok(optimized_expr) => optimized_expr,
1416 Err(err) => return Err((err.into(), cached_expr)),
1417 };
1418
1419 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1420
1421 (Arc::new(raw_expr), Arc::new(optimized_expr))
1422 }
1423 };
1424 let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1425
1426 for &i in &materialized_view.non_null_assertions {
1427 typ.column_types[i].nullable = false;
1428 }
1429 let desc = RelationDesc::new(typ, materialized_view.column_names);
1430 let desc = VersionedRelationDesc::new(desc);
1431
1432 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1433
1434 let dependencies = raw_expr
1436 .depends_on()
1437 .into_iter()
1438 .map(|gid| self.get_entry_by_global_id(&gid).id())
1439 .collect();
1440
1441 CatalogItem::MaterializedView(MaterializedView {
1442 create_sql: materialized_view.create_sql,
1443 collections,
1444 raw_expr,
1445 locally_optimized_expr: optimized_expr,
1446 desc,
1447 resolved_ids,
1448 dependencies,
1449 replacement_target: materialized_view.replacement_target,
1450 cluster_id: materialized_view.cluster_id,
1451 target_replica: materialized_view.target_replica,
1452 non_null_assertions: materialized_view.non_null_assertions,
1453 custom_logical_compaction_window: materialized_view.compaction_window,
1454 refresh_schedule: materialized_view.refresh_schedule,
1455 initial_as_of,
1456 optimized_plan: None,
1457 physical_plan: None,
1458 dataflow_metainfo: None,
1459 })
1460 }
1461 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1462 create_sql: index.create_sql,
1463 global_id,
1464 on: index.on,
1465 keys: index.keys.into(),
1466 conn_id: None,
1467 resolved_ids,
1468 cluster_id: index.cluster_id,
1469 custom_logical_compaction_window: custom_logical_compaction_window
1470 .or(index.compaction_window),
1471 is_retained_metrics_object,
1472 optimized_plan: None,
1473 physical_plan: None,
1474 dataflow_metainfo: None,
1475 }),
1476 Plan::CreateSink(CreateSinkPlan {
1477 sink,
1478 with_snapshot,
1479 in_cluster,
1480 ..
1481 }) => CatalogItem::Sink(Sink {
1482 create_sql: sink.create_sql,
1483 global_id,
1484 from: sink.from,
1485 connection: sink.connection,
1486 envelope: sink.envelope,
1487 version: sink.version,
1488 with_snapshot,
1489 resolved_ids,
1490 cluster_id: in_cluster,
1491 commit_interval: sink.commit_interval,
1492 }),
1493 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1494 if let Err(err) = typ.inner.desc(&session_catalog) {
1498 return Err((err.into(), cached_expr));
1499 }
1500 CatalogItem::Type(Type {
1501 create_sql: Some(typ.create_sql),
1502 global_id,
1503 details: CatalogTypeDetails {
1504 array_id: None,
1505 typ: typ.inner,
1506 pg_metadata: None,
1507 },
1508 resolved_ids,
1509 })
1510 }
1511 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1512 create_sql: secret.create_sql,
1513 global_id,
1514 }),
1515 Plan::CreateConnection(CreateConnectionPlan {
1516 connection:
1517 mz_sql::plan::Connection {
1518 create_sql,
1519 details,
1520 },
1521 ..
1522 }) => CatalogItem::Connection(Connection {
1523 create_sql,
1524 global_id,
1525 details,
1526 resolved_ids,
1527 }),
1528 _ => {
1529 return Err((
1530 Error::new(ErrorKind::Corruption {
1531 detail: "catalog entry generated inappropriate plan".to_string(),
1532 })
1533 .into(),
1534 cached_expr,
1535 ));
1536 }
1537 };
1538
1539 if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1543 if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1544 {
1545 *optimized_plan = prev_optimized;
1546 *physical_plan = prev_physical;
1547 *dataflow_metainfo = prev_metainfo;
1548 }
1549 }
1550
1551 Ok((item, uncached_expr))
1552 }
1553
1554 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1560 let restore = Arc::clone(&self.system_configuration);
1571 Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1572 let res = f(self);
1573 self.system_configuration = restore;
1574 res
1575 }
1576
1577 pub fn get_indexes_on(
1579 &self,
1580 id: GlobalId,
1581 cluster: ClusterId,
1582 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1583 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1584
1585 self.try_get_entry_by_global_id(&id)
1586 .into_iter()
1587 .map(move |e| {
1588 e.used_by()
1589 .iter()
1590 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1591 CatalogItem::Index(index) if index_matches(index) => {
1592 Some((index.global_id(), index))
1593 }
1594 _ => None,
1595 })
1596 })
1597 .flatten()
1598 }
1599
1600 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1601 &self.database_by_id[database_id]
1602 }
1603
1604 pub(super) fn try_get_cluster_replica(
1609 &self,
1610 id: ClusterId,
1611 replica_id: ReplicaId,
1612 ) -> Option<&ClusterReplica> {
1613 self.try_get_cluster(id)
1614 .and_then(|cluster| cluster.replica(replica_id))
1615 }
1616
1617 pub(crate) fn get_cluster_replica(
1621 &self,
1622 cluster_id: ClusterId,
1623 replica_id: ReplicaId,
1624 ) -> &ClusterReplica {
1625 self.try_get_cluster_replica(cluster_id, replica_id)
1626 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1627 }
1628
1629 pub(super) fn resolve_replica_in_cluster(
1630 &self,
1631 cluster_id: &ClusterId,
1632 replica_name: &str,
1633 ) -> Result<&ClusterReplica, SqlCatalogError> {
1634 let cluster = self.get_cluster(*cluster_id);
1635 let replica_id = cluster
1636 .replica_id_by_name_
1637 .get(replica_name)
1638 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1639 Ok(&cluster.replicas_by_id_[replica_id])
1640 }
1641
1642 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1644 Ok(self.system_configuration.get(name)?)
1645 }
1646
1647 pub(super) fn parse_system_configuration(
1651 &self,
1652 name: &str,
1653 value: VarInput,
1654 ) -> Result<String, Error> {
1655 let value = self.system_configuration.parse(name, value)?;
1656 Ok(value.format())
1657 }
1658
1659 pub(super) fn resolve_schema_in_database(
1661 &self,
1662 database_spec: &ResolvedDatabaseSpecifier,
1663 schema_name: &str,
1664 conn_id: &ConnectionId,
1665 ) -> Result<&Schema, SqlCatalogError> {
1666 let schema = match database_spec {
1667 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1668 self.temporary_schemas.get(conn_id)
1669 }
1670 ResolvedDatabaseSpecifier::Ambient => self
1671 .ambient_schemas_by_name
1672 .get(schema_name)
1673 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1674 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1675 db.schemas_by_name
1676 .get(schema_name)
1677 .and_then(|id| db.schemas_by_id.get(id))
1678 }),
1679 };
1680 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1681 }
1682
1683 pub fn try_get_schema(
1688 &self,
1689 database_spec: &ResolvedDatabaseSpecifier,
1690 schema_spec: &SchemaSpecifier,
1691 conn_id: &ConnectionId,
1692 ) -> Option<&Schema> {
1693 match (database_spec, schema_spec) {
1695 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1696 self.temporary_schemas.get(conn_id)
1697 }
1698 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1699 self.ambient_schemas_by_id.get(id)
1700 }
1701 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1702 .database_by_id
1703 .get(database_id)
1704 .and_then(|db| db.schemas_by_id.get(schema_id)),
1705 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1706 unreachable!("temporary schemas are in the ambient database")
1707 }
1708 }
1709 }
1710
1711 pub fn get_schema(
1712 &self,
1713 database_spec: &ResolvedDatabaseSpecifier,
1714 schema_spec: &SchemaSpecifier,
1715 conn_id: &ConnectionId,
1716 ) -> &Schema {
1717 self.try_get_schema(database_spec, schema_spec, conn_id)
1719 .expect("schema must exist")
1720 }
1721
1722 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1723 self.database_by_id
1724 .values()
1725 .filter_map(|database| database.schemas_by_id.get(schema_id))
1726 .chain(self.ambient_schemas_by_id.values())
1727 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1728 .into_first()
1729 }
1730
1731 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1732 self.temporary_schemas
1733 .values()
1734 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1735 .into_first()
1736 }
1737
1738 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1739 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1740 }
1741
1742 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1743 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1744 }
1745
1746 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1747 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1748 }
1749
1750 pub fn get_information_schema_id(&self) -> SchemaId {
1751 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1752 }
1753
1754 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1755 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1756 }
1757
1758 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1759 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1760 }
1761
1762 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1763 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1764 }
1765
1766 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1767 SYSTEM_SCHEMAS
1768 .iter()
1769 .map(|name| self.ambient_schemas_by_name[*name])
1770 }
1771
1772 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1773 self.system_schema_ids().contains(&id)
1774 }
1775
1776 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1777 match spec {
1778 SchemaSpecifier::Temporary => false,
1779 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1780 }
1781 }
1782
1783 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1784 UNSTABLE_SCHEMAS
1785 .iter()
1786 .map(|name| self.ambient_schemas_by_name[*name])
1787 }
1788
1789 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1790 self.unstable_schema_ids().contains(&id)
1791 }
1792
1793 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1794 match spec {
1795 SchemaSpecifier::Temporary => false,
1796 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1797 }
1798 }
1799
1800 pub fn create_temporary_schema(
1803 &mut self,
1804 conn_id: &ConnectionId,
1805 owner_id: RoleId,
1806 ) -> Result<(), Error> {
1807 let oid = INVALID_OID;
1812 self.temporary_schemas.insert(
1813 conn_id.clone(),
1814 Schema {
1815 name: QualifiedSchemaName {
1816 database: ResolvedDatabaseSpecifier::Ambient,
1817 schema: MZ_TEMP_SCHEMA.into(),
1818 },
1819 id: SchemaSpecifier::Temporary,
1820 oid,
1821 items: BTreeMap::new(),
1822 functions: BTreeMap::new(),
1823 types: BTreeMap::new(),
1824 owner_id,
1825 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1826 mz_sql::catalog::ObjectType::Schema,
1827 owner_id,
1828 )]),
1829 },
1830 );
1831 Ok(())
1832 }
1833
1834 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1836 std::iter::empty()
1837 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1838 if schema.id.is_temporary() {
1839 Some(schema.oid)
1840 } else {
1841 None
1842 }
1843 }))
1844 .chain(self.entry_by_id.values().filter_map(|entry| {
1845 if entry.item().is_temporary() {
1846 Some(entry.oid)
1847 } else {
1848 None
1849 }
1850 }))
1851 }
1852
1853 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1857 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1858 }
1859
1860 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1864 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1865 let log = match self.get_entry(&item_id).item() {
1866 CatalogItem::Log(log) => log,
1867 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1868 };
1869 (item_id, log.global_id)
1870 }
1871
1872 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1876 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1877 }
1878
1879 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1883 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1884 let schema = &self.ambient_schemas_by_id[schema_id];
1885 match builtin.catalog_item_type() {
1886 CatalogItemType::Type => schema.types[builtin.name()],
1887 CatalogItemType::Func => schema.functions[builtin.name()],
1888 CatalogItemType::Table
1889 | CatalogItemType::Source
1890 | CatalogItemType::Sink
1891 | CatalogItemType::View
1892 | CatalogItemType::MaterializedView
1893 | CatalogItemType::Index
1894 | CatalogItemType::Secret
1895 | CatalogItemType::Connection => schema.items[builtin.name()],
1896 }
1897 }
1898
1899 pub fn resolve_builtin_type_references(
1901 &self,
1902 builtin: &BuiltinType<NameReference>,
1903 ) -> BuiltinType<IdReference> {
1904 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1905 CatalogType::AclItem => CatalogType::AclItem,
1906 CatalogType::Array { element_reference } => CatalogType::Array {
1907 element_reference: self.get_system_type(element_reference).id,
1908 },
1909 CatalogType::List {
1910 element_reference,
1911 element_modifiers,
1912 } => CatalogType::List {
1913 element_reference: self.get_system_type(element_reference).id,
1914 element_modifiers: element_modifiers.clone(),
1915 },
1916 CatalogType::Map {
1917 key_reference,
1918 value_reference,
1919 key_modifiers,
1920 value_modifiers,
1921 } => CatalogType::Map {
1922 key_reference: self.get_system_type(key_reference).id,
1923 value_reference: self.get_system_type(value_reference).id,
1924 key_modifiers: key_modifiers.clone(),
1925 value_modifiers: value_modifiers.clone(),
1926 },
1927 CatalogType::Range { element_reference } => CatalogType::Range {
1928 element_reference: self.get_system_type(element_reference).id,
1929 },
1930 CatalogType::Record { fields } => CatalogType::Record {
1931 fields: fields
1932 .into_iter()
1933 .map(|f| CatalogRecordField {
1934 name: f.name.clone(),
1935 type_reference: self.get_system_type(f.type_reference).id,
1936 type_modifiers: f.type_modifiers.clone(),
1937 })
1938 .collect(),
1939 },
1940 CatalogType::Bool => CatalogType::Bool,
1941 CatalogType::Bytes => CatalogType::Bytes,
1942 CatalogType::Char => CatalogType::Char,
1943 CatalogType::Date => CatalogType::Date,
1944 CatalogType::Float32 => CatalogType::Float32,
1945 CatalogType::Float64 => CatalogType::Float64,
1946 CatalogType::Int16 => CatalogType::Int16,
1947 CatalogType::Int32 => CatalogType::Int32,
1948 CatalogType::Int64 => CatalogType::Int64,
1949 CatalogType::UInt16 => CatalogType::UInt16,
1950 CatalogType::UInt32 => CatalogType::UInt32,
1951 CatalogType::UInt64 => CatalogType::UInt64,
1952 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1953 CatalogType::Interval => CatalogType::Interval,
1954 CatalogType::Jsonb => CatalogType::Jsonb,
1955 CatalogType::Numeric => CatalogType::Numeric,
1956 CatalogType::Oid => CatalogType::Oid,
1957 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1958 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1959 CatalogType::Pseudo => CatalogType::Pseudo,
1960 CatalogType::RegClass => CatalogType::RegClass,
1961 CatalogType::RegProc => CatalogType::RegProc,
1962 CatalogType::RegType => CatalogType::RegType,
1963 CatalogType::String => CatalogType::String,
1964 CatalogType::Time => CatalogType::Time,
1965 CatalogType::Timestamp => CatalogType::Timestamp,
1966 CatalogType::TimestampTz => CatalogType::TimestampTz,
1967 CatalogType::Uuid => CatalogType::Uuid,
1968 CatalogType::VarChar => CatalogType::VarChar,
1969 CatalogType::Int2Vector => CatalogType::Int2Vector,
1970 CatalogType::MzAclItem => CatalogType::MzAclItem,
1971 };
1972
1973 BuiltinType {
1974 name: builtin.name,
1975 schema: builtin.schema,
1976 oid: builtin.oid,
1977 details: CatalogTypeDetails {
1978 array_id: builtin.details.array_id,
1979 typ,
1980 pg_metadata: builtin.details.pg_metadata.clone(),
1981 },
1982 }
1983 }
1984
1985 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1986 &self.config
1987 }
1988
1989 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1990 match self.database_by_name.get(database_name) {
1991 Some(id) => Ok(&self.database_by_id[id]),
1992 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1993 }
1994 }
1995
1996 pub fn resolve_schema(
1997 &self,
1998 current_database: Option<&DatabaseId>,
1999 database_name: Option<&str>,
2000 schema_name: &str,
2001 conn_id: &ConnectionId,
2002 ) -> Result<&Schema, SqlCatalogError> {
2003 let database_spec = match database_name {
2004 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
2009 self.resolve_database(database)?.id().clone(),
2010 )),
2011 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2012 };
2013
2014 if let Some(database_spec) = database_spec {
2016 if let Ok(schema) =
2017 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2018 {
2019 return Ok(schema);
2020 }
2021 }
2022
2023 if let Ok(schema) = self.resolve_schema_in_database(
2025 &ResolvedDatabaseSpecifier::Ambient,
2026 schema_name,
2027 conn_id,
2028 ) {
2029 return Ok(schema);
2030 }
2031
2032 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2033 }
2034
2035 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2039 self.ambient_schemas_by_name[name]
2040 }
2041
2042 pub fn resolve_search_path(
2043 &self,
2044 session: &dyn SessionMetadata,
2045 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2046 let database = self
2047 .database_by_name
2048 .get(session.database())
2049 .map(|id| id.clone());
2050
2051 session
2052 .search_path()
2053 .iter()
2054 .map(|schema| {
2055 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2056 })
2057 .filter_map(|schema| schema.ok())
2058 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2059 .collect()
2060 }
2061
2062 pub fn effective_search_path(
2063 &self,
2064 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2065 include_temp_schema: bool,
2066 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2067 let mut v = Vec::with_capacity(search_path.len() + 3);
2068 let temp_schema = (
2070 ResolvedDatabaseSpecifier::Ambient,
2071 SchemaSpecifier::Temporary,
2072 );
2073 if include_temp_schema && !search_path.contains(&temp_schema) {
2074 v.push(temp_schema);
2075 }
2076 let default_schemas = [
2077 (
2078 ResolvedDatabaseSpecifier::Ambient,
2079 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2080 ),
2081 (
2082 ResolvedDatabaseSpecifier::Ambient,
2083 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2084 ),
2085 ];
2086 for schema in default_schemas.into_iter() {
2087 if !search_path.contains(&schema) {
2088 v.push(schema);
2089 }
2090 }
2091 v.extend_from_slice(search_path);
2092 v
2093 }
2094
2095 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2096 let id = self
2097 .clusters_by_name
2098 .get(name)
2099 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2100 Ok(&self.clusters_by_id[id])
2101 }
2102
2103 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2104 let id = self
2105 .clusters_by_name
2106 .get(cluster.name)
2107 .expect("failed to lookup BuiltinCluster by name");
2108 self.clusters_by_id
2109 .get(id)
2110 .expect("failed to lookup BuiltinCluster by ID")
2111 }
2112
2113 pub fn resolve_cluster_replica(
2114 &self,
2115 cluster_replica_name: &QualifiedReplica,
2116 ) -> Result<&ClusterReplica, SqlCatalogError> {
2117 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2118 let replica_name = cluster_replica_name.replica.as_str();
2119 let replica_id = cluster
2120 .replica_id(replica_name)
2121 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2122 Ok(cluster.replica(replica_id).expect("Must exist"))
2123 }
2124
2125 #[allow(clippy::useless_let_if_seq)]
2131 pub fn resolve(
2132 &self,
2133 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2134 current_database: Option<&DatabaseId>,
2135 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2136 name: &PartialItemName,
2137 conn_id: &ConnectionId,
2138 err_gen: fn(String) -> SqlCatalogError,
2139 ) -> Result<&CatalogEntry, SqlCatalogError> {
2140 let schemas = match &name.schema {
2145 Some(schema_name) => {
2146 match self.resolve_schema(
2147 current_database,
2148 name.database.as_deref(),
2149 schema_name,
2150 conn_id,
2151 ) {
2152 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2153 Err(e) => return Err(e),
2154 }
2155 }
2156 None => match self
2157 .try_get_schema(
2158 &ResolvedDatabaseSpecifier::Ambient,
2159 &SchemaSpecifier::Temporary,
2160 conn_id,
2161 )
2162 .and_then(|schema| schema.items.get(&name.item))
2163 {
2164 Some(id) => return Ok(self.get_entry(id)),
2165 None => search_path.to_vec(),
2166 },
2167 };
2168
2169 for (database_spec, schema_spec) in &schemas {
2170 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2173 continue;
2174 };
2175
2176 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2177 return Ok(&self.entry_by_id[id]);
2178 }
2179 }
2180
2181 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2186 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2187 for schema_id in [
2188 self.get_mz_catalog_unstable_schema_id(),
2189 self.get_mz_introspection_schema_id(),
2190 ] {
2191 let schema = self.get_schema(
2192 &ResolvedDatabaseSpecifier::Ambient,
2193 &SchemaSpecifier::Id(schema_id),
2194 conn_id,
2195 );
2196
2197 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2198 debug!(
2199 github_27831 = true,
2200 "encountered use of outdated schema `mz_internal` for relation: {name}",
2201 );
2202 return Ok(&self.entry_by_id[id]);
2203 }
2204 }
2205 }
2206
2207 Err(err_gen(name.to_string()))
2208 }
2209
2210 pub fn resolve_entry(
2212 &self,
2213 current_database: Option<&DatabaseId>,
2214 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2215 name: &PartialItemName,
2216 conn_id: &ConnectionId,
2217 ) -> Result<&CatalogEntry, SqlCatalogError> {
2218 self.resolve(
2219 |schema| &schema.items,
2220 current_database,
2221 search_path,
2222 name,
2223 conn_id,
2224 SqlCatalogError::UnknownItem,
2225 )
2226 }
2227
2228 pub fn resolve_function(
2230 &self,
2231 current_database: Option<&DatabaseId>,
2232 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2233 name: &PartialItemName,
2234 conn_id: &ConnectionId,
2235 ) -> Result<&CatalogEntry, SqlCatalogError> {
2236 self.resolve(
2237 |schema| &schema.functions,
2238 current_database,
2239 search_path,
2240 name,
2241 conn_id,
2242 |name| SqlCatalogError::UnknownFunction {
2243 name,
2244 alternative: None,
2245 },
2246 )
2247 }
2248
2249 pub fn resolve_type(
2251 &self,
2252 current_database: Option<&DatabaseId>,
2253 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2254 name: &PartialItemName,
2255 conn_id: &ConnectionId,
2256 ) -> Result<&CatalogEntry, SqlCatalogError> {
2257 static NON_PG_CATALOG_TYPES: LazyLock<
2258 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2259 > = LazyLock::new(|| {
2260 BUILTINS::types()
2261 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2262 .map(|typ| (typ.name, typ))
2263 .collect()
2264 });
2265
2266 let entry = self.resolve(
2267 |schema| &schema.types,
2268 current_database,
2269 search_path,
2270 name,
2271 conn_id,
2272 |name| SqlCatalogError::UnknownType { name },
2273 )?;
2274
2275 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2276 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2277 warn!(
2278 "user specified an incorrect schema of {} for the type {}, which should be in \
2279 the {} schema. This works now due to a bug but will be fixed in a later release.",
2280 PG_CATALOG_SCHEMA.quoted(),
2281 typ.name.quoted(),
2282 typ.schema.quoted(),
2283 )
2284 }
2285 }
2286
2287 Ok(entry)
2288 }
2289
2290 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2292 match object_id {
2293 ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2294 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2295 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2296 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2297 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2298 ObjectId::ClusterReplica(cluster_replica_id) => {
2299 CommentObjectId::ClusterReplica(cluster_replica_id)
2300 }
2301 ObjectId::NetworkPolicy(network_policy_id) => {
2302 CommentObjectId::NetworkPolicy(network_policy_id)
2303 }
2304 }
2305 }
2306
2307 pub fn system_config(&self) -> &SystemVars {
2309 &self.system_configuration
2310 }
2311
2312 pub fn cluster_scoped_optimizer_overrides(
2316 &self,
2317 cluster_id: ClusterId,
2318 ) -> OptimizerFeatureOverrides {
2319 self.scoped_system_parameters
2320 .cluster
2321 .get(&cluster_id)
2322 .cloned()
2323 .map(OptimizerFeatureOverrides::from)
2324 .unwrap_or_default()
2325 }
2326
2327 pub fn scoped_system_parameters(&self) -> &ScopedParameters {
2330 &self.scoped_system_parameters
2331 }
2332
2333 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2335 Arc::make_mut(&mut self.system_configuration)
2336 }
2337
2338 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2348 let mut dump = serde_json::to_value(&self).map_err(|e| {
2350 Error::new(ErrorKind::Unstructured(format!(
2351 "internal error: could not dump catalog: {}",
2354 e
2355 )))
2356 })?;
2357
2358 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2359 dump_obj.insert(
2361 "system_parameter_defaults".into(),
2362 serde_json::json!(self.system_config().defaults()),
2363 );
2364 if let Some(unfinalized_shards) = unfinalized_shards {
2366 dump_obj
2367 .get_mut("storage_metadata")
2368 .expect("known to exist")
2369 .as_object_mut()
2370 .expect("storage_metadata is an object")
2371 .insert(
2372 "unfinalized_shards".into(),
2373 serde_json::json!(unfinalized_shards),
2374 );
2375 }
2376 let temporary_gids: Vec<_> = self
2381 .entry_by_global_id
2382 .iter()
2383 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2384 .map(|(gid, _item_id)| *gid)
2385 .collect();
2386 if !temporary_gids.is_empty() {
2387 let gids = dump_obj
2388 .get_mut("entry_by_global_id")
2389 .expect("known_to_exist")
2390 .as_object_mut()
2391 .expect("entry_by_global_id is an object");
2392 for gid in temporary_gids {
2393 gids.remove(&gid.to_string());
2394 }
2395 }
2396 dump_obj.remove("role_auth_by_id");
2399
2400 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2402 }
2403
2404 pub fn availability_zones(&self) -> &[String] {
2405 &self.availability_zones
2406 }
2407
2408 pub fn concretize_replica_location(
2409 &self,
2410 location: mz_catalog::durable::ReplicaLocation,
2411 allowed_sizes: &Vec<String>,
2412 allowed_availability_zones: Option<&[String]>,
2413 allow_disabled: bool,
2414 ) -> Result<ReplicaLocation, Error> {
2415 let location = match location {
2416 mz_catalog::durable::ReplicaLocation::Unmanaged {
2417 storagectl_addrs,
2418 computectl_addrs,
2419 } => {
2420 if allowed_availability_zones.is_some() {
2421 return Err(Error {
2422 kind: ErrorKind::Internal(
2423 "tried concretize unmanaged replica with specific availability_zones"
2424 .to_string(),
2425 ),
2426 });
2427 }
2428 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2429 storagectl_addrs,
2430 computectl_addrs,
2431 })
2432 }
2433 mz_catalog::durable::ReplicaLocation::Managed {
2434 size,
2435 availability_zones,
2441 billed_as,
2442 internal,
2443 pending,
2444 } => {
2445 self.ensure_valid_replica_size(allowed_sizes, &size, allow_disabled)?;
2446 let cluster_replica_sizes = &self.cluster_replica_sizes;
2447
2448 ReplicaLocation::Managed(ManagedReplicaLocation {
2449 allocation: cluster_replica_sizes
2450 .0
2451 .get(&size)
2452 .expect("catalog out of sync")
2453 .clone(),
2454 availability_zones: match allowed_availability_zones {
2455 Some(azs) => azs.to_vec(),
2456 None => availability_zones,
2457 },
2458 size,
2459 billed_as,
2460 internal,
2461 pending,
2462 })
2463 }
2464 };
2465 Ok(location)
2466 }
2467
2468 pub(crate) fn ensure_valid_replica_size(
2469 &self,
2470 allowed_sizes: &[String],
2471 size: &String,
2472 allow_disabled: bool,
2473 ) -> Result<(), Error> {
2474 let cluster_replica_sizes = &self.cluster_replica_sizes;
2475
2476 if !cluster_replica_sizes.0.contains_key(size)
2477 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2478 || (!allow_disabled && cluster_replica_sizes.0[size].disabled)
2479 {
2480 let mut entries = cluster_replica_sizes
2481 .enabled_allocations()
2482 .collect::<Vec<_>>();
2483
2484 if !allowed_sizes.is_empty() {
2485 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2486 entries.retain(|(name, _)| allowed_sizes.contains(name));
2487 }
2488
2489 entries.sort_by_key(
2490 |(
2491 _name,
2492 ReplicaAllocation {
2493 scale, cpu_limit, ..
2494 },
2495 )| (scale, cpu_limit),
2496 );
2497
2498 Err(Error {
2499 kind: ErrorKind::InvalidClusterReplicaSize {
2500 size: size.to_owned(),
2501 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2502 },
2503 })
2504 } else {
2505 Ok(())
2506 }
2507 }
2508
2509 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2510 if role_id.is_builtin() {
2511 let role = self.get_role(role_id);
2512 Err(Error::new(ErrorKind::ReservedRoleName(
2513 role.name().to_string(),
2514 )))
2515 } else {
2516 Ok(())
2517 }
2518 }
2519
2520 pub fn ensure_not_reserved_network_policy(
2521 &self,
2522 network_policy_id: &NetworkPolicyId,
2523 ) -> Result<(), Error> {
2524 if network_policy_id.is_builtin() {
2525 let policy = self.get_network_policy(network_policy_id);
2526 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2527 policy.name.clone(),
2528 )))
2529 } else {
2530 Ok(())
2531 }
2532 }
2533
2534 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2535 let is_grantable = !role_id.is_public() && !role_id.is_system();
2536 if is_grantable {
2537 Ok(())
2538 } else {
2539 let role = self.get_role(role_id);
2540 Err(Error::new(ErrorKind::UngrantableRoleName(
2541 role.name().to_string(),
2542 )))
2543 }
2544 }
2545
2546 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2547 if role_id.is_system() {
2548 let role = self.get_role(role_id);
2549 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2550 role.name().to_string(),
2551 )))
2552 } else {
2553 Ok(())
2554 }
2555 }
2556
2557 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2558 if role_id.is_predefined() {
2559 let role = self.get_role(role_id);
2560 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2561 role.name().to_string(),
2562 )))
2563 } else {
2564 Ok(())
2565 }
2566 }
2567
2568 pub(crate) fn add_to_audit_log(
2571 system_configuration: &SystemVars,
2572 oracle_write_ts: mz_repr::Timestamp,
2573 session: Option<&ConnMeta>,
2574 tx: &mut mz_catalog::durable::Transaction,
2575 audit_events: &mut Vec<VersionedEvent>,
2576 event_type: EventType,
2577 object_type: ObjectType,
2578 details: EventDetails,
2579 ) -> Result<(), Error> {
2580 let user = session.map(|session| session.user().name.to_string());
2581
2582 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2585 Some(ts) => ts.into(),
2586 _ => oracle_write_ts.into(),
2587 };
2588 let id = tx.allocate_audit_log_id()?;
2589 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2590 audit_events.push(event.clone());
2591 tx.insert_audit_log_event(event);
2592 Ok(())
2593 }
2594
2595 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2596 match id {
2597 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2598 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2599 self.get_cluster_replica(*cluster_id, *replica_id)
2600 .owner_id(),
2601 ),
2602 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2603 ObjectId::Schema((database_spec, schema_spec)) => Some(
2604 self.get_schema(database_spec, schema_spec, conn_id)
2605 .owner_id(),
2606 ),
2607 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2608 ObjectId::Role(_) => None,
2609 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2610 }
2611 }
2612
2613 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2614 match object_id {
2615 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2616 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2617 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2618 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2619 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2620 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2621 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2622 }
2623 }
2624
2625 pub(super) fn get_system_object_type(
2626 &self,
2627 id: &SystemObjectId,
2628 ) -> mz_sql::catalog::SystemObjectType {
2629 match id {
2630 SystemObjectId::Object(object_id) => {
2631 SystemObjectType::Object(self.get_object_type(object_id))
2632 }
2633 SystemObjectId::System => SystemObjectType::System,
2634 }
2635 }
2636
2637 pub fn storage_metadata(&self) -> &StorageMetadata {
2641 &self.storage_metadata
2642 }
2643
2644 pub fn source_compaction_windows(
2646 &self,
2647 ids: impl IntoIterator<Item = CatalogItemId>,
2648 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2649 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2650 let mut seen = BTreeSet::new();
2651 for item_id in ids {
2652 if !seen.insert(item_id) {
2653 continue;
2654 }
2655 let entry = self.get_entry(&item_id);
2656 match entry.item() {
2657 CatalogItem::Source(source) => {
2658 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2659 cws.entry(source_cw).or_default().insert(item_id);
2660 }
2661 CatalogItem::Table(table) => {
2662 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2663 match &table.data_source {
2664 TableDataSource::DataSource {
2665 desc:
2666 DataSourceDesc::IngestionExport { .. }
2667 | DataSourceDesc::Webhook { .. },
2669 timeline: _,
2670 } => {
2671 cws.entry(table_cw).or_default().insert(item_id);
2672 }
2673 TableDataSource::TableWrites { .. } => {}
2676 TableDataSource::DataSource {
2677 desc:
2678 DataSourceDesc::Ingestion { .. }
2679 | DataSourceDesc::OldSyntaxIngestion { .. }
2680 | DataSourceDesc::Introspection(_)
2681 | DataSourceDesc::Progress
2682 | DataSourceDesc::Catalog,
2683 ..
2684 } => {
2685 unreachable!(
2686 "unexpected DataSourceDesc for table {item_id}: {:?}",
2687 table.data_source
2688 )
2689 }
2690 }
2691 }
2692 _ => {
2693 continue;
2695 }
2696 }
2697 }
2698 cws
2699 }
2700
2701 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2702 match id {
2703 CommentObjectId::Table(id)
2704 | CommentObjectId::View(id)
2705 | CommentObjectId::MaterializedView(id)
2706 | CommentObjectId::Source(id)
2707 | CommentObjectId::Sink(id)
2708 | CommentObjectId::Index(id)
2709 | CommentObjectId::Func(id)
2710 | CommentObjectId::Connection(id)
2711 | CommentObjectId::Type(id)
2712 | CommentObjectId::Secret(id) => Some(*id),
2713 CommentObjectId::Role(_)
2714 | CommentObjectId::Database(_)
2715 | CommentObjectId::Schema(_)
2716 | CommentObjectId::Cluster(_)
2717 | CommentObjectId::ClusterReplica(_)
2718 | CommentObjectId::NetworkPolicy(_) => None,
2719 }
2720 }
2721
2722 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2723 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2724 }
2725
2726 pub fn comment_id_to_audit_log_name(
2727 &self,
2728 id: CommentObjectId,
2729 conn_id: &ConnectionId,
2730 ) -> String {
2731 match id {
2732 CommentObjectId::Table(id)
2733 | CommentObjectId::View(id)
2734 | CommentObjectId::MaterializedView(id)
2735 | CommentObjectId::Source(id)
2736 | CommentObjectId::Sink(id)
2737 | CommentObjectId::Index(id)
2738 | CommentObjectId::Func(id)
2739 | CommentObjectId::Connection(id)
2740 | CommentObjectId::Type(id)
2741 | CommentObjectId::Secret(id) => {
2742 let item = self.get_entry(&id);
2743 let name = self.resolve_full_name(item.name(), Some(conn_id));
2744 name.to_string()
2745 }
2746 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2747 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2748 CommentObjectId::Schema((spec, schema_id)) => {
2749 let schema = self.get_schema(&spec, &schema_id, conn_id);
2750 self.resolve_full_schema_name(&schema.name).to_string()
2751 }
2752 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2753 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2754 let cluster = self.get_cluster(cluster_id);
2755 let replica = self.get_cluster_replica(cluster_id, replica_id);
2756 QualifiedReplica {
2757 cluster: Ident::new_unchecked(cluster.name.clone()),
2758 replica: Ident::new_unchecked(replica.name.clone()),
2759 }
2760 .to_string()
2761 }
2762 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2763 }
2764 }
2765
2766 pub fn mock_authentication_nonce(&self) -> String {
2767 self.mock_authentication_nonce.clone().unwrap_or_default()
2768 }
2769}
2770
2771impl ConnectionResolver for CatalogState {
2772 fn resolve_connection(
2773 &self,
2774 id: CatalogItemId,
2775 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2776 use mz_storage_types::connections::Connection::*;
2777 match self
2778 .get_entry(&id)
2779 .connection()
2780 .expect("catalog out of sync")
2781 .details
2782 .to_connection()
2783 {
2784 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2785 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2786 Csr(conn) => Csr(conn.into_inline_connection(self)),
2787 GlueSchemaRegistry(conn) => GlueSchemaRegistry(conn.into_inline_connection(self)),
2788 Ssh(conn) => Ssh(conn),
2789 Aws(conn) => Aws(conn),
2790 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2791 Gcp(conn) => Gcp(conn),
2792 MySql(conn) => MySql(conn.into_inline_connection(self)),
2793 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2794 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2795 }
2796 }
2797}
2798
2799impl OptimizerCatalog for CatalogState {
2800 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2801 CatalogState::get_entry_by_global_id(self, id)
2802 }
2803 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2804 CatalogState::get_entry(self, id)
2805 }
2806 fn resolve_full_name(
2807 &self,
2808 name: &QualifiedItemName,
2809 conn_id: Option<&ConnectionId>,
2810 ) -> FullItemName {
2811 CatalogState::resolve_full_name(self, name, conn_id)
2812 }
2813 fn get_indexes_on(
2814 &self,
2815 id: GlobalId,
2816 cluster: ClusterId,
2817 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2818 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2819 }
2820}
2821
2822impl OptimizerCatalog for Catalog {
2823 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2824 self.state.get_entry_by_global_id(id)
2825 }
2826
2827 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2828 self.state.get_entry(id)
2829 }
2830
2831 fn resolve_full_name(
2832 &self,
2833 name: &QualifiedItemName,
2834 conn_id: Option<&ConnectionId>,
2835 ) -> FullItemName {
2836 self.state.resolve_full_name(name, conn_id)
2837 }
2838
2839 fn get_indexes_on(
2840 &self,
2841 id: GlobalId,
2842 cluster: ClusterId,
2843 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2844 Box::new(self.state.get_indexes_on(id, cluster))
2845 }
2846}
2847
2848impl Catalog {
2849 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2850 self
2851 }
2852}