1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71 TypeReference,
72};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::ConnMeta;
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
119 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
121 #[serde(serialize_with = "skip_temp_items")]
122 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
125 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
128 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
131 pub(super) roles_by_name: BTreeMap<String, RoleId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
134 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
135 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
137 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
139
140 #[serde(skip)]
141 pub(super) system_configuration: SystemVars,
142 pub(super) default_privileges: DefaultPrivileges,
143 pub(super) system_privileges: PrivilegeMap,
144 pub(super) comments: CommentsMap,
145 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
147 pub(super) storage_metadata: StorageMetadata,
148 pub(super) mock_authentication_nonce: Option<String>,
149
150 #[serde(skip)]
152 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
153
154 #[serde(skip)]
156 pub(super) config: mz_sql::catalog::CatalogConfig,
157 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158 #[serde(skip)]
159 pub(crate) availability_zones: Vec<String>,
160
161 #[serde(skip)]
163 pub(super) egress_addresses: Vec<IpNet>,
164 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166 pub(super) http_host_name: Option<String>,
167
168 #[serde(skip)]
170 pub(super) license_key: ValidatedLicenseKey,
171}
172
173#[derive(Debug, Clone, Serialize)]
175pub(crate) enum LocalExpressionCache {
176 Open {
178 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
180 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
182 },
183 Closed,
185}
186
187impl LocalExpressionCache {
188 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
189 Self::Open {
190 cached_exprs,
191 uncached_exprs: BTreeMap::new(),
192 }
193 }
194
195 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
196 match self {
197 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
198 LocalExpressionCache::Closed => None,
199 }
200 }
201
202 pub(super) fn insert_cached_expression(
205 &mut self,
206 id: GlobalId,
207 local_expressions: LocalExpressions,
208 ) {
209 match self {
210 LocalExpressionCache::Open { cached_exprs, .. } => {
211 cached_exprs.insert(id, local_expressions);
212 }
213 LocalExpressionCache::Closed => {}
214 }
215 }
216
217 pub(super) fn insert_uncached_expression(
220 &mut self,
221 id: GlobalId,
222 local_mir: OptimizedMirRelationExpr,
223 optimizer_features: OptimizerFeatures,
224 ) {
225 match self {
226 LocalExpressionCache::Open { uncached_exprs, .. } => {
227 let local_expr = LocalExpressions {
228 local_mir,
229 optimizer_features,
230 };
231 let prev = uncached_exprs.remove(&id);
238 match prev {
239 Some(prev) if prev == local_expr => {
240 uncached_exprs.insert(id, local_expr);
241 }
242 None => {
243 uncached_exprs.insert(id, local_expr);
244 }
245 Some(_) => {}
246 }
247 }
248 LocalExpressionCache::Closed => {}
249 }
250 }
251
252 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
253 match self {
254 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
255 LocalExpressionCache::Closed => BTreeMap::new(),
256 }
257 }
258}
259
260fn skip_temp_items<S>(
261 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
262 serializer: S,
263) -> Result<S::Ok, S::Error>
264where
265 S: serde::Serializer,
266{
267 mz_ore::serde::map_key_to_string(
268 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
269 serializer,
270 )
271}
272
273impl CatalogState {
274 pub fn empty_test() -> Self {
278 CatalogState {
279 database_by_name: Default::default(),
280 database_by_id: Default::default(),
281 entry_by_id: Default::default(),
282 entry_by_global_id: Default::default(),
283 ambient_schemas_by_name: Default::default(),
284 ambient_schemas_by_id: Default::default(),
285 temporary_schemas: Default::default(),
286 clusters_by_id: Default::default(),
287 clusters_by_name: Default::default(),
288 network_policies_by_name: Default::default(),
289 roles_by_name: Default::default(),
290 roles_by_id: Default::default(),
291 network_policies_by_id: Default::default(),
292 role_auth_by_id: Default::default(),
293 config: CatalogConfig {
294 start_time: Default::default(),
295 start_instant: Instant::now(),
296 nonce: Default::default(),
297 environment_id: EnvironmentId::for_tests(),
298 session_id: Default::default(),
299 build_info: &DUMMY_BUILD_INFO,
300 timestamp_interval: Default::default(),
301 now: NOW_ZERO.clone(),
302 connection_context: ConnectionContext::for_tests(Arc::new(
303 InMemorySecretsController::new(),
304 )),
305 builtins_cfg: BuiltinsConfig {
306 include_continual_tasks: true,
307 },
308 helm_chart_version: None,
309 },
310 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
311 availability_zones: Default::default(),
312 system_configuration: Default::default(),
313 egress_addresses: Default::default(),
314 aws_principal_context: Default::default(),
315 aws_privatelink_availability_zones: Default::default(),
316 http_host_name: Default::default(),
317 default_privileges: Default::default(),
318 system_privileges: Default::default(),
319 comments: Default::default(),
320 source_references: Default::default(),
321 storage_metadata: Default::default(),
322 license_key: ValidatedLicenseKey::for_tests(),
323 mock_authentication_nonce: Default::default(),
324 }
325 }
326
327 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
328 let search_path = self.resolve_search_path(session);
329 let database = self
330 .database_by_name
331 .get(session.vars().database())
332 .map(|id| id.clone());
333 let state = match session.transaction().catalog_state() {
334 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
335 None => Cow::Borrowed(self),
336 };
337 ConnCatalog {
338 state,
339 unresolvable_ids: BTreeSet::new(),
340 conn_id: session.conn_id().clone(),
341 cluster: session.vars().cluster().into(),
342 database,
343 search_path,
344 role_id: session.current_role_id().clone(),
345 prepared_statements: Some(session.prepared_statements()),
346 portals: Some(session.portals()),
347 notices_tx: session.retain_notice_transmitter(),
348 }
349 }
350
351 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
352 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
353 let cluster = self.system_configuration.default_cluster();
354
355 ConnCatalog {
356 state: Cow::Borrowed(self),
357 unresolvable_ids: BTreeSet::new(),
358 conn_id: SYSTEM_CONN_ID.clone(),
359 cluster,
360 database: self
361 .resolve_database(DEFAULT_DATABASE_NAME)
362 .ok()
363 .map(|db| db.id()),
364 search_path: Vec::new(),
367 role_id,
368 prepared_statements: None,
369 portals: None,
370 notices_tx,
371 }
372 }
373
374 pub fn for_system_session(&self) -> ConnCatalog<'_> {
375 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
376 }
377
378 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
383 struct I<'a> {
384 queue: VecDeque<CatalogItemId>,
385 seen: BTreeSet<CatalogItemId>,
386 this: &'a CatalogState,
387 }
388 impl<'a> Iterator for I<'a> {
389 type Item = CatalogItemId;
390 fn next(&mut self) -> Option<Self::Item> {
391 if let Some(next) = self.queue.pop_front() {
392 for child in self.this.get_entry(&next).item().uses() {
393 if !self.seen.contains(&child) {
394 self.queue.push_back(child);
395 self.seen.insert(child);
396 }
397 }
398 Some(next)
399 } else {
400 None
401 }
402 }
403 }
404
405 I {
406 queue: [id].into_iter().collect(),
407 seen: [id].into_iter().collect(),
408 this: self,
409 }
410 }
411
412 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
415 let mut out = Vec::new();
416 self.introspection_dependencies_inner(id, &mut out);
417 out
418 }
419
420 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
421 match self.get_entry(&id).item() {
422 CatalogItem::Log(_) => out.push(id),
423 item @ (CatalogItem::View(_)
424 | CatalogItem::MaterializedView(_)
425 | CatalogItem::Connection(_)
426 | CatalogItem::ContinualTask(_)) => {
427 for item_id in item.references().items() {
429 self.introspection_dependencies_inner(*item_id, out);
430 }
431 }
432 CatalogItem::Sink(sink) => {
433 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
434 self.introspection_dependencies_inner(from_item_id, out)
435 }
436 CatalogItem::Index(idx) => {
437 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
438 self.introspection_dependencies_inner(on_item_id, out)
439 }
440 CatalogItem::Table(_)
441 | CatalogItem::Source(_)
442 | CatalogItem::Type(_)
443 | CatalogItem::Func(_)
444 | CatalogItem::Secret(_) => (),
445 }
446 }
447
448 pub(super) fn object_dependents(
454 &self,
455 object_ids: &Vec<ObjectId>,
456 conn_id: &ConnectionId,
457 seen: &mut BTreeSet<ObjectId>,
458 ) -> Vec<ObjectId> {
459 let mut dependents = Vec::new();
460 for object_id in object_ids {
461 match object_id {
462 ObjectId::Cluster(id) => {
463 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
464 }
465 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
466 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
467 ),
468 ObjectId::Database(id) => {
469 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
470 }
471 ObjectId::Schema((database_spec, schema_spec)) => {
472 dependents.extend_from_slice(&self.schema_dependents(
473 database_spec.clone(),
474 schema_spec.clone(),
475 conn_id,
476 seen,
477 ));
478 }
479 ObjectId::NetworkPolicy(id) => {
480 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
481 }
482 id @ ObjectId::Role(_) => {
483 let unseen = seen.insert(id.clone());
484 if unseen {
485 dependents.push(id.clone());
486 }
487 }
488 ObjectId::Item(id) => {
489 dependents.extend_from_slice(&self.item_dependents(*id, seen))
490 }
491 }
492 }
493 dependents
494 }
495
496 fn cluster_dependents(
503 &self,
504 cluster_id: ClusterId,
505 seen: &mut BTreeSet<ObjectId>,
506 ) -> Vec<ObjectId> {
507 let mut dependents = Vec::new();
508 let object_id = ObjectId::Cluster(cluster_id);
509 if !seen.contains(&object_id) {
510 seen.insert(object_id.clone());
511 let cluster = self.get_cluster(cluster_id);
512 for item_id in cluster.bound_objects() {
513 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
514 }
515 for replica_id in cluster.replica_ids().values() {
516 dependents.extend_from_slice(&self.cluster_replica_dependents(
517 cluster_id,
518 *replica_id,
519 seen,
520 ));
521 }
522 dependents.push(object_id);
523 }
524 dependents
525 }
526
527 pub(super) fn cluster_replica_dependents(
534 &self,
535 cluster_id: ClusterId,
536 replica_id: ReplicaId,
537 seen: &mut BTreeSet<ObjectId>,
538 ) -> Vec<ObjectId> {
539 let mut dependents = Vec::new();
540 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
541 if !seen.contains(&object_id) {
542 seen.insert(object_id.clone());
543 dependents.push(object_id);
544 }
545 dependents
546 }
547
548 fn database_dependents(
555 &self,
556 database_id: DatabaseId,
557 conn_id: &ConnectionId,
558 seen: &mut BTreeSet<ObjectId>,
559 ) -> Vec<ObjectId> {
560 let mut dependents = Vec::new();
561 let object_id = ObjectId::Database(database_id);
562 if !seen.contains(&object_id) {
563 seen.insert(object_id.clone());
564 let database = self.get_database(&database_id);
565 for schema_id in database.schema_ids().values() {
566 dependents.extend_from_slice(&self.schema_dependents(
567 ResolvedDatabaseSpecifier::Id(database_id),
568 SchemaSpecifier::Id(*schema_id),
569 conn_id,
570 seen,
571 ));
572 }
573 dependents.push(object_id);
574 }
575 dependents
576 }
577
578 fn schema_dependents(
585 &self,
586 database_spec: ResolvedDatabaseSpecifier,
587 schema_spec: SchemaSpecifier,
588 conn_id: &ConnectionId,
589 seen: &mut BTreeSet<ObjectId>,
590 ) -> Vec<ObjectId> {
591 let mut dependents = Vec::new();
592 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
593 if !seen.contains(&object_id) {
594 seen.insert(object_id.clone());
595 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
596 for item_id in schema.item_ids() {
597 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
598 }
599 dependents.push(object_id)
600 }
601 dependents
602 }
603
604 pub(super) fn item_dependents(
611 &self,
612 item_id: CatalogItemId,
613 seen: &mut BTreeSet<ObjectId>,
614 ) -> Vec<ObjectId> {
615 let mut dependents = Vec::new();
616 let object_id = ObjectId::Item(item_id);
617 if !seen.contains(&object_id) {
618 seen.insert(object_id.clone());
619 let entry = self.get_entry(&item_id);
620 for dependent_id in entry.used_by() {
621 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
622 }
623 dependents.push(object_id);
624 if let Some(progress_id) = entry.progress_id() {
628 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
629 }
630 }
631 dependents
632 }
633
634 pub(super) fn network_policy_dependents(
641 &self,
642 network_policy_id: NetworkPolicyId,
643 _seen: &mut BTreeSet<ObjectId>,
644 ) -> Vec<ObjectId> {
645 let object_id = ObjectId::NetworkPolicy(network_policy_id);
646 vec![object_id]
650 }
651
652 fn is_stable(&self, id: CatalogItemId) -> bool {
656 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
657 !self.is_unstable_schema_specifier(spec)
658 }
659
660 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
661 if self.system_config().unsafe_enable_unstable_dependencies() {
662 return Ok(());
663 }
664
665 let unstable_dependencies: Vec<_> = item
666 .references()
667 .items()
668 .filter(|id| !self.is_stable(**id))
669 .map(|id| self.get_entry(id).name().item.clone())
670 .collect();
671
672 if unstable_dependencies.is_empty() || item.is_temporary() {
676 Ok(())
677 } else {
678 let object_type = item.typ().to_string();
679 Err(Error {
680 kind: ErrorKind::UnstableDependency {
681 object_type,
682 unstable_dependencies,
683 },
684 })
685 }
686 }
687
688 pub fn resolve_full_name(
689 &self,
690 name: &QualifiedItemName,
691 conn_id: Option<&ConnectionId>,
692 ) -> FullItemName {
693 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
694
695 let database = match &name.qualifiers.database_spec {
696 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
697 ResolvedDatabaseSpecifier::Id(id) => {
698 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
699 }
700 };
701 let schema = self
702 .get_schema(
703 &name.qualifiers.database_spec,
704 &name.qualifiers.schema_spec,
705 conn_id,
706 )
707 .name()
708 .schema
709 .clone();
710 FullItemName {
711 database,
712 schema,
713 item: name.item.clone(),
714 }
715 }
716
717 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
718 let database = match &name.database {
719 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
720 ResolvedDatabaseSpecifier::Id(id) => {
721 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
722 }
723 };
724 FullSchemaName {
725 database,
726 schema: name.schema.clone(),
727 }
728 }
729
730 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
731 self.entry_by_id
732 .get(id)
733 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
734 }
735
736 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
737 let item_id = self
738 .entry_by_global_id
739 .get(id)
740 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
741
742 let entry = self.get_entry(item_id).clone();
743 let version = match entry.item() {
744 CatalogItem::Table(table) => {
745 let (version, _) = table
746 .collections
747 .iter()
748 .find(|(_verison, gid)| *gid == id)
749 .expect("version to exist");
750 RelationVersionSelector::Specific(*version)
751 }
752 _ => RelationVersionSelector::Latest,
753 };
754 CatalogCollectionEntry { entry, version }
755 }
756
757 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
758 self.entry_by_id.iter()
759 }
760
761 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
762 let schema = self
763 .temporary_schemas
764 .get(conn)
765 .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
766 schema.items.values().copied().map(ObjectId::from)
767 }
768
769 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
775 let mut res = None;
776 for schema_id in self.system_schema_ids() {
777 let schema = &self.ambient_schemas_by_id[&schema_id];
778 if let Some(global_id) = schema.types.get(name) {
779 match res {
780 None => res = Some(self.get_entry(global_id)),
781 Some(_) => panic!(
782 "only call get_system_type on objects uniquely identifiable in one system schema"
783 ),
784 }
785 }
786 }
787
788 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
789 }
790
791 pub fn get_item_by_name(
792 &self,
793 name: &QualifiedItemName,
794 conn_id: &ConnectionId,
795 ) -> Option<&CatalogEntry> {
796 self.get_schema(
797 &name.qualifiers.database_spec,
798 &name.qualifiers.schema_spec,
799 conn_id,
800 )
801 .items
802 .get(&name.item)
803 .and_then(|id| self.try_get_entry(id))
804 }
805
806 pub fn get_type_by_name(
807 &self,
808 name: &QualifiedItemName,
809 conn_id: &ConnectionId,
810 ) -> Option<&CatalogEntry> {
811 self.get_schema(
812 &name.qualifiers.database_spec,
813 &name.qualifiers.schema_spec,
814 conn_id,
815 )
816 .types
817 .get(&name.item)
818 .and_then(|id| self.try_get_entry(id))
819 }
820
821 pub(super) fn find_available_name(
822 &self,
823 mut name: QualifiedItemName,
824 conn_id: &ConnectionId,
825 ) -> QualifiedItemName {
826 let mut i = 0;
827 let orig_item_name = name.item.clone();
828 while self.get_item_by_name(&name, conn_id).is_some() {
829 i += 1;
830 name.item = format!("{}{}", orig_item_name, i);
831 }
832 name
833 }
834
835 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
836 self.entry_by_id.get(id)
837 }
838
839 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
840 let item_id = self.entry_by_global_id.get(id)?;
841 self.try_get_entry(item_id)
842 }
843
844 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
847 let entry = self.try_get_entry_by_global_id(id)?;
848 let desc = match entry.item() {
849 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
850 other => other.desc_opt(RelationVersionSelector::Latest)?,
852 };
853 Some(desc)
854 }
855
856 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
857 self.try_get_cluster(cluster_id)
858 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
859 }
860
861 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
862 self.clusters_by_id.get(&cluster_id)
863 }
864
865 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
866 self.roles_by_id.get(id)
867 }
868
869 pub fn get_role(&self, id: &RoleId) -> &Role {
870 self.roles_by_id.get(id).expect("catalog out of sync")
871 }
872
873 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
874 self.roles_by_id.keys()
875 }
876
877 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
878 self.roles_by_name
879 .get(role_name)
880 .map(|id| &self.roles_by_id[id])
881 }
882
883 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
884 self.role_auth_by_id
885 .get(id)
886 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
887 }
888
889 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
890 self.role_auth_by_id.get(id)
891 }
892
893 pub(super) fn try_get_network_policy_by_name(
894 &self,
895 policy_name: &str,
896 ) -> Option<&NetworkPolicy> {
897 self.network_policies_by_name
898 .get(policy_name)
899 .map(|id| &self.network_policies_by_id[id])
900 }
901
902 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
903 let mut membership = BTreeSet::new();
904 let mut queue = VecDeque::from(vec![id]);
905 while let Some(cur_id) = queue.pop_front() {
906 if !membership.contains(cur_id) {
907 membership.insert(cur_id.clone());
908 let role = self.get_role(cur_id);
909 soft_assert_no_log!(
910 !role.membership().keys().contains(id),
911 "circular membership exists in the catalog"
912 );
913 queue.extend(role.membership().keys());
914 }
915 }
916 membership.insert(RoleId::Public);
917 membership
918 }
919
920 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
921 self.network_policies_by_id
922 .get(id)
923 .expect("catalog out of sync")
924 }
925
926 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
927 self.network_policies_by_id.keys()
928 }
929
930 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
935 let entry = self.try_get_entry(id)?;
936 let name = self.resolve_full_name(entry.name(), None);
938 let host_name = self
939 .http_host_name
940 .as_ref()
941 .map(|x| x.as_str())
942 .unwrap_or_else(|| "HOST");
943
944 let RawDatabaseSpecifier::Name(database) = name.database else {
945 return None;
946 };
947
948 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
949 url.path_segments_mut()
950 .ok()?
951 .push(&database)
952 .push(&name.schema)
953 .push(&name.item);
954
955 Some(url)
956 }
957
958 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
966 &mut self,
969 create_sql: &str,
970 force_if_exists_skip: bool,
971 ) -> Result<(Plan, ResolvedIds), AdapterError> {
972 self.with_enable_for_item_parsing(|state| {
973 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
974 let pcx = Some(&pcx);
975 let session_catalog = state.for_system_session();
976
977 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
978 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
979 let plan =
980 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
981
982 Ok((plan, resolved_ids))
983 })
984 }
985
986 #[mz_ore::instrument]
988 pub(crate) fn parse_plan(
989 create_sql: &str,
990 pcx: Option<&PlanContext>,
991 catalog: &ConnCatalog,
992 ) -> Result<(Plan, ResolvedIds), AdapterError> {
993 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
994 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
995 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
996
997 return Ok((plan, resolved_ids));
998 }
999
1000 pub(crate) fn deserialize_item(
1002 &self,
1003 global_id: GlobalId,
1004 create_sql: &str,
1005 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1006 local_expression_cache: &mut LocalExpressionCache,
1007 previous_item: Option<CatalogItem>,
1008 ) -> Result<CatalogItem, AdapterError> {
1009 self.parse_item(
1010 global_id,
1011 create_sql,
1012 extra_versions,
1013 None,
1014 false,
1015 None,
1016 local_expression_cache,
1017 previous_item,
1018 )
1019 }
1020
1021 #[mz_ore::instrument]
1023 pub(crate) fn parse_item(
1024 &self,
1025 global_id: GlobalId,
1026 create_sql: &str,
1027 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1028 pcx: Option<&PlanContext>,
1029 is_retained_metrics_object: bool,
1030 custom_logical_compaction_window: Option<CompactionWindow>,
1031 local_expression_cache: &mut LocalExpressionCache,
1032 previous_item: Option<CatalogItem>,
1033 ) -> Result<CatalogItem, AdapterError> {
1034 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1035 match self.parse_item_inner(
1036 global_id,
1037 create_sql,
1038 extra_versions,
1039 pcx,
1040 is_retained_metrics_object,
1041 custom_logical_compaction_window,
1042 cached_expr,
1043 previous_item,
1044 ) {
1045 Ok((item, uncached_expr)) => {
1046 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1047 local_expression_cache.insert_uncached_expression(
1048 global_id,
1049 uncached_expr,
1050 optimizer_features,
1051 );
1052 }
1053 Ok(item)
1054 }
1055 Err((err, cached_expr)) => {
1056 if let Some(local_expr) = cached_expr {
1057 local_expression_cache.insert_cached_expression(global_id, local_expr);
1058 }
1059 Err(err)
1060 }
1061 }
1062 }
1063
1064 #[mz_ore::instrument]
1071 pub(crate) fn parse_item_inner(
1072 &self,
1073 global_id: GlobalId,
1074 create_sql: &str,
1075 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1076 pcx: Option<&PlanContext>,
1077 is_retained_metrics_object: bool,
1078 custom_logical_compaction_window: Option<CompactionWindow>,
1079 cached_expr: Option<LocalExpressions>,
1080 previous_item: Option<CatalogItem>,
1081 ) -> Result<
1082 (
1083 CatalogItem,
1084 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1085 ),
1086 (AdapterError, Option<LocalExpressions>),
1087 > {
1088 let session_catalog = self.for_system_session();
1089
1090 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1091 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1092 Err(err) => return Err((err, cached_expr)),
1093 };
1094
1095 let mut uncached_expr = None;
1096
1097 let item = match plan {
1098 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1099 let collections = extra_versions
1100 .iter()
1101 .map(|(version, gid)| (*version, *gid))
1102 .chain([(RelationVersion::root(), global_id)].into_iter())
1103 .collect();
1104
1105 CatalogItem::Table(Table {
1106 create_sql: Some(table.create_sql),
1107 desc: table.desc,
1108 collections,
1109 conn_id: None,
1110 resolved_ids,
1111 custom_logical_compaction_window: custom_logical_compaction_window
1112 .or(table.compaction_window),
1113 is_retained_metrics_object,
1114 data_source: match table.data_source {
1115 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1116 TableDataSource::TableWrites { defaults }
1117 }
1118 mz_sql::plan::TableDataSource::DataSource {
1119 desc: data_source_desc,
1120 timeline,
1121 } => match data_source_desc {
1122 mz_sql::plan::DataSourceDesc::IngestionExport {
1123 ingestion_id,
1124 external_reference,
1125 details,
1126 data_config,
1127 } => TableDataSource::DataSource {
1128 desc: DataSourceDesc::IngestionExport {
1129 ingestion_id,
1130 external_reference,
1131 details,
1132 data_config,
1133 },
1134 timeline,
1135 },
1136 mz_sql::plan::DataSourceDesc::Webhook {
1137 validate_using,
1138 body_format,
1139 headers,
1140 cluster_id,
1141 } => TableDataSource::DataSource {
1142 desc: DataSourceDesc::Webhook {
1143 validate_using,
1144 body_format,
1145 headers,
1146 cluster_id: cluster_id
1147 .expect("Webhook Tables must have a cluster_id set"),
1148 },
1149 timeline,
1150 },
1151 _ => {
1152 return Err((
1153 AdapterError::Unstructured(anyhow::anyhow!(
1154 "unsupported data source for table"
1155 )),
1156 cached_expr,
1157 ));
1158 }
1159 },
1160 },
1161 })
1162 }
1163 Plan::CreateSource(CreateSourcePlan {
1164 source,
1165 timeline,
1166 in_cluster,
1167 ..
1168 }) => CatalogItem::Source(Source {
1169 create_sql: Some(source.create_sql),
1170 data_source: match source.data_source {
1171 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1172 desc,
1173 cluster_id: match in_cluster {
1174 Some(id) => id,
1175 None => {
1176 return Err((
1177 AdapterError::Unstructured(anyhow::anyhow!(
1178 "ingestion-based sources must have cluster specified"
1179 )),
1180 cached_expr,
1181 ));
1182 }
1183 },
1184 },
1185 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1186 desc,
1187 progress_subsource,
1188 data_config,
1189 details,
1190 } => DataSourceDesc::OldSyntaxIngestion {
1191 desc,
1192 progress_subsource,
1193 data_config,
1194 details,
1195 cluster_id: match in_cluster {
1196 Some(id) => id,
1197 None => {
1198 return Err((
1199 AdapterError::Unstructured(anyhow::anyhow!(
1200 "ingestion-based sources must have cluster specified"
1201 )),
1202 cached_expr,
1203 ));
1204 }
1205 },
1206 },
1207 mz_sql::plan::DataSourceDesc::IngestionExport {
1208 ingestion_id,
1209 external_reference,
1210 details,
1211 data_config,
1212 } => DataSourceDesc::IngestionExport {
1213 ingestion_id,
1214 external_reference,
1215 details,
1216 data_config,
1217 },
1218 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1219 mz_sql::plan::DataSourceDesc::Webhook {
1220 validate_using,
1221 body_format,
1222 headers,
1223 cluster_id,
1224 } => {
1225 mz_ore::soft_assert_or_log!(
1226 cluster_id.is_none(),
1227 "cluster_id set at Source level for Webhooks"
1228 );
1229 DataSourceDesc::Webhook {
1230 validate_using,
1231 body_format,
1232 headers,
1233 cluster_id: in_cluster
1234 .expect("webhook sources must use an existing cluster"),
1235 }
1236 }
1237 },
1238 desc: source.desc,
1239 global_id,
1240 timeline,
1241 resolved_ids,
1242 custom_logical_compaction_window: source
1243 .compaction_window
1244 .or(custom_logical_compaction_window),
1245 is_retained_metrics_object,
1246 }),
1247 Plan::CreateView(CreateViewPlan { view, .. }) => {
1248 let optimizer_config =
1250 optimize::OptimizerConfig::from(session_catalog.system_vars());
1251 let previous_exprs = previous_item.map(|item| match item {
1252 CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1253 _ => None,
1254 });
1255
1256 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1257 (Some(local_expr), _)
1258 if local_expr.optimizer_features == optimizer_config.features =>
1259 {
1260 debug!("local expression cache hit for {global_id:?}");
1261 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1262 }
1263 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1265 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1266 }
1267 (cached_expr, _) => {
1268 let optimizer_features = optimizer_config.features.clone();
1269 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1271
1272 let raw_expr = view.expr;
1274 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1275 Ok(optimzed_expr) => optimzed_expr,
1276 Err(err) => return Err((err.into(), cached_expr)),
1277 };
1278
1279 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1280
1281 (Arc::new(raw_expr), Arc::new(optimized_expr))
1282 }
1283 };
1284
1285 let dependencies: BTreeSet<_> = raw_expr
1287 .depends_on()
1288 .into_iter()
1289 .map(|gid| self.get_entry_by_global_id(&gid).id())
1290 .collect();
1291
1292 CatalogItem::View(View {
1293 create_sql: view.create_sql,
1294 global_id,
1295 raw_expr,
1296 desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1297 optimized_expr,
1298 conn_id: None,
1299 resolved_ids,
1300 dependencies: DependencyIds(dependencies),
1301 })
1302 }
1303 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1304 materialized_view, ..
1305 }) => {
1306 let collections = extra_versions
1307 .iter()
1308 .map(|(version, gid)| (*version, *gid))
1309 .chain([(RelationVersion::root(), global_id)].into_iter())
1310 .collect();
1311
1312 let optimizer_config =
1314 optimize::OptimizerConfig::from(session_catalog.system_vars());
1315 let previous_exprs = previous_item.map(|item| match item {
1316 CatalogItem::MaterializedView(materialized_view) => {
1317 (materialized_view.raw_expr, materialized_view.optimized_expr)
1318 }
1319 item => unreachable!("expected materialized view, found: {item:#?}"),
1320 });
1321
1322 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1323 (Some(local_expr), _)
1324 if local_expr.optimizer_features == optimizer_config.features =>
1325 {
1326 debug!("local expression cache hit for {global_id:?}");
1327 (
1328 Arc::new(materialized_view.expr),
1329 Arc::new(local_expr.local_mir),
1330 )
1331 }
1332 (_, Some((raw_expr, optimized_expr)))
1334 if *raw_expr == materialized_view.expr =>
1335 {
1336 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1337 }
1338 (cached_expr, _) => {
1339 let optimizer_features = optimizer_config.features.clone();
1340 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1342
1343 let raw_expr = materialized_view.expr;
1344 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1345 Ok(optimized_expr) => optimized_expr,
1346 Err(err) => return Err((err.into(), cached_expr)),
1347 };
1348
1349 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1350
1351 (Arc::new(raw_expr), Arc::new(optimized_expr))
1352 }
1353 };
1354 let mut typ = optimized_expr.typ();
1355 for &i in &materialized_view.non_null_assertions {
1356 typ.column_types[i].nullable = false;
1357 }
1358 let desc = RelationDesc::new(typ, materialized_view.column_names);
1359 let desc = VersionedRelationDesc::new(desc);
1360
1361 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1362
1363 let dependencies = raw_expr
1365 .depends_on()
1366 .into_iter()
1367 .map(|gid| self.get_entry_by_global_id(&gid).id())
1368 .collect();
1369
1370 CatalogItem::MaterializedView(MaterializedView {
1371 create_sql: materialized_view.create_sql,
1372 collections,
1373 raw_expr,
1374 optimized_expr,
1375 desc,
1376 resolved_ids,
1377 dependencies,
1378 replacement_target: materialized_view.replacement_target,
1379 cluster_id: materialized_view.cluster_id,
1380 non_null_assertions: materialized_view.non_null_assertions,
1381 custom_logical_compaction_window: materialized_view.compaction_window,
1382 refresh_schedule: materialized_view.refresh_schedule,
1383 initial_as_of,
1384 })
1385 }
1386 Plan::CreateContinualTask(plan) => {
1387 let ct =
1388 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1389 Ok(ct) => ct,
1390 Err(err) => return Err((err, cached_expr)),
1391 };
1392 CatalogItem::ContinualTask(ct)
1393 }
1394 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1395 create_sql: index.create_sql,
1396 global_id,
1397 on: index.on,
1398 keys: index.keys.into(),
1399 conn_id: None,
1400 resolved_ids,
1401 cluster_id: index.cluster_id,
1402 custom_logical_compaction_window: custom_logical_compaction_window
1403 .or(index.compaction_window),
1404 is_retained_metrics_object,
1405 }),
1406 Plan::CreateSink(CreateSinkPlan {
1407 sink,
1408 with_snapshot,
1409 in_cluster,
1410 ..
1411 }) => CatalogItem::Sink(Sink {
1412 create_sql: sink.create_sql,
1413 global_id,
1414 from: sink.from,
1415 connection: sink.connection,
1416 envelope: sink.envelope,
1417 version: sink.version,
1418 with_snapshot,
1419 resolved_ids,
1420 cluster_id: in_cluster,
1421 }),
1422 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1423 if let Err(err) = typ.inner.desc(&session_catalog) {
1427 return Err((err.into(), cached_expr));
1428 }
1429 CatalogItem::Type(Type {
1430 create_sql: Some(typ.create_sql),
1431 global_id,
1432 details: CatalogTypeDetails {
1433 array_id: None,
1434 typ: typ.inner,
1435 pg_metadata: None,
1436 },
1437 resolved_ids,
1438 })
1439 }
1440 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1441 create_sql: secret.create_sql,
1442 global_id,
1443 }),
1444 Plan::CreateConnection(CreateConnectionPlan {
1445 connection:
1446 mz_sql::plan::Connection {
1447 create_sql,
1448 details,
1449 },
1450 ..
1451 }) => CatalogItem::Connection(Connection {
1452 create_sql,
1453 global_id,
1454 details,
1455 resolved_ids,
1456 }),
1457 _ => {
1458 return Err((
1459 Error::new(ErrorKind::Corruption {
1460 detail: "catalog entry generated inappropriate plan".to_string(),
1461 })
1462 .into(),
1463 cached_expr,
1464 ));
1465 }
1466 };
1467
1468 Ok((item, uncached_expr))
1469 }
1470
1471 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1477 let restore = self.system_configuration.clone();
1488 self.system_configuration.enable_for_item_parsing();
1489 let res = f(self);
1490 self.system_configuration = restore;
1491 res
1492 }
1493
1494 pub fn get_indexes_on(
1496 &self,
1497 id: GlobalId,
1498 cluster: ClusterId,
1499 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1500 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1501
1502 self.try_get_entry_by_global_id(&id)
1503 .into_iter()
1504 .map(move |e| {
1505 e.used_by()
1506 .iter()
1507 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1508 CatalogItem::Index(index) if index_matches(index) => {
1509 Some((index.global_id(), index))
1510 }
1511 _ => None,
1512 })
1513 })
1514 .flatten()
1515 }
1516
1517 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1518 &self.database_by_id[database_id]
1519 }
1520
1521 pub(super) fn try_get_cluster_replica(
1526 &self,
1527 id: ClusterId,
1528 replica_id: ReplicaId,
1529 ) -> Option<&ClusterReplica> {
1530 self.try_get_cluster(id)
1531 .and_then(|cluster| cluster.replica(replica_id))
1532 }
1533
1534 pub(crate) fn get_cluster_replica(
1538 &self,
1539 cluster_id: ClusterId,
1540 replica_id: ReplicaId,
1541 ) -> &ClusterReplica {
1542 self.try_get_cluster_replica(cluster_id, replica_id)
1543 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1544 }
1545
1546 pub(super) fn resolve_replica_in_cluster(
1547 &self,
1548 cluster_id: &ClusterId,
1549 replica_name: &str,
1550 ) -> Result<&ClusterReplica, SqlCatalogError> {
1551 let cluster = self.get_cluster(*cluster_id);
1552 let replica_id = cluster
1553 .replica_id_by_name_
1554 .get(replica_name)
1555 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1556 Ok(&cluster.replicas_by_id_[replica_id])
1557 }
1558
1559 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1561 Ok(self.system_configuration.get(name)?)
1562 }
1563
1564 pub(super) fn parse_system_configuration(
1568 &self,
1569 name: &str,
1570 value: VarInput,
1571 ) -> Result<String, Error> {
1572 let value = self.system_configuration.parse(name, value)?;
1573 Ok(value.format())
1574 }
1575
1576 pub(super) fn resolve_schema_in_database(
1578 &self,
1579 database_spec: &ResolvedDatabaseSpecifier,
1580 schema_name: &str,
1581 conn_id: &ConnectionId,
1582 ) -> Result<&Schema, SqlCatalogError> {
1583 let schema = match database_spec {
1584 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1585 self.temporary_schemas.get(conn_id)
1586 }
1587 ResolvedDatabaseSpecifier::Ambient => self
1588 .ambient_schemas_by_name
1589 .get(schema_name)
1590 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1591 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1592 db.schemas_by_name
1593 .get(schema_name)
1594 .and_then(|id| db.schemas_by_id.get(id))
1595 }),
1596 };
1597 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1598 }
1599
1600 pub fn get_schema(
1601 &self,
1602 database_spec: &ResolvedDatabaseSpecifier,
1603 schema_spec: &SchemaSpecifier,
1604 conn_id: &ConnectionId,
1605 ) -> &Schema {
1606 match (database_spec, schema_spec) {
1608 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1609 &self.temporary_schemas[conn_id]
1610 }
1611 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1612 &self.ambient_schemas_by_id[id]
1613 }
1614
1615 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1616 &self.database_by_id[database_id].schemas_by_id[schema_id]
1617 }
1618 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1619 unreachable!("temporary schemas are in the ambient database")
1620 }
1621 }
1622 }
1623
1624 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1625 self.database_by_id
1626 .values()
1627 .filter_map(|database| database.schemas_by_id.get(schema_id))
1628 .chain(self.ambient_schemas_by_id.values())
1629 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1630 .into_first()
1631 }
1632
1633 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1634 self.temporary_schemas
1635 .values()
1636 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1637 .into_first()
1638 }
1639
1640 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1641 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1642 }
1643
1644 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1645 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1646 }
1647
1648 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1649 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1650 }
1651
1652 pub fn get_information_schema_id(&self) -> SchemaId {
1653 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1654 }
1655
1656 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1657 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1658 }
1659
1660 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1661 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1662 }
1663
1664 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1665 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1666 }
1667
1668 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1669 SYSTEM_SCHEMAS
1670 .iter()
1671 .map(|name| self.ambient_schemas_by_name[*name])
1672 }
1673
1674 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1675 self.system_schema_ids().contains(&id)
1676 }
1677
1678 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1679 match spec {
1680 SchemaSpecifier::Temporary => false,
1681 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1682 }
1683 }
1684
1685 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1686 UNSTABLE_SCHEMAS
1687 .iter()
1688 .map(|name| self.ambient_schemas_by_name[*name])
1689 }
1690
1691 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1692 self.unstable_schema_ids().contains(&id)
1693 }
1694
1695 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1696 match spec {
1697 SchemaSpecifier::Temporary => false,
1698 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1699 }
1700 }
1701
1702 pub fn create_temporary_schema(
1705 &mut self,
1706 conn_id: &ConnectionId,
1707 owner_id: RoleId,
1708 ) -> Result<(), Error> {
1709 let oid = INVALID_OID;
1714 self.temporary_schemas.insert(
1715 conn_id.clone(),
1716 Schema {
1717 name: QualifiedSchemaName {
1718 database: ResolvedDatabaseSpecifier::Ambient,
1719 schema: MZ_TEMP_SCHEMA.into(),
1720 },
1721 id: SchemaSpecifier::Temporary,
1722 oid,
1723 items: BTreeMap::new(),
1724 functions: BTreeMap::new(),
1725 types: BTreeMap::new(),
1726 owner_id,
1727 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1728 mz_sql::catalog::ObjectType::Schema,
1729 owner_id,
1730 )]),
1731 },
1732 );
1733 Ok(())
1734 }
1735
1736 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1738 std::iter::empty()
1739 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1740 if schema.id.is_temporary() {
1741 Some(schema.oid)
1742 } else {
1743 None
1744 }
1745 }))
1746 .chain(self.entry_by_id.values().filter_map(|entry| {
1747 if entry.item().is_temporary() {
1748 Some(entry.oid)
1749 } else {
1750 None
1751 }
1752 }))
1753 }
1754
1755 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1759 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1760 }
1761
1762 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1766 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1767 let log = match self.get_entry(&item_id).item() {
1768 CatalogItem::Log(log) => log,
1769 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1770 };
1771 (item_id, log.global_id)
1772 }
1773
1774 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1778 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1779 }
1780
1781 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1785 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1786 let schema = &self.ambient_schemas_by_id[schema_id];
1787 match builtin.catalog_item_type() {
1788 CatalogItemType::Type => schema.types[builtin.name()],
1789 CatalogItemType::Func => schema.functions[builtin.name()],
1790 CatalogItemType::Table
1791 | CatalogItemType::Source
1792 | CatalogItemType::Sink
1793 | CatalogItemType::View
1794 | CatalogItemType::MaterializedView
1795 | CatalogItemType::Index
1796 | CatalogItemType::Secret
1797 | CatalogItemType::Connection
1798 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1799 }
1800 }
1801
1802 pub fn resolve_builtin_type_references(
1804 &self,
1805 builtin: &BuiltinType<NameReference>,
1806 ) -> BuiltinType<IdReference> {
1807 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1808 CatalogType::AclItem => CatalogType::AclItem,
1809 CatalogType::Array { element_reference } => CatalogType::Array {
1810 element_reference: self.get_system_type(element_reference).id,
1811 },
1812 CatalogType::List {
1813 element_reference,
1814 element_modifiers,
1815 } => CatalogType::List {
1816 element_reference: self.get_system_type(element_reference).id,
1817 element_modifiers: element_modifiers.clone(),
1818 },
1819 CatalogType::Map {
1820 key_reference,
1821 value_reference,
1822 key_modifiers,
1823 value_modifiers,
1824 } => CatalogType::Map {
1825 key_reference: self.get_system_type(key_reference).id,
1826 value_reference: self.get_system_type(value_reference).id,
1827 key_modifiers: key_modifiers.clone(),
1828 value_modifiers: value_modifiers.clone(),
1829 },
1830 CatalogType::Range { element_reference } => CatalogType::Range {
1831 element_reference: self.get_system_type(element_reference).id,
1832 },
1833 CatalogType::Record { fields } => CatalogType::Record {
1834 fields: fields
1835 .into_iter()
1836 .map(|f| CatalogRecordField {
1837 name: f.name.clone(),
1838 type_reference: self.get_system_type(f.type_reference).id,
1839 type_modifiers: f.type_modifiers.clone(),
1840 })
1841 .collect(),
1842 },
1843 CatalogType::Bool => CatalogType::Bool,
1844 CatalogType::Bytes => CatalogType::Bytes,
1845 CatalogType::Char => CatalogType::Char,
1846 CatalogType::Date => CatalogType::Date,
1847 CatalogType::Float32 => CatalogType::Float32,
1848 CatalogType::Float64 => CatalogType::Float64,
1849 CatalogType::Int16 => CatalogType::Int16,
1850 CatalogType::Int32 => CatalogType::Int32,
1851 CatalogType::Int64 => CatalogType::Int64,
1852 CatalogType::UInt16 => CatalogType::UInt16,
1853 CatalogType::UInt32 => CatalogType::UInt32,
1854 CatalogType::UInt64 => CatalogType::UInt64,
1855 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1856 CatalogType::Interval => CatalogType::Interval,
1857 CatalogType::Jsonb => CatalogType::Jsonb,
1858 CatalogType::Numeric => CatalogType::Numeric,
1859 CatalogType::Oid => CatalogType::Oid,
1860 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1861 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1862 CatalogType::Pseudo => CatalogType::Pseudo,
1863 CatalogType::RegClass => CatalogType::RegClass,
1864 CatalogType::RegProc => CatalogType::RegProc,
1865 CatalogType::RegType => CatalogType::RegType,
1866 CatalogType::String => CatalogType::String,
1867 CatalogType::Time => CatalogType::Time,
1868 CatalogType::Timestamp => CatalogType::Timestamp,
1869 CatalogType::TimestampTz => CatalogType::TimestampTz,
1870 CatalogType::Uuid => CatalogType::Uuid,
1871 CatalogType::VarChar => CatalogType::VarChar,
1872 CatalogType::Int2Vector => CatalogType::Int2Vector,
1873 CatalogType::MzAclItem => CatalogType::MzAclItem,
1874 };
1875
1876 BuiltinType {
1877 name: builtin.name,
1878 schema: builtin.schema,
1879 oid: builtin.oid,
1880 details: CatalogTypeDetails {
1881 array_id: builtin.details.array_id,
1882 typ,
1883 pg_metadata: builtin.details.pg_metadata.clone(),
1884 },
1885 }
1886 }
1887
1888 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1889 &self.config
1890 }
1891
1892 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1893 match self.database_by_name.get(database_name) {
1894 Some(id) => Ok(&self.database_by_id[id]),
1895 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1896 }
1897 }
1898
1899 pub fn resolve_schema(
1900 &self,
1901 current_database: Option<&DatabaseId>,
1902 database_name: Option<&str>,
1903 schema_name: &str,
1904 conn_id: &ConnectionId,
1905 ) -> Result<&Schema, SqlCatalogError> {
1906 let database_spec = match database_name {
1907 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1912 self.resolve_database(database)?.id().clone(),
1913 )),
1914 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1915 };
1916
1917 if let Some(database_spec) = database_spec {
1919 if let Ok(schema) =
1920 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1921 {
1922 return Ok(schema);
1923 }
1924 }
1925
1926 if let Ok(schema) = self.resolve_schema_in_database(
1928 &ResolvedDatabaseSpecifier::Ambient,
1929 schema_name,
1930 conn_id,
1931 ) {
1932 return Ok(schema);
1933 }
1934
1935 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1936 }
1937
1938 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1942 self.ambient_schemas_by_name[name]
1943 }
1944
1945 pub fn resolve_search_path(
1946 &self,
1947 session: &dyn SessionMetadata,
1948 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1949 let database = self
1950 .database_by_name
1951 .get(session.database())
1952 .map(|id| id.clone());
1953
1954 session
1955 .search_path()
1956 .iter()
1957 .map(|schema| {
1958 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1959 })
1960 .filter_map(|schema| schema.ok())
1961 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1962 .collect()
1963 }
1964
1965 pub fn effective_search_path(
1966 &self,
1967 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1968 include_temp_schema: bool,
1969 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1970 let mut v = Vec::with_capacity(search_path.len() + 3);
1971 let temp_schema = (
1973 ResolvedDatabaseSpecifier::Ambient,
1974 SchemaSpecifier::Temporary,
1975 );
1976 if include_temp_schema && !search_path.contains(&temp_schema) {
1977 v.push(temp_schema);
1978 }
1979 let default_schemas = [
1980 (
1981 ResolvedDatabaseSpecifier::Ambient,
1982 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1983 ),
1984 (
1985 ResolvedDatabaseSpecifier::Ambient,
1986 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1987 ),
1988 ];
1989 for schema in default_schemas.into_iter() {
1990 if !search_path.contains(&schema) {
1991 v.push(schema);
1992 }
1993 }
1994 v.extend_from_slice(search_path);
1995 v
1996 }
1997
1998 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1999 let id = self
2000 .clusters_by_name
2001 .get(name)
2002 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2003 Ok(&self.clusters_by_id[id])
2004 }
2005
2006 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2007 let id = self
2008 .clusters_by_name
2009 .get(cluster.name)
2010 .expect("failed to lookup BuiltinCluster by name");
2011 self.clusters_by_id
2012 .get(id)
2013 .expect("failed to lookup BuiltinCluster by ID")
2014 }
2015
2016 pub fn resolve_cluster_replica(
2017 &self,
2018 cluster_replica_name: &QualifiedReplica,
2019 ) -> Result<&ClusterReplica, SqlCatalogError> {
2020 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2021 let replica_name = cluster_replica_name.replica.as_str();
2022 let replica_id = cluster
2023 .replica_id(replica_name)
2024 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2025 Ok(cluster.replica(replica_id).expect("Must exist"))
2026 }
2027
2028 #[allow(clippy::useless_let_if_seq)]
2034 pub fn resolve(
2035 &self,
2036 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2037 current_database: Option<&DatabaseId>,
2038 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2039 name: &PartialItemName,
2040 conn_id: &ConnectionId,
2041 err_gen: fn(String) -> SqlCatalogError,
2042 ) -> Result<&CatalogEntry, SqlCatalogError> {
2043 let schemas = match &name.schema {
2048 Some(schema_name) => {
2049 match self.resolve_schema(
2050 current_database,
2051 name.database.as_deref(),
2052 schema_name,
2053 conn_id,
2054 ) {
2055 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2056 Err(e) => return Err(e),
2057 }
2058 }
2059 None => match self
2060 .get_schema(
2061 &ResolvedDatabaseSpecifier::Ambient,
2062 &SchemaSpecifier::Temporary,
2063 conn_id,
2064 )
2065 .items
2066 .get(&name.item)
2067 {
2068 Some(id) => return Ok(self.get_entry(id)),
2069 None => search_path.to_vec(),
2070 },
2071 };
2072
2073 for (database_spec, schema_spec) in &schemas {
2074 let schema = self.get_schema(database_spec, schema_spec, conn_id);
2075
2076 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2077 return Ok(&self.entry_by_id[id]);
2078 }
2079 }
2080
2081 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2086 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2087 for schema_id in [
2088 self.get_mz_catalog_unstable_schema_id(),
2089 self.get_mz_introspection_schema_id(),
2090 ] {
2091 let schema = self.get_schema(
2092 &ResolvedDatabaseSpecifier::Ambient,
2093 &SchemaSpecifier::Id(schema_id),
2094 conn_id,
2095 );
2096
2097 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2098 debug!(
2099 github_27831 = true,
2100 "encountered use of outdated schema `mz_internal` for relation: {name}",
2101 );
2102 return Ok(&self.entry_by_id[id]);
2103 }
2104 }
2105 }
2106
2107 Err(err_gen(name.to_string()))
2108 }
2109
2110 pub fn resolve_entry(
2112 &self,
2113 current_database: Option<&DatabaseId>,
2114 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2115 name: &PartialItemName,
2116 conn_id: &ConnectionId,
2117 ) -> Result<&CatalogEntry, SqlCatalogError> {
2118 self.resolve(
2119 |schema| &schema.items,
2120 current_database,
2121 search_path,
2122 name,
2123 conn_id,
2124 SqlCatalogError::UnknownItem,
2125 )
2126 }
2127
2128 pub fn resolve_function(
2130 &self,
2131 current_database: Option<&DatabaseId>,
2132 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2133 name: &PartialItemName,
2134 conn_id: &ConnectionId,
2135 ) -> Result<&CatalogEntry, SqlCatalogError> {
2136 self.resolve(
2137 |schema| &schema.functions,
2138 current_database,
2139 search_path,
2140 name,
2141 conn_id,
2142 |name| SqlCatalogError::UnknownFunction {
2143 name,
2144 alternative: None,
2145 },
2146 )
2147 }
2148
2149 pub fn resolve_type(
2151 &self,
2152 current_database: Option<&DatabaseId>,
2153 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2154 name: &PartialItemName,
2155 conn_id: &ConnectionId,
2156 ) -> Result<&CatalogEntry, SqlCatalogError> {
2157 static NON_PG_CATALOG_TYPES: LazyLock<
2158 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2159 > = LazyLock::new(|| {
2160 BUILTINS::types()
2161 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2162 .map(|typ| (typ.name, typ))
2163 .collect()
2164 });
2165
2166 let entry = self.resolve(
2167 |schema| &schema.types,
2168 current_database,
2169 search_path,
2170 name,
2171 conn_id,
2172 |name| SqlCatalogError::UnknownType { name },
2173 )?;
2174
2175 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2176 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2177 warn!(
2178 "user specified an incorrect schema of {} for the type {}, which should be in \
2179 the {} schema. This works now due to a bug but will be fixed in a later release.",
2180 PG_CATALOG_SCHEMA.quoted(),
2181 typ.name.quoted(),
2182 typ.schema.quoted(),
2183 )
2184 }
2185 }
2186
2187 Ok(entry)
2188 }
2189
2190 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2192 match object_id {
2193 ObjectId::Item(item_id) => {
2194 let entry = self.get_entry(&item_id);
2195 match entry.item_type() {
2196 CatalogItemType::Table => CommentObjectId::Table(item_id),
2197 CatalogItemType::Source => CommentObjectId::Source(item_id),
2198 CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2199 CatalogItemType::View => CommentObjectId::View(item_id),
2200 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2201 CatalogItemType::Index => CommentObjectId::Index(item_id),
2202 CatalogItemType::Func => CommentObjectId::Func(item_id),
2203 CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2204 CatalogItemType::Type => CommentObjectId::Type(item_id),
2205 CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2206 CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2207 }
2208 }
2209 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2210 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2211 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2212 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2213 ObjectId::ClusterReplica(cluster_replica_id) => {
2214 CommentObjectId::ClusterReplica(cluster_replica_id)
2215 }
2216 ObjectId::NetworkPolicy(network_policy_id) => {
2217 CommentObjectId::NetworkPolicy(network_policy_id)
2218 }
2219 }
2220 }
2221
2222 pub fn system_config(&self) -> &SystemVars {
2224 &self.system_configuration
2225 }
2226
2227 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2229 &mut self.system_configuration
2230 }
2231
2232 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2242 let mut dump = serde_json::to_value(&self).map_err(|e| {
2244 Error::new(ErrorKind::Unstructured(format!(
2245 "internal error: could not dump catalog: {}",
2248 e
2249 )))
2250 })?;
2251
2252 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2253 dump_obj.insert(
2255 "system_parameter_defaults".into(),
2256 serde_json::json!(self.system_config().defaults()),
2257 );
2258 if let Some(unfinalized_shards) = unfinalized_shards {
2260 dump_obj
2261 .get_mut("storage_metadata")
2262 .expect("known to exist")
2263 .as_object_mut()
2264 .expect("storage_metadata is an object")
2265 .insert(
2266 "unfinalized_shards".into(),
2267 serde_json::json!(unfinalized_shards),
2268 );
2269 }
2270 let temporary_gids: Vec<_> = self
2275 .entry_by_global_id
2276 .iter()
2277 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2278 .map(|(gid, _item_id)| *gid)
2279 .collect();
2280 if !temporary_gids.is_empty() {
2281 let gids = dump_obj
2282 .get_mut("entry_by_global_id")
2283 .expect("known_to_exist")
2284 .as_object_mut()
2285 .expect("entry_by_global_id is an object");
2286 for gid in temporary_gids {
2287 gids.remove(&gid.to_string());
2288 }
2289 }
2290 dump_obj.remove("role_auth_by_id");
2293
2294 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2296 }
2297
2298 pub fn availability_zones(&self) -> &[String] {
2299 &self.availability_zones
2300 }
2301
2302 pub fn concretize_replica_location(
2303 &self,
2304 location: mz_catalog::durable::ReplicaLocation,
2305 allowed_sizes: &Vec<String>,
2306 allowed_availability_zones: Option<&[String]>,
2307 ) -> Result<ReplicaLocation, Error> {
2308 let location = match location {
2309 mz_catalog::durable::ReplicaLocation::Unmanaged {
2310 storagectl_addrs,
2311 computectl_addrs,
2312 } => {
2313 if allowed_availability_zones.is_some() {
2314 return Err(Error {
2315 kind: ErrorKind::Internal(
2316 "tried concretize unmanaged replica with specific availability_zones"
2317 .to_string(),
2318 ),
2319 });
2320 }
2321 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2322 storagectl_addrs,
2323 computectl_addrs,
2324 })
2325 }
2326 mz_catalog::durable::ReplicaLocation::Managed {
2327 size,
2328 availability_zone,
2329 billed_as,
2330 internal,
2331 pending,
2332 } => {
2333 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2334 let message = "tried concretize managed replica with specific availability zones and availability zone";
2335 return Err(Error {
2336 kind: ErrorKind::Internal(message.to_string()),
2337 });
2338 }
2339 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2340 let cluster_replica_sizes = &self.cluster_replica_sizes;
2341
2342 ReplicaLocation::Managed(ManagedReplicaLocation {
2343 allocation: cluster_replica_sizes
2344 .0
2345 .get(&size)
2346 .expect("catalog out of sync")
2347 .clone(),
2348 availability_zones: match (availability_zone, allowed_availability_zones) {
2349 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2350 (None, Some(azs)) if azs.is_empty() => {
2351 ManagedReplicaAvailabilityZones::FromCluster(None)
2352 }
2353 (None, Some(azs)) => {
2354 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2355 }
2356 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2357 },
2358 size,
2359 billed_as,
2360 internal,
2361 pending,
2362 })
2363 }
2364 };
2365 Ok(location)
2366 }
2367
2368 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2379 let alloc = &self.cluster_replica_sizes.0[size];
2380 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2381 }
2382
2383 pub(crate) fn ensure_valid_replica_size(
2384 &self,
2385 allowed_sizes: &[String],
2386 size: &String,
2387 ) -> Result<(), Error> {
2388 let cluster_replica_sizes = &self.cluster_replica_sizes;
2389
2390 if !cluster_replica_sizes.0.contains_key(size)
2391 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2392 || cluster_replica_sizes.0[size].disabled
2393 {
2394 let mut entries = cluster_replica_sizes
2395 .enabled_allocations()
2396 .collect::<Vec<_>>();
2397
2398 if !allowed_sizes.is_empty() {
2399 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2400 entries.retain(|(name, _)| allowed_sizes.contains(name));
2401 }
2402
2403 entries.sort_by_key(
2404 |(
2405 _name,
2406 ReplicaAllocation {
2407 scale, cpu_limit, ..
2408 },
2409 )| (scale, cpu_limit),
2410 );
2411
2412 Err(Error {
2413 kind: ErrorKind::InvalidClusterReplicaSize {
2414 size: size.to_owned(),
2415 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2416 },
2417 })
2418 } else {
2419 Ok(())
2420 }
2421 }
2422
2423 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2424 if role_id.is_builtin() {
2425 let role = self.get_role(role_id);
2426 Err(Error::new(ErrorKind::ReservedRoleName(
2427 role.name().to_string(),
2428 )))
2429 } else {
2430 Ok(())
2431 }
2432 }
2433
2434 pub fn ensure_not_reserved_network_policy(
2435 &self,
2436 network_policy_id: &NetworkPolicyId,
2437 ) -> Result<(), Error> {
2438 if network_policy_id.is_builtin() {
2439 let policy = self.get_network_policy(network_policy_id);
2440 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2441 policy.name.clone(),
2442 )))
2443 } else {
2444 Ok(())
2445 }
2446 }
2447
2448 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2449 let is_grantable = !role_id.is_public() && !role_id.is_system();
2450 if is_grantable {
2451 Ok(())
2452 } else {
2453 let role = self.get_role(role_id);
2454 Err(Error::new(ErrorKind::UngrantableRoleName(
2455 role.name().to_string(),
2456 )))
2457 }
2458 }
2459
2460 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2461 if role_id.is_system() {
2462 let role = self.get_role(role_id);
2463 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2464 role.name().to_string(),
2465 )))
2466 } else {
2467 Ok(())
2468 }
2469 }
2470
2471 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2472 if role_id.is_predefined() {
2473 let role = self.get_role(role_id);
2474 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2475 role.name().to_string(),
2476 )))
2477 } else {
2478 Ok(())
2479 }
2480 }
2481
2482 pub(crate) fn add_to_audit_log(
2485 system_configuration: &SystemVars,
2486 oracle_write_ts: mz_repr::Timestamp,
2487 session: Option<&ConnMeta>,
2488 tx: &mut mz_catalog::durable::Transaction,
2489 audit_events: &mut Vec<VersionedEvent>,
2490 event_type: EventType,
2491 object_type: ObjectType,
2492 details: EventDetails,
2493 ) -> Result<(), Error> {
2494 let user = session.map(|session| session.user().name.to_string());
2495
2496 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2499 Some(ts) => ts.into(),
2500 _ => oracle_write_ts.into(),
2501 };
2502 let id = tx.allocate_audit_log_id()?;
2503 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2504 audit_events.push(event.clone());
2505 tx.insert_audit_log_event(event);
2506 Ok(())
2507 }
2508
2509 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2510 match id {
2511 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2512 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2513 self.get_cluster_replica(*cluster_id, *replica_id)
2514 .owner_id(),
2515 ),
2516 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2517 ObjectId::Schema((database_spec, schema_spec)) => Some(
2518 self.get_schema(database_spec, schema_spec, conn_id)
2519 .owner_id(),
2520 ),
2521 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2522 ObjectId::Role(_) => None,
2523 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2524 }
2525 }
2526
2527 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2528 match object_id {
2529 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2530 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2531 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2532 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2533 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2534 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2535 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2536 }
2537 }
2538
2539 pub(super) fn get_system_object_type(
2540 &self,
2541 id: &SystemObjectId,
2542 ) -> mz_sql::catalog::SystemObjectType {
2543 match id {
2544 SystemObjectId::Object(object_id) => {
2545 SystemObjectType::Object(self.get_object_type(object_id))
2546 }
2547 SystemObjectId::System => SystemObjectType::System,
2548 }
2549 }
2550
2551 pub fn storage_metadata(&self) -> &StorageMetadata {
2555 &self.storage_metadata
2556 }
2557
2558 pub fn source_compaction_windows(
2560 &self,
2561 ids: impl IntoIterator<Item = CatalogItemId>,
2562 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2563 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2564 let mut seen = BTreeSet::new();
2565 for item_id in ids {
2566 if !seen.insert(item_id) {
2567 continue;
2568 }
2569 let entry = self.get_entry(&item_id);
2570 match entry.item() {
2571 CatalogItem::Source(source) => {
2572 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2573 match source.data_source {
2574 DataSourceDesc::Ingestion { .. }
2575 | DataSourceDesc::OldSyntaxIngestion { .. }
2576 | DataSourceDesc::IngestionExport { .. } => {
2577 cws.entry(source_cw).or_default().insert(item_id);
2578 }
2579 DataSourceDesc::Introspection(_)
2580 | DataSourceDesc::Progress
2581 | DataSourceDesc::Webhook { .. } => {
2582 cws.entry(source_cw).or_default().insert(item_id);
2583 }
2584 }
2585 }
2586 CatalogItem::Table(table) => {
2587 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2588 match &table.data_source {
2589 TableDataSource::DataSource {
2590 desc: DataSourceDesc::IngestionExport { .. },
2591 timeline: _,
2592 } => {
2593 cws.entry(table_cw).or_default().insert(item_id);
2594 }
2595 _ => {}
2596 }
2597 }
2598 _ => {
2599 continue;
2601 }
2602 }
2603 }
2604 cws
2605 }
2606
2607 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2608 match id {
2609 CommentObjectId::Table(id)
2610 | CommentObjectId::View(id)
2611 | CommentObjectId::MaterializedView(id)
2612 | CommentObjectId::Source(id)
2613 | CommentObjectId::Sink(id)
2614 | CommentObjectId::Index(id)
2615 | CommentObjectId::Func(id)
2616 | CommentObjectId::Connection(id)
2617 | CommentObjectId::Type(id)
2618 | CommentObjectId::Secret(id)
2619 | CommentObjectId::ContinualTask(id) => Some(*id),
2620 CommentObjectId::Role(_)
2621 | CommentObjectId::Database(_)
2622 | CommentObjectId::Schema(_)
2623 | CommentObjectId::Cluster(_)
2624 | CommentObjectId::ClusterReplica(_)
2625 | CommentObjectId::NetworkPolicy(_) => None,
2626 }
2627 }
2628
2629 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2630 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2631 }
2632
2633 pub fn comment_id_to_audit_log_name(
2634 &self,
2635 id: CommentObjectId,
2636 conn_id: &ConnectionId,
2637 ) -> String {
2638 match id {
2639 CommentObjectId::Table(id)
2640 | CommentObjectId::View(id)
2641 | CommentObjectId::MaterializedView(id)
2642 | CommentObjectId::Source(id)
2643 | CommentObjectId::Sink(id)
2644 | CommentObjectId::Index(id)
2645 | CommentObjectId::Func(id)
2646 | CommentObjectId::Connection(id)
2647 | CommentObjectId::Type(id)
2648 | CommentObjectId::Secret(id)
2649 | CommentObjectId::ContinualTask(id) => {
2650 let item = self.get_entry(&id);
2651 let name = self.resolve_full_name(item.name(), Some(conn_id));
2652 name.to_string()
2653 }
2654 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2655 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2656 CommentObjectId::Schema((spec, schema_id)) => {
2657 let schema = self.get_schema(&spec, &schema_id, conn_id);
2658 self.resolve_full_schema_name(&schema.name).to_string()
2659 }
2660 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2661 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2662 let cluster = self.get_cluster(cluster_id);
2663 let replica = self.get_cluster_replica(cluster_id, replica_id);
2664 QualifiedReplica {
2665 cluster: Ident::new_unchecked(cluster.name.clone()),
2666 replica: Ident::new_unchecked(replica.name.clone()),
2667 }
2668 .to_string()
2669 }
2670 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2671 }
2672 }
2673
2674 pub fn mock_authentication_nonce(&self) -> String {
2675 self.mock_authentication_nonce.clone().unwrap_or_default()
2676 }
2677}
2678
2679impl ConnectionResolver for CatalogState {
2680 fn resolve_connection(
2681 &self,
2682 id: CatalogItemId,
2683 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2684 use mz_storage_types::connections::Connection::*;
2685 match self
2686 .get_entry(&id)
2687 .connection()
2688 .expect("catalog out of sync")
2689 .details
2690 .to_connection()
2691 {
2692 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2693 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2694 Csr(conn) => Csr(conn.into_inline_connection(self)),
2695 Ssh(conn) => Ssh(conn),
2696 Aws(conn) => Aws(conn),
2697 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2698 MySql(conn) => MySql(conn.into_inline_connection(self)),
2699 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2700 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2701 }
2702 }
2703}
2704
2705impl OptimizerCatalog for CatalogState {
2706 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2707 CatalogState::get_entry_by_global_id(self, id)
2708 }
2709 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2710 CatalogState::get_entry(self, id)
2711 }
2712 fn resolve_full_name(
2713 &self,
2714 name: &QualifiedItemName,
2715 conn_id: Option<&ConnectionId>,
2716 ) -> FullItemName {
2717 CatalogState::resolve_full_name(self, name, conn_id)
2718 }
2719 fn get_indexes_on(
2720 &self,
2721 id: GlobalId,
2722 cluster: ClusterId,
2723 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2724 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2725 }
2726}
2727
2728impl OptimizerCatalog for Catalog {
2729 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2730 self.state.get_entry_by_global_id(id)
2731 }
2732
2733 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2734 self.state.get_entry(id)
2735 }
2736
2737 fn resolve_full_name(
2738 &self,
2739 name: &QualifiedItemName,
2740 conn_id: Option<&ConnectionId>,
2741 ) -> FullItemName {
2742 self.state.resolve_full_name(name, conn_id)
2743 }
2744
2745 fn get_indexes_on(
2746 &self,
2747 id: GlobalId,
2748 cluster: ClusterId,
2749 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2750 Box::new(self.state.get_indexes_on(id, cluster))
2751 }
2752}
2753
2754impl Catalog {
2755 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2756 self
2757 }
2758}