1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
61use mz_secrets::InMemorySecretsController;
62use mz_sql::ast::Ident;
63use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68 TypeReference,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
72 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{
76 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
77 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
78 Plan, PlanContext,
79};
80use mz_sql::rbac;
81use mz_sql::session::metadata::SessionMetadata;
82use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
83use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
84use mz_sql_parser::ast::QualifiedReplica;
85use mz_storage_client::controller::StorageMetadata;
86use mz_storage_types::connections::ConnectionContext;
87use mz_storage_types::connections::inline::{
88 ConnectionResolver, InlinedConnection, IntoInlineConnection,
89};
90use serde::Serialize;
91use timely::progress::Antichain;
92use tokio::sync::mpsc;
93use tracing::{debug, warn};
94
95use crate::AdapterError;
97use crate::catalog::{Catalog, ConnCatalog};
98use crate::coord::ConnMeta;
99use crate::optimize::{self, Optimize, OptimizerCatalog};
100use crate::session::Session;
101
102#[derive(Debug, Clone, Serialize)]
109pub struct CatalogState {
110 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
116 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
117 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
118 #[serde(serialize_with = "skip_temp_items")]
119 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
120 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
122 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
125 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
128 pub(super) roles_by_name: BTreeMap<String, RoleId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
131 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
134 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
135 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
136
137 #[serde(skip)]
138 pub(super) system_configuration: SystemVars,
139 pub(super) default_privileges: DefaultPrivileges,
140 pub(super) system_privileges: PrivilegeMap,
141 pub(super) comments: CommentsMap,
142 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
143 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
144 pub(super) storage_metadata: StorageMetadata,
145
146 #[serde(skip)]
148 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
149
150 #[serde(skip)]
152 pub(super) config: mz_sql::catalog::CatalogConfig,
153 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
154 #[serde(skip)]
155 pub(crate) availability_zones: Vec<String>,
156
157 #[serde(skip)]
159 pub(super) egress_addresses: Vec<IpNet>,
160 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
161 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
162 pub(super) http_host_name: Option<String>,
163
164 #[serde(skip)]
166 pub(super) license_key: ValidatedLicenseKey,
167}
168
169#[derive(Debug, Clone, Serialize)]
171pub(crate) enum LocalExpressionCache {
172 Open {
174 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
176 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
178 },
179 Closed,
181}
182
183impl LocalExpressionCache {
184 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
185 Self::Open {
186 cached_exprs,
187 uncached_exprs: BTreeMap::new(),
188 }
189 }
190
191 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
192 match self {
193 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
194 LocalExpressionCache::Closed => None,
195 }
196 }
197
198 pub(super) fn insert_cached_expression(
201 &mut self,
202 id: GlobalId,
203 local_expressions: LocalExpressions,
204 ) {
205 match self {
206 LocalExpressionCache::Open { cached_exprs, .. } => {
207 cached_exprs.insert(id, local_expressions);
208 }
209 LocalExpressionCache::Closed => {}
210 }
211 }
212
213 pub(super) fn insert_uncached_expression(
216 &mut self,
217 id: GlobalId,
218 local_mir: OptimizedMirRelationExpr,
219 optimizer_features: OptimizerFeatures,
220 ) {
221 match self {
222 LocalExpressionCache::Open { uncached_exprs, .. } => {
223 let local_expr = LocalExpressions {
224 local_mir,
225 optimizer_features,
226 };
227 let prev = uncached_exprs.remove(&id);
234 match prev {
235 Some(prev) if prev == local_expr => {
236 uncached_exprs.insert(id, local_expr);
237 }
238 None => {
239 uncached_exprs.insert(id, local_expr);
240 }
241 Some(_) => {}
242 }
243 }
244 LocalExpressionCache::Closed => {}
245 }
246 }
247
248 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
249 match self {
250 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
251 LocalExpressionCache::Closed => BTreeMap::new(),
252 }
253 }
254}
255
256fn skip_temp_items<S>(
257 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
258 serializer: S,
259) -> Result<S::Ok, S::Error>
260where
261 S: serde::Serializer,
262{
263 mz_ore::serde::map_key_to_string(
264 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
265 serializer,
266 )
267}
268
269impl CatalogState {
270 pub fn empty_test() -> Self {
274 CatalogState {
275 database_by_name: Default::default(),
276 database_by_id: Default::default(),
277 entry_by_id: Default::default(),
278 entry_by_global_id: Default::default(),
279 ambient_schemas_by_name: Default::default(),
280 ambient_schemas_by_id: Default::default(),
281 temporary_schemas: Default::default(),
282 clusters_by_id: Default::default(),
283 clusters_by_name: Default::default(),
284 network_policies_by_name: Default::default(),
285 roles_by_name: Default::default(),
286 roles_by_id: Default::default(),
287 network_policies_by_id: Default::default(),
288 role_auth_by_id: Default::default(),
289 config: CatalogConfig {
290 start_time: Default::default(),
291 start_instant: Instant::now(),
292 nonce: Default::default(),
293 environment_id: EnvironmentId::for_tests(),
294 session_id: Default::default(),
295 build_info: &DUMMY_BUILD_INFO,
296 timestamp_interval: Default::default(),
297 now: NOW_ZERO.clone(),
298 connection_context: ConnectionContext::for_tests(Arc::new(
299 InMemorySecretsController::new(),
300 )),
301 builtins_cfg: BuiltinsConfig {
302 include_continual_tasks: true,
303 },
304 helm_chart_version: None,
305 },
306 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
307 availability_zones: Default::default(),
308 system_configuration: Default::default(),
309 egress_addresses: Default::default(),
310 aws_principal_context: Default::default(),
311 aws_privatelink_availability_zones: Default::default(),
312 http_host_name: Default::default(),
313 default_privileges: Default::default(),
314 system_privileges: Default::default(),
315 comments: Default::default(),
316 source_references: Default::default(),
317 storage_metadata: Default::default(),
318 license_key: ValidatedLicenseKey::for_tests(),
319 }
320 }
321
322 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
323 let search_path = self.resolve_search_path(session);
324 let database = self
325 .database_by_name
326 .get(session.vars().database())
327 .map(|id| id.clone());
328 let state = match session.transaction().catalog_state() {
329 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
330 None => Cow::Borrowed(self),
331 };
332 ConnCatalog {
333 state,
334 unresolvable_ids: BTreeSet::new(),
335 conn_id: session.conn_id().clone(),
336 cluster: session.vars().cluster().into(),
337 database,
338 search_path,
339 role_id: session.current_role_id().clone(),
340 prepared_statements: Some(session.prepared_statements()),
341 notices_tx: session.retain_notice_transmitter(),
342 }
343 }
344
345 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
346 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
347 let cluster = self.system_configuration.default_cluster();
348
349 ConnCatalog {
350 state: Cow::Borrowed(self),
351 unresolvable_ids: BTreeSet::new(),
352 conn_id: SYSTEM_CONN_ID.clone(),
353 cluster,
354 database: self
355 .resolve_database(DEFAULT_DATABASE_NAME)
356 .ok()
357 .map(|db| db.id()),
358 search_path: Vec::new(),
361 role_id,
362 prepared_statements: None,
363 notices_tx,
364 }
365 }
366
367 pub fn for_system_session(&self) -> ConnCatalog<'_> {
368 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
369 }
370
371 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
376 struct I<'a> {
377 queue: VecDeque<CatalogItemId>,
378 seen: BTreeSet<CatalogItemId>,
379 this: &'a CatalogState,
380 }
381 impl<'a> Iterator for I<'a> {
382 type Item = CatalogItemId;
383 fn next(&mut self) -> Option<Self::Item> {
384 if let Some(next) = self.queue.pop_front() {
385 for child in self.this.get_entry(&next).item().uses() {
386 if !self.seen.contains(&child) {
387 self.queue.push_back(child);
388 self.seen.insert(child);
389 }
390 }
391 Some(next)
392 } else {
393 None
394 }
395 }
396 }
397
398 I {
399 queue: [id].into_iter().collect(),
400 seen: [id].into_iter().collect(),
401 this: self,
402 }
403 }
404
405 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
408 let mut out = Vec::new();
409 self.introspection_dependencies_inner(id, &mut out);
410 out
411 }
412
413 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
414 match self.get_entry(&id).item() {
415 CatalogItem::Log(_) => out.push(id),
416 item @ (CatalogItem::View(_)
417 | CatalogItem::MaterializedView(_)
418 | CatalogItem::Connection(_)
419 | CatalogItem::ContinualTask(_)) => {
420 for item_id in item.references().items() {
422 self.introspection_dependencies_inner(*item_id, out);
423 }
424 }
425 CatalogItem::Sink(sink) => {
426 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
427 self.introspection_dependencies_inner(from_item_id, out)
428 }
429 CatalogItem::Index(idx) => {
430 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
431 self.introspection_dependencies_inner(on_item_id, out)
432 }
433 CatalogItem::Table(_)
434 | CatalogItem::Source(_)
435 | CatalogItem::Type(_)
436 | CatalogItem::Func(_)
437 | CatalogItem::Secret(_) => (),
438 }
439 }
440
441 pub(super) fn object_dependents(
447 &self,
448 object_ids: &Vec<ObjectId>,
449 conn_id: &ConnectionId,
450 seen: &mut BTreeSet<ObjectId>,
451 ) -> Vec<ObjectId> {
452 let mut dependents = Vec::new();
453 for object_id in object_ids {
454 match object_id {
455 ObjectId::Cluster(id) => {
456 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
457 }
458 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
459 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
460 ),
461 ObjectId::Database(id) => {
462 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
463 }
464 ObjectId::Schema((database_spec, schema_spec)) => {
465 dependents.extend_from_slice(&self.schema_dependents(
466 database_spec.clone(),
467 schema_spec.clone(),
468 conn_id,
469 seen,
470 ));
471 }
472 ObjectId::NetworkPolicy(id) => {
473 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
474 }
475 id @ ObjectId::Role(_) => {
476 let unseen = seen.insert(id.clone());
477 if unseen {
478 dependents.push(id.clone());
479 }
480 }
481 ObjectId::Item(id) => {
482 dependents.extend_from_slice(&self.item_dependents(*id, seen))
483 }
484 }
485 }
486 dependents
487 }
488
489 fn cluster_dependents(
496 &self,
497 cluster_id: ClusterId,
498 seen: &mut BTreeSet<ObjectId>,
499 ) -> Vec<ObjectId> {
500 let mut dependents = Vec::new();
501 let object_id = ObjectId::Cluster(cluster_id);
502 if !seen.contains(&object_id) {
503 seen.insert(object_id.clone());
504 let cluster = self.get_cluster(cluster_id);
505 for item_id in cluster.bound_objects() {
506 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
507 }
508 for replica_id in cluster.replica_ids().values() {
509 dependents.extend_from_slice(&self.cluster_replica_dependents(
510 cluster_id,
511 *replica_id,
512 seen,
513 ));
514 }
515 dependents.push(object_id);
516 }
517 dependents
518 }
519
520 pub(super) fn cluster_replica_dependents(
527 &self,
528 cluster_id: ClusterId,
529 replica_id: ReplicaId,
530 seen: &mut BTreeSet<ObjectId>,
531 ) -> Vec<ObjectId> {
532 let mut dependents = Vec::new();
533 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
534 if !seen.contains(&object_id) {
535 seen.insert(object_id.clone());
536 dependents.push(object_id);
537 }
538 dependents
539 }
540
541 fn database_dependents(
548 &self,
549 database_id: DatabaseId,
550 conn_id: &ConnectionId,
551 seen: &mut BTreeSet<ObjectId>,
552 ) -> Vec<ObjectId> {
553 let mut dependents = Vec::new();
554 let object_id = ObjectId::Database(database_id);
555 if !seen.contains(&object_id) {
556 seen.insert(object_id.clone());
557 let database = self.get_database(&database_id);
558 for schema_id in database.schema_ids().values() {
559 dependents.extend_from_slice(&self.schema_dependents(
560 ResolvedDatabaseSpecifier::Id(database_id),
561 SchemaSpecifier::Id(*schema_id),
562 conn_id,
563 seen,
564 ));
565 }
566 dependents.push(object_id);
567 }
568 dependents
569 }
570
571 fn schema_dependents(
578 &self,
579 database_spec: ResolvedDatabaseSpecifier,
580 schema_spec: SchemaSpecifier,
581 conn_id: &ConnectionId,
582 seen: &mut BTreeSet<ObjectId>,
583 ) -> Vec<ObjectId> {
584 let mut dependents = Vec::new();
585 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
586 if !seen.contains(&object_id) {
587 seen.insert(object_id.clone());
588 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
589 for item_id in schema.item_ids() {
590 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
591 }
592 dependents.push(object_id)
593 }
594 dependents
595 }
596
597 pub(super) fn item_dependents(
604 &self,
605 item_id: CatalogItemId,
606 seen: &mut BTreeSet<ObjectId>,
607 ) -> Vec<ObjectId> {
608 let mut dependents = Vec::new();
609 let object_id = ObjectId::Item(item_id);
610 if !seen.contains(&object_id) {
611 seen.insert(object_id.clone());
612 let entry = self.get_entry(&item_id);
613 for dependent_id in entry.used_by() {
614 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
615 }
616 dependents.push(object_id);
617 if let Some(progress_id) = entry.progress_id() {
621 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
622 }
623 }
624 dependents
625 }
626
627 pub(super) fn network_policy_dependents(
634 &self,
635 network_policy_id: NetworkPolicyId,
636 _seen: &mut BTreeSet<ObjectId>,
637 ) -> Vec<ObjectId> {
638 let object_id = ObjectId::NetworkPolicy(network_policy_id);
639 vec![object_id]
643 }
644
645 fn is_stable(&self, id: CatalogItemId) -> bool {
649 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
650 !self.is_unstable_schema_specifier(spec)
651 }
652
653 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
654 if self.system_config().unsafe_enable_unstable_dependencies() {
655 return Ok(());
656 }
657
658 let unstable_dependencies: Vec<_> = item
659 .references()
660 .items()
661 .filter(|id| !self.is_stable(**id))
662 .map(|id| self.get_entry(id).name().item.clone())
663 .collect();
664
665 if unstable_dependencies.is_empty() || item.is_temporary() {
669 Ok(())
670 } else {
671 let object_type = item.typ().to_string();
672 Err(Error {
673 kind: ErrorKind::UnstableDependency {
674 object_type,
675 unstable_dependencies,
676 },
677 })
678 }
679 }
680
681 pub fn resolve_full_name(
682 &self,
683 name: &QualifiedItemName,
684 conn_id: Option<&ConnectionId>,
685 ) -> FullItemName {
686 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
687
688 let database = match &name.qualifiers.database_spec {
689 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
690 ResolvedDatabaseSpecifier::Id(id) => {
691 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
692 }
693 };
694 let schema = self
695 .get_schema(
696 &name.qualifiers.database_spec,
697 &name.qualifiers.schema_spec,
698 conn_id,
699 )
700 .name()
701 .schema
702 .clone();
703 FullItemName {
704 database,
705 schema,
706 item: name.item.clone(),
707 }
708 }
709
710 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
711 let database = match &name.database {
712 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
713 ResolvedDatabaseSpecifier::Id(id) => {
714 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
715 }
716 };
717 FullSchemaName {
718 database,
719 schema: name.schema.clone(),
720 }
721 }
722
723 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
724 self.entry_by_id
725 .get(id)
726 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
727 }
728
729 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
730 let item_id = self
731 .entry_by_global_id
732 .get(id)
733 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
734
735 let entry = self.get_entry(item_id).clone();
736 let version = match entry.item() {
737 CatalogItem::Table(table) => {
738 let (version, _) = table
739 .collections
740 .iter()
741 .find(|(_verison, gid)| *gid == id)
742 .expect("version to exist");
743 RelationVersionSelector::Specific(*version)
744 }
745 _ => RelationVersionSelector::Latest,
746 };
747 CatalogCollectionEntry { entry, version }
748 }
749
750 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
751 self.entry_by_id.iter()
752 }
753
754 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
755 let schema = self
756 .temporary_schemas
757 .get(conn)
758 .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
759 schema.items.values().copied().map(ObjectId::from)
760 }
761
762 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
768 let mut res = None;
769 for schema_id in self.system_schema_ids() {
770 let schema = &self.ambient_schemas_by_id[&schema_id];
771 if let Some(global_id) = schema.types.get(name) {
772 match res {
773 None => res = Some(self.get_entry(global_id)),
774 Some(_) => panic!(
775 "only call get_system_type on objects uniquely identifiable in one system schema"
776 ),
777 }
778 }
779 }
780
781 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
782 }
783
784 pub fn get_item_by_name(
785 &self,
786 name: &QualifiedItemName,
787 conn_id: &ConnectionId,
788 ) -> Option<&CatalogEntry> {
789 self.get_schema(
790 &name.qualifiers.database_spec,
791 &name.qualifiers.schema_spec,
792 conn_id,
793 )
794 .items
795 .get(&name.item)
796 .and_then(|id| self.try_get_entry(id))
797 }
798
799 pub fn get_type_by_name(
800 &self,
801 name: &QualifiedItemName,
802 conn_id: &ConnectionId,
803 ) -> Option<&CatalogEntry> {
804 self.get_schema(
805 &name.qualifiers.database_spec,
806 &name.qualifiers.schema_spec,
807 conn_id,
808 )
809 .types
810 .get(&name.item)
811 .and_then(|id| self.try_get_entry(id))
812 }
813
814 pub(super) fn find_available_name(
815 &self,
816 mut name: QualifiedItemName,
817 conn_id: &ConnectionId,
818 ) -> QualifiedItemName {
819 let mut i = 0;
820 let orig_item_name = name.item.clone();
821 while self.get_item_by_name(&name, conn_id).is_some() {
822 i += 1;
823 name.item = format!("{}{}", orig_item_name, i);
824 }
825 name
826 }
827
828 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
829 self.entry_by_id.get(id)
830 }
831
832 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
833 let item_id = self.entry_by_global_id.get(id)?;
834 self.try_get_entry(item_id)
835 }
836
837 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
840 let entry = self.try_get_entry_by_global_id(id)?;
841 let desc = match entry.item() {
842 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
843 other => other.desc_opt(RelationVersionSelector::Latest)?,
845 };
846 Some(desc)
847 }
848
849 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
850 self.try_get_cluster(cluster_id)
851 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
852 }
853
854 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
855 self.clusters_by_id.get(&cluster_id)
856 }
857
858 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
859 self.roles_by_id.get(id)
860 }
861
862 pub fn get_role(&self, id: &RoleId) -> &Role {
863 self.roles_by_id.get(id).expect("catalog out of sync")
864 }
865
866 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
867 self.roles_by_id.keys()
868 }
869
870 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
871 self.roles_by_name
872 .get(role_name)
873 .map(|id| &self.roles_by_id[id])
874 }
875
876 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
877 self.role_auth_by_id.get(id)
878 }
879
880 pub(super) fn try_get_network_policy_by_name(
881 &self,
882 policy_name: &str,
883 ) -> Option<&NetworkPolicy> {
884 self.network_policies_by_name
885 .get(policy_name)
886 .map(|id| &self.network_policies_by_id[id])
887 }
888
889 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
890 let mut membership = BTreeSet::new();
891 let mut queue = VecDeque::from(vec![id]);
892 while let Some(cur_id) = queue.pop_front() {
893 if !membership.contains(cur_id) {
894 membership.insert(cur_id.clone());
895 let role = self.get_role(cur_id);
896 soft_assert_no_log!(
897 !role.membership().keys().contains(id),
898 "circular membership exists in the catalog"
899 );
900 queue.extend(role.membership().keys());
901 }
902 }
903 membership.insert(RoleId::Public);
904 membership
905 }
906
907 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
908 self.network_policies_by_id
909 .get(id)
910 .expect("catalog out of sync")
911 }
912
913 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
914 self.network_policies_by_id.keys()
915 }
916
917 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
922 let entry = self.try_get_entry(id)?;
923 let name = self.resolve_full_name(entry.name(), None);
925 let host_name = self
926 .http_host_name
927 .as_ref()
928 .map(|x| x.as_str())
929 .unwrap_or_else(|| "HOST");
930
931 let RawDatabaseSpecifier::Name(database) = name.database else {
932 return None;
933 };
934
935 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
936 url.path_segments_mut()
937 .ok()?
938 .push(&database)
939 .push(&name.schema)
940 .push(&name.item);
941
942 Some(url)
943 }
944
945 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
953 &mut self,
956 create_sql: &str,
957 force_if_exists_skip: bool,
958 ) -> Result<(Plan, ResolvedIds), AdapterError> {
959 self.with_enable_for_item_parsing(|state| {
960 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
961 let pcx = Some(&pcx);
962 let session_catalog = state.for_system_session();
963
964 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
965 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
966 let plan =
967 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
968
969 Ok((plan, resolved_ids))
970 })
971 }
972
973 #[mz_ore::instrument]
975 pub(crate) fn parse_plan(
976 create_sql: &str,
977 pcx: Option<&PlanContext>,
978 catalog: &ConnCatalog,
979 ) -> Result<(Plan, ResolvedIds), AdapterError> {
980 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
981 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
982 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
983
984 return Ok((plan, resolved_ids));
985 }
986
987 pub(crate) fn deserialize_item(
989 &self,
990 global_id: GlobalId,
991 create_sql: &str,
992 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
993 local_expression_cache: &mut LocalExpressionCache,
994 previous_item: Option<CatalogItem>,
995 ) -> Result<CatalogItem, AdapterError> {
996 self.parse_item(
997 global_id,
998 create_sql,
999 extra_versions,
1000 None,
1001 false,
1002 None,
1003 local_expression_cache,
1004 previous_item,
1005 )
1006 }
1007
1008 #[mz_ore::instrument]
1010 pub(crate) fn parse_item(
1011 &self,
1012 global_id: GlobalId,
1013 create_sql: &str,
1014 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1015 pcx: Option<&PlanContext>,
1016 is_retained_metrics_object: bool,
1017 custom_logical_compaction_window: Option<CompactionWindow>,
1018 local_expression_cache: &mut LocalExpressionCache,
1019 previous_item: Option<CatalogItem>,
1020 ) -> Result<CatalogItem, AdapterError> {
1021 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1022 match self.parse_item_inner(
1023 global_id,
1024 create_sql,
1025 extra_versions,
1026 pcx,
1027 is_retained_metrics_object,
1028 custom_logical_compaction_window,
1029 cached_expr,
1030 previous_item,
1031 ) {
1032 Ok((item, uncached_expr)) => {
1033 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1034 local_expression_cache.insert_uncached_expression(
1035 global_id,
1036 uncached_expr,
1037 optimizer_features,
1038 );
1039 }
1040 Ok(item)
1041 }
1042 Err((err, cached_expr)) => {
1043 if let Some(local_expr) = cached_expr {
1044 local_expression_cache.insert_cached_expression(global_id, local_expr);
1045 }
1046 Err(err)
1047 }
1048 }
1049 }
1050
1051 #[mz_ore::instrument]
1058 pub(crate) fn parse_item_inner(
1059 &self,
1060 global_id: GlobalId,
1061 create_sql: &str,
1062 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1063 pcx: Option<&PlanContext>,
1064 is_retained_metrics_object: bool,
1065 custom_logical_compaction_window: Option<CompactionWindow>,
1066 cached_expr: Option<LocalExpressions>,
1067 previous_item: Option<CatalogItem>,
1068 ) -> Result<
1069 (
1070 CatalogItem,
1071 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1072 ),
1073 (AdapterError, Option<LocalExpressions>),
1074 > {
1075 let session_catalog = self.for_system_session();
1076
1077 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1078 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1079 Err(err) => return Err((err, cached_expr)),
1080 };
1081
1082 let mut uncached_expr = None;
1083
1084 let item = match plan {
1085 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1086 let collections = extra_versions
1087 .iter()
1088 .map(|(version, gid)| (*version, *gid))
1089 .chain([(RelationVersion::root(), global_id)].into_iter())
1090 .collect();
1091
1092 CatalogItem::Table(Table {
1093 create_sql: Some(table.create_sql),
1094 desc: table.desc,
1095 collections,
1096 conn_id: None,
1097 resolved_ids,
1098 custom_logical_compaction_window: custom_logical_compaction_window
1099 .or(table.compaction_window),
1100 is_retained_metrics_object,
1101 data_source: match table.data_source {
1102 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1103 TableDataSource::TableWrites { defaults }
1104 }
1105 mz_sql::plan::TableDataSource::DataSource {
1106 desc: data_source_desc,
1107 timeline,
1108 } => match data_source_desc {
1109 mz_sql::plan::DataSourceDesc::IngestionExport {
1110 ingestion_id,
1111 external_reference,
1112 details,
1113 data_config,
1114 } => TableDataSource::DataSource {
1115 desc: DataSourceDesc::IngestionExport {
1116 ingestion_id,
1117 external_reference,
1118 details,
1119 data_config,
1120 },
1121 timeline,
1122 },
1123 mz_sql::plan::DataSourceDesc::Webhook {
1124 validate_using,
1125 body_format,
1126 headers,
1127 cluster_id,
1128 } => TableDataSource::DataSource {
1129 desc: DataSourceDesc::Webhook {
1130 validate_using,
1131 body_format,
1132 headers,
1133 cluster_id: cluster_id
1134 .expect("Webhook Tables must have a cluster_id set"),
1135 },
1136 timeline,
1137 },
1138 _ => {
1139 return Err((
1140 AdapterError::Unstructured(anyhow::anyhow!(
1141 "unsupported data source for table"
1142 )),
1143 cached_expr,
1144 ));
1145 }
1146 },
1147 },
1148 })
1149 }
1150 Plan::CreateSource(CreateSourcePlan {
1151 source,
1152 timeline,
1153 in_cluster,
1154 ..
1155 }) => CatalogItem::Source(Source {
1156 create_sql: Some(source.create_sql),
1157 data_source: match source.data_source {
1158 mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1159 DataSourceDesc::Ingestion {
1160 ingestion_desc,
1161 cluster_id: match in_cluster {
1162 Some(id) => id,
1163 None => {
1164 return Err((
1165 AdapterError::Unstructured(anyhow::anyhow!(
1166 "ingestion-based sources must have cluster specified"
1167 )),
1168 cached_expr,
1169 ));
1170 }
1171 },
1172 }
1173 }
1174 mz_sql::plan::DataSourceDesc::IngestionExport {
1175 ingestion_id,
1176 external_reference,
1177 details,
1178 data_config,
1179 } => DataSourceDesc::IngestionExport {
1180 ingestion_id,
1181 external_reference,
1182 details,
1183 data_config,
1184 },
1185 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1186 mz_sql::plan::DataSourceDesc::Webhook {
1187 validate_using,
1188 body_format,
1189 headers,
1190 cluster_id,
1191 } => {
1192 mz_ore::soft_assert_or_log!(
1193 cluster_id.is_none(),
1194 "cluster_id set at Source level for Webhooks"
1195 );
1196 DataSourceDesc::Webhook {
1197 validate_using,
1198 body_format,
1199 headers,
1200 cluster_id: in_cluster
1201 .expect("webhook sources must use an existing cluster"),
1202 }
1203 }
1204 },
1205 desc: source.desc,
1206 global_id,
1207 timeline,
1208 resolved_ids,
1209 custom_logical_compaction_window: source
1210 .compaction_window
1211 .or(custom_logical_compaction_window),
1212 is_retained_metrics_object,
1213 }),
1214 Plan::CreateView(CreateViewPlan { view, .. }) => {
1215 let optimizer_config =
1217 optimize::OptimizerConfig::from(session_catalog.system_vars());
1218 let previous_exprs = previous_item.map(|item| match item {
1219 CatalogItem::View(view) => (view.raw_expr, view.optimized_expr),
1220 item => unreachable!("expected view, found: {item:#?}"),
1221 });
1222
1223 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1224 (Some(local_expr), _)
1225 if local_expr.optimizer_features == optimizer_config.features =>
1226 {
1227 debug!("local expression cache hit for {global_id:?}");
1228 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1229 }
1230 (_, Some((raw_expr, optimized_expr))) if *raw_expr == view.expr => {
1232 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1233 }
1234 (cached_expr, _) => {
1235 let optimizer_features = optimizer_config.features.clone();
1236 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1238
1239 let raw_expr = view.expr;
1241 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1242 Ok(optimzed_expr) => optimzed_expr,
1243 Err(err) => return Err((err.into(), cached_expr)),
1244 };
1245
1246 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1247
1248 (Arc::new(raw_expr), Arc::new(optimized_expr))
1249 }
1250 };
1251
1252 let dependencies: BTreeSet<_> = raw_expr
1254 .depends_on()
1255 .into_iter()
1256 .map(|gid| self.get_entry_by_global_id(&gid).id())
1257 .collect();
1258
1259 CatalogItem::View(View {
1260 create_sql: view.create_sql,
1261 global_id,
1262 raw_expr,
1263 desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1264 optimized_expr,
1265 conn_id: None,
1266 resolved_ids,
1267 dependencies: DependencyIds(dependencies),
1268 })
1269 }
1270 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1271 materialized_view, ..
1272 }) => {
1273 let optimizer_config =
1275 optimize::OptimizerConfig::from(session_catalog.system_vars());
1276 let previous_exprs = previous_item.map(|item| match item {
1277 CatalogItem::MaterializedView(materialized_view) => {
1278 (materialized_view.raw_expr, materialized_view.optimized_expr)
1279 }
1280 item => unreachable!("expected materialized view, found: {item:#?}"),
1281 });
1282
1283 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1284 (Some(local_expr), _)
1285 if local_expr.optimizer_features == optimizer_config.features =>
1286 {
1287 debug!("local expression cache hit for {global_id:?}");
1288 (
1289 Arc::new(materialized_view.expr),
1290 Arc::new(local_expr.local_mir),
1291 )
1292 }
1293 (_, Some((raw_expr, optimized_expr)))
1295 if *raw_expr == materialized_view.expr =>
1296 {
1297 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1298 }
1299 (cached_expr, _) => {
1300 let optimizer_features = optimizer_config.features.clone();
1301 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1303
1304 let raw_expr = materialized_view.expr;
1305 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1306 Ok(optimized_expr) => optimized_expr,
1307 Err(err) => return Err((err.into(), cached_expr)),
1308 };
1309
1310 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1311
1312 (Arc::new(raw_expr), Arc::new(optimized_expr))
1313 }
1314 };
1315 let mut typ = optimized_expr.typ();
1316 for &i in &materialized_view.non_null_assertions {
1317 typ.column_types[i].nullable = false;
1318 }
1319 let desc = RelationDesc::new(typ, materialized_view.column_names);
1320
1321 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1322
1323 let dependencies = raw_expr
1325 .depends_on()
1326 .into_iter()
1327 .map(|gid| self.get_entry_by_global_id(&gid).id())
1328 .collect();
1329
1330 CatalogItem::MaterializedView(MaterializedView {
1331 create_sql: materialized_view.create_sql,
1332 global_id,
1333 raw_expr,
1334 optimized_expr,
1335 desc,
1336 resolved_ids,
1337 dependencies,
1338 cluster_id: materialized_view.cluster_id,
1339 non_null_assertions: materialized_view.non_null_assertions,
1340 custom_logical_compaction_window: materialized_view.compaction_window,
1341 refresh_schedule: materialized_view.refresh_schedule,
1342 initial_as_of,
1343 })
1344 }
1345 Plan::CreateContinualTask(plan) => {
1346 let ct =
1347 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1348 Ok(ct) => ct,
1349 Err(err) => return Err((err, cached_expr)),
1350 };
1351 CatalogItem::ContinualTask(ct)
1352 }
1353 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1354 create_sql: index.create_sql,
1355 global_id,
1356 on: index.on,
1357 keys: index.keys.into(),
1358 conn_id: None,
1359 resolved_ids,
1360 cluster_id: index.cluster_id,
1361 custom_logical_compaction_window: custom_logical_compaction_window
1362 .or(index.compaction_window),
1363 is_retained_metrics_object,
1364 }),
1365 Plan::CreateSink(CreateSinkPlan {
1366 sink,
1367 with_snapshot,
1368 in_cluster,
1369 ..
1370 }) => CatalogItem::Sink(Sink {
1371 create_sql: sink.create_sql,
1372 global_id,
1373 from: sink.from,
1374 connection: sink.connection,
1375 envelope: sink.envelope,
1376 version: sink.version,
1377 with_snapshot,
1378 resolved_ids,
1379 cluster_id: in_cluster,
1380 }),
1381 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1382 let desc = match typ.inner.desc(&session_catalog) {
1383 Ok(desc) => desc,
1384 Err(err) => return Err((err.into(), cached_expr)),
1385 };
1386 CatalogItem::Type(Type {
1387 create_sql: Some(typ.create_sql),
1388 global_id,
1389 desc,
1390 details: CatalogTypeDetails {
1391 array_id: None,
1392 typ: typ.inner,
1393 pg_metadata: None,
1394 },
1395 resolved_ids,
1396 })
1397 }
1398 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1399 create_sql: secret.create_sql,
1400 global_id,
1401 }),
1402 Plan::CreateConnection(CreateConnectionPlan {
1403 connection:
1404 mz_sql::plan::Connection {
1405 create_sql,
1406 details,
1407 },
1408 ..
1409 }) => CatalogItem::Connection(Connection {
1410 create_sql,
1411 global_id,
1412 details,
1413 resolved_ids,
1414 }),
1415 _ => {
1416 return Err((
1417 Error::new(ErrorKind::Corruption {
1418 detail: "catalog entry generated inappropriate plan".to_string(),
1419 })
1420 .into(),
1421 cached_expr,
1422 ));
1423 }
1424 };
1425
1426 Ok((item, uncached_expr))
1427 }
1428
1429 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1435 let restore = self.system_configuration.clone();
1446 self.system_configuration.enable_for_item_parsing();
1447 let res = f(self);
1448 self.system_configuration = restore;
1449 res
1450 }
1451
1452 pub fn get_indexes_on(
1454 &self,
1455 id: GlobalId,
1456 cluster: ClusterId,
1457 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1458 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1459
1460 self.try_get_entry_by_global_id(&id)
1461 .into_iter()
1462 .map(move |e| {
1463 e.used_by()
1464 .iter()
1465 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1466 CatalogItem::Index(index) if index_matches(index) => {
1467 Some((index.global_id(), index))
1468 }
1469 _ => None,
1470 })
1471 })
1472 .flatten()
1473 }
1474
1475 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1476 &self.database_by_id[database_id]
1477 }
1478
1479 pub(super) fn try_get_cluster_replica(
1484 &self,
1485 id: ClusterId,
1486 replica_id: ReplicaId,
1487 ) -> Option<&ClusterReplica> {
1488 self.try_get_cluster(id)
1489 .and_then(|cluster| cluster.replica(replica_id))
1490 }
1491
1492 pub(super) fn get_cluster_replica(
1496 &self,
1497 cluster_id: ClusterId,
1498 replica_id: ReplicaId,
1499 ) -> &ClusterReplica {
1500 self.try_get_cluster_replica(cluster_id, replica_id)
1501 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1502 }
1503
1504 pub(super) fn resolve_replica_in_cluster(
1505 &self,
1506 cluster_id: &ClusterId,
1507 replica_name: &str,
1508 ) -> Result<&ClusterReplica, SqlCatalogError> {
1509 let cluster = self.get_cluster(*cluster_id);
1510 let replica_id = cluster
1511 .replica_id_by_name_
1512 .get(replica_name)
1513 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1514 Ok(&cluster.replicas_by_id_[replica_id])
1515 }
1516
1517 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1519 Ok(self.system_configuration.get(name)?)
1520 }
1521
1522 pub(super) fn parse_system_configuration(
1526 &self,
1527 name: &str,
1528 value: VarInput,
1529 ) -> Result<String, Error> {
1530 let value = self.system_configuration.parse(name, value)?;
1531 Ok(value.format())
1532 }
1533
1534 pub(super) fn resolve_schema_in_database(
1536 &self,
1537 database_spec: &ResolvedDatabaseSpecifier,
1538 schema_name: &str,
1539 conn_id: &ConnectionId,
1540 ) -> Result<&Schema, SqlCatalogError> {
1541 let schema = match database_spec {
1542 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1543 self.temporary_schemas.get(conn_id)
1544 }
1545 ResolvedDatabaseSpecifier::Ambient => self
1546 .ambient_schemas_by_name
1547 .get(schema_name)
1548 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1549 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1550 db.schemas_by_name
1551 .get(schema_name)
1552 .and_then(|id| db.schemas_by_id.get(id))
1553 }),
1554 };
1555 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1556 }
1557
1558 pub fn get_schema(
1559 &self,
1560 database_spec: &ResolvedDatabaseSpecifier,
1561 schema_spec: &SchemaSpecifier,
1562 conn_id: &ConnectionId,
1563 ) -> &Schema {
1564 match (database_spec, schema_spec) {
1566 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1567 &self.temporary_schemas[conn_id]
1568 }
1569 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1570 &self.ambient_schemas_by_id[id]
1571 }
1572
1573 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1574 &self.database_by_id[database_id].schemas_by_id[schema_id]
1575 }
1576 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1577 unreachable!("temporary schemas are in the ambient database")
1578 }
1579 }
1580 }
1581
1582 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1583 self.database_by_id
1584 .values()
1585 .filter_map(|database| database.schemas_by_id.get(schema_id))
1586 .chain(self.ambient_schemas_by_id.values())
1587 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1588 .into_first()
1589 }
1590
1591 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1592 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1593 }
1594
1595 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1596 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1597 }
1598
1599 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1600 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1601 }
1602
1603 pub fn get_information_schema_id(&self) -> SchemaId {
1604 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1605 }
1606
1607 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1608 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1609 }
1610
1611 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1612 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1613 }
1614
1615 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1616 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1617 }
1618
1619 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1620 SYSTEM_SCHEMAS
1621 .iter()
1622 .map(|name| self.ambient_schemas_by_name[*name])
1623 }
1624
1625 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1626 self.system_schema_ids().contains(&id)
1627 }
1628
1629 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1630 match spec {
1631 SchemaSpecifier::Temporary => false,
1632 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1633 }
1634 }
1635
1636 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1637 UNSTABLE_SCHEMAS
1638 .iter()
1639 .map(|name| self.ambient_schemas_by_name[*name])
1640 }
1641
1642 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1643 self.unstable_schema_ids().contains(&id)
1644 }
1645
1646 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1647 match spec {
1648 SchemaSpecifier::Temporary => false,
1649 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1650 }
1651 }
1652
1653 pub fn create_temporary_schema(
1656 &mut self,
1657 conn_id: &ConnectionId,
1658 owner_id: RoleId,
1659 ) -> Result<(), Error> {
1660 let oid = INVALID_OID;
1665 self.temporary_schemas.insert(
1666 conn_id.clone(),
1667 Schema {
1668 name: QualifiedSchemaName {
1669 database: ResolvedDatabaseSpecifier::Ambient,
1670 schema: MZ_TEMP_SCHEMA.into(),
1671 },
1672 id: SchemaSpecifier::Temporary,
1673 oid,
1674 items: BTreeMap::new(),
1675 functions: BTreeMap::new(),
1676 types: BTreeMap::new(),
1677 owner_id,
1678 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1679 mz_sql::catalog::ObjectType::Schema,
1680 owner_id,
1681 )]),
1682 },
1683 );
1684 Ok(())
1685 }
1686
1687 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1689 std::iter::empty()
1690 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1691 if schema.id.is_temporary() {
1692 Some(schema.oid)
1693 } else {
1694 None
1695 }
1696 }))
1697 .chain(self.entry_by_id.values().filter_map(|entry| {
1698 if entry.item().is_temporary() {
1699 Some(entry.oid)
1700 } else {
1701 None
1702 }
1703 }))
1704 }
1705
1706 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1710 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1711 }
1712
1713 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1717 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1718 let log = match self.get_entry(&item_id).item() {
1719 CatalogItem::Log(log) => log,
1720 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1721 };
1722 (item_id, log.global_id)
1723 }
1724
1725 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1729 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1730 }
1731
1732 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1736 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1737 let schema = &self.ambient_schemas_by_id[schema_id];
1738 match builtin.catalog_item_type() {
1739 CatalogItemType::Type => schema.types[builtin.name()],
1740 CatalogItemType::Func => schema.functions[builtin.name()],
1741 CatalogItemType::Table
1742 | CatalogItemType::Source
1743 | CatalogItemType::Sink
1744 | CatalogItemType::View
1745 | CatalogItemType::MaterializedView
1746 | CatalogItemType::Index
1747 | CatalogItemType::Secret
1748 | CatalogItemType::Connection
1749 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1750 }
1751 }
1752
1753 pub fn resolve_builtin_type_references(
1755 &self,
1756 builtin: &BuiltinType<NameReference>,
1757 ) -> BuiltinType<IdReference> {
1758 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1759 CatalogType::AclItem => CatalogType::AclItem,
1760 CatalogType::Array { element_reference } => CatalogType::Array {
1761 element_reference: self.get_system_type(element_reference).id,
1762 },
1763 CatalogType::List {
1764 element_reference,
1765 element_modifiers,
1766 } => CatalogType::List {
1767 element_reference: self.get_system_type(element_reference).id,
1768 element_modifiers: element_modifiers.clone(),
1769 },
1770 CatalogType::Map {
1771 key_reference,
1772 value_reference,
1773 key_modifiers,
1774 value_modifiers,
1775 } => CatalogType::Map {
1776 key_reference: self.get_system_type(key_reference).id,
1777 value_reference: self.get_system_type(value_reference).id,
1778 key_modifiers: key_modifiers.clone(),
1779 value_modifiers: value_modifiers.clone(),
1780 },
1781 CatalogType::Range { element_reference } => CatalogType::Range {
1782 element_reference: self.get_system_type(element_reference).id,
1783 },
1784 CatalogType::Record { fields } => CatalogType::Record {
1785 fields: fields
1786 .into_iter()
1787 .map(|f| CatalogRecordField {
1788 name: f.name.clone(),
1789 type_reference: self.get_system_type(f.type_reference).id,
1790 type_modifiers: f.type_modifiers.clone(),
1791 })
1792 .collect(),
1793 },
1794 CatalogType::Bool => CatalogType::Bool,
1795 CatalogType::Bytes => CatalogType::Bytes,
1796 CatalogType::Char => CatalogType::Char,
1797 CatalogType::Date => CatalogType::Date,
1798 CatalogType::Float32 => CatalogType::Float32,
1799 CatalogType::Float64 => CatalogType::Float64,
1800 CatalogType::Int16 => CatalogType::Int16,
1801 CatalogType::Int32 => CatalogType::Int32,
1802 CatalogType::Int64 => CatalogType::Int64,
1803 CatalogType::UInt16 => CatalogType::UInt16,
1804 CatalogType::UInt32 => CatalogType::UInt32,
1805 CatalogType::UInt64 => CatalogType::UInt64,
1806 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1807 CatalogType::Interval => CatalogType::Interval,
1808 CatalogType::Jsonb => CatalogType::Jsonb,
1809 CatalogType::Numeric => CatalogType::Numeric,
1810 CatalogType::Oid => CatalogType::Oid,
1811 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1812 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1813 CatalogType::Pseudo => CatalogType::Pseudo,
1814 CatalogType::RegClass => CatalogType::RegClass,
1815 CatalogType::RegProc => CatalogType::RegProc,
1816 CatalogType::RegType => CatalogType::RegType,
1817 CatalogType::String => CatalogType::String,
1818 CatalogType::Time => CatalogType::Time,
1819 CatalogType::Timestamp => CatalogType::Timestamp,
1820 CatalogType::TimestampTz => CatalogType::TimestampTz,
1821 CatalogType::Uuid => CatalogType::Uuid,
1822 CatalogType::VarChar => CatalogType::VarChar,
1823 CatalogType::Int2Vector => CatalogType::Int2Vector,
1824 CatalogType::MzAclItem => CatalogType::MzAclItem,
1825 };
1826
1827 BuiltinType {
1828 name: builtin.name,
1829 schema: builtin.schema,
1830 oid: builtin.oid,
1831 details: CatalogTypeDetails {
1832 array_id: builtin.details.array_id,
1833 typ,
1834 pg_metadata: builtin.details.pg_metadata.clone(),
1835 },
1836 }
1837 }
1838
1839 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1840 &self.config
1841 }
1842
1843 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1844 match self.database_by_name.get(database_name) {
1845 Some(id) => Ok(&self.database_by_id[id]),
1846 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1847 }
1848 }
1849
1850 pub fn resolve_schema(
1851 &self,
1852 current_database: Option<&DatabaseId>,
1853 database_name: Option<&str>,
1854 schema_name: &str,
1855 conn_id: &ConnectionId,
1856 ) -> Result<&Schema, SqlCatalogError> {
1857 let database_spec = match database_name {
1858 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1863 self.resolve_database(database)?.id().clone(),
1864 )),
1865 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1866 };
1867
1868 if let Some(database_spec) = database_spec {
1870 if let Ok(schema) =
1871 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1872 {
1873 return Ok(schema);
1874 }
1875 }
1876
1877 if let Ok(schema) = self.resolve_schema_in_database(
1879 &ResolvedDatabaseSpecifier::Ambient,
1880 schema_name,
1881 conn_id,
1882 ) {
1883 return Ok(schema);
1884 }
1885
1886 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1887 }
1888
1889 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1893 self.ambient_schemas_by_name[name]
1894 }
1895
1896 pub fn resolve_search_path(
1897 &self,
1898 session: &dyn SessionMetadata,
1899 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1900 let database = self
1901 .database_by_name
1902 .get(session.database())
1903 .map(|id| id.clone());
1904
1905 session
1906 .search_path()
1907 .iter()
1908 .map(|schema| {
1909 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1910 })
1911 .filter_map(|schema| schema.ok())
1912 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1913 .collect()
1914 }
1915
1916 pub fn effective_search_path(
1917 &self,
1918 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1919 include_temp_schema: bool,
1920 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1921 let mut v = Vec::with_capacity(search_path.len() + 3);
1922 let temp_schema = (
1924 ResolvedDatabaseSpecifier::Ambient,
1925 SchemaSpecifier::Temporary,
1926 );
1927 if include_temp_schema && !search_path.contains(&temp_schema) {
1928 v.push(temp_schema);
1929 }
1930 let default_schemas = [
1931 (
1932 ResolvedDatabaseSpecifier::Ambient,
1933 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1934 ),
1935 (
1936 ResolvedDatabaseSpecifier::Ambient,
1937 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1938 ),
1939 ];
1940 for schema in default_schemas.into_iter() {
1941 if !search_path.contains(&schema) {
1942 v.push(schema);
1943 }
1944 }
1945 v.extend_from_slice(search_path);
1946 v
1947 }
1948
1949 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1950 let id = self
1951 .clusters_by_name
1952 .get(name)
1953 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1954 Ok(&self.clusters_by_id[id])
1955 }
1956
1957 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1958 let id = self
1959 .clusters_by_name
1960 .get(cluster.name)
1961 .expect("failed to lookup BuiltinCluster by name");
1962 self.clusters_by_id
1963 .get(id)
1964 .expect("failed to lookup BuiltinCluster by ID")
1965 }
1966
1967 pub fn resolve_cluster_replica(
1968 &self,
1969 cluster_replica_name: &QualifiedReplica,
1970 ) -> Result<&ClusterReplica, SqlCatalogError> {
1971 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
1972 let replica_name = cluster_replica_name.replica.as_str();
1973 let replica_id = cluster
1974 .replica_id(replica_name)
1975 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1976 Ok(cluster.replica(replica_id).expect("Must exist"))
1977 }
1978
1979 #[allow(clippy::useless_let_if_seq)]
1985 pub fn resolve(
1986 &self,
1987 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
1988 current_database: Option<&DatabaseId>,
1989 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
1990 name: &PartialItemName,
1991 conn_id: &ConnectionId,
1992 err_gen: fn(String) -> SqlCatalogError,
1993 ) -> Result<&CatalogEntry, SqlCatalogError> {
1994 let schemas = match &name.schema {
1999 Some(schema_name) => {
2000 match self.resolve_schema(
2001 current_database,
2002 name.database.as_deref(),
2003 schema_name,
2004 conn_id,
2005 ) {
2006 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2007 Err(e) => return Err(e),
2008 }
2009 }
2010 None => match self
2011 .get_schema(
2012 &ResolvedDatabaseSpecifier::Ambient,
2013 &SchemaSpecifier::Temporary,
2014 conn_id,
2015 )
2016 .items
2017 .get(&name.item)
2018 {
2019 Some(id) => return Ok(self.get_entry(id)),
2020 None => search_path.to_vec(),
2021 },
2022 };
2023
2024 for (database_spec, schema_spec) in &schemas {
2025 let schema = self.get_schema(database_spec, schema_spec, conn_id);
2026
2027 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2028 return Ok(&self.entry_by_id[id]);
2029 }
2030 }
2031
2032 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2037 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2038 for schema_id in [
2039 self.get_mz_catalog_unstable_schema_id(),
2040 self.get_mz_introspection_schema_id(),
2041 ] {
2042 let schema = self.get_schema(
2043 &ResolvedDatabaseSpecifier::Ambient,
2044 &SchemaSpecifier::Id(schema_id),
2045 conn_id,
2046 );
2047
2048 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2049 debug!(
2050 github_27831 = true,
2051 "encountered use of outdated schema `mz_internal` for relation: {name}",
2052 );
2053 return Ok(&self.entry_by_id[id]);
2054 }
2055 }
2056 }
2057
2058 Err(err_gen(name.to_string()))
2059 }
2060
2061 pub fn resolve_entry(
2063 &self,
2064 current_database: Option<&DatabaseId>,
2065 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2066 name: &PartialItemName,
2067 conn_id: &ConnectionId,
2068 ) -> Result<&CatalogEntry, SqlCatalogError> {
2069 self.resolve(
2070 |schema| &schema.items,
2071 current_database,
2072 search_path,
2073 name,
2074 conn_id,
2075 SqlCatalogError::UnknownItem,
2076 )
2077 }
2078
2079 pub fn resolve_function(
2081 &self,
2082 current_database: Option<&DatabaseId>,
2083 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2084 name: &PartialItemName,
2085 conn_id: &ConnectionId,
2086 ) -> Result<&CatalogEntry, SqlCatalogError> {
2087 self.resolve(
2088 |schema| &schema.functions,
2089 current_database,
2090 search_path,
2091 name,
2092 conn_id,
2093 |name| SqlCatalogError::UnknownFunction {
2094 name,
2095 alternative: None,
2096 },
2097 )
2098 }
2099
2100 pub fn resolve_type(
2102 &self,
2103 current_database: Option<&DatabaseId>,
2104 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2105 name: &PartialItemName,
2106 conn_id: &ConnectionId,
2107 ) -> Result<&CatalogEntry, SqlCatalogError> {
2108 static NON_PG_CATALOG_TYPES: LazyLock<
2109 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2110 > = LazyLock::new(|| {
2111 BUILTINS::types()
2112 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2113 .map(|typ| (typ.name, typ))
2114 .collect()
2115 });
2116
2117 let entry = self.resolve(
2118 |schema| &schema.types,
2119 current_database,
2120 search_path,
2121 name,
2122 conn_id,
2123 |name| SqlCatalogError::UnknownType { name },
2124 )?;
2125
2126 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2127 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2128 warn!(
2129 "user specified an incorrect schema of {} for the type {}, which should be in \
2130 the {} schema. This works now due to a bug but will be fixed in a later release.",
2131 PG_CATALOG_SCHEMA.quoted(),
2132 typ.name.quoted(),
2133 typ.schema.quoted(),
2134 )
2135 }
2136 }
2137
2138 Ok(entry)
2139 }
2140
2141 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2143 match object_id {
2144 ObjectId::Item(item_id) => {
2145 let entry = self.get_entry(&item_id);
2146 match entry.item_type() {
2147 CatalogItemType::Table => CommentObjectId::Table(item_id),
2148 CatalogItemType::Source => CommentObjectId::Source(item_id),
2149 CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2150 CatalogItemType::View => CommentObjectId::View(item_id),
2151 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2152 CatalogItemType::Index => CommentObjectId::Index(item_id),
2153 CatalogItemType::Func => CommentObjectId::Func(item_id),
2154 CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2155 CatalogItemType::Type => CommentObjectId::Type(item_id),
2156 CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2157 CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2158 }
2159 }
2160 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2161 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2162 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2163 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2164 ObjectId::ClusterReplica(cluster_replica_id) => {
2165 CommentObjectId::ClusterReplica(cluster_replica_id)
2166 }
2167 ObjectId::NetworkPolicy(network_policy_id) => {
2168 CommentObjectId::NetworkPolicy(network_policy_id)
2169 }
2170 }
2171 }
2172
2173 pub fn system_config(&self) -> &SystemVars {
2175 &self.system_configuration
2176 }
2177
2178 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2180 &mut self.system_configuration
2181 }
2182
2183 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2193 let mut dump = serde_json::to_value(&self).map_err(|e| {
2195 Error::new(ErrorKind::Unstructured(format!(
2196 "internal error: could not dump catalog: {}",
2199 e
2200 )))
2201 })?;
2202
2203 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2204 dump_obj.insert(
2206 "system_parameter_defaults".into(),
2207 serde_json::json!(self.system_config().defaults()),
2208 );
2209 if let Some(unfinalized_shards) = unfinalized_shards {
2211 dump_obj
2212 .get_mut("storage_metadata")
2213 .expect("known to exist")
2214 .as_object_mut()
2215 .expect("storage_metadata is an object")
2216 .insert(
2217 "unfinalized_shards".into(),
2218 serde_json::json!(unfinalized_shards),
2219 );
2220 }
2221 let temporary_gids: Vec<_> = self
2226 .entry_by_global_id
2227 .iter()
2228 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2229 .map(|(gid, _item_id)| *gid)
2230 .collect();
2231 if !temporary_gids.is_empty() {
2232 let gids = dump_obj
2233 .get_mut("entry_by_global_id")
2234 .expect("known_to_exist")
2235 .as_object_mut()
2236 .expect("entry_by_global_id is an object");
2237 for gid in temporary_gids {
2238 gids.remove(&gid.to_string());
2239 }
2240 }
2241
2242 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2244 }
2245
2246 pub fn availability_zones(&self) -> &[String] {
2247 &self.availability_zones
2248 }
2249
2250 pub fn concretize_replica_location(
2251 &self,
2252 location: mz_catalog::durable::ReplicaLocation,
2253 allowed_sizes: &Vec<String>,
2254 allowed_availability_zones: Option<&[String]>,
2255 ) -> Result<ReplicaLocation, Error> {
2256 let location = match location {
2257 mz_catalog::durable::ReplicaLocation::Unmanaged {
2258 storagectl_addrs,
2259 computectl_addrs,
2260 } => {
2261 if allowed_availability_zones.is_some() {
2262 return Err(Error {
2263 kind: ErrorKind::Internal(
2264 "tried concretize unmanaged replica with specific availability_zones"
2265 .to_string(),
2266 ),
2267 });
2268 }
2269 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2270 storagectl_addrs,
2271 computectl_addrs,
2272 })
2273 }
2274 mz_catalog::durable::ReplicaLocation::Managed {
2275 size,
2276 availability_zone,
2277 billed_as,
2278 internal,
2279 pending,
2280 } => {
2281 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2282 let message = "tried concretize managed replica with specific availability zones and availability zone";
2283 return Err(Error {
2284 kind: ErrorKind::Internal(message.to_string()),
2285 });
2286 }
2287 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2288 let cluster_replica_sizes = &self.cluster_replica_sizes;
2289
2290 ReplicaLocation::Managed(ManagedReplicaLocation {
2291 allocation: cluster_replica_sizes
2292 .0
2293 .get(&size)
2294 .expect("catalog out of sync")
2295 .clone(),
2296 availability_zones: match (availability_zone, allowed_availability_zones) {
2297 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2298 (None, Some(azs)) if azs.is_empty() => {
2299 ManagedReplicaAvailabilityZones::FromCluster(None)
2300 }
2301 (None, Some(azs)) => {
2302 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2303 }
2304 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2305 },
2306 size,
2307 billed_as,
2308 internal,
2309 pending,
2310 })
2311 }
2312 };
2313 Ok(location)
2314 }
2315
2316 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2322 let alloc = &self.cluster_replica_sizes.0[size];
2323 alloc.disk_limit != Some(DiskLimit::ZERO)
2324 }
2325
2326 pub(crate) fn ensure_valid_replica_size(
2327 &self,
2328 allowed_sizes: &[String],
2329 size: &String,
2330 ) -> Result<(), Error> {
2331 let cluster_replica_sizes = &self.cluster_replica_sizes;
2332
2333 if !cluster_replica_sizes.0.contains_key(size)
2334 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2335 || cluster_replica_sizes.0[size].disabled
2336 {
2337 let mut entries = cluster_replica_sizes
2338 .enabled_allocations()
2339 .collect::<Vec<_>>();
2340
2341 if !allowed_sizes.is_empty() {
2342 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2343 entries.retain(|(name, _)| allowed_sizes.contains(name));
2344 }
2345
2346 entries.sort_by_key(
2347 |(
2348 _name,
2349 ReplicaAllocation {
2350 scale, cpu_limit, ..
2351 },
2352 )| (scale, cpu_limit),
2353 );
2354
2355 Err(Error {
2356 kind: ErrorKind::InvalidClusterReplicaSize {
2357 size: size.to_owned(),
2358 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2359 },
2360 })
2361 } else {
2362 Ok(())
2363 }
2364 }
2365
2366 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2367 if role_id.is_builtin() {
2368 let role = self.get_role(role_id);
2369 Err(Error::new(ErrorKind::ReservedRoleName(
2370 role.name().to_string(),
2371 )))
2372 } else {
2373 Ok(())
2374 }
2375 }
2376
2377 pub fn ensure_not_reserved_network_policy(
2378 &self,
2379 network_policy_id: &NetworkPolicyId,
2380 ) -> Result<(), Error> {
2381 if network_policy_id.is_builtin() {
2382 let policy = self.get_network_policy(network_policy_id);
2383 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2384 policy.name.clone(),
2385 )))
2386 } else {
2387 Ok(())
2388 }
2389 }
2390
2391 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2392 let is_grantable = !role_id.is_public() && !role_id.is_system();
2393 if is_grantable {
2394 Ok(())
2395 } else {
2396 let role = self.get_role(role_id);
2397 Err(Error::new(ErrorKind::UngrantableRoleName(
2398 role.name().to_string(),
2399 )))
2400 }
2401 }
2402
2403 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2404 if role_id.is_system() {
2405 let role = self.get_role(role_id);
2406 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2407 role.name().to_string(),
2408 )))
2409 } else {
2410 Ok(())
2411 }
2412 }
2413
2414 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2415 if role_id.is_predefined() {
2416 let role = self.get_role(role_id);
2417 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2418 role.name().to_string(),
2419 )))
2420 } else {
2421 Ok(())
2422 }
2423 }
2424
2425 pub(crate) fn add_to_audit_log(
2428 system_configuration: &SystemVars,
2429 oracle_write_ts: mz_repr::Timestamp,
2430 session: Option<&ConnMeta>,
2431 tx: &mut mz_catalog::durable::Transaction,
2432 audit_events: &mut Vec<VersionedEvent>,
2433 event_type: EventType,
2434 object_type: ObjectType,
2435 details: EventDetails,
2436 ) -> Result<(), Error> {
2437 let user = session.map(|session| session.user().name.to_string());
2438
2439 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2442 Some(ts) => ts.into(),
2443 _ => oracle_write_ts.into(),
2444 };
2445 let id = tx.allocate_audit_log_id()?;
2446 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2447 audit_events.push(event.clone());
2448 tx.insert_audit_log_event(event);
2449 Ok(())
2450 }
2451
2452 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2453 match id {
2454 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2455 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2456 self.get_cluster_replica(*cluster_id, *replica_id)
2457 .owner_id(),
2458 ),
2459 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2460 ObjectId::Schema((database_spec, schema_spec)) => Some(
2461 self.get_schema(database_spec, schema_spec, conn_id)
2462 .owner_id(),
2463 ),
2464 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2465 ObjectId::Role(_) => None,
2466 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2467 }
2468 }
2469
2470 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2471 match object_id {
2472 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2473 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2474 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2475 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2476 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2477 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2478 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2479 }
2480 }
2481
2482 pub(super) fn get_system_object_type(
2483 &self,
2484 id: &SystemObjectId,
2485 ) -> mz_sql::catalog::SystemObjectType {
2486 match id {
2487 SystemObjectId::Object(object_id) => {
2488 SystemObjectType::Object(self.get_object_type(object_id))
2489 }
2490 SystemObjectId::System => SystemObjectType::System,
2491 }
2492 }
2493
2494 pub fn storage_metadata(&self) -> &StorageMetadata {
2498 &self.storage_metadata
2499 }
2500
2501 pub fn source_compaction_windows(
2503 &self,
2504 ids: impl IntoIterator<Item = CatalogItemId>,
2505 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2506 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2507 let mut seen = BTreeSet::new();
2508 for item_id in ids {
2509 if !seen.insert(item_id) {
2510 continue;
2511 }
2512 let entry = self.get_entry(&item_id);
2513 match entry.item() {
2514 CatalogItem::Source(source) => {
2515 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2516 match source.data_source {
2517 DataSourceDesc::Ingestion { .. } => {
2518 cws.entry(source_cw).or_default().insert(item_id);
2519 }
2520 DataSourceDesc::IngestionExport { .. } => {
2521 cws.entry(source_cw).or_default().insert(item_id);
2522 }
2523 DataSourceDesc::Introspection(_)
2524 | DataSourceDesc::Progress
2525 | DataSourceDesc::Webhook { .. } => {
2526 cws.entry(source_cw).or_default().insert(item_id);
2527 }
2528 }
2529 }
2530 CatalogItem::Table(table) => {
2531 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2532 match &table.data_source {
2533 TableDataSource::DataSource {
2534 desc: DataSourceDesc::IngestionExport { .. },
2535 timeline: _,
2536 } => {
2537 cws.entry(table_cw).or_default().insert(item_id);
2538 }
2539 _ => {}
2540 }
2541 }
2542 _ => {
2543 continue;
2545 }
2546 }
2547 }
2548 cws
2549 }
2550
2551 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2552 match id {
2553 CommentObjectId::Table(id)
2554 | CommentObjectId::View(id)
2555 | CommentObjectId::MaterializedView(id)
2556 | CommentObjectId::Source(id)
2557 | CommentObjectId::Sink(id)
2558 | CommentObjectId::Index(id)
2559 | CommentObjectId::Func(id)
2560 | CommentObjectId::Connection(id)
2561 | CommentObjectId::Type(id)
2562 | CommentObjectId::Secret(id)
2563 | CommentObjectId::ContinualTask(id) => Some(*id),
2564 CommentObjectId::Role(_)
2565 | CommentObjectId::Database(_)
2566 | CommentObjectId::Schema(_)
2567 | CommentObjectId::Cluster(_)
2568 | CommentObjectId::ClusterReplica(_)
2569 | CommentObjectId::NetworkPolicy(_) => None,
2570 }
2571 }
2572
2573 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2574 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2575 }
2576
2577 pub fn comment_id_to_audit_log_name(
2578 &self,
2579 id: CommentObjectId,
2580 conn_id: &ConnectionId,
2581 ) -> String {
2582 match id {
2583 CommentObjectId::Table(id)
2584 | CommentObjectId::View(id)
2585 | CommentObjectId::MaterializedView(id)
2586 | CommentObjectId::Source(id)
2587 | CommentObjectId::Sink(id)
2588 | CommentObjectId::Index(id)
2589 | CommentObjectId::Func(id)
2590 | CommentObjectId::Connection(id)
2591 | CommentObjectId::Type(id)
2592 | CommentObjectId::Secret(id)
2593 | CommentObjectId::ContinualTask(id) => {
2594 let item = self.get_entry(&id);
2595 let name = self.resolve_full_name(item.name(), Some(conn_id));
2596 name.to_string()
2597 }
2598 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2599 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2600 CommentObjectId::Schema((spec, schema_id)) => {
2601 let schema = self.get_schema(&spec, &schema_id, conn_id);
2602 self.resolve_full_schema_name(&schema.name).to_string()
2603 }
2604 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2605 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2606 let cluster = self.get_cluster(cluster_id);
2607 let replica = self.get_cluster_replica(cluster_id, replica_id);
2608 QualifiedReplica {
2609 cluster: Ident::new_unchecked(cluster.name.clone()),
2610 replica: Ident::new_unchecked(replica.name.clone()),
2611 }
2612 .to_string()
2613 }
2614 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2615 }
2616 }
2617}
2618
2619impl ConnectionResolver for CatalogState {
2620 fn resolve_connection(
2621 &self,
2622 id: CatalogItemId,
2623 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2624 use mz_storage_types::connections::Connection::*;
2625 match self
2626 .get_entry(&id)
2627 .connection()
2628 .expect("catalog out of sync")
2629 .details
2630 .to_connection()
2631 {
2632 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2633 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2634 Csr(conn) => Csr(conn.into_inline_connection(self)),
2635 Ssh(conn) => Ssh(conn),
2636 Aws(conn) => Aws(conn),
2637 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2638 MySql(conn) => MySql(conn.into_inline_connection(self)),
2639 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2640 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2641 }
2642 }
2643}
2644
2645impl OptimizerCatalog for CatalogState {
2646 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2647 CatalogState::get_entry_by_global_id(self, id)
2648 }
2649 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2650 CatalogState::get_entry(self, id)
2651 }
2652 fn resolve_full_name(
2653 &self,
2654 name: &QualifiedItemName,
2655 conn_id: Option<&ConnectionId>,
2656 ) -> FullItemName {
2657 CatalogState::resolve_full_name(self, name, conn_id)
2658 }
2659 fn get_indexes_on(
2660 &self,
2661 id: GlobalId,
2662 cluster: ClusterId,
2663 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2664 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2665 }
2666}
2667
2668impl OptimizerCatalog for Catalog {
2669 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2670 self.state.get_entry_by_global_id(id)
2671 }
2672
2673 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2674 self.state.get_entry(id)
2675 }
2676
2677 fn resolve_full_name(
2678 &self,
2679 name: &QualifiedItemName,
2680 conn_id: Option<&ConnectionId>,
2681 ) -> FullItemName {
2682 self.state.resolve_full_name(name, conn_id)
2683 }
2684
2685 fn get_indexes_on(
2686 &self,
2687 id: GlobalId,
2688 cluster: ClusterId,
2689 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2690 Box::new(self.state.get_indexes_on(id, cluster))
2691 }
2692}
2693
2694impl Catalog {
2695 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2696 self
2697 }
2698}