Skip to main content

mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62    VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71    TypeReference,
72};
73use mz_sql::names::{
74    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81    Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91    ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98// DO NOT add any more imports from `crate` outside of `crate::catalog`.
99use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105/// The in-memory representation of the Catalog. This struct is not directly used to persist
106/// metadata to persistent storage. For persistent metadata see
107/// [`mz_catalog::durable::DurableCatalogState`].
108///
109/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
110/// storing the contents of this struct on disk.
111#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
114    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
115    // include:
116    //  - Temporary items.
117    //  - Certain objects are partially derived from read-only state.
118    pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
119    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120    pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
121    #[serde(serialize_with = "skip_temp_items")]
122    pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
125    pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
128    pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
131    pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
134    pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
135    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136    pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
137    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138    pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
139
140    #[serde(skip)]
141    pub(super) system_configuration: Arc<SystemVars>,
142    pub(super) default_privileges: Arc<DefaultPrivileges>,
143    pub(super) system_privileges: Arc<PrivilegeMap>,
144    pub(super) comments: Arc<CommentsMap>,
145    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146    pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
147    pub(super) storage_metadata: Arc<StorageMetadata>,
148    pub(super) mock_authentication_nonce: Option<String>,
149
150    // Mutable state not derived from the durable catalog.
151    #[serde(skip)]
152    pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
153
154    // Read-only state not derived from the durable catalog.
155    #[serde(skip)]
156    pub(super) config: mz_sql::catalog::CatalogConfig,
157    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158    #[serde(skip)]
159    pub(crate) availability_zones: Vec<String>,
160
161    // Read-only not derived from the durable catalog.
162    #[serde(skip)]
163    pub(super) egress_addresses: Vec<IpNet>,
164    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166    pub(super) http_host_name: Option<String>,
167
168    // Read-only not derived from the durable catalog.
169    #[serde(skip)]
170    pub(super) license_key: ValidatedLicenseKey,
171}
172
173/// Keeps track of what expressions are cached or not during startup.
174/// It's also used during catalog transactions to avoid re-optimizing CREATE VIEW / CREATE MAT VIEW
175/// statements when going back and forth between durable catalog operations and in-memory catalog
176/// operations.
177#[derive(Debug, Clone, Serialize)]
178pub(crate) enum LocalExpressionCache {
179    /// The cache is being used.
180    Open {
181        /// The local expressions that were cached in the expression cache.
182        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
183        /// The local expressions that were NOT cached in the expression cache.
184        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
185    },
186    /// The cache is not being used.
187    Closed,
188}
189
190impl LocalExpressionCache {
191    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
192        Self::Open {
193            cached_exprs,
194            uncached_exprs: BTreeMap::new(),
195        }
196    }
197
198    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
199        match self {
200            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
201            LocalExpressionCache::Closed => None,
202        }
203    }
204
205    /// Insert an expression that was cached, back into the cache. This is generally needed when
206    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
207    pub(super) fn insert_cached_expression(
208        &mut self,
209        id: GlobalId,
210        local_expressions: LocalExpressions,
211    ) {
212        match self {
213            LocalExpressionCache::Open { cached_exprs, .. } => {
214                cached_exprs.insert(id, local_expressions);
215            }
216            LocalExpressionCache::Closed => {}
217        }
218    }
219
220    /// Inform the cache that `id` was not found in the cache and that we should add it as
221    /// `local_mir` and `optimizer_features`.
222    pub(super) fn insert_uncached_expression(
223        &mut self,
224        id: GlobalId,
225        local_mir: OptimizedMirRelationExpr,
226        optimizer_features: OptimizerFeatures,
227    ) {
228        match self {
229            LocalExpressionCache::Open { uncached_exprs, .. } => {
230                let local_expr = LocalExpressions {
231                    local_mir,
232                    optimizer_features,
233                };
234                // If we are trying to cache the same item a second time, with a different
235                // expression, then we must be migrating the object or doing something else weird.
236                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
237                // version after a restart. Caching the migrated version may cause us to incorrectly
238                // think that the object has already been migrated. To simplify things, we cache
239                // neither.
240                let prev = uncached_exprs.remove(&id);
241                match prev {
242                    Some(prev) if prev == local_expr => {
243                        uncached_exprs.insert(id, local_expr);
244                    }
245                    None => {
246                        uncached_exprs.insert(id, local_expr);
247                    }
248                    Some(_) => {}
249                }
250            }
251            LocalExpressionCache::Closed => {}
252        }
253    }
254
255    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
256        match self {
257            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
258            LocalExpressionCache::Closed => BTreeMap::new(),
259        }
260    }
261}
262
263fn skip_temp_items<S>(
264    entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
265    serializer: S,
266) -> Result<S::Ok, S::Error>
267where
268    S: serde::Serializer,
269{
270    mz_ore::serde::map_key_to_string(
271        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
272        serializer,
273    )
274}
275
276impl CatalogState {
277    /// Returns an empty [`CatalogState`] that can be used in tests.
278    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
279    // tests are structured in this repository.
280    pub fn empty_test() -> Self {
281        CatalogState {
282            database_by_name: Default::default(),
283            database_by_id: Default::default(),
284            entry_by_id: Default::default(),
285            entry_by_global_id: Default::default(),
286            ambient_schemas_by_name: Default::default(),
287            ambient_schemas_by_id: Default::default(),
288            temporary_schemas: Default::default(),
289            clusters_by_id: Default::default(),
290            clusters_by_name: Default::default(),
291            network_policies_by_name: Default::default(),
292            roles_by_name: Default::default(),
293            roles_by_id: Default::default(),
294            network_policies_by_id: Default::default(),
295            role_auth_by_id: Default::default(),
296            config: CatalogConfig {
297                start_time: Default::default(),
298                start_instant: Instant::now(),
299                nonce: Default::default(),
300                environment_id: EnvironmentId::for_tests(),
301                session_id: Default::default(),
302                build_info: &DUMMY_BUILD_INFO,
303                now: NOW_ZERO.clone(),
304                connection_context: ConnectionContext::for_tests(Arc::new(
305                    InMemorySecretsController::new(),
306                )),
307                builtins_cfg: BuiltinsConfig {
308                    include_continual_tasks: true,
309                },
310                helm_chart_version: None,
311            },
312            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
313            availability_zones: Default::default(),
314            system_configuration: Arc::new(SystemVars::default()),
315            egress_addresses: Default::default(),
316            aws_principal_context: Default::default(),
317            aws_privatelink_availability_zones: Default::default(),
318            http_host_name: Default::default(),
319            default_privileges: Arc::new(DefaultPrivileges::default()),
320            system_privileges: Arc::new(PrivilegeMap::default()),
321            comments: Arc::new(CommentsMap::default()),
322            source_references: Default::default(),
323            storage_metadata: Arc::new(StorageMetadata::default()),
324            license_key: ValidatedLicenseKey::for_tests(),
325            mock_authentication_nonce: Default::default(),
326        }
327    }
328
329    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
330        let search_path = self.resolve_search_path(session);
331        let database = self
332            .database_by_name
333            .get(session.vars().database())
334            .map(|id| id.clone());
335        let state = match session.transaction().catalog_state() {
336            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
337            None => Cow::Borrowed(self),
338        };
339        ConnCatalog {
340            state,
341            unresolvable_ids: BTreeSet::new(),
342            conn_id: session.conn_id().clone(),
343            cluster: session.vars().cluster().into(),
344            database,
345            search_path,
346            role_id: session.current_role_id().clone(),
347            prepared_statements: Some(session.prepared_statements()),
348            portals: Some(session.portals()),
349            notices_tx: session.retain_notice_transmitter(),
350        }
351    }
352
353    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
354        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
355        let cluster = self.system_configuration.default_cluster();
356
357        ConnCatalog {
358            state: Cow::Borrowed(self),
359            unresolvable_ids: BTreeSet::new(),
360            conn_id: SYSTEM_CONN_ID.clone(),
361            cluster,
362            database: self
363                .resolve_database(DEFAULT_DATABASE_NAME)
364                .ok()
365                .map(|db| db.id()),
366            // Leaving the system's search path empty allows us to catch issues
367            // where catalog object names have not been normalized correctly.
368            search_path: Vec::new(),
369            role_id,
370            prepared_statements: None,
371            portals: None,
372            notices_tx,
373        }
374    }
375
376    pub fn for_system_session(&self) -> ConnCatalog<'_> {
377        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
378    }
379
380    /// Returns an iterator over the deduplicated identifiers of all
381    /// objects this catalog entry transitively depends on (where
382    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
383    /// [`CatalogItem::references`]).
384    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
385        struct I<'a> {
386            queue: VecDeque<CatalogItemId>,
387            seen: BTreeSet<CatalogItemId>,
388            this: &'a CatalogState,
389        }
390        impl<'a> Iterator for I<'a> {
391            type Item = CatalogItemId;
392            fn next(&mut self) -> Option<Self::Item> {
393                if let Some(next) = self.queue.pop_front() {
394                    for child in self.this.get_entry(&next).item().uses() {
395                        if !self.seen.contains(&child) {
396                            self.queue.push_back(child);
397                            self.seen.insert(child);
398                        }
399                    }
400                    Some(next)
401                } else {
402                    None
403                }
404            }
405        }
406
407        I {
408            queue: [id].into_iter().collect(),
409            seen: [id].into_iter().collect(),
410            this: self,
411        }
412    }
413
414    /// Computes the IDs of any log sources this catalog entry transitively
415    /// depends on.
416    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
417        let mut out = Vec::new();
418        self.introspection_dependencies_inner(id, &mut out);
419        out
420    }
421
422    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
423        match self.get_entry(&id).item() {
424            CatalogItem::Log(_) => out.push(id),
425            item @ (CatalogItem::View(_)
426            | CatalogItem::MaterializedView(_)
427            | CatalogItem::Connection(_)
428            | CatalogItem::ContinualTask(_)) => {
429                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
430                for item_id in item.references().items() {
431                    self.introspection_dependencies_inner(*item_id, out);
432                }
433            }
434            CatalogItem::Sink(sink) => {
435                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
436                self.introspection_dependencies_inner(from_item_id, out)
437            }
438            CatalogItem::Index(idx) => {
439                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
440                self.introspection_dependencies_inner(on_item_id, out)
441            }
442            CatalogItem::Table(_)
443            | CatalogItem::Source(_)
444            | CatalogItem::Type(_)
445            | CatalogItem::Func(_)
446            | CatalogItem::Secret(_) => (),
447        }
448    }
449
450    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
451    ///
452    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
453    /// earlier in the list than the roots. This is particularly useful for the order to drop
454    /// objects.
455    pub(super) fn object_dependents(
456        &self,
457        object_ids: &Vec<ObjectId>,
458        conn_id: &ConnectionId,
459        seen: &mut BTreeSet<ObjectId>,
460    ) -> Vec<ObjectId> {
461        let mut dependents = Vec::new();
462        for object_id in object_ids {
463            match object_id {
464                ObjectId::Cluster(id) => {
465                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
466                }
467                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
468                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
469                ),
470                ObjectId::Database(id) => {
471                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
472                }
473                ObjectId::Schema((database_spec, schema_spec)) => {
474                    dependents.extend_from_slice(&self.schema_dependents(
475                        database_spec.clone(),
476                        schema_spec.clone(),
477                        conn_id,
478                        seen,
479                    ));
480                }
481                ObjectId::NetworkPolicy(id) => {
482                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
483                }
484                id @ ObjectId::Role(_) => {
485                    let unseen = seen.insert(id.clone());
486                    if unseen {
487                        dependents.push(id.clone());
488                    }
489                }
490                ObjectId::Item(id) => {
491                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
492                }
493            }
494        }
495        dependents
496    }
497
498    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
499    /// itself.
500    ///
501    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
502    /// earlier in the list than the roots. This is particularly useful for the order to drop
503    /// objects.
504    fn cluster_dependents(
505        &self,
506        cluster_id: ClusterId,
507        seen: &mut BTreeSet<ObjectId>,
508    ) -> Vec<ObjectId> {
509        let mut dependents = Vec::new();
510        let object_id = ObjectId::Cluster(cluster_id);
511        if !seen.contains(&object_id) {
512            seen.insert(object_id.clone());
513            let cluster = self.get_cluster(cluster_id);
514            for item_id in cluster.bound_objects() {
515                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
516            }
517            for replica_id in cluster.replica_ids().values() {
518                dependents.extend_from_slice(&self.cluster_replica_dependents(
519                    cluster_id,
520                    *replica_id,
521                    seen,
522                ));
523            }
524            dependents.push(object_id);
525        }
526        dependents
527    }
528
529    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
530    /// itself.
531    ///
532    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
533    /// earlier in the list than the roots. This is particularly useful for the order to drop
534    /// objects.
535    pub(super) fn cluster_replica_dependents(
536        &self,
537        cluster_id: ClusterId,
538        replica_id: ReplicaId,
539        seen: &mut BTreeSet<ObjectId>,
540    ) -> Vec<ObjectId> {
541        let mut dependents = Vec::new();
542        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
543        if !seen.contains(&object_id) {
544            seen.insert(object_id.clone());
545            dependents.push(object_id);
546        }
547        dependents
548    }
549
550    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
551    /// itself.
552    ///
553    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
554    /// earlier in the list than the roots. This is particularly useful for the order to drop
555    /// objects.
556    fn database_dependents(
557        &self,
558        database_id: DatabaseId,
559        conn_id: &ConnectionId,
560        seen: &mut BTreeSet<ObjectId>,
561    ) -> Vec<ObjectId> {
562        let mut dependents = Vec::new();
563        let object_id = ObjectId::Database(database_id);
564        if !seen.contains(&object_id) {
565            seen.insert(object_id.clone());
566            let database = self.get_database(&database_id);
567            for schema_id in database.schema_ids().values() {
568                dependents.extend_from_slice(&self.schema_dependents(
569                    ResolvedDatabaseSpecifier::Id(database_id),
570                    SchemaSpecifier::Id(*schema_id),
571                    conn_id,
572                    seen,
573                ));
574            }
575            dependents.push(object_id);
576        }
577        dependents
578    }
579
580    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
581    /// itself.
582    ///
583    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
584    /// earlier in the list than the roots. This is particularly useful for the order to drop
585    /// objects.
586    fn schema_dependents(
587        &self,
588        database_spec: ResolvedDatabaseSpecifier,
589        schema_spec: SchemaSpecifier,
590        conn_id: &ConnectionId,
591        seen: &mut BTreeSet<ObjectId>,
592    ) -> Vec<ObjectId> {
593        let mut dependents = Vec::new();
594        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
595        if !seen.contains(&object_id) {
596            seen.insert(object_id.clone());
597            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
598            for item_id in schema.item_ids() {
599                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
600            }
601            dependents.push(object_id)
602        }
603        dependents
604    }
605
606    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
607    /// itself.
608    ///
609    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
610    /// earlier in the list than the roots. This is particularly useful for the order to drop
611    /// objects.
612    pub(super) fn item_dependents(
613        &self,
614        item_id: CatalogItemId,
615        seen: &mut BTreeSet<ObjectId>,
616    ) -> Vec<ObjectId> {
617        let mut dependents = Vec::new();
618        let object_id = ObjectId::Item(item_id);
619        if !seen.contains(&object_id) {
620            seen.insert(object_id.clone());
621            let entry = self.get_entry(&item_id);
622            for dependent_id in entry.used_by() {
623                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
624            }
625            dependents.push(object_id);
626            // We treat the progress collection as if it depends on the source
627            // for dropping. We have additional code in planning to create a
628            // kind of special-case "CASCADE" for this dependency.
629            if let Some(progress_id) = entry.progress_id() {
630                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
631            }
632        }
633        dependents
634    }
635
636    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
637    /// itself.
638    ///
639    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
640    /// earlier in the list than the roots. This is particularly useful for the order to drop
641    /// objects.
642    pub(super) fn network_policy_dependents(
643        &self,
644        network_policy_id: NetworkPolicyId,
645        _seen: &mut BTreeSet<ObjectId>,
646    ) -> Vec<ObjectId> {
647        let object_id = ObjectId::NetworkPolicy(network_policy_id);
648        // Currently network policies have no dependents
649        // when we add the ability for users or sources/sinks to have policies
650        // this method will need to be updated.
651        vec![object_id]
652    }
653
654    /// Indicates whether the indicated item is considered stable or not.
655    ///
656    /// Only stable items can be used as dependencies of other catalog items.
657    fn is_stable(&self, id: CatalogItemId) -> bool {
658        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
659        !self.is_unstable_schema_specifier(spec)
660    }
661
662    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
663        if self.system_config().unsafe_enable_unstable_dependencies() {
664            return Ok(());
665        }
666
667        let unstable_dependencies: Vec<_> = item
668            .references()
669            .items()
670            .filter(|id| !self.is_stable(**id))
671            .map(|id| self.get_entry(id).name().item.clone())
672            .collect();
673
674        // It's okay to create a temporary object with unstable
675        // dependencies, since we will never need to reboot a catalog
676        // that contains it.
677        if unstable_dependencies.is_empty() || item.is_temporary() {
678            Ok(())
679        } else {
680            let object_type = item.typ().to_string();
681            Err(Error {
682                kind: ErrorKind::UnstableDependency {
683                    object_type,
684                    unstable_dependencies,
685                },
686            })
687        }
688    }
689
690    pub fn resolve_full_name(
691        &self,
692        name: &QualifiedItemName,
693        conn_id: Option<&ConnectionId>,
694    ) -> FullItemName {
695        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
696
697        let database = match &name.qualifiers.database_spec {
698            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
699            ResolvedDatabaseSpecifier::Id(id) => {
700                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
701            }
702        };
703        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
704        // and the schema may not exist yet if no temporary items have been created.
705        let schema = match &name.qualifiers.schema_spec {
706            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
707            _ => self
708                .get_schema(
709                    &name.qualifiers.database_spec,
710                    &name.qualifiers.schema_spec,
711                    conn_id,
712                )
713                .name()
714                .schema
715                .clone(),
716        };
717        FullItemName {
718            database,
719            schema,
720            item: name.item.clone(),
721        }
722    }
723
724    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
725        let database = match &name.database {
726            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
727            ResolvedDatabaseSpecifier::Id(id) => {
728                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
729            }
730        };
731        FullSchemaName {
732            database,
733            schema: name.schema.clone(),
734        }
735    }
736
737    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
738        self.entry_by_id
739            .get(id)
740            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
741    }
742
743    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
744        let item_id = self
745            .entry_by_global_id
746            .get(id)
747            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
748
749        let entry = self.get_entry(item_id).clone();
750        let version = match entry.item() {
751            CatalogItem::Table(table) => {
752                let (version, _) = table
753                    .collections
754                    .iter()
755                    .find(|(_verison, gid)| *gid == id)
756                    .expect("version to exist");
757                RelationVersionSelector::Specific(*version)
758            }
759            _ => RelationVersionSelector::Latest,
760        };
761        CatalogCollectionEntry { entry, version }
762    }
763
764    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
765        self.entry_by_id.iter()
766    }
767
768    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
769        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
770        self.temporary_schemas
771            .get(conn)
772            .into_iter()
773            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
774    }
775
776    /// Returns true if a temporary schema exists for the given connection.
777    ///
778    /// Temporary schemas are created lazily when the first temporary object is created
779    /// for a connection, so this may return false for connections that haven't created
780    /// any temporary objects.
781    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
782        self.temporary_schemas.contains_key(conn)
783    }
784
785    /// Gets a type named `name` from exactly one of the system schemas.
786    ///
787    /// # Panics
788    /// - If `name` is not an entry in any system schema
789    /// - If more than one system schema has an entry named `name`.
790    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
791        let mut res = None;
792        for schema_id in self.system_schema_ids() {
793            let schema = &self.ambient_schemas_by_id[&schema_id];
794            if let Some(global_id) = schema.types.get(name) {
795                match res {
796                    None => res = Some(self.get_entry(global_id)),
797                    Some(_) => panic!(
798                        "only call get_system_type on objects uniquely identifiable in one system schema"
799                    ),
800                }
801            }
802        }
803
804        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
805    }
806
807    pub fn get_item_by_name(
808        &self,
809        name: &QualifiedItemName,
810        conn_id: &ConnectionId,
811    ) -> Option<&CatalogEntry> {
812        self.get_schema(
813            &name.qualifiers.database_spec,
814            &name.qualifiers.schema_spec,
815            conn_id,
816        )
817        .items
818        .get(&name.item)
819        .and_then(|id| self.try_get_entry(id))
820    }
821
822    pub fn get_type_by_name(
823        &self,
824        name: &QualifiedItemName,
825        conn_id: &ConnectionId,
826    ) -> Option<&CatalogEntry> {
827        self.get_schema(
828            &name.qualifiers.database_spec,
829            &name.qualifiers.schema_spec,
830            conn_id,
831        )
832        .types
833        .get(&name.item)
834        .and_then(|id| self.try_get_entry(id))
835    }
836
837    pub(super) fn find_available_name(
838        &self,
839        mut name: QualifiedItemName,
840        conn_id: &ConnectionId,
841    ) -> QualifiedItemName {
842        let mut i = 0;
843        let orig_item_name = name.item.clone();
844        while self.get_item_by_name(&name, conn_id).is_some() {
845            i += 1;
846            name.item = format!("{}{}", orig_item_name, i);
847        }
848        name
849    }
850
851    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
852        self.entry_by_id.get(id)
853    }
854
855    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
856        let item_id = self.entry_by_global_id.get(id)?;
857        self.try_get_entry(item_id)
858    }
859
860    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
861    /// object that returns rows.
862    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
863        let entry = self.try_get_entry_by_global_id(id)?;
864        let desc = match entry.item() {
865            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
866            // TODO(alter_table): Support schema evolution on sources.
867            other => other.relation_desc(RelationVersionSelector::Latest)?,
868        };
869        Some(desc)
870    }
871
872    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
873        self.try_get_cluster(cluster_id)
874            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
875    }
876
877    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
878        self.clusters_by_id.get(&cluster_id)
879    }
880
881    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
882        self.roles_by_id.get(id)
883    }
884
885    pub fn get_role(&self, id: &RoleId) -> &Role {
886        self.roles_by_id.get(id).expect("catalog out of sync")
887    }
888
889    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
890        self.roles_by_id.keys()
891    }
892
893    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
894        self.roles_by_name
895            .get(role_name)
896            .map(|id| &self.roles_by_id[id])
897    }
898
899    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
900        self.role_auth_by_id
901            .get(id)
902            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
903    }
904
905    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
906        self.role_auth_by_id.get(id)
907    }
908
909    pub(super) fn try_get_network_policy_by_name(
910        &self,
911        policy_name: &str,
912    ) -> Option<&NetworkPolicy> {
913        self.network_policies_by_name
914            .get(policy_name)
915            .map(|id| &self.network_policies_by_id[id])
916    }
917
918    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
919        let mut membership = BTreeSet::new();
920        let mut queue = VecDeque::from(vec![id]);
921        while let Some(cur_id) = queue.pop_front() {
922            if !membership.contains(cur_id) {
923                membership.insert(cur_id.clone());
924                let role = self.get_role(cur_id);
925                soft_assert_no_log!(
926                    !role.membership().keys().contains(id),
927                    "circular membership exists in the catalog"
928                );
929                queue.extend(role.membership().keys());
930            }
931        }
932        membership.insert(RoleId::Public);
933        membership
934    }
935
936    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
937        self.network_policies_by_id
938            .get(id)
939            .expect("catalog out of sync")
940    }
941
942    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
943        self.network_policies_by_id.keys()
944    }
945
946    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
947    /// source.
948    ///
949    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
950    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
951        let entry = self.try_get_entry(id)?;
952        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
953        let name = self.resolve_full_name(entry.name(), None);
954        let host_name = self
955            .http_host_name
956            .as_ref()
957            .map(|x| x.as_str())
958            .unwrap_or_else(|| "HOST");
959
960        let RawDatabaseSpecifier::Name(database) = name.database else {
961            return None;
962        };
963
964        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
965        url.path_segments_mut()
966            .ok()?
967            .push(&database)
968            .push(&name.schema)
969            .push(&name.item);
970
971        Some(url)
972    }
973
974    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
975    ///
976    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
977    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
978    ///
979    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
980    /// their original state before the method returns.
981    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
982        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
983        // caller if this method changed the state of the catalog.
984        &mut self,
985        create_sql: &str,
986        force_if_exists_skip: bool,
987    ) -> Result<(Plan, ResolvedIds), AdapterError> {
988        self.with_enable_for_item_parsing(|state| {
989            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
990            let pcx = Some(&pcx);
991            let session_catalog = state.for_system_session();
992
993            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
994            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
995            let plan =
996                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
997
998            Ok((plan, resolved_ids))
999        })
1000    }
1001
1002    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1003    #[mz_ore::instrument]
1004    pub(crate) fn parse_plan(
1005        create_sql: &str,
1006        pcx: Option<&PlanContext>,
1007        catalog: &ConnCatalog,
1008    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1009        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1010        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1011        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1012
1013        Ok((plan, resolved_ids))
1014    }
1015
1016    /// Parses the given SQL string into a pair of [`CatalogItem`].
1017    pub(crate) fn deserialize_item(
1018        &self,
1019        global_id: GlobalId,
1020        create_sql: &str,
1021        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1022        local_expression_cache: &mut LocalExpressionCache,
1023        previous_item: Option<CatalogItem>,
1024    ) -> Result<CatalogItem, AdapterError> {
1025        self.parse_item(
1026            global_id,
1027            create_sql,
1028            extra_versions,
1029            None,
1030            false,
1031            None,
1032            local_expression_cache,
1033            previous_item,
1034        )
1035    }
1036
1037    /// Parses the given SQL string into a `CatalogItem`.
1038    #[mz_ore::instrument]
1039    pub(crate) fn parse_item(
1040        &self,
1041        global_id: GlobalId,
1042        create_sql: &str,
1043        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1044        pcx: Option<&PlanContext>,
1045        is_retained_metrics_object: bool,
1046        custom_logical_compaction_window: Option<CompactionWindow>,
1047        local_expression_cache: &mut LocalExpressionCache,
1048        previous_item: Option<CatalogItem>,
1049    ) -> Result<CatalogItem, AdapterError> {
1050        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1051        match self.parse_item_inner(
1052            global_id,
1053            create_sql,
1054            extra_versions,
1055            pcx,
1056            is_retained_metrics_object,
1057            custom_logical_compaction_window,
1058            cached_expr,
1059            previous_item,
1060        ) {
1061            Ok((item, uncached_expr)) => {
1062                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1063                    local_expression_cache.insert_uncached_expression(
1064                        global_id,
1065                        uncached_expr,
1066                        optimizer_features,
1067                    );
1068                }
1069                Ok(item)
1070            }
1071            Err((err, cached_expr)) => {
1072                if let Some(local_expr) = cached_expr {
1073                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1074                }
1075                Err(err)
1076            }
1077        }
1078    }
1079
1080    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1081    ///
1082    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1083    /// not cached.
1084    ///
1085    /// On failure returns an error and `cached_expr` so it can be used later.
1086    #[mz_ore::instrument]
1087    pub(crate) fn parse_item_inner(
1088        &self,
1089        global_id: GlobalId,
1090        create_sql: &str,
1091        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1092        pcx: Option<&PlanContext>,
1093        is_retained_metrics_object: bool,
1094        custom_logical_compaction_window: Option<CompactionWindow>,
1095        cached_expr: Option<LocalExpressions>,
1096        previous_item: Option<CatalogItem>,
1097    ) -> Result<
1098        (
1099            CatalogItem,
1100            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1101        ),
1102        (AdapterError, Option<LocalExpressions>),
1103    > {
1104        let session_catalog = self.for_system_session();
1105
1106        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1107            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1108            Err(err) => return Err((err, cached_expr)),
1109        };
1110
1111        let mut uncached_expr = None;
1112
1113        let item = match plan {
1114            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1115                let collections = extra_versions
1116                    .iter()
1117                    .map(|(version, gid)| (*version, *gid))
1118                    .chain([(RelationVersion::root(), global_id)].into_iter())
1119                    .collect();
1120
1121                CatalogItem::Table(Table {
1122                    create_sql: Some(table.create_sql),
1123                    desc: table.desc,
1124                    collections,
1125                    conn_id: None,
1126                    resolved_ids,
1127                    custom_logical_compaction_window: custom_logical_compaction_window
1128                        .or(table.compaction_window),
1129                    is_retained_metrics_object,
1130                    data_source: match table.data_source {
1131                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1132                            TableDataSource::TableWrites { defaults }
1133                        }
1134                        mz_sql::plan::TableDataSource::DataSource {
1135                            desc: data_source_desc,
1136                            timeline,
1137                        } => match data_source_desc {
1138                            mz_sql::plan::DataSourceDesc::IngestionExport {
1139                                ingestion_id,
1140                                external_reference,
1141                                details,
1142                                data_config,
1143                            } => TableDataSource::DataSource {
1144                                desc: DataSourceDesc::IngestionExport {
1145                                    ingestion_id,
1146                                    external_reference,
1147                                    details,
1148                                    data_config,
1149                                },
1150                                timeline,
1151                            },
1152                            mz_sql::plan::DataSourceDesc::Webhook {
1153                                validate_using,
1154                                body_format,
1155                                headers,
1156                                cluster_id,
1157                            } => TableDataSource::DataSource {
1158                                desc: DataSourceDesc::Webhook {
1159                                    validate_using,
1160                                    body_format,
1161                                    headers,
1162                                    cluster_id: cluster_id
1163                                        .expect("Webhook Tables must have a cluster_id set"),
1164                                },
1165                                timeline,
1166                            },
1167                            _ => {
1168                                return Err((
1169                                    AdapterError::Unstructured(anyhow::anyhow!(
1170                                        "unsupported data source for table"
1171                                    )),
1172                                    cached_expr,
1173                                ));
1174                            }
1175                        },
1176                    },
1177                })
1178            }
1179            Plan::CreateSource(CreateSourcePlan {
1180                source,
1181                timeline,
1182                in_cluster,
1183                ..
1184            }) => CatalogItem::Source(Source {
1185                create_sql: Some(source.create_sql),
1186                data_source: match source.data_source {
1187                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1188                        desc,
1189                        cluster_id: match in_cluster {
1190                            Some(id) => id,
1191                            None => {
1192                                return Err((
1193                                    AdapterError::Unstructured(anyhow::anyhow!(
1194                                        "ingestion-based sources must have cluster specified"
1195                                    )),
1196                                    cached_expr,
1197                                ));
1198                            }
1199                        },
1200                    },
1201                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1202                        desc,
1203                        progress_subsource,
1204                        data_config,
1205                        details,
1206                    } => DataSourceDesc::OldSyntaxIngestion {
1207                        desc,
1208                        progress_subsource,
1209                        data_config,
1210                        details,
1211                        cluster_id: match in_cluster {
1212                            Some(id) => id,
1213                            None => {
1214                                return Err((
1215                                    AdapterError::Unstructured(anyhow::anyhow!(
1216                                        "ingestion-based sources must have cluster specified"
1217                                    )),
1218                                    cached_expr,
1219                                ));
1220                            }
1221                        },
1222                    },
1223                    mz_sql::plan::DataSourceDesc::IngestionExport {
1224                        ingestion_id,
1225                        external_reference,
1226                        details,
1227                        data_config,
1228                    } => DataSourceDesc::IngestionExport {
1229                        ingestion_id,
1230                        external_reference,
1231                        details,
1232                        data_config,
1233                    },
1234                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1235                    mz_sql::plan::DataSourceDesc::Webhook {
1236                        validate_using,
1237                        body_format,
1238                        headers,
1239                        cluster_id,
1240                    } => {
1241                        mz_ore::soft_assert_or_log!(
1242                            cluster_id.is_none(),
1243                            "cluster_id set at Source level for Webhooks"
1244                        );
1245                        DataSourceDesc::Webhook {
1246                            validate_using,
1247                            body_format,
1248                            headers,
1249                            cluster_id: in_cluster
1250                                .expect("webhook sources must use an existing cluster"),
1251                        }
1252                    }
1253                },
1254                desc: source.desc,
1255                global_id,
1256                timeline,
1257                resolved_ids,
1258                custom_logical_compaction_window: source
1259                    .compaction_window
1260                    .or(custom_logical_compaction_window),
1261                is_retained_metrics_object,
1262            }),
1263            Plan::CreateView(CreateViewPlan { view, .. }) => {
1264                // Collect optimizer parameters.
1265                let optimizer_config =
1266                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1267                let previous_exprs = previous_item.map(|item| match item {
1268                    CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1269                    _ => None,
1270                });
1271
1272                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1273                    (Some(local_expr), _)
1274                        if local_expr.optimizer_features == optimizer_config.features =>
1275                    {
1276                        debug!("local expression cache hit for {global_id:?}");
1277                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1278                    }
1279                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1280                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1281                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1282                    }
1283                    (cached_expr, _) => {
1284                        let optimizer_features = optimizer_config.features.clone();
1285                        // Build an optimizer for this VIEW.
1286                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1287
1288                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1289                        let raw_expr = view.expr;
1290                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1291                            Ok(optimzed_expr) => optimzed_expr,
1292                            Err(err) => return Err((err.into(), cached_expr)),
1293                        };
1294
1295                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1296
1297                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1298                    }
1299                };
1300
1301                // Resolve all item dependencies from the HIR expression.
1302                let dependencies: BTreeSet<_> = raw_expr
1303                    .depends_on()
1304                    .into_iter()
1305                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1306                    .collect();
1307
1308                let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1309                CatalogItem::View(View {
1310                    create_sql: view.create_sql,
1311                    global_id,
1312                    raw_expr,
1313                    desc: RelationDesc::new(typ, view.column_names),
1314                    optimized_expr,
1315                    conn_id: None,
1316                    resolved_ids,
1317                    dependencies: DependencyIds(dependencies),
1318                })
1319            }
1320            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1321                materialized_view, ..
1322            }) => {
1323                let collections = extra_versions
1324                    .iter()
1325                    .map(|(version, gid)| (*version, *gid))
1326                    .chain([(RelationVersion::root(), global_id)].into_iter())
1327                    .collect();
1328
1329                // Collect optimizer parameters.
1330                let system_vars = session_catalog.system_vars();
1331                let overrides = self
1332                    .get_cluster(materialized_view.cluster_id)
1333                    .config
1334                    .features();
1335                let optimizer_config =
1336                    optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1337                let previous_exprs = previous_item.map(|item| match item {
1338                    CatalogItem::MaterializedView(materialized_view) => {
1339                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1340                    }
1341                    item => unreachable!("expected materialized view, found: {item:#?}"),
1342                });
1343
1344                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1345                    (Some(local_expr), _)
1346                        if local_expr.optimizer_features == optimizer_config.features =>
1347                    {
1348                        debug!("local expression cache hit for {global_id:?}");
1349                        (
1350                            Arc::new(materialized_view.expr),
1351                            Arc::new(local_expr.local_mir),
1352                        )
1353                    }
1354                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1355                    (_, Some((raw_expr, optimized_expr)))
1356                        if *raw_expr == materialized_view.expr =>
1357                    {
1358                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1359                    }
1360                    (cached_expr, _) => {
1361                        let optimizer_features = optimizer_config.features.clone();
1362                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1363                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1364
1365                        let raw_expr = materialized_view.expr;
1366                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1367                            Ok(optimized_expr) => optimized_expr,
1368                            Err(err) => return Err((err.into(), cached_expr)),
1369                        };
1370
1371                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1372
1373                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1374                    }
1375                };
1376                let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1377
1378                for &i in &materialized_view.non_null_assertions {
1379                    typ.column_types[i].nullable = false;
1380                }
1381                let desc = RelationDesc::new(typ, materialized_view.column_names);
1382                let desc = VersionedRelationDesc::new(desc);
1383
1384                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1385
1386                // Resolve all item dependencies from the HIR expression.
1387                let dependencies = raw_expr
1388                    .depends_on()
1389                    .into_iter()
1390                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1391                    .collect();
1392
1393                CatalogItem::MaterializedView(MaterializedView {
1394                    create_sql: materialized_view.create_sql,
1395                    collections,
1396                    raw_expr,
1397                    optimized_expr,
1398                    desc,
1399                    resolved_ids,
1400                    dependencies,
1401                    replacement_target: materialized_view.replacement_target,
1402                    cluster_id: materialized_view.cluster_id,
1403                    target_replica: materialized_view.target_replica,
1404                    non_null_assertions: materialized_view.non_null_assertions,
1405                    custom_logical_compaction_window: materialized_view.compaction_window,
1406                    refresh_schedule: materialized_view.refresh_schedule,
1407                    initial_as_of,
1408                })
1409            }
1410            Plan::CreateContinualTask(plan) => {
1411                let ct =
1412                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1413                        Ok(ct) => ct,
1414                        Err(err) => return Err((err, cached_expr)),
1415                    };
1416                CatalogItem::ContinualTask(ct)
1417            }
1418            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1419                create_sql: index.create_sql,
1420                global_id,
1421                on: index.on,
1422                keys: index.keys.into(),
1423                conn_id: None,
1424                resolved_ids,
1425                cluster_id: index.cluster_id,
1426                custom_logical_compaction_window: custom_logical_compaction_window
1427                    .or(index.compaction_window),
1428                is_retained_metrics_object,
1429            }),
1430            Plan::CreateSink(CreateSinkPlan {
1431                sink,
1432                with_snapshot,
1433                in_cluster,
1434                ..
1435            }) => CatalogItem::Sink(Sink {
1436                create_sql: sink.create_sql,
1437                global_id,
1438                from: sink.from,
1439                connection: sink.connection,
1440                envelope: sink.envelope,
1441                version: sink.version,
1442                with_snapshot,
1443                resolved_ids,
1444                cluster_id: in_cluster,
1445                commit_interval: sink.commit_interval,
1446            }),
1447            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1448                // Even if we don't need the `RelationDesc` here, error out
1449                // early and eagerly, as a kind of soft assertion that we _can_
1450                // build the `RelationDesc` when needed.
1451                if let Err(err) = typ.inner.desc(&session_catalog) {
1452                    return Err((err.into(), cached_expr));
1453                }
1454                CatalogItem::Type(Type {
1455                    create_sql: Some(typ.create_sql),
1456                    global_id,
1457                    details: CatalogTypeDetails {
1458                        array_id: None,
1459                        typ: typ.inner,
1460                        pg_metadata: None,
1461                    },
1462                    resolved_ids,
1463                })
1464            }
1465            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1466                create_sql: secret.create_sql,
1467                global_id,
1468            }),
1469            Plan::CreateConnection(CreateConnectionPlan {
1470                connection:
1471                    mz_sql::plan::Connection {
1472                        create_sql,
1473                        details,
1474                    },
1475                ..
1476            }) => CatalogItem::Connection(Connection {
1477                create_sql,
1478                global_id,
1479                details,
1480                resolved_ids,
1481            }),
1482            _ => {
1483                return Err((
1484                    Error::new(ErrorKind::Corruption {
1485                        detail: "catalog entry generated inappropriate plan".to_string(),
1486                    })
1487                    .into(),
1488                    cached_expr,
1489                ));
1490            }
1491        };
1492
1493        Ok((item, uncached_expr))
1494    }
1495
1496    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1497    /// Calling this method will not permanently modify any system configuration variables.
1498    ///
1499    /// WARNING:
1500    /// Any modifications made to the system configuration variables in `f`, will be lost.
1501    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1502        // Enable catalog features that might be required during planning existing
1503        // catalog items. Existing catalog items might have been created while
1504        // a specific feature flag was turned on, so we need to ensure that this
1505        // is also the case during catalog rehydration in order to avoid panics.
1506        //
1507        // WARNING / CONTRACT:
1508        // 1. Features used in this method that related to parsing / planning
1509        //    should be `enable_for_item_parsing` set to `true`.
1510        // 2. After this step, feature flag configuration must not be
1511        //    overridden.
1512        let restore = Arc::clone(&self.system_configuration);
1513        Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1514        let res = f(self);
1515        self.system_configuration = restore;
1516        res
1517    }
1518
1519    /// Returns all indexes on the given object and cluster known in the catalog.
1520    pub fn get_indexes_on(
1521        &self,
1522        id: GlobalId,
1523        cluster: ClusterId,
1524    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1525        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1526
1527        self.try_get_entry_by_global_id(&id)
1528            .into_iter()
1529            .map(move |e| {
1530                e.used_by()
1531                    .iter()
1532                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1533                        CatalogItem::Index(index) if index_matches(index) => {
1534                            Some((index.global_id(), index))
1535                        }
1536                        _ => None,
1537                    })
1538            })
1539            .flatten()
1540    }
1541
1542    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1543        &self.database_by_id[database_id]
1544    }
1545
1546    /// Gets a reference to the specified replica of the specified cluster.
1547    ///
1548    /// Returns `None` if either the cluster or the replica does not
1549    /// exist.
1550    pub(super) fn try_get_cluster_replica(
1551        &self,
1552        id: ClusterId,
1553        replica_id: ReplicaId,
1554    ) -> Option<&ClusterReplica> {
1555        self.try_get_cluster(id)
1556            .and_then(|cluster| cluster.replica(replica_id))
1557    }
1558
1559    /// Gets a reference to the specified replica of the specified cluster.
1560    ///
1561    /// Panics if either the cluster or the replica does not exist.
1562    pub(crate) fn get_cluster_replica(
1563        &self,
1564        cluster_id: ClusterId,
1565        replica_id: ReplicaId,
1566    ) -> &ClusterReplica {
1567        self.try_get_cluster_replica(cluster_id, replica_id)
1568            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1569    }
1570
1571    pub(super) fn resolve_replica_in_cluster(
1572        &self,
1573        cluster_id: &ClusterId,
1574        replica_name: &str,
1575    ) -> Result<&ClusterReplica, SqlCatalogError> {
1576        let cluster = self.get_cluster(*cluster_id);
1577        let replica_id = cluster
1578            .replica_id_by_name_
1579            .get(replica_name)
1580            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1581        Ok(&cluster.replicas_by_id_[replica_id])
1582    }
1583
1584    /// Get system configuration `name`.
1585    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1586        Ok(self.system_configuration.get(name)?)
1587    }
1588
1589    /// Parse system configuration `name` with `value` int.
1590    ///
1591    /// Returns the parsed value as a string.
1592    pub(super) fn parse_system_configuration(
1593        &self,
1594        name: &str,
1595        value: VarInput,
1596    ) -> Result<String, Error> {
1597        let value = self.system_configuration.parse(name, value)?;
1598        Ok(value.format())
1599    }
1600
1601    /// Gets the schema map for the database matching `database_spec`.
1602    pub(super) fn resolve_schema_in_database(
1603        &self,
1604        database_spec: &ResolvedDatabaseSpecifier,
1605        schema_name: &str,
1606        conn_id: &ConnectionId,
1607    ) -> Result<&Schema, SqlCatalogError> {
1608        let schema = match database_spec {
1609            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1610                self.temporary_schemas.get(conn_id)
1611            }
1612            ResolvedDatabaseSpecifier::Ambient => self
1613                .ambient_schemas_by_name
1614                .get(schema_name)
1615                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1616            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1617                db.schemas_by_name
1618                    .get(schema_name)
1619                    .and_then(|id| db.schemas_by_id.get(id))
1620            }),
1621        };
1622        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1623    }
1624
1625    /// Try to get a schema, returning `None` if it doesn't exist.
1626    ///
1627    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1628    /// (temporary schemas are created lazily when the first temporary object is created).
1629    pub fn try_get_schema(
1630        &self,
1631        database_spec: &ResolvedDatabaseSpecifier,
1632        schema_spec: &SchemaSpecifier,
1633        conn_id: &ConnectionId,
1634    ) -> Option<&Schema> {
1635        // Keep in sync with `get_schema` and `get_schemas_mut`
1636        match (database_spec, schema_spec) {
1637            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1638                self.temporary_schemas.get(conn_id)
1639            }
1640            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1641                self.ambient_schemas_by_id.get(id)
1642            }
1643            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1644                .database_by_id
1645                .get(database_id)
1646                .and_then(|db| db.schemas_by_id.get(schema_id)),
1647            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1648                unreachable!("temporary schemas are in the ambient database")
1649            }
1650        }
1651    }
1652
1653    pub fn get_schema(
1654        &self,
1655        database_spec: &ResolvedDatabaseSpecifier,
1656        schema_spec: &SchemaSpecifier,
1657        conn_id: &ConnectionId,
1658    ) -> &Schema {
1659        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1660        self.try_get_schema(database_spec, schema_spec, conn_id)
1661            .expect("schema must exist")
1662    }
1663
1664    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1665        self.database_by_id
1666            .values()
1667            .filter_map(|database| database.schemas_by_id.get(schema_id))
1668            .chain(self.ambient_schemas_by_id.values())
1669            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1670            .into_first()
1671    }
1672
1673    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1674        self.temporary_schemas
1675            .values()
1676            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1677            .into_first()
1678    }
1679
1680    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1681        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1682    }
1683
1684    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1685        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1686    }
1687
1688    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1689        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1690    }
1691
1692    pub fn get_information_schema_id(&self) -> SchemaId {
1693        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1694    }
1695
1696    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1697        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1698    }
1699
1700    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1701        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1702    }
1703
1704    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1705        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1706    }
1707
1708    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1709        SYSTEM_SCHEMAS
1710            .iter()
1711            .map(|name| self.ambient_schemas_by_name[*name])
1712    }
1713
1714    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1715        self.system_schema_ids().contains(&id)
1716    }
1717
1718    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1719        match spec {
1720            SchemaSpecifier::Temporary => false,
1721            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1722        }
1723    }
1724
1725    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1726        UNSTABLE_SCHEMAS
1727            .iter()
1728            .map(|name| self.ambient_schemas_by_name[*name])
1729    }
1730
1731    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1732        self.unstable_schema_ids().contains(&id)
1733    }
1734
1735    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1736        match spec {
1737            SchemaSpecifier::Temporary => false,
1738            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1739        }
1740    }
1741
1742    /// Creates a new schema in the `Catalog` for temporary items
1743    /// indicated by the TEMPORARY or TEMP keywords.
1744    pub fn create_temporary_schema(
1745        &mut self,
1746        conn_id: &ConnectionId,
1747        owner_id: RoleId,
1748    ) -> Result<(), Error> {
1749        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1750        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1751        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1752        // all -1.
1753        let oid = INVALID_OID;
1754        self.temporary_schemas.insert(
1755            conn_id.clone(),
1756            Schema {
1757                name: QualifiedSchemaName {
1758                    database: ResolvedDatabaseSpecifier::Ambient,
1759                    schema: MZ_TEMP_SCHEMA.into(),
1760                },
1761                id: SchemaSpecifier::Temporary,
1762                oid,
1763                items: BTreeMap::new(),
1764                functions: BTreeMap::new(),
1765                types: BTreeMap::new(),
1766                owner_id,
1767                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1768                    mz_sql::catalog::ObjectType::Schema,
1769                    owner_id,
1770                )]),
1771            },
1772        );
1773        Ok(())
1774    }
1775
1776    /// Return all OIDs that are allocated to temporary objects.
1777    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1778        std::iter::empty()
1779            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1780                if schema.id.is_temporary() {
1781                    Some(schema.oid)
1782                } else {
1783                    None
1784                }
1785            }))
1786            .chain(self.entry_by_id.values().filter_map(|entry| {
1787                if entry.item().is_temporary() {
1788                    Some(entry.oid)
1789                } else {
1790                    None
1791                }
1792            }))
1793    }
1794
1795    /// Optimized lookup for a builtin table.
1796    ///
1797    /// Panics if the builtin table doesn't exist in the catalog.
1798    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1799        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1800    }
1801
1802    /// Optimized lookup for a builtin log.
1803    ///
1804    /// Panics if the builtin log doesn't exist in the catalog.
1805    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1806        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1807        let log = match self.get_entry(&item_id).item() {
1808            CatalogItem::Log(log) => log,
1809            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1810        };
1811        (item_id, log.global_id)
1812    }
1813
1814    /// Optimized lookup for a builtin storage collection.
1815    ///
1816    /// Panics if the builtin storage collection doesn't exist in the catalog.
1817    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1818        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1819    }
1820
1821    /// Optimized lookup for a builtin object.
1822    ///
1823    /// Panics if the builtin object doesn't exist in the catalog.
1824    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1825        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1826        let schema = &self.ambient_schemas_by_id[schema_id];
1827        match builtin.catalog_item_type() {
1828            CatalogItemType::Type => schema.types[builtin.name()],
1829            CatalogItemType::Func => schema.functions[builtin.name()],
1830            CatalogItemType::Table
1831            | CatalogItemType::Source
1832            | CatalogItemType::Sink
1833            | CatalogItemType::View
1834            | CatalogItemType::MaterializedView
1835            | CatalogItemType::Index
1836            | CatalogItemType::Secret
1837            | CatalogItemType::Connection
1838            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1839        }
1840    }
1841
1842    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1843    pub fn resolve_builtin_type_references(
1844        &self,
1845        builtin: &BuiltinType<NameReference>,
1846    ) -> BuiltinType<IdReference> {
1847        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1848            CatalogType::AclItem => CatalogType::AclItem,
1849            CatalogType::Array { element_reference } => CatalogType::Array {
1850                element_reference: self.get_system_type(element_reference).id,
1851            },
1852            CatalogType::List {
1853                element_reference,
1854                element_modifiers,
1855            } => CatalogType::List {
1856                element_reference: self.get_system_type(element_reference).id,
1857                element_modifiers: element_modifiers.clone(),
1858            },
1859            CatalogType::Map {
1860                key_reference,
1861                value_reference,
1862                key_modifiers,
1863                value_modifiers,
1864            } => CatalogType::Map {
1865                key_reference: self.get_system_type(key_reference).id,
1866                value_reference: self.get_system_type(value_reference).id,
1867                key_modifiers: key_modifiers.clone(),
1868                value_modifiers: value_modifiers.clone(),
1869            },
1870            CatalogType::Range { element_reference } => CatalogType::Range {
1871                element_reference: self.get_system_type(element_reference).id,
1872            },
1873            CatalogType::Record { fields } => CatalogType::Record {
1874                fields: fields
1875                    .into_iter()
1876                    .map(|f| CatalogRecordField {
1877                        name: f.name.clone(),
1878                        type_reference: self.get_system_type(f.type_reference).id,
1879                        type_modifiers: f.type_modifiers.clone(),
1880                    })
1881                    .collect(),
1882            },
1883            CatalogType::Bool => CatalogType::Bool,
1884            CatalogType::Bytes => CatalogType::Bytes,
1885            CatalogType::Char => CatalogType::Char,
1886            CatalogType::Date => CatalogType::Date,
1887            CatalogType::Float32 => CatalogType::Float32,
1888            CatalogType::Float64 => CatalogType::Float64,
1889            CatalogType::Int16 => CatalogType::Int16,
1890            CatalogType::Int32 => CatalogType::Int32,
1891            CatalogType::Int64 => CatalogType::Int64,
1892            CatalogType::UInt16 => CatalogType::UInt16,
1893            CatalogType::UInt32 => CatalogType::UInt32,
1894            CatalogType::UInt64 => CatalogType::UInt64,
1895            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1896            CatalogType::Interval => CatalogType::Interval,
1897            CatalogType::Jsonb => CatalogType::Jsonb,
1898            CatalogType::Numeric => CatalogType::Numeric,
1899            CatalogType::Oid => CatalogType::Oid,
1900            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1901            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1902            CatalogType::Pseudo => CatalogType::Pseudo,
1903            CatalogType::RegClass => CatalogType::RegClass,
1904            CatalogType::RegProc => CatalogType::RegProc,
1905            CatalogType::RegType => CatalogType::RegType,
1906            CatalogType::String => CatalogType::String,
1907            CatalogType::Time => CatalogType::Time,
1908            CatalogType::Timestamp => CatalogType::Timestamp,
1909            CatalogType::TimestampTz => CatalogType::TimestampTz,
1910            CatalogType::Uuid => CatalogType::Uuid,
1911            CatalogType::VarChar => CatalogType::VarChar,
1912            CatalogType::Int2Vector => CatalogType::Int2Vector,
1913            CatalogType::MzAclItem => CatalogType::MzAclItem,
1914        };
1915
1916        BuiltinType {
1917            name: builtin.name,
1918            schema: builtin.schema,
1919            oid: builtin.oid,
1920            details: CatalogTypeDetails {
1921                array_id: builtin.details.array_id,
1922                typ,
1923                pg_metadata: builtin.details.pg_metadata.clone(),
1924            },
1925        }
1926    }
1927
1928    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1929        &self.config
1930    }
1931
1932    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1933        match self.database_by_name.get(database_name) {
1934            Some(id) => Ok(&self.database_by_id[id]),
1935            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1936        }
1937    }
1938
1939    pub fn resolve_schema(
1940        &self,
1941        current_database: Option<&DatabaseId>,
1942        database_name: Option<&str>,
1943        schema_name: &str,
1944        conn_id: &ConnectionId,
1945    ) -> Result<&Schema, SqlCatalogError> {
1946        let database_spec = match database_name {
1947            // If a database is explicitly specified, validate it. Note that we
1948            // intentionally do not validate `current_database` to permit
1949            // querying `mz_catalog` with an invalid session database, e.g., so
1950            // that you can run `SHOW DATABASES` to *find* a valid database.
1951            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1952                self.resolve_database(database)?.id().clone(),
1953            )),
1954            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1955        };
1956
1957        // First try to find the schema in the named database.
1958        if let Some(database_spec) = database_spec {
1959            if let Ok(schema) =
1960                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1961            {
1962                return Ok(schema);
1963            }
1964        }
1965
1966        // Then fall back to the ambient database.
1967        if let Ok(schema) = self.resolve_schema_in_database(
1968            &ResolvedDatabaseSpecifier::Ambient,
1969            schema_name,
1970            conn_id,
1971        ) {
1972            return Ok(schema);
1973        }
1974
1975        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1976    }
1977
1978    /// Optimized lookup for a system schema.
1979    ///
1980    /// Panics if the system schema doesn't exist in the catalog.
1981    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1982        self.ambient_schemas_by_name[name]
1983    }
1984
1985    pub fn resolve_search_path(
1986        &self,
1987        session: &dyn SessionMetadata,
1988    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1989        let database = self
1990            .database_by_name
1991            .get(session.database())
1992            .map(|id| id.clone());
1993
1994        session
1995            .search_path()
1996            .iter()
1997            .map(|schema| {
1998                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1999            })
2000            .filter_map(|schema| schema.ok())
2001            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2002            .collect()
2003    }
2004
2005    pub fn effective_search_path(
2006        &self,
2007        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2008        include_temp_schema: bool,
2009    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2010        let mut v = Vec::with_capacity(search_path.len() + 3);
2011        // Temp schema is only included for relations and data types, not for functions and operators
2012        let temp_schema = (
2013            ResolvedDatabaseSpecifier::Ambient,
2014            SchemaSpecifier::Temporary,
2015        );
2016        if include_temp_schema && !search_path.contains(&temp_schema) {
2017            v.push(temp_schema);
2018        }
2019        let default_schemas = [
2020            (
2021                ResolvedDatabaseSpecifier::Ambient,
2022                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2023            ),
2024            (
2025                ResolvedDatabaseSpecifier::Ambient,
2026                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2027            ),
2028        ];
2029        for schema in default_schemas.into_iter() {
2030            if !search_path.contains(&schema) {
2031                v.push(schema);
2032            }
2033        }
2034        v.extend_from_slice(search_path);
2035        v
2036    }
2037
2038    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2039        let id = self
2040            .clusters_by_name
2041            .get(name)
2042            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2043        Ok(&self.clusters_by_id[id])
2044    }
2045
2046    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2047        let id = self
2048            .clusters_by_name
2049            .get(cluster.name)
2050            .expect("failed to lookup BuiltinCluster by name");
2051        self.clusters_by_id
2052            .get(id)
2053            .expect("failed to lookup BuiltinCluster by ID")
2054    }
2055
2056    pub fn resolve_cluster_replica(
2057        &self,
2058        cluster_replica_name: &QualifiedReplica,
2059    ) -> Result<&ClusterReplica, SqlCatalogError> {
2060        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2061        let replica_name = cluster_replica_name.replica.as_str();
2062        let replica_id = cluster
2063            .replica_id(replica_name)
2064            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2065        Ok(cluster.replica(replica_id).expect("Must exist"))
2066    }
2067
2068    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2069    ///
2070    /// If `name` does not specify a database, the `current_database` is used.
2071    /// If `name` does not specify a schema, then the schemas in `search_path`
2072    /// are searched in order.
2073    #[allow(clippy::useless_let_if_seq)]
2074    pub fn resolve(
2075        &self,
2076        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2077        current_database: Option<&DatabaseId>,
2078        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2079        name: &PartialItemName,
2080        conn_id: &ConnectionId,
2081        err_gen: fn(String) -> SqlCatalogError,
2082    ) -> Result<&CatalogEntry, SqlCatalogError> {
2083        // If a schema name was specified, just try to find the item in that
2084        // schema. If no schema was specified, try to find the item in the connection's
2085        // temporary schema. If the item is not found, try to find the item in every
2086        // schema in the search path.
2087        let schemas = match &name.schema {
2088            Some(schema_name) => {
2089                match self.resolve_schema(
2090                    current_database,
2091                    name.database.as_deref(),
2092                    schema_name,
2093                    conn_id,
2094                ) {
2095                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2096                    Err(e) => return Err(e),
2097                }
2098            }
2099            None => match self
2100                .try_get_schema(
2101                    &ResolvedDatabaseSpecifier::Ambient,
2102                    &SchemaSpecifier::Temporary,
2103                    conn_id,
2104                )
2105                .and_then(|schema| schema.items.get(&name.item))
2106            {
2107                Some(id) => return Ok(self.get_entry(id)),
2108                None => search_path.to_vec(),
2109            },
2110        };
2111
2112        for (database_spec, schema_spec) in &schemas {
2113            // Use try_get_schema because the temp schema might not exist yet
2114            // (it's created lazily when the first temp object is created).
2115            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2116                continue;
2117            };
2118
2119            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2120                return Ok(&self.entry_by_id[id]);
2121            }
2122        }
2123
2124        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2125        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2126        // automatically let uses of the old schema resolve to the new ones as well.
2127        // TODO(database-issues#8173) remove this after sufficient time has passed
2128        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2129        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2130            for schema_id in [
2131                self.get_mz_catalog_unstable_schema_id(),
2132                self.get_mz_introspection_schema_id(),
2133            ] {
2134                let schema = self.get_schema(
2135                    &ResolvedDatabaseSpecifier::Ambient,
2136                    &SchemaSpecifier::Id(schema_id),
2137                    conn_id,
2138                );
2139
2140                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2141                    debug!(
2142                        github_27831 = true,
2143                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2144                    );
2145                    return Ok(&self.entry_by_id[id]);
2146                }
2147            }
2148        }
2149
2150        Err(err_gen(name.to_string()))
2151    }
2152
2153    /// Resolves `name` to a non-function [`CatalogEntry`].
2154    pub fn resolve_entry(
2155        &self,
2156        current_database: Option<&DatabaseId>,
2157        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2158        name: &PartialItemName,
2159        conn_id: &ConnectionId,
2160    ) -> Result<&CatalogEntry, SqlCatalogError> {
2161        self.resolve(
2162            |schema| &schema.items,
2163            current_database,
2164            search_path,
2165            name,
2166            conn_id,
2167            SqlCatalogError::UnknownItem,
2168        )
2169    }
2170
2171    /// Resolves `name` to a function [`CatalogEntry`].
2172    pub fn resolve_function(
2173        &self,
2174        current_database: Option<&DatabaseId>,
2175        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2176        name: &PartialItemName,
2177        conn_id: &ConnectionId,
2178    ) -> Result<&CatalogEntry, SqlCatalogError> {
2179        self.resolve(
2180            |schema| &schema.functions,
2181            current_database,
2182            search_path,
2183            name,
2184            conn_id,
2185            |name| SqlCatalogError::UnknownFunction {
2186                name,
2187                alternative: None,
2188            },
2189        )
2190    }
2191
2192    /// Resolves `name` to a type [`CatalogEntry`].
2193    pub fn resolve_type(
2194        &self,
2195        current_database: Option<&DatabaseId>,
2196        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2197        name: &PartialItemName,
2198        conn_id: &ConnectionId,
2199    ) -> Result<&CatalogEntry, SqlCatalogError> {
2200        static NON_PG_CATALOG_TYPES: LazyLock<
2201            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2202        > = LazyLock::new(|| {
2203            BUILTINS::types()
2204                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2205                .map(|typ| (typ.name, typ))
2206                .collect()
2207        });
2208
2209        let entry = self.resolve(
2210            |schema| &schema.types,
2211            current_database,
2212            search_path,
2213            name,
2214            conn_id,
2215            |name| SqlCatalogError::UnknownType { name },
2216        )?;
2217
2218        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2219            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2220                warn!(
2221                    "user specified an incorrect schema of {} for the type {}, which should be in \
2222                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2223                    PG_CATALOG_SCHEMA.quoted(),
2224                    typ.name.quoted(),
2225                    typ.schema.quoted(),
2226                )
2227            }
2228        }
2229
2230        Ok(entry)
2231    }
2232
2233    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2234    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2235        match object_id {
2236            ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2237            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2238            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2239            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2240            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2241            ObjectId::ClusterReplica(cluster_replica_id) => {
2242                CommentObjectId::ClusterReplica(cluster_replica_id)
2243            }
2244            ObjectId::NetworkPolicy(network_policy_id) => {
2245                CommentObjectId::NetworkPolicy(network_policy_id)
2246            }
2247        }
2248    }
2249
2250    /// Return current system configuration.
2251    pub fn system_config(&self) -> &SystemVars {
2252        &self.system_configuration
2253    }
2254
2255    /// Return a mutable reference to the current system configuration.
2256    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2257        Arc::make_mut(&mut self.system_configuration)
2258    }
2259
2260    /// Serializes the catalog's in-memory state.
2261    ///
2262    /// There are no guarantees about the format of the serialized state, except
2263    /// that the serialized state for two identical catalogs will compare
2264    /// identically.
2265    ///
2266    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2267    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2268    /// argument.
2269    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2270        // Dump the base catalog.
2271        let mut dump = serde_json::to_value(&self).map_err(|e| {
2272            Error::new(ErrorKind::Unstructured(format!(
2273                // Don't panic here because we don't have compile-time failures for maps with
2274                // non-string keys.
2275                "internal error: could not dump catalog: {}",
2276                e
2277            )))
2278        })?;
2279
2280        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2281        // Stitch in system parameter defaults.
2282        dump_obj.insert(
2283            "system_parameter_defaults".into(),
2284            serde_json::json!(self.system_config().defaults()),
2285        );
2286        // Potentially overwrite unfinalized shards.
2287        if let Some(unfinalized_shards) = unfinalized_shards {
2288            dump_obj
2289                .get_mut("storage_metadata")
2290                .expect("known to exist")
2291                .as_object_mut()
2292                .expect("storage_metadata is an object")
2293                .insert(
2294                    "unfinalized_shards".into(),
2295                    serde_json::json!(unfinalized_shards),
2296                );
2297        }
2298        // Remove GlobalIds for temporary objects from the mapping.
2299        //
2300        // Post-test consistency checks with the durable catalog don't know about temporary items
2301        // since they're kept entirely in memory.
2302        let temporary_gids: Vec<_> = self
2303            .entry_by_global_id
2304            .iter()
2305            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2306            .map(|(gid, _item_id)| *gid)
2307            .collect();
2308        if !temporary_gids.is_empty() {
2309            let gids = dump_obj
2310                .get_mut("entry_by_global_id")
2311                .expect("known_to_exist")
2312                .as_object_mut()
2313                .expect("entry_by_global_id is an object");
2314            for gid in temporary_gids {
2315                gids.remove(&gid.to_string());
2316            }
2317        }
2318        // We exclude role_auth_by_id because it contains password information
2319        // which should not be included in the dump.
2320        dump_obj.remove("role_auth_by_id");
2321
2322        // Emit as pretty-printed JSON.
2323        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2324    }
2325
2326    pub fn availability_zones(&self) -> &[String] {
2327        &self.availability_zones
2328    }
2329
2330    pub fn concretize_replica_location(
2331        &self,
2332        location: mz_catalog::durable::ReplicaLocation,
2333        allowed_sizes: &Vec<String>,
2334        allowed_availability_zones: Option<&[String]>,
2335    ) -> Result<ReplicaLocation, Error> {
2336        let location = match location {
2337            mz_catalog::durable::ReplicaLocation::Unmanaged {
2338                storagectl_addrs,
2339                computectl_addrs,
2340            } => {
2341                if allowed_availability_zones.is_some() {
2342                    return Err(Error {
2343                        kind: ErrorKind::Internal(
2344                            "tried concretize unmanaged replica with specific availability_zones"
2345                                .to_string(),
2346                        ),
2347                    });
2348                }
2349                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2350                    storagectl_addrs,
2351                    computectl_addrs,
2352                })
2353            }
2354            mz_catalog::durable::ReplicaLocation::Managed {
2355                size,
2356                availability_zone,
2357                billed_as,
2358                internal,
2359                pending,
2360            } => {
2361                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2362                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2363                    return Err(Error {
2364                        kind: ErrorKind::Internal(message.to_string()),
2365                    });
2366                }
2367                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2368                let cluster_replica_sizes = &self.cluster_replica_sizes;
2369
2370                ReplicaLocation::Managed(ManagedReplicaLocation {
2371                    allocation: cluster_replica_sizes
2372                        .0
2373                        .get(&size)
2374                        .expect("catalog out of sync")
2375                        .clone(),
2376                    availability_zones: match (availability_zone, allowed_availability_zones) {
2377                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2378                        (None, Some(azs)) if azs.is_empty() => {
2379                            ManagedReplicaAvailabilityZones::FromCluster(None)
2380                        }
2381                        (None, Some(azs)) => {
2382                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2383                        }
2384                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2385                    },
2386                    size,
2387                    billed_as,
2388                    internal,
2389                    pending,
2390                })
2391            }
2392        };
2393        Ok(location)
2394    }
2395
2396    /// Return whether the given replica size requests a disk.
2397    ///
2398    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2399    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2400    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2401    /// not something we can know at this point.
2402    ///
2403    /// # Panics
2404    ///
2405    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2406    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2407        let alloc = &self.cluster_replica_sizes.0[size];
2408        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2409    }
2410
2411    pub(crate) fn ensure_valid_replica_size(
2412        &self,
2413        allowed_sizes: &[String],
2414        size: &String,
2415    ) -> Result<(), Error> {
2416        let cluster_replica_sizes = &self.cluster_replica_sizes;
2417
2418        if !cluster_replica_sizes.0.contains_key(size)
2419            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2420            || cluster_replica_sizes.0[size].disabled
2421        {
2422            let mut entries = cluster_replica_sizes
2423                .enabled_allocations()
2424                .collect::<Vec<_>>();
2425
2426            if !allowed_sizes.is_empty() {
2427                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2428                entries.retain(|(name, _)| allowed_sizes.contains(name));
2429            }
2430
2431            entries.sort_by_key(
2432                |(
2433                    _name,
2434                    ReplicaAllocation {
2435                        scale, cpu_limit, ..
2436                    },
2437                )| (scale, cpu_limit),
2438            );
2439
2440            Err(Error {
2441                kind: ErrorKind::InvalidClusterReplicaSize {
2442                    size: size.to_owned(),
2443                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2444                },
2445            })
2446        } else {
2447            Ok(())
2448        }
2449    }
2450
2451    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2452        if role_id.is_builtin() {
2453            let role = self.get_role(role_id);
2454            Err(Error::new(ErrorKind::ReservedRoleName(
2455                role.name().to_string(),
2456            )))
2457        } else {
2458            Ok(())
2459        }
2460    }
2461
2462    pub fn ensure_not_reserved_network_policy(
2463        &self,
2464        network_policy_id: &NetworkPolicyId,
2465    ) -> Result<(), Error> {
2466        if network_policy_id.is_builtin() {
2467            let policy = self.get_network_policy(network_policy_id);
2468            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2469                policy.name.clone(),
2470            )))
2471        } else {
2472            Ok(())
2473        }
2474    }
2475
2476    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2477        let is_grantable = !role_id.is_public() && !role_id.is_system();
2478        if is_grantable {
2479            Ok(())
2480        } else {
2481            let role = self.get_role(role_id);
2482            Err(Error::new(ErrorKind::UngrantableRoleName(
2483                role.name().to_string(),
2484            )))
2485        }
2486    }
2487
2488    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2489        if role_id.is_system() {
2490            let role = self.get_role(role_id);
2491            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2492                role.name().to_string(),
2493            )))
2494        } else {
2495            Ok(())
2496        }
2497    }
2498
2499    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2500        if role_id.is_predefined() {
2501            let role = self.get_role(role_id);
2502            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2503                role.name().to_string(),
2504            )))
2505        } else {
2506            Ok(())
2507        }
2508    }
2509
2510    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2511    // passing tx, and session?
2512    pub(crate) fn add_to_audit_log(
2513        system_configuration: &SystemVars,
2514        oracle_write_ts: mz_repr::Timestamp,
2515        session: Option<&ConnMeta>,
2516        tx: &mut mz_catalog::durable::Transaction,
2517        audit_events: &mut Vec<VersionedEvent>,
2518        event_type: EventType,
2519        object_type: ObjectType,
2520        details: EventDetails,
2521    ) -> Result<(), Error> {
2522        let user = session.map(|session| session.user().name.to_string());
2523
2524        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2525
2526        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2527            Some(ts) => ts.into(),
2528            _ => oracle_write_ts.into(),
2529        };
2530        let id = tx.allocate_audit_log_id()?;
2531        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2532        audit_events.push(event.clone());
2533        tx.insert_audit_log_event(event);
2534        Ok(())
2535    }
2536
2537    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2538        match id {
2539            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2540            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2541                self.get_cluster_replica(*cluster_id, *replica_id)
2542                    .owner_id(),
2543            ),
2544            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2545            ObjectId::Schema((database_spec, schema_spec)) => Some(
2546                self.get_schema(database_spec, schema_spec, conn_id)
2547                    .owner_id(),
2548            ),
2549            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2550            ObjectId::Role(_) => None,
2551            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2552        }
2553    }
2554
2555    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2556        match object_id {
2557            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2558            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2559            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2560            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2561            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2562            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2563            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2564        }
2565    }
2566
2567    pub(super) fn get_system_object_type(
2568        &self,
2569        id: &SystemObjectId,
2570    ) -> mz_sql::catalog::SystemObjectType {
2571        match id {
2572            SystemObjectId::Object(object_id) => {
2573                SystemObjectType::Object(self.get_object_type(object_id))
2574            }
2575            SystemObjectId::System => SystemObjectType::System,
2576        }
2577    }
2578
2579    /// Returns a read-only view of the current [`StorageMetadata`].
2580    ///
2581    /// To write to this struct, you must use a catalog transaction.
2582    pub fn storage_metadata(&self) -> &StorageMetadata {
2583        &self.storage_metadata
2584    }
2585
2586    /// For the Sources ids in `ids`, return their compaction windows.
2587    pub fn source_compaction_windows(
2588        &self,
2589        ids: impl IntoIterator<Item = CatalogItemId>,
2590    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2591        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2592        let mut seen = BTreeSet::new();
2593        for item_id in ids {
2594            if !seen.insert(item_id) {
2595                continue;
2596            }
2597            let entry = self.get_entry(&item_id);
2598            match entry.item() {
2599                CatalogItem::Source(source) => {
2600                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2601                    cws.entry(source_cw).or_default().insert(item_id);
2602                }
2603                CatalogItem::Table(table) => {
2604                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2605                    match &table.data_source {
2606                        TableDataSource::DataSource {
2607                            desc:
2608                                DataSourceDesc::IngestionExport { .. }
2609                                // Also match webhook tables (source-to-table migration).
2610                                | DataSourceDesc::Webhook { .. },
2611                            timeline: _,
2612                        } => {
2613                            cws.entry(table_cw).or_default().insert(item_id);
2614                        }
2615                        // Regular tables handle compaction directly in
2616                        // catalog_implications, not through this function.
2617                        TableDataSource::TableWrites { .. } => {}
2618                        TableDataSource::DataSource {
2619                            desc:
2620                                DataSourceDesc::Ingestion { .. }
2621                                | DataSourceDesc::OldSyntaxIngestion { .. }
2622                                | DataSourceDesc::Introspection(_)
2623                                | DataSourceDesc::Progress
2624                                | DataSourceDesc::Catalog,
2625                            ..
2626                        } => {
2627                            unreachable!(
2628                                "unexpected DataSourceDesc for table {item_id}: {:?}",
2629                                table.data_source
2630                            )
2631                        }
2632                    }
2633                }
2634                _ => {
2635                    // Views could depend on sources, so ignore them if added by used_by above.
2636                    continue;
2637                }
2638            }
2639        }
2640        cws
2641    }
2642
2643    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2644        match id {
2645            CommentObjectId::Table(id)
2646            | CommentObjectId::View(id)
2647            | CommentObjectId::MaterializedView(id)
2648            | CommentObjectId::Source(id)
2649            | CommentObjectId::Sink(id)
2650            | CommentObjectId::Index(id)
2651            | CommentObjectId::Func(id)
2652            | CommentObjectId::Connection(id)
2653            | CommentObjectId::Type(id)
2654            | CommentObjectId::Secret(id)
2655            | CommentObjectId::ContinualTask(id) => Some(*id),
2656            CommentObjectId::Role(_)
2657            | CommentObjectId::Database(_)
2658            | CommentObjectId::Schema(_)
2659            | CommentObjectId::Cluster(_)
2660            | CommentObjectId::ClusterReplica(_)
2661            | CommentObjectId::NetworkPolicy(_) => None,
2662        }
2663    }
2664
2665    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2666        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2667    }
2668
2669    pub fn comment_id_to_audit_log_name(
2670        &self,
2671        id: CommentObjectId,
2672        conn_id: &ConnectionId,
2673    ) -> String {
2674        match id {
2675            CommentObjectId::Table(id)
2676            | CommentObjectId::View(id)
2677            | CommentObjectId::MaterializedView(id)
2678            | CommentObjectId::Source(id)
2679            | CommentObjectId::Sink(id)
2680            | CommentObjectId::Index(id)
2681            | CommentObjectId::Func(id)
2682            | CommentObjectId::Connection(id)
2683            | CommentObjectId::Type(id)
2684            | CommentObjectId::Secret(id)
2685            | CommentObjectId::ContinualTask(id) => {
2686                let item = self.get_entry(&id);
2687                let name = self.resolve_full_name(item.name(), Some(conn_id));
2688                name.to_string()
2689            }
2690            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2691            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2692            CommentObjectId::Schema((spec, schema_id)) => {
2693                let schema = self.get_schema(&spec, &schema_id, conn_id);
2694                self.resolve_full_schema_name(&schema.name).to_string()
2695            }
2696            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2697            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2698                let cluster = self.get_cluster(cluster_id);
2699                let replica = self.get_cluster_replica(cluster_id, replica_id);
2700                QualifiedReplica {
2701                    cluster: Ident::new_unchecked(cluster.name.clone()),
2702                    replica: Ident::new_unchecked(replica.name.clone()),
2703                }
2704                .to_string()
2705            }
2706            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2707        }
2708    }
2709
2710    pub fn mock_authentication_nonce(&self) -> String {
2711        self.mock_authentication_nonce.clone().unwrap_or_default()
2712    }
2713}
2714
2715impl ConnectionResolver for CatalogState {
2716    fn resolve_connection(
2717        &self,
2718        id: CatalogItemId,
2719    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2720        use mz_storage_types::connections::Connection::*;
2721        match self
2722            .get_entry(&id)
2723            .connection()
2724            .expect("catalog out of sync")
2725            .details
2726            .to_connection()
2727        {
2728            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2729            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2730            Csr(conn) => Csr(conn.into_inline_connection(self)),
2731            Ssh(conn) => Ssh(conn),
2732            Aws(conn) => Aws(conn),
2733            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2734            MySql(conn) => MySql(conn.into_inline_connection(self)),
2735            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2736            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2737        }
2738    }
2739}
2740
2741impl OptimizerCatalog for CatalogState {
2742    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2743        CatalogState::get_entry_by_global_id(self, id)
2744    }
2745    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2746        CatalogState::get_entry(self, id)
2747    }
2748    fn resolve_full_name(
2749        &self,
2750        name: &QualifiedItemName,
2751        conn_id: Option<&ConnectionId>,
2752    ) -> FullItemName {
2753        CatalogState::resolve_full_name(self, name, conn_id)
2754    }
2755    fn get_indexes_on(
2756        &self,
2757        id: GlobalId,
2758        cluster: ClusterId,
2759    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2760        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2761    }
2762}
2763
2764impl OptimizerCatalog for Catalog {
2765    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2766        self.state.get_entry_by_global_id(id)
2767    }
2768
2769    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2770        self.state.get_entry(id)
2771    }
2772
2773    fn resolve_full_name(
2774        &self,
2775        name: &QualifiedItemName,
2776        conn_id: Option<&ConnectionId>,
2777    ) -> FullItemName {
2778        self.state.resolve_full_name(name, conn_id)
2779    }
2780
2781    fn get_indexes_on(
2782        &self,
2783        id: GlobalId,
2784        cluster: ClusterId,
2785    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2786        Box::new(self.state.get_indexes_on(id, cluster))
2787    }
2788}
2789
2790impl Catalog {
2791    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2792        self
2793    }
2794}