mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
61use mz_secrets::InMemorySecretsController;
62use mz_sql::ast::Ident;
63use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68    TypeReference,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
72    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{
76    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
77    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
78    Plan, PlanContext,
79};
80use mz_sql::rbac;
81use mz_sql::session::metadata::SessionMetadata;
82use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
83use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
84use mz_sql_parser::ast::QualifiedReplica;
85use mz_storage_client::controller::StorageMetadata;
86use mz_storage_types::connections::ConnectionContext;
87use mz_storage_types::connections::inline::{
88    ConnectionResolver, InlinedConnection, IntoInlineConnection,
89};
90use serde::Serialize;
91use timely::progress::Antichain;
92use tokio::sync::mpsc;
93use tracing::{debug, warn};
94
95// DO NOT add any more imports from `crate` outside of `crate::catalog`.
96use crate::AdapterError;
97use crate::catalog::{Catalog, ConnCatalog};
98use crate::coord::ConnMeta;
99use crate::optimize::{self, Optimize, OptimizerCatalog};
100use crate::session::Session;
101
102/// The in-memory representation of the Catalog. This struct is not directly used to persist
103/// metadata to persistent storage. For persistent metadata see
104/// [`mz_catalog::durable::DurableCatalogState`].
105///
106/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
107/// storing the contents of this struct on disk.
108#[derive(Debug, Clone, Serialize)]
109pub struct CatalogState {
110    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
111    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
112    // include:
113    //  - Temporary items.
114    //  - Certain objects are partially derived from read-only state.
115    pub(super) database_by_name: BTreeMap<String, DatabaseId>,
116    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
117    pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
118    #[serde(serialize_with = "skip_temp_items")]
119    pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
120    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121    pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
122    pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
125    pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
128    pub(super) roles_by_name: BTreeMap<String, RoleId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) roles_by_id: BTreeMap<RoleId, Role>,
131    pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
134    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
135    pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
136
137    #[serde(skip)]
138    pub(super) system_configuration: SystemVars,
139    pub(super) default_privileges: DefaultPrivileges,
140    pub(super) system_privileges: PrivilegeMap,
141    pub(super) comments: CommentsMap,
142    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
143    pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
144    pub(super) storage_metadata: StorageMetadata,
145    pub(super) mock_authentication_nonce: Option<String>,
146
147    // Mutable state not derived from the durable catalog.
148    #[serde(skip)]
149    pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
150
151    // Read-only state not derived from the durable catalog.
152    #[serde(skip)]
153    pub(super) config: mz_sql::catalog::CatalogConfig,
154    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
155    #[serde(skip)]
156    pub(crate) availability_zones: Vec<String>,
157
158    // Read-only not derived from the durable catalog.
159    #[serde(skip)]
160    pub(super) egress_addresses: Vec<IpNet>,
161    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
162    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
163    pub(super) http_host_name: Option<String>,
164
165    // Read-only not derived from the durable catalog.
166    #[serde(skip)]
167    pub(super) license_key: ValidatedLicenseKey,
168}
169
170/// Keeps track of what expressions are cached or not during startup.
171#[derive(Debug, Clone, Serialize)]
172pub(crate) enum LocalExpressionCache {
173    /// The cache is being used.
174    Open {
175        /// The local expressions that were cached in the expression cache.
176        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
177        /// The local expressions that were NOT cached in the expression cache.
178        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
179    },
180    /// The cache is not being used.
181    Closed,
182}
183
184impl LocalExpressionCache {
185    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
186        Self::Open {
187            cached_exprs,
188            uncached_exprs: BTreeMap::new(),
189        }
190    }
191
192    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
193        match self {
194            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
195            LocalExpressionCache::Closed => None,
196        }
197    }
198
199    /// Insert an expression that was cached, back into the cache. This is generally needed when
200    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
201    pub(super) fn insert_cached_expression(
202        &mut self,
203        id: GlobalId,
204        local_expressions: LocalExpressions,
205    ) {
206        match self {
207            LocalExpressionCache::Open { cached_exprs, .. } => {
208                cached_exprs.insert(id, local_expressions);
209            }
210            LocalExpressionCache::Closed => {}
211        }
212    }
213
214    /// Inform the cache that `id` was not found in the cache and that we should add it as
215    /// `local_mir` and `optimizer_features`.
216    pub(super) fn insert_uncached_expression(
217        &mut self,
218        id: GlobalId,
219        local_mir: OptimizedMirRelationExpr,
220        optimizer_features: OptimizerFeatures,
221    ) {
222        match self {
223            LocalExpressionCache::Open { uncached_exprs, .. } => {
224                let local_expr = LocalExpressions {
225                    local_mir,
226                    optimizer_features,
227                };
228                // If we are trying to cache the same item a second time, with a different
229                // expression, then we must be migrating the object or doing something else weird.
230                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
231                // version after a restart. Caching the migrated version may cause us to incorrectly
232                // think that the object has already been migrated. To simplify things, we cache
233                // neither.
234                let prev = uncached_exprs.remove(&id);
235                match prev {
236                    Some(prev) if prev == local_expr => {
237                        uncached_exprs.insert(id, local_expr);
238                    }
239                    None => {
240                        uncached_exprs.insert(id, local_expr);
241                    }
242                    Some(_) => {}
243                }
244            }
245            LocalExpressionCache::Closed => {}
246        }
247    }
248
249    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
250        match self {
251            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
252            LocalExpressionCache::Closed => BTreeMap::new(),
253        }
254    }
255}
256
257fn skip_temp_items<S>(
258    entries: &BTreeMap<CatalogItemId, CatalogEntry>,
259    serializer: S,
260) -> Result<S::Ok, S::Error>
261where
262    S: serde::Serializer,
263{
264    mz_ore::serde::map_key_to_string(
265        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
266        serializer,
267    )
268}
269
270impl CatalogState {
271    /// Returns an empty [`CatalogState`] that can be used in tests.
272    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
273    // tests are structured in this repository.
274    pub fn empty_test() -> Self {
275        CatalogState {
276            database_by_name: Default::default(),
277            database_by_id: Default::default(),
278            entry_by_id: Default::default(),
279            entry_by_global_id: Default::default(),
280            ambient_schemas_by_name: Default::default(),
281            ambient_schemas_by_id: Default::default(),
282            temporary_schemas: Default::default(),
283            clusters_by_id: Default::default(),
284            clusters_by_name: Default::default(),
285            network_policies_by_name: Default::default(),
286            roles_by_name: Default::default(),
287            roles_by_id: Default::default(),
288            network_policies_by_id: Default::default(),
289            role_auth_by_id: Default::default(),
290            config: CatalogConfig {
291                start_time: Default::default(),
292                start_instant: Instant::now(),
293                nonce: Default::default(),
294                environment_id: EnvironmentId::for_tests(),
295                session_id: Default::default(),
296                build_info: &DUMMY_BUILD_INFO,
297                timestamp_interval: Default::default(),
298                now: NOW_ZERO.clone(),
299                connection_context: ConnectionContext::for_tests(Arc::new(
300                    InMemorySecretsController::new(),
301                )),
302                builtins_cfg: BuiltinsConfig {
303                    include_continual_tasks: true,
304                },
305                helm_chart_version: None,
306            },
307            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
308            availability_zones: Default::default(),
309            system_configuration: Default::default(),
310            egress_addresses: Default::default(),
311            aws_principal_context: Default::default(),
312            aws_privatelink_availability_zones: Default::default(),
313            http_host_name: Default::default(),
314            default_privileges: Default::default(),
315            system_privileges: Default::default(),
316            comments: Default::default(),
317            source_references: Default::default(),
318            storage_metadata: Default::default(),
319            license_key: ValidatedLicenseKey::for_tests(),
320            mock_authentication_nonce: Default::default(),
321        }
322    }
323
324    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
325        let search_path = self.resolve_search_path(session);
326        let database = self
327            .database_by_name
328            .get(session.vars().database())
329            .map(|id| id.clone());
330        let state = match session.transaction().catalog_state() {
331            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
332            None => Cow::Borrowed(self),
333        };
334        ConnCatalog {
335            state,
336            unresolvable_ids: BTreeSet::new(),
337            conn_id: session.conn_id().clone(),
338            cluster: session.vars().cluster().into(),
339            database,
340            search_path,
341            role_id: session.current_role_id().clone(),
342            prepared_statements: Some(session.prepared_statements()),
343            portals: Some(session.portals()),
344            notices_tx: session.retain_notice_transmitter(),
345        }
346    }
347
348    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
349        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
350        let cluster = self.system_configuration.default_cluster();
351
352        ConnCatalog {
353            state: Cow::Borrowed(self),
354            unresolvable_ids: BTreeSet::new(),
355            conn_id: SYSTEM_CONN_ID.clone(),
356            cluster,
357            database: self
358                .resolve_database(DEFAULT_DATABASE_NAME)
359                .ok()
360                .map(|db| db.id()),
361            // Leaving the system's search path empty allows us to catch issues
362            // where catalog object names have not been normalized correctly.
363            search_path: Vec::new(),
364            role_id,
365            prepared_statements: None,
366            portals: None,
367            notices_tx,
368        }
369    }
370
371    pub fn for_system_session(&self) -> ConnCatalog<'_> {
372        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
373    }
374
375    /// Returns an iterator over the deduplicated identifiers of all
376    /// objects this catalog entry transitively depends on (where
377    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
378    /// [`CatalogItem::references`]).
379    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
380        struct I<'a> {
381            queue: VecDeque<CatalogItemId>,
382            seen: BTreeSet<CatalogItemId>,
383            this: &'a CatalogState,
384        }
385        impl<'a> Iterator for I<'a> {
386            type Item = CatalogItemId;
387            fn next(&mut self) -> Option<Self::Item> {
388                if let Some(next) = self.queue.pop_front() {
389                    for child in self.this.get_entry(&next).item().uses() {
390                        if !self.seen.contains(&child) {
391                            self.queue.push_back(child);
392                            self.seen.insert(child);
393                        }
394                    }
395                    Some(next)
396                } else {
397                    None
398                }
399            }
400        }
401
402        I {
403            queue: [id].into_iter().collect(),
404            seen: [id].into_iter().collect(),
405            this: self,
406        }
407    }
408
409    /// Computes the IDs of any log sources this catalog entry transitively
410    /// depends on.
411    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
412        let mut out = Vec::new();
413        self.introspection_dependencies_inner(id, &mut out);
414        out
415    }
416
417    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
418        match self.get_entry(&id).item() {
419            CatalogItem::Log(_) => out.push(id),
420            item @ (CatalogItem::View(_)
421            | CatalogItem::MaterializedView(_)
422            | CatalogItem::Connection(_)
423            | CatalogItem::ContinualTask(_)) => {
424                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
425                for item_id in item.references().items() {
426                    self.introspection_dependencies_inner(*item_id, out);
427                }
428            }
429            CatalogItem::Sink(sink) => {
430                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
431                self.introspection_dependencies_inner(from_item_id, out)
432            }
433            CatalogItem::Index(idx) => {
434                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
435                self.introspection_dependencies_inner(on_item_id, out)
436            }
437            CatalogItem::Table(_)
438            | CatalogItem::Source(_)
439            | CatalogItem::Type(_)
440            | CatalogItem::Func(_)
441            | CatalogItem::Secret(_) => (),
442        }
443    }
444
445    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
446    ///
447    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
448    /// earlier in the list than the roots. This is particularly useful for the order to drop
449    /// objects.
450    pub(super) fn object_dependents(
451        &self,
452        object_ids: &Vec<ObjectId>,
453        conn_id: &ConnectionId,
454        seen: &mut BTreeSet<ObjectId>,
455    ) -> Vec<ObjectId> {
456        let mut dependents = Vec::new();
457        for object_id in object_ids {
458            match object_id {
459                ObjectId::Cluster(id) => {
460                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
461                }
462                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
463                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
464                ),
465                ObjectId::Database(id) => {
466                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
467                }
468                ObjectId::Schema((database_spec, schema_spec)) => {
469                    dependents.extend_from_slice(&self.schema_dependents(
470                        database_spec.clone(),
471                        schema_spec.clone(),
472                        conn_id,
473                        seen,
474                    ));
475                }
476                ObjectId::NetworkPolicy(id) => {
477                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
478                }
479                id @ ObjectId::Role(_) => {
480                    let unseen = seen.insert(id.clone());
481                    if unseen {
482                        dependents.push(id.clone());
483                    }
484                }
485                ObjectId::Item(id) => {
486                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
487                }
488            }
489        }
490        dependents
491    }
492
493    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
494    /// itself.
495    ///
496    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
497    /// earlier in the list than the roots. This is particularly useful for the order to drop
498    /// objects.
499    fn cluster_dependents(
500        &self,
501        cluster_id: ClusterId,
502        seen: &mut BTreeSet<ObjectId>,
503    ) -> Vec<ObjectId> {
504        let mut dependents = Vec::new();
505        let object_id = ObjectId::Cluster(cluster_id);
506        if !seen.contains(&object_id) {
507            seen.insert(object_id.clone());
508            let cluster = self.get_cluster(cluster_id);
509            for item_id in cluster.bound_objects() {
510                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
511            }
512            for replica_id in cluster.replica_ids().values() {
513                dependents.extend_from_slice(&self.cluster_replica_dependents(
514                    cluster_id,
515                    *replica_id,
516                    seen,
517                ));
518            }
519            dependents.push(object_id);
520        }
521        dependents
522    }
523
524    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
525    /// itself.
526    ///
527    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
528    /// earlier in the list than the roots. This is particularly useful for the order to drop
529    /// objects.
530    pub(super) fn cluster_replica_dependents(
531        &self,
532        cluster_id: ClusterId,
533        replica_id: ReplicaId,
534        seen: &mut BTreeSet<ObjectId>,
535    ) -> Vec<ObjectId> {
536        let mut dependents = Vec::new();
537        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
538        if !seen.contains(&object_id) {
539            seen.insert(object_id.clone());
540            dependents.push(object_id);
541        }
542        dependents
543    }
544
545    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
546    /// itself.
547    ///
548    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
549    /// earlier in the list than the roots. This is particularly useful for the order to drop
550    /// objects.
551    fn database_dependents(
552        &self,
553        database_id: DatabaseId,
554        conn_id: &ConnectionId,
555        seen: &mut BTreeSet<ObjectId>,
556    ) -> Vec<ObjectId> {
557        let mut dependents = Vec::new();
558        let object_id = ObjectId::Database(database_id);
559        if !seen.contains(&object_id) {
560            seen.insert(object_id.clone());
561            let database = self.get_database(&database_id);
562            for schema_id in database.schema_ids().values() {
563                dependents.extend_from_slice(&self.schema_dependents(
564                    ResolvedDatabaseSpecifier::Id(database_id),
565                    SchemaSpecifier::Id(*schema_id),
566                    conn_id,
567                    seen,
568                ));
569            }
570            dependents.push(object_id);
571        }
572        dependents
573    }
574
575    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
576    /// itself.
577    ///
578    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
579    /// earlier in the list than the roots. This is particularly useful for the order to drop
580    /// objects.
581    fn schema_dependents(
582        &self,
583        database_spec: ResolvedDatabaseSpecifier,
584        schema_spec: SchemaSpecifier,
585        conn_id: &ConnectionId,
586        seen: &mut BTreeSet<ObjectId>,
587    ) -> Vec<ObjectId> {
588        let mut dependents = Vec::new();
589        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
590        if !seen.contains(&object_id) {
591            seen.insert(object_id.clone());
592            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
593            for item_id in schema.item_ids() {
594                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
595            }
596            dependents.push(object_id)
597        }
598        dependents
599    }
600
601    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
602    /// itself.
603    ///
604    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
605    /// earlier in the list than the roots. This is particularly useful for the order to drop
606    /// objects.
607    pub(super) fn item_dependents(
608        &self,
609        item_id: CatalogItemId,
610        seen: &mut BTreeSet<ObjectId>,
611    ) -> Vec<ObjectId> {
612        let mut dependents = Vec::new();
613        let object_id = ObjectId::Item(item_id);
614        if !seen.contains(&object_id) {
615            seen.insert(object_id.clone());
616            let entry = self.get_entry(&item_id);
617            for dependent_id in entry.used_by() {
618                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
619            }
620            dependents.push(object_id);
621            // We treat the progress collection as if it depends on the source
622            // for dropping. We have additional code in planning to create a
623            // kind of special-case "CASCADE" for this dependency.
624            if let Some(progress_id) = entry.progress_id() {
625                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
626            }
627        }
628        dependents
629    }
630
631    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
632    /// itself.
633    ///
634    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
635    /// earlier in the list than the roots. This is particularly useful for the order to drop
636    /// objects.
637    pub(super) fn network_policy_dependents(
638        &self,
639        network_policy_id: NetworkPolicyId,
640        _seen: &mut BTreeSet<ObjectId>,
641    ) -> Vec<ObjectId> {
642        let object_id = ObjectId::NetworkPolicy(network_policy_id);
643        // Currently network policies have no dependents
644        // when we add the ability for users or sources/sinks to have policies
645        // this method will need to be updated.
646        vec![object_id]
647    }
648
649    /// Indicates whether the indicated item is considered stable or not.
650    ///
651    /// Only stable items can be used as dependencies of other catalog items.
652    fn is_stable(&self, id: CatalogItemId) -> bool {
653        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
654        !self.is_unstable_schema_specifier(spec)
655    }
656
657    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
658        if self.system_config().unsafe_enable_unstable_dependencies() {
659            return Ok(());
660        }
661
662        let unstable_dependencies: Vec<_> = item
663            .references()
664            .items()
665            .filter(|id| !self.is_stable(**id))
666            .map(|id| self.get_entry(id).name().item.clone())
667            .collect();
668
669        // It's okay to create a temporary object with unstable
670        // dependencies, since we will never need to reboot a catalog
671        // that contains it.
672        if unstable_dependencies.is_empty() || item.is_temporary() {
673            Ok(())
674        } else {
675            let object_type = item.typ().to_string();
676            Err(Error {
677                kind: ErrorKind::UnstableDependency {
678                    object_type,
679                    unstable_dependencies,
680                },
681            })
682        }
683    }
684
685    pub fn resolve_full_name(
686        &self,
687        name: &QualifiedItemName,
688        conn_id: Option<&ConnectionId>,
689    ) -> FullItemName {
690        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
691
692        let database = match &name.qualifiers.database_spec {
693            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
694            ResolvedDatabaseSpecifier::Id(id) => {
695                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
696            }
697        };
698        let schema = self
699            .get_schema(
700                &name.qualifiers.database_spec,
701                &name.qualifiers.schema_spec,
702                conn_id,
703            )
704            .name()
705            .schema
706            .clone();
707        FullItemName {
708            database,
709            schema,
710            item: name.item.clone(),
711        }
712    }
713
714    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
715        let database = match &name.database {
716            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
717            ResolvedDatabaseSpecifier::Id(id) => {
718                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
719            }
720        };
721        FullSchemaName {
722            database,
723            schema: name.schema.clone(),
724        }
725    }
726
727    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
728        self.entry_by_id
729            .get(id)
730            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
731    }
732
733    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
734        let item_id = self
735            .entry_by_global_id
736            .get(id)
737            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
738
739        let entry = self.get_entry(item_id).clone();
740        let version = match entry.item() {
741            CatalogItem::Table(table) => {
742                let (version, _) = table
743                    .collections
744                    .iter()
745                    .find(|(_verison, gid)| *gid == id)
746                    .expect("version to exist");
747                RelationVersionSelector::Specific(*version)
748            }
749            _ => RelationVersionSelector::Latest,
750        };
751        CatalogCollectionEntry { entry, version }
752    }
753
754    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
755        self.entry_by_id.iter()
756    }
757
758    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
759        let schema = self
760            .temporary_schemas
761            .get(conn)
762            .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
763        schema.items.values().copied().map(ObjectId::from)
764    }
765
766    /// Gets a type named `name` from exactly one of the system schemas.
767    ///
768    /// # Panics
769    /// - If `name` is not an entry in any system schema
770    /// - If more than one system schema has an entry named `name`.
771    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
772        let mut res = None;
773        for schema_id in self.system_schema_ids() {
774            let schema = &self.ambient_schemas_by_id[&schema_id];
775            if let Some(global_id) = schema.types.get(name) {
776                match res {
777                    None => res = Some(self.get_entry(global_id)),
778                    Some(_) => panic!(
779                        "only call get_system_type on objects uniquely identifiable in one system schema"
780                    ),
781                }
782            }
783        }
784
785        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
786    }
787
788    pub fn get_item_by_name(
789        &self,
790        name: &QualifiedItemName,
791        conn_id: &ConnectionId,
792    ) -> Option<&CatalogEntry> {
793        self.get_schema(
794            &name.qualifiers.database_spec,
795            &name.qualifiers.schema_spec,
796            conn_id,
797        )
798        .items
799        .get(&name.item)
800        .and_then(|id| self.try_get_entry(id))
801    }
802
803    pub fn get_type_by_name(
804        &self,
805        name: &QualifiedItemName,
806        conn_id: &ConnectionId,
807    ) -> Option<&CatalogEntry> {
808        self.get_schema(
809            &name.qualifiers.database_spec,
810            &name.qualifiers.schema_spec,
811            conn_id,
812        )
813        .types
814        .get(&name.item)
815        .and_then(|id| self.try_get_entry(id))
816    }
817
818    pub(super) fn find_available_name(
819        &self,
820        mut name: QualifiedItemName,
821        conn_id: &ConnectionId,
822    ) -> QualifiedItemName {
823        let mut i = 0;
824        let orig_item_name = name.item.clone();
825        while self.get_item_by_name(&name, conn_id).is_some() {
826            i += 1;
827            name.item = format!("{}{}", orig_item_name, i);
828        }
829        name
830    }
831
832    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
833        self.entry_by_id.get(id)
834    }
835
836    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
837        let item_id = self.entry_by_global_id.get(id)?;
838        self.try_get_entry(item_id)
839    }
840
841    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
842    /// object that returns rows.
843    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
844        let entry = self.try_get_entry_by_global_id(id)?;
845        let desc = match entry.item() {
846            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
847            // TODO(alter_table): Support schema evolution on sources.
848            other => other.desc_opt(RelationVersionSelector::Latest)?,
849        };
850        Some(desc)
851    }
852
853    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
854        self.try_get_cluster(cluster_id)
855            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
856    }
857
858    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
859        self.clusters_by_id.get(&cluster_id)
860    }
861
862    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
863        self.roles_by_id.get(id)
864    }
865
866    pub fn get_role(&self, id: &RoleId) -> &Role {
867        self.roles_by_id.get(id).expect("catalog out of sync")
868    }
869
870    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
871        self.roles_by_id.keys()
872    }
873
874    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
875        self.roles_by_name
876            .get(role_name)
877            .map(|id| &self.roles_by_id[id])
878    }
879
880    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
881        self.role_auth_by_id.get(id)
882    }
883
884    pub(super) fn try_get_network_policy_by_name(
885        &self,
886        policy_name: &str,
887    ) -> Option<&NetworkPolicy> {
888        self.network_policies_by_name
889            .get(policy_name)
890            .map(|id| &self.network_policies_by_id[id])
891    }
892
893    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
894        let mut membership = BTreeSet::new();
895        let mut queue = VecDeque::from(vec![id]);
896        while let Some(cur_id) = queue.pop_front() {
897            if !membership.contains(cur_id) {
898                membership.insert(cur_id.clone());
899                let role = self.get_role(cur_id);
900                soft_assert_no_log!(
901                    !role.membership().keys().contains(id),
902                    "circular membership exists in the catalog"
903                );
904                queue.extend(role.membership().keys());
905            }
906        }
907        membership.insert(RoleId::Public);
908        membership
909    }
910
911    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
912        self.network_policies_by_id
913            .get(id)
914            .expect("catalog out of sync")
915    }
916
917    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
918        self.network_policies_by_id.keys()
919    }
920
921    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
922    /// source.
923    ///
924    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
925    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
926        let entry = self.try_get_entry(id)?;
927        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
928        let name = self.resolve_full_name(entry.name(), None);
929        let host_name = self
930            .http_host_name
931            .as_ref()
932            .map(|x| x.as_str())
933            .unwrap_or_else(|| "HOST");
934
935        let RawDatabaseSpecifier::Name(database) = name.database else {
936            return None;
937        };
938
939        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
940        url.path_segments_mut()
941            .ok()?
942            .push(&database)
943            .push(&name.schema)
944            .push(&name.item);
945
946        Some(url)
947    }
948
949    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
950    ///
951    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
952    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
953    ///
954    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
955    /// their original state before the method returns.
956    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
957        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
958        // caller if this method changed the state of the catalog.
959        &mut self,
960        create_sql: &str,
961        force_if_exists_skip: bool,
962    ) -> Result<(Plan, ResolvedIds), AdapterError> {
963        self.with_enable_for_item_parsing(|state| {
964            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
965            let pcx = Some(&pcx);
966            let session_catalog = state.for_system_session();
967
968            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
969            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
970            let plan =
971                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
972
973            Ok((plan, resolved_ids))
974        })
975    }
976
977    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
978    #[mz_ore::instrument]
979    pub(crate) fn parse_plan(
980        create_sql: &str,
981        pcx: Option<&PlanContext>,
982        catalog: &ConnCatalog,
983    ) -> Result<(Plan, ResolvedIds), AdapterError> {
984        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
985        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
986        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
987
988        return Ok((plan, resolved_ids));
989    }
990
991    /// Parses the given SQL string into a pair of [`CatalogItem`].
992    pub(crate) fn deserialize_item(
993        &self,
994        global_id: GlobalId,
995        create_sql: &str,
996        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
997        local_expression_cache: &mut LocalExpressionCache,
998        previous_item: Option<CatalogItem>,
999    ) -> Result<CatalogItem, AdapterError> {
1000        self.parse_item(
1001            global_id,
1002            create_sql,
1003            extra_versions,
1004            None,
1005            false,
1006            None,
1007            local_expression_cache,
1008            previous_item,
1009        )
1010    }
1011
1012    /// Parses the given SQL string into a `CatalogItem`.
1013    #[mz_ore::instrument]
1014    pub(crate) fn parse_item(
1015        &self,
1016        global_id: GlobalId,
1017        create_sql: &str,
1018        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1019        pcx: Option<&PlanContext>,
1020        is_retained_metrics_object: bool,
1021        custom_logical_compaction_window: Option<CompactionWindow>,
1022        local_expression_cache: &mut LocalExpressionCache,
1023        previous_item: Option<CatalogItem>,
1024    ) -> Result<CatalogItem, AdapterError> {
1025        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1026        match self.parse_item_inner(
1027            global_id,
1028            create_sql,
1029            extra_versions,
1030            pcx,
1031            is_retained_metrics_object,
1032            custom_logical_compaction_window,
1033            cached_expr,
1034            previous_item,
1035        ) {
1036            Ok((item, uncached_expr)) => {
1037                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1038                    local_expression_cache.insert_uncached_expression(
1039                        global_id,
1040                        uncached_expr,
1041                        optimizer_features,
1042                    );
1043                }
1044                Ok(item)
1045            }
1046            Err((err, cached_expr)) => {
1047                if let Some(local_expr) = cached_expr {
1048                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1049                }
1050                Err(err)
1051            }
1052        }
1053    }
1054
1055    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1056    ///
1057    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1058    /// not cached.
1059    ///
1060    /// On failure returns an error and `cached_expr` so it can be used later.
1061    #[mz_ore::instrument]
1062    pub(crate) fn parse_item_inner(
1063        &self,
1064        global_id: GlobalId,
1065        create_sql: &str,
1066        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1067        pcx: Option<&PlanContext>,
1068        is_retained_metrics_object: bool,
1069        custom_logical_compaction_window: Option<CompactionWindow>,
1070        cached_expr: Option<LocalExpressions>,
1071        previous_item: Option<CatalogItem>,
1072    ) -> Result<
1073        (
1074            CatalogItem,
1075            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1076        ),
1077        (AdapterError, Option<LocalExpressions>),
1078    > {
1079        let session_catalog = self.for_system_session();
1080
1081        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1082            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1083            Err(err) => return Err((err, cached_expr)),
1084        };
1085
1086        let mut uncached_expr = None;
1087
1088        let item = match plan {
1089            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1090                let collections = extra_versions
1091                    .iter()
1092                    .map(|(version, gid)| (*version, *gid))
1093                    .chain([(RelationVersion::root(), global_id)].into_iter())
1094                    .collect();
1095
1096                CatalogItem::Table(Table {
1097                    create_sql: Some(table.create_sql),
1098                    desc: table.desc,
1099                    collections,
1100                    conn_id: None,
1101                    resolved_ids,
1102                    custom_logical_compaction_window: custom_logical_compaction_window
1103                        .or(table.compaction_window),
1104                    is_retained_metrics_object,
1105                    data_source: match table.data_source {
1106                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1107                            TableDataSource::TableWrites { defaults }
1108                        }
1109                        mz_sql::plan::TableDataSource::DataSource {
1110                            desc: data_source_desc,
1111                            timeline,
1112                        } => match data_source_desc {
1113                            mz_sql::plan::DataSourceDesc::IngestionExport {
1114                                ingestion_id,
1115                                external_reference,
1116                                details,
1117                                data_config,
1118                            } => TableDataSource::DataSource {
1119                                desc: DataSourceDesc::IngestionExport {
1120                                    ingestion_id,
1121                                    external_reference,
1122                                    details,
1123                                    data_config,
1124                                },
1125                                timeline,
1126                            },
1127                            mz_sql::plan::DataSourceDesc::Webhook {
1128                                validate_using,
1129                                body_format,
1130                                headers,
1131                                cluster_id,
1132                            } => TableDataSource::DataSource {
1133                                desc: DataSourceDesc::Webhook {
1134                                    validate_using,
1135                                    body_format,
1136                                    headers,
1137                                    cluster_id: cluster_id
1138                                        .expect("Webhook Tables must have a cluster_id set"),
1139                                },
1140                                timeline,
1141                            },
1142                            _ => {
1143                                return Err((
1144                                    AdapterError::Unstructured(anyhow::anyhow!(
1145                                        "unsupported data source for table"
1146                                    )),
1147                                    cached_expr,
1148                                ));
1149                            }
1150                        },
1151                    },
1152                })
1153            }
1154            Plan::CreateSource(CreateSourcePlan {
1155                source,
1156                timeline,
1157                in_cluster,
1158                ..
1159            }) => CatalogItem::Source(Source {
1160                create_sql: Some(source.create_sql),
1161                data_source: match source.data_source {
1162                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1163                        desc,
1164                        cluster_id: match in_cluster {
1165                            Some(id) => id,
1166                            None => {
1167                                return Err((
1168                                    AdapterError::Unstructured(anyhow::anyhow!(
1169                                        "ingestion-based sources must have cluster specified"
1170                                    )),
1171                                    cached_expr,
1172                                ));
1173                            }
1174                        },
1175                    },
1176                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1177                        desc,
1178                        progress_subsource,
1179                        data_config,
1180                        details,
1181                    } => DataSourceDesc::OldSyntaxIngestion {
1182                        desc,
1183                        progress_subsource,
1184                        data_config,
1185                        details,
1186                        cluster_id: match in_cluster {
1187                            Some(id) => id,
1188                            None => {
1189                                return Err((
1190                                    AdapterError::Unstructured(anyhow::anyhow!(
1191                                        "ingestion-based sources must have cluster specified"
1192                                    )),
1193                                    cached_expr,
1194                                ));
1195                            }
1196                        },
1197                    },
1198                    mz_sql::plan::DataSourceDesc::IngestionExport {
1199                        ingestion_id,
1200                        external_reference,
1201                        details,
1202                        data_config,
1203                    } => DataSourceDesc::IngestionExport {
1204                        ingestion_id,
1205                        external_reference,
1206                        details,
1207                        data_config,
1208                    },
1209                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1210                    mz_sql::plan::DataSourceDesc::Webhook {
1211                        validate_using,
1212                        body_format,
1213                        headers,
1214                        cluster_id,
1215                    } => {
1216                        mz_ore::soft_assert_or_log!(
1217                            cluster_id.is_none(),
1218                            "cluster_id set at Source level for Webhooks"
1219                        );
1220                        DataSourceDesc::Webhook {
1221                            validate_using,
1222                            body_format,
1223                            headers,
1224                            cluster_id: in_cluster
1225                                .expect("webhook sources must use an existing cluster"),
1226                        }
1227                    }
1228                },
1229                desc: source.desc,
1230                global_id,
1231                timeline,
1232                resolved_ids,
1233                custom_logical_compaction_window: source
1234                    .compaction_window
1235                    .or(custom_logical_compaction_window),
1236                is_retained_metrics_object,
1237            }),
1238            Plan::CreateView(CreateViewPlan { view, .. }) => {
1239                // Collect optimizer parameters.
1240                let optimizer_config =
1241                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1242                let previous_exprs = previous_item.map(|item| match item {
1243                    CatalogItem::View(view) => (view.raw_expr, view.optimized_expr),
1244                    item => unreachable!("expected view, found: {item:#?}"),
1245                });
1246
1247                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1248                    (Some(local_expr), _)
1249                        if local_expr.optimizer_features == optimizer_config.features =>
1250                    {
1251                        debug!("local expression cache hit for {global_id:?}");
1252                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1253                    }
1254                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1255                    (_, Some((raw_expr, optimized_expr))) if *raw_expr == view.expr => {
1256                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1257                    }
1258                    (cached_expr, _) => {
1259                        let optimizer_features = optimizer_config.features.clone();
1260                        // Build an optimizer for this VIEW.
1261                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1262
1263                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1264                        let raw_expr = view.expr;
1265                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1266                            Ok(optimzed_expr) => optimzed_expr,
1267                            Err(err) => return Err((err.into(), cached_expr)),
1268                        };
1269
1270                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1271
1272                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1273                    }
1274                };
1275
1276                // Resolve all item dependencies from the HIR expression.
1277                let dependencies: BTreeSet<_> = raw_expr
1278                    .depends_on()
1279                    .into_iter()
1280                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1281                    .collect();
1282
1283                CatalogItem::View(View {
1284                    create_sql: view.create_sql,
1285                    global_id,
1286                    raw_expr,
1287                    desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1288                    optimized_expr,
1289                    conn_id: None,
1290                    resolved_ids,
1291                    dependencies: DependencyIds(dependencies),
1292                })
1293            }
1294            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1295                materialized_view, ..
1296            }) => {
1297                // Collect optimizer parameters.
1298                let optimizer_config =
1299                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1300                let previous_exprs = previous_item.map(|item| match item {
1301                    CatalogItem::MaterializedView(materialized_view) => {
1302                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1303                    }
1304                    item => unreachable!("expected materialized view, found: {item:#?}"),
1305                });
1306
1307                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1308                    (Some(local_expr), _)
1309                        if local_expr.optimizer_features == optimizer_config.features =>
1310                    {
1311                        debug!("local expression cache hit for {global_id:?}");
1312                        (
1313                            Arc::new(materialized_view.expr),
1314                            Arc::new(local_expr.local_mir),
1315                        )
1316                    }
1317                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1318                    (_, Some((raw_expr, optimized_expr)))
1319                        if *raw_expr == materialized_view.expr =>
1320                    {
1321                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1322                    }
1323                    (cached_expr, _) => {
1324                        let optimizer_features = optimizer_config.features.clone();
1325                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1326                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1327
1328                        let raw_expr = materialized_view.expr;
1329                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1330                            Ok(optimized_expr) => optimized_expr,
1331                            Err(err) => return Err((err.into(), cached_expr)),
1332                        };
1333
1334                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1335
1336                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1337                    }
1338                };
1339                let mut typ = optimized_expr.typ();
1340                for &i in &materialized_view.non_null_assertions {
1341                    typ.column_types[i].nullable = false;
1342                }
1343                let desc = RelationDesc::new(typ, materialized_view.column_names);
1344
1345                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1346
1347                // Resolve all item dependencies from the HIR expression.
1348                let dependencies = raw_expr
1349                    .depends_on()
1350                    .into_iter()
1351                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1352                    .collect();
1353
1354                CatalogItem::MaterializedView(MaterializedView {
1355                    create_sql: materialized_view.create_sql,
1356                    global_id,
1357                    raw_expr,
1358                    optimized_expr,
1359                    desc,
1360                    resolved_ids,
1361                    dependencies,
1362                    cluster_id: materialized_view.cluster_id,
1363                    non_null_assertions: materialized_view.non_null_assertions,
1364                    custom_logical_compaction_window: materialized_view.compaction_window,
1365                    refresh_schedule: materialized_view.refresh_schedule,
1366                    initial_as_of,
1367                })
1368            }
1369            Plan::CreateContinualTask(plan) => {
1370                let ct =
1371                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1372                        Ok(ct) => ct,
1373                        Err(err) => return Err((err, cached_expr)),
1374                    };
1375                CatalogItem::ContinualTask(ct)
1376            }
1377            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1378                create_sql: index.create_sql,
1379                global_id,
1380                on: index.on,
1381                keys: index.keys.into(),
1382                conn_id: None,
1383                resolved_ids,
1384                cluster_id: index.cluster_id,
1385                custom_logical_compaction_window: custom_logical_compaction_window
1386                    .or(index.compaction_window),
1387                is_retained_metrics_object,
1388            }),
1389            Plan::CreateSink(CreateSinkPlan {
1390                sink,
1391                with_snapshot,
1392                in_cluster,
1393                ..
1394            }) => CatalogItem::Sink(Sink {
1395                create_sql: sink.create_sql,
1396                global_id,
1397                from: sink.from,
1398                connection: sink.connection,
1399                envelope: sink.envelope,
1400                version: sink.version,
1401                with_snapshot,
1402                resolved_ids,
1403                cluster_id: in_cluster,
1404            }),
1405            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1406                let desc = match typ.inner.desc(&session_catalog) {
1407                    Ok(desc) => desc,
1408                    Err(err) => return Err((err.into(), cached_expr)),
1409                };
1410                CatalogItem::Type(Type {
1411                    create_sql: Some(typ.create_sql),
1412                    global_id,
1413                    desc,
1414                    details: CatalogTypeDetails {
1415                        array_id: None,
1416                        typ: typ.inner,
1417                        pg_metadata: None,
1418                    },
1419                    resolved_ids,
1420                })
1421            }
1422            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1423                create_sql: secret.create_sql,
1424                global_id,
1425            }),
1426            Plan::CreateConnection(CreateConnectionPlan {
1427                connection:
1428                    mz_sql::plan::Connection {
1429                        create_sql,
1430                        details,
1431                    },
1432                ..
1433            }) => CatalogItem::Connection(Connection {
1434                create_sql,
1435                global_id,
1436                details,
1437                resolved_ids,
1438            }),
1439            _ => {
1440                return Err((
1441                    Error::new(ErrorKind::Corruption {
1442                        detail: "catalog entry generated inappropriate plan".to_string(),
1443                    })
1444                    .into(),
1445                    cached_expr,
1446                ));
1447            }
1448        };
1449
1450        Ok((item, uncached_expr))
1451    }
1452
1453    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1454    /// Calling this method will not permanently modify any system configuration variables.
1455    ///
1456    /// WARNING:
1457    /// Any modifications made to the system configuration variables in `f`, will be lost.
1458    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1459        // Enable catalog features that might be required during planning existing
1460        // catalog items. Existing catalog items might have been created while
1461        // a specific feature flag was turned on, so we need to ensure that this
1462        // is also the case during catalog rehydration in order to avoid panics.
1463        //
1464        // WARNING / CONTRACT:
1465        // 1. Features used in this method that related to parsing / planning
1466        //    should be `enable_for_item_parsing` set to `true`.
1467        // 2. After this step, feature flag configuration must not be
1468        //    overridden.
1469        let restore = self.system_configuration.clone();
1470        self.system_configuration.enable_for_item_parsing();
1471        let res = f(self);
1472        self.system_configuration = restore;
1473        res
1474    }
1475
1476    /// Returns all indexes on the given object and cluster known in the catalog.
1477    pub fn get_indexes_on(
1478        &self,
1479        id: GlobalId,
1480        cluster: ClusterId,
1481    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1482        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1483
1484        self.try_get_entry_by_global_id(&id)
1485            .into_iter()
1486            .map(move |e| {
1487                e.used_by()
1488                    .iter()
1489                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1490                        CatalogItem::Index(index) if index_matches(index) => {
1491                            Some((index.global_id(), index))
1492                        }
1493                        _ => None,
1494                    })
1495            })
1496            .flatten()
1497    }
1498
1499    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1500        &self.database_by_id[database_id]
1501    }
1502
1503    /// Gets a reference to the specified replica of the specified cluster.
1504    ///
1505    /// Returns `None` if either the cluster or the replica does not
1506    /// exist.
1507    pub(super) fn try_get_cluster_replica(
1508        &self,
1509        id: ClusterId,
1510        replica_id: ReplicaId,
1511    ) -> Option<&ClusterReplica> {
1512        self.try_get_cluster(id)
1513            .and_then(|cluster| cluster.replica(replica_id))
1514    }
1515
1516    /// Gets a reference to the specified replica of the specified cluster.
1517    ///
1518    /// Panics if either the cluster or the replica does not exist.
1519    pub(super) fn get_cluster_replica(
1520        &self,
1521        cluster_id: ClusterId,
1522        replica_id: ReplicaId,
1523    ) -> &ClusterReplica {
1524        self.try_get_cluster_replica(cluster_id, replica_id)
1525            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1526    }
1527
1528    pub(super) fn resolve_replica_in_cluster(
1529        &self,
1530        cluster_id: &ClusterId,
1531        replica_name: &str,
1532    ) -> Result<&ClusterReplica, SqlCatalogError> {
1533        let cluster = self.get_cluster(*cluster_id);
1534        let replica_id = cluster
1535            .replica_id_by_name_
1536            .get(replica_name)
1537            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1538        Ok(&cluster.replicas_by_id_[replica_id])
1539    }
1540
1541    /// Get system configuration `name`.
1542    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1543        Ok(self.system_configuration.get(name)?)
1544    }
1545
1546    /// Parse system configuration `name` with `value` int.
1547    ///
1548    /// Returns the parsed value as a string.
1549    pub(super) fn parse_system_configuration(
1550        &self,
1551        name: &str,
1552        value: VarInput,
1553    ) -> Result<String, Error> {
1554        let value = self.system_configuration.parse(name, value)?;
1555        Ok(value.format())
1556    }
1557
1558    /// Gets the schema map for the database matching `database_spec`.
1559    pub(super) fn resolve_schema_in_database(
1560        &self,
1561        database_spec: &ResolvedDatabaseSpecifier,
1562        schema_name: &str,
1563        conn_id: &ConnectionId,
1564    ) -> Result<&Schema, SqlCatalogError> {
1565        let schema = match database_spec {
1566            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1567                self.temporary_schemas.get(conn_id)
1568            }
1569            ResolvedDatabaseSpecifier::Ambient => self
1570                .ambient_schemas_by_name
1571                .get(schema_name)
1572                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1573            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1574                db.schemas_by_name
1575                    .get(schema_name)
1576                    .and_then(|id| db.schemas_by_id.get(id))
1577            }),
1578        };
1579        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1580    }
1581
1582    pub fn get_schema(
1583        &self,
1584        database_spec: &ResolvedDatabaseSpecifier,
1585        schema_spec: &SchemaSpecifier,
1586        conn_id: &ConnectionId,
1587    ) -> &Schema {
1588        // Keep in sync with `get_schemas_mut`
1589        match (database_spec, schema_spec) {
1590            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1591                &self.temporary_schemas[conn_id]
1592            }
1593            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1594                &self.ambient_schemas_by_id[id]
1595            }
1596
1597            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1598                &self.database_by_id[database_id].schemas_by_id[schema_id]
1599            }
1600            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1601                unreachable!("temporary schemas are in the ambient database")
1602            }
1603        }
1604    }
1605
1606    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1607        self.database_by_id
1608            .values()
1609            .filter_map(|database| database.schemas_by_id.get(schema_id))
1610            .chain(self.ambient_schemas_by_id.values())
1611            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1612            .into_first()
1613    }
1614
1615    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1616        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1617    }
1618
1619    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1620        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1621    }
1622
1623    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1624        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1625    }
1626
1627    pub fn get_information_schema_id(&self) -> SchemaId {
1628        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1629    }
1630
1631    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1632        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1633    }
1634
1635    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1636        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1637    }
1638
1639    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1640        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1641    }
1642
1643    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1644        SYSTEM_SCHEMAS
1645            .iter()
1646            .map(|name| self.ambient_schemas_by_name[*name])
1647    }
1648
1649    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1650        self.system_schema_ids().contains(&id)
1651    }
1652
1653    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1654        match spec {
1655            SchemaSpecifier::Temporary => false,
1656            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1657        }
1658    }
1659
1660    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1661        UNSTABLE_SCHEMAS
1662            .iter()
1663            .map(|name| self.ambient_schemas_by_name[*name])
1664    }
1665
1666    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1667        self.unstable_schema_ids().contains(&id)
1668    }
1669
1670    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1671        match spec {
1672            SchemaSpecifier::Temporary => false,
1673            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1674        }
1675    }
1676
1677    /// Creates a new schema in the `Catalog` for temporary items
1678    /// indicated by the TEMPORARY or TEMP keywords.
1679    pub fn create_temporary_schema(
1680        &mut self,
1681        conn_id: &ConnectionId,
1682        owner_id: RoleId,
1683    ) -> Result<(), Error> {
1684        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1685        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1686        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1687        // all -1.
1688        let oid = INVALID_OID;
1689        self.temporary_schemas.insert(
1690            conn_id.clone(),
1691            Schema {
1692                name: QualifiedSchemaName {
1693                    database: ResolvedDatabaseSpecifier::Ambient,
1694                    schema: MZ_TEMP_SCHEMA.into(),
1695                },
1696                id: SchemaSpecifier::Temporary,
1697                oid,
1698                items: BTreeMap::new(),
1699                functions: BTreeMap::new(),
1700                types: BTreeMap::new(),
1701                owner_id,
1702                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1703                    mz_sql::catalog::ObjectType::Schema,
1704                    owner_id,
1705                )]),
1706            },
1707        );
1708        Ok(())
1709    }
1710
1711    /// Return all OIDs that are allocated to temporary objects.
1712    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1713        std::iter::empty()
1714            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1715                if schema.id.is_temporary() {
1716                    Some(schema.oid)
1717                } else {
1718                    None
1719                }
1720            }))
1721            .chain(self.entry_by_id.values().filter_map(|entry| {
1722                if entry.item().is_temporary() {
1723                    Some(entry.oid)
1724                } else {
1725                    None
1726                }
1727            }))
1728    }
1729
1730    /// Optimized lookup for a builtin table.
1731    ///
1732    /// Panics if the builtin table doesn't exist in the catalog.
1733    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1734        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1735    }
1736
1737    /// Optimized lookup for a builtin log.
1738    ///
1739    /// Panics if the builtin log doesn't exist in the catalog.
1740    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1741        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1742        let log = match self.get_entry(&item_id).item() {
1743            CatalogItem::Log(log) => log,
1744            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1745        };
1746        (item_id, log.global_id)
1747    }
1748
1749    /// Optimized lookup for a builtin storage collection.
1750    ///
1751    /// Panics if the builtin storage collection doesn't exist in the catalog.
1752    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1753        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1754    }
1755
1756    /// Optimized lookup for a builtin object.
1757    ///
1758    /// Panics if the builtin object doesn't exist in the catalog.
1759    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1760        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1761        let schema = &self.ambient_schemas_by_id[schema_id];
1762        match builtin.catalog_item_type() {
1763            CatalogItemType::Type => schema.types[builtin.name()],
1764            CatalogItemType::Func => schema.functions[builtin.name()],
1765            CatalogItemType::Table
1766            | CatalogItemType::Source
1767            | CatalogItemType::Sink
1768            | CatalogItemType::View
1769            | CatalogItemType::MaterializedView
1770            | CatalogItemType::Index
1771            | CatalogItemType::Secret
1772            | CatalogItemType::Connection
1773            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1774        }
1775    }
1776
1777    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1778    pub fn resolve_builtin_type_references(
1779        &self,
1780        builtin: &BuiltinType<NameReference>,
1781    ) -> BuiltinType<IdReference> {
1782        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1783            CatalogType::AclItem => CatalogType::AclItem,
1784            CatalogType::Array { element_reference } => CatalogType::Array {
1785                element_reference: self.get_system_type(element_reference).id,
1786            },
1787            CatalogType::List {
1788                element_reference,
1789                element_modifiers,
1790            } => CatalogType::List {
1791                element_reference: self.get_system_type(element_reference).id,
1792                element_modifiers: element_modifiers.clone(),
1793            },
1794            CatalogType::Map {
1795                key_reference,
1796                value_reference,
1797                key_modifiers,
1798                value_modifiers,
1799            } => CatalogType::Map {
1800                key_reference: self.get_system_type(key_reference).id,
1801                value_reference: self.get_system_type(value_reference).id,
1802                key_modifiers: key_modifiers.clone(),
1803                value_modifiers: value_modifiers.clone(),
1804            },
1805            CatalogType::Range { element_reference } => CatalogType::Range {
1806                element_reference: self.get_system_type(element_reference).id,
1807            },
1808            CatalogType::Record { fields } => CatalogType::Record {
1809                fields: fields
1810                    .into_iter()
1811                    .map(|f| CatalogRecordField {
1812                        name: f.name.clone(),
1813                        type_reference: self.get_system_type(f.type_reference).id,
1814                        type_modifiers: f.type_modifiers.clone(),
1815                    })
1816                    .collect(),
1817            },
1818            CatalogType::Bool => CatalogType::Bool,
1819            CatalogType::Bytes => CatalogType::Bytes,
1820            CatalogType::Char => CatalogType::Char,
1821            CatalogType::Date => CatalogType::Date,
1822            CatalogType::Float32 => CatalogType::Float32,
1823            CatalogType::Float64 => CatalogType::Float64,
1824            CatalogType::Int16 => CatalogType::Int16,
1825            CatalogType::Int32 => CatalogType::Int32,
1826            CatalogType::Int64 => CatalogType::Int64,
1827            CatalogType::UInt16 => CatalogType::UInt16,
1828            CatalogType::UInt32 => CatalogType::UInt32,
1829            CatalogType::UInt64 => CatalogType::UInt64,
1830            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1831            CatalogType::Interval => CatalogType::Interval,
1832            CatalogType::Jsonb => CatalogType::Jsonb,
1833            CatalogType::Numeric => CatalogType::Numeric,
1834            CatalogType::Oid => CatalogType::Oid,
1835            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1836            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1837            CatalogType::Pseudo => CatalogType::Pseudo,
1838            CatalogType::RegClass => CatalogType::RegClass,
1839            CatalogType::RegProc => CatalogType::RegProc,
1840            CatalogType::RegType => CatalogType::RegType,
1841            CatalogType::String => CatalogType::String,
1842            CatalogType::Time => CatalogType::Time,
1843            CatalogType::Timestamp => CatalogType::Timestamp,
1844            CatalogType::TimestampTz => CatalogType::TimestampTz,
1845            CatalogType::Uuid => CatalogType::Uuid,
1846            CatalogType::VarChar => CatalogType::VarChar,
1847            CatalogType::Int2Vector => CatalogType::Int2Vector,
1848            CatalogType::MzAclItem => CatalogType::MzAclItem,
1849        };
1850
1851        BuiltinType {
1852            name: builtin.name,
1853            schema: builtin.schema,
1854            oid: builtin.oid,
1855            details: CatalogTypeDetails {
1856                array_id: builtin.details.array_id,
1857                typ,
1858                pg_metadata: builtin.details.pg_metadata.clone(),
1859            },
1860        }
1861    }
1862
1863    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1864        &self.config
1865    }
1866
1867    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1868        match self.database_by_name.get(database_name) {
1869            Some(id) => Ok(&self.database_by_id[id]),
1870            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1871        }
1872    }
1873
1874    pub fn resolve_schema(
1875        &self,
1876        current_database: Option<&DatabaseId>,
1877        database_name: Option<&str>,
1878        schema_name: &str,
1879        conn_id: &ConnectionId,
1880    ) -> Result<&Schema, SqlCatalogError> {
1881        let database_spec = match database_name {
1882            // If a database is explicitly specified, validate it. Note that we
1883            // intentionally do not validate `current_database` to permit
1884            // querying `mz_catalog` with an invalid session database, e.g., so
1885            // that you can run `SHOW DATABASES` to *find* a valid database.
1886            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1887                self.resolve_database(database)?.id().clone(),
1888            )),
1889            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1890        };
1891
1892        // First try to find the schema in the named database.
1893        if let Some(database_spec) = database_spec {
1894            if let Ok(schema) =
1895                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1896            {
1897                return Ok(schema);
1898            }
1899        }
1900
1901        // Then fall back to the ambient database.
1902        if let Ok(schema) = self.resolve_schema_in_database(
1903            &ResolvedDatabaseSpecifier::Ambient,
1904            schema_name,
1905            conn_id,
1906        ) {
1907            return Ok(schema);
1908        }
1909
1910        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1911    }
1912
1913    /// Optimized lookup for a system schema.
1914    ///
1915    /// Panics if the system schema doesn't exist in the catalog.
1916    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1917        self.ambient_schemas_by_name[name]
1918    }
1919
1920    pub fn resolve_search_path(
1921        &self,
1922        session: &dyn SessionMetadata,
1923    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1924        let database = self
1925            .database_by_name
1926            .get(session.database())
1927            .map(|id| id.clone());
1928
1929        session
1930            .search_path()
1931            .iter()
1932            .map(|schema| {
1933                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1934            })
1935            .filter_map(|schema| schema.ok())
1936            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1937            .collect()
1938    }
1939
1940    pub fn effective_search_path(
1941        &self,
1942        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1943        include_temp_schema: bool,
1944    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1945        let mut v = Vec::with_capacity(search_path.len() + 3);
1946        // Temp schema is only included for relations and data types, not for functions and operators
1947        let temp_schema = (
1948            ResolvedDatabaseSpecifier::Ambient,
1949            SchemaSpecifier::Temporary,
1950        );
1951        if include_temp_schema && !search_path.contains(&temp_schema) {
1952            v.push(temp_schema);
1953        }
1954        let default_schemas = [
1955            (
1956                ResolvedDatabaseSpecifier::Ambient,
1957                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1958            ),
1959            (
1960                ResolvedDatabaseSpecifier::Ambient,
1961                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1962            ),
1963        ];
1964        for schema in default_schemas.into_iter() {
1965            if !search_path.contains(&schema) {
1966                v.push(schema);
1967            }
1968        }
1969        v.extend_from_slice(search_path);
1970        v
1971    }
1972
1973    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1974        let id = self
1975            .clusters_by_name
1976            .get(name)
1977            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1978        Ok(&self.clusters_by_id[id])
1979    }
1980
1981    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1982        let id = self
1983            .clusters_by_name
1984            .get(cluster.name)
1985            .expect("failed to lookup BuiltinCluster by name");
1986        self.clusters_by_id
1987            .get(id)
1988            .expect("failed to lookup BuiltinCluster by ID")
1989    }
1990
1991    pub fn resolve_cluster_replica(
1992        &self,
1993        cluster_replica_name: &QualifiedReplica,
1994    ) -> Result<&ClusterReplica, SqlCatalogError> {
1995        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
1996        let replica_name = cluster_replica_name.replica.as_str();
1997        let replica_id = cluster
1998            .replica_id(replica_name)
1999            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2000        Ok(cluster.replica(replica_id).expect("Must exist"))
2001    }
2002
2003    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2004    ///
2005    /// If `name` does not specify a database, the `current_database` is used.
2006    /// If `name` does not specify a schema, then the schemas in `search_path`
2007    /// are searched in order.
2008    #[allow(clippy::useless_let_if_seq)]
2009    pub fn resolve(
2010        &self,
2011        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2012        current_database: Option<&DatabaseId>,
2013        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2014        name: &PartialItemName,
2015        conn_id: &ConnectionId,
2016        err_gen: fn(String) -> SqlCatalogError,
2017    ) -> Result<&CatalogEntry, SqlCatalogError> {
2018        // If a schema name was specified, just try to find the item in that
2019        // schema. If no schema was specified, try to find the item in the connection's
2020        // temporary schema. If the item is not found, try to find the item in every
2021        // schema in the search path.
2022        let schemas = match &name.schema {
2023            Some(schema_name) => {
2024                match self.resolve_schema(
2025                    current_database,
2026                    name.database.as_deref(),
2027                    schema_name,
2028                    conn_id,
2029                ) {
2030                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2031                    Err(e) => return Err(e),
2032                }
2033            }
2034            None => match self
2035                .get_schema(
2036                    &ResolvedDatabaseSpecifier::Ambient,
2037                    &SchemaSpecifier::Temporary,
2038                    conn_id,
2039                )
2040                .items
2041                .get(&name.item)
2042            {
2043                Some(id) => return Ok(self.get_entry(id)),
2044                None => search_path.to_vec(),
2045            },
2046        };
2047
2048        for (database_spec, schema_spec) in &schemas {
2049            let schema = self.get_schema(database_spec, schema_spec, conn_id);
2050
2051            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2052                return Ok(&self.entry_by_id[id]);
2053            }
2054        }
2055
2056        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2057        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2058        // automatically let uses of the old schema resolve to the new ones as well.
2059        // TODO(database-issues#8173) remove this after sufficient time has passed
2060        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2061        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2062            for schema_id in [
2063                self.get_mz_catalog_unstable_schema_id(),
2064                self.get_mz_introspection_schema_id(),
2065            ] {
2066                let schema = self.get_schema(
2067                    &ResolvedDatabaseSpecifier::Ambient,
2068                    &SchemaSpecifier::Id(schema_id),
2069                    conn_id,
2070                );
2071
2072                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2073                    debug!(
2074                        github_27831 = true,
2075                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2076                    );
2077                    return Ok(&self.entry_by_id[id]);
2078                }
2079            }
2080        }
2081
2082        Err(err_gen(name.to_string()))
2083    }
2084
2085    /// Resolves `name` to a non-function [`CatalogEntry`].
2086    pub fn resolve_entry(
2087        &self,
2088        current_database: Option<&DatabaseId>,
2089        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2090        name: &PartialItemName,
2091        conn_id: &ConnectionId,
2092    ) -> Result<&CatalogEntry, SqlCatalogError> {
2093        self.resolve(
2094            |schema| &schema.items,
2095            current_database,
2096            search_path,
2097            name,
2098            conn_id,
2099            SqlCatalogError::UnknownItem,
2100        )
2101    }
2102
2103    /// Resolves `name` to a function [`CatalogEntry`].
2104    pub fn resolve_function(
2105        &self,
2106        current_database: Option<&DatabaseId>,
2107        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2108        name: &PartialItemName,
2109        conn_id: &ConnectionId,
2110    ) -> Result<&CatalogEntry, SqlCatalogError> {
2111        self.resolve(
2112            |schema| &schema.functions,
2113            current_database,
2114            search_path,
2115            name,
2116            conn_id,
2117            |name| SqlCatalogError::UnknownFunction {
2118                name,
2119                alternative: None,
2120            },
2121        )
2122    }
2123
2124    /// Resolves `name` to a type [`CatalogEntry`].
2125    pub fn resolve_type(
2126        &self,
2127        current_database: Option<&DatabaseId>,
2128        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2129        name: &PartialItemName,
2130        conn_id: &ConnectionId,
2131    ) -> Result<&CatalogEntry, SqlCatalogError> {
2132        static NON_PG_CATALOG_TYPES: LazyLock<
2133            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2134        > = LazyLock::new(|| {
2135            BUILTINS::types()
2136                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2137                .map(|typ| (typ.name, typ))
2138                .collect()
2139        });
2140
2141        let entry = self.resolve(
2142            |schema| &schema.types,
2143            current_database,
2144            search_path,
2145            name,
2146            conn_id,
2147            |name| SqlCatalogError::UnknownType { name },
2148        )?;
2149
2150        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2151            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2152                warn!(
2153                    "user specified an incorrect schema of {} for the type {}, which should be in \
2154                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2155                    PG_CATALOG_SCHEMA.quoted(),
2156                    typ.name.quoted(),
2157                    typ.schema.quoted(),
2158                )
2159            }
2160        }
2161
2162        Ok(entry)
2163    }
2164
2165    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2166    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2167        match object_id {
2168            ObjectId::Item(item_id) => {
2169                let entry = self.get_entry(&item_id);
2170                match entry.item_type() {
2171                    CatalogItemType::Table => CommentObjectId::Table(item_id),
2172                    CatalogItemType::Source => CommentObjectId::Source(item_id),
2173                    CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2174                    CatalogItemType::View => CommentObjectId::View(item_id),
2175                    CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2176                    CatalogItemType::Index => CommentObjectId::Index(item_id),
2177                    CatalogItemType::Func => CommentObjectId::Func(item_id),
2178                    CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2179                    CatalogItemType::Type => CommentObjectId::Type(item_id),
2180                    CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2181                    CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2182                }
2183            }
2184            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2185            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2186            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2187            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2188            ObjectId::ClusterReplica(cluster_replica_id) => {
2189                CommentObjectId::ClusterReplica(cluster_replica_id)
2190            }
2191            ObjectId::NetworkPolicy(network_policy_id) => {
2192                CommentObjectId::NetworkPolicy(network_policy_id)
2193            }
2194        }
2195    }
2196
2197    /// Return current system configuration.
2198    pub fn system_config(&self) -> &SystemVars {
2199        &self.system_configuration
2200    }
2201
2202    /// Return a mutable reference to the current system configuration.
2203    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2204        &mut self.system_configuration
2205    }
2206
2207    /// Serializes the catalog's in-memory state.
2208    ///
2209    /// There are no guarantees about the format of the serialized state, except
2210    /// that the serialized state for two identical catalogs will compare
2211    /// identically.
2212    ///
2213    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2214    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2215    /// argument.
2216    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2217        // Dump the base catalog.
2218        let mut dump = serde_json::to_value(&self).map_err(|e| {
2219            Error::new(ErrorKind::Unstructured(format!(
2220                // Don't panic here because we don't have compile-time failures for maps with
2221                // non-string keys.
2222                "internal error: could not dump catalog: {}",
2223                e
2224            )))
2225        })?;
2226
2227        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2228        // Stitch in system parameter defaults.
2229        dump_obj.insert(
2230            "system_parameter_defaults".into(),
2231            serde_json::json!(self.system_config().defaults()),
2232        );
2233        // Potentially overwrite unfinalized shards.
2234        if let Some(unfinalized_shards) = unfinalized_shards {
2235            dump_obj
2236                .get_mut("storage_metadata")
2237                .expect("known to exist")
2238                .as_object_mut()
2239                .expect("storage_metadata is an object")
2240                .insert(
2241                    "unfinalized_shards".into(),
2242                    serde_json::json!(unfinalized_shards),
2243                );
2244        }
2245        // Remove GlobalIds for temporary objects from the mapping.
2246        //
2247        // Post-test consistency checks with the durable catalog don't know about temporary items
2248        // since they're kept entirely in memory.
2249        let temporary_gids: Vec<_> = self
2250            .entry_by_global_id
2251            .iter()
2252            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2253            .map(|(gid, _item_id)| *gid)
2254            .collect();
2255        if !temporary_gids.is_empty() {
2256            let gids = dump_obj
2257                .get_mut("entry_by_global_id")
2258                .expect("known_to_exist")
2259                .as_object_mut()
2260                .expect("entry_by_global_id is an object");
2261            for gid in temporary_gids {
2262                gids.remove(&gid.to_string());
2263            }
2264        }
2265
2266        // Emit as pretty-printed JSON.
2267        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2268    }
2269
2270    pub fn availability_zones(&self) -> &[String] {
2271        &self.availability_zones
2272    }
2273
2274    pub fn concretize_replica_location(
2275        &self,
2276        location: mz_catalog::durable::ReplicaLocation,
2277        allowed_sizes: &Vec<String>,
2278        allowed_availability_zones: Option<&[String]>,
2279    ) -> Result<ReplicaLocation, Error> {
2280        let location = match location {
2281            mz_catalog::durable::ReplicaLocation::Unmanaged {
2282                storagectl_addrs,
2283                computectl_addrs,
2284            } => {
2285                if allowed_availability_zones.is_some() {
2286                    return Err(Error {
2287                        kind: ErrorKind::Internal(
2288                            "tried concretize unmanaged replica with specific availability_zones"
2289                                .to_string(),
2290                        ),
2291                    });
2292                }
2293                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2294                    storagectl_addrs,
2295                    computectl_addrs,
2296                })
2297            }
2298            mz_catalog::durable::ReplicaLocation::Managed {
2299                size,
2300                availability_zone,
2301                billed_as,
2302                internal,
2303                pending,
2304            } => {
2305                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2306                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2307                    return Err(Error {
2308                        kind: ErrorKind::Internal(message.to_string()),
2309                    });
2310                }
2311                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2312                let cluster_replica_sizes = &self.cluster_replica_sizes;
2313
2314                ReplicaLocation::Managed(ManagedReplicaLocation {
2315                    allocation: cluster_replica_sizes
2316                        .0
2317                        .get(&size)
2318                        .expect("catalog out of sync")
2319                        .clone(),
2320                    availability_zones: match (availability_zone, allowed_availability_zones) {
2321                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2322                        (None, Some(azs)) if azs.is_empty() => {
2323                            ManagedReplicaAvailabilityZones::FromCluster(None)
2324                        }
2325                        (None, Some(azs)) => {
2326                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2327                        }
2328                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2329                    },
2330                    size,
2331                    billed_as,
2332                    internal,
2333                    pending,
2334                })
2335            }
2336        };
2337        Ok(location)
2338    }
2339
2340    /// Return whether the given replica size requests a disk.
2341    ///
2342    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2343    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2344    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2345    /// not something we can know at this point.
2346    ///
2347    /// # Panics
2348    ///
2349    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2350    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2351        let alloc = &self.cluster_replica_sizes.0[size];
2352        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2353    }
2354
2355    pub(crate) fn ensure_valid_replica_size(
2356        &self,
2357        allowed_sizes: &[String],
2358        size: &String,
2359    ) -> Result<(), Error> {
2360        let cluster_replica_sizes = &self.cluster_replica_sizes;
2361
2362        if !cluster_replica_sizes.0.contains_key(size)
2363            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2364            || cluster_replica_sizes.0[size].disabled
2365        {
2366            let mut entries = cluster_replica_sizes
2367                .enabled_allocations()
2368                .collect::<Vec<_>>();
2369
2370            if !allowed_sizes.is_empty() {
2371                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2372                entries.retain(|(name, _)| allowed_sizes.contains(name));
2373            }
2374
2375            entries.sort_by_key(
2376                |(
2377                    _name,
2378                    ReplicaAllocation {
2379                        scale, cpu_limit, ..
2380                    },
2381                )| (scale, cpu_limit),
2382            );
2383
2384            Err(Error {
2385                kind: ErrorKind::InvalidClusterReplicaSize {
2386                    size: size.to_owned(),
2387                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2388                },
2389            })
2390        } else {
2391            Ok(())
2392        }
2393    }
2394
2395    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2396        if role_id.is_builtin() {
2397            let role = self.get_role(role_id);
2398            Err(Error::new(ErrorKind::ReservedRoleName(
2399                role.name().to_string(),
2400            )))
2401        } else {
2402            Ok(())
2403        }
2404    }
2405
2406    pub fn ensure_not_reserved_network_policy(
2407        &self,
2408        network_policy_id: &NetworkPolicyId,
2409    ) -> Result<(), Error> {
2410        if network_policy_id.is_builtin() {
2411            let policy = self.get_network_policy(network_policy_id);
2412            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2413                policy.name.clone(),
2414            )))
2415        } else {
2416            Ok(())
2417        }
2418    }
2419
2420    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2421        let is_grantable = !role_id.is_public() && !role_id.is_system();
2422        if is_grantable {
2423            Ok(())
2424        } else {
2425            let role = self.get_role(role_id);
2426            Err(Error::new(ErrorKind::UngrantableRoleName(
2427                role.name().to_string(),
2428            )))
2429        }
2430    }
2431
2432    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2433        if role_id.is_system() {
2434            let role = self.get_role(role_id);
2435            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2436                role.name().to_string(),
2437            )))
2438        } else {
2439            Ok(())
2440        }
2441    }
2442
2443    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2444        if role_id.is_predefined() {
2445            let role = self.get_role(role_id);
2446            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2447                role.name().to_string(),
2448            )))
2449        } else {
2450            Ok(())
2451        }
2452    }
2453
2454    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2455    // passing tx, and session?
2456    pub(crate) fn add_to_audit_log(
2457        system_configuration: &SystemVars,
2458        oracle_write_ts: mz_repr::Timestamp,
2459        session: Option<&ConnMeta>,
2460        tx: &mut mz_catalog::durable::Transaction,
2461        audit_events: &mut Vec<VersionedEvent>,
2462        event_type: EventType,
2463        object_type: ObjectType,
2464        details: EventDetails,
2465    ) -> Result<(), Error> {
2466        let user = session.map(|session| session.user().name.to_string());
2467
2468        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2469
2470        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2471            Some(ts) => ts.into(),
2472            _ => oracle_write_ts.into(),
2473        };
2474        let id = tx.allocate_audit_log_id()?;
2475        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2476        audit_events.push(event.clone());
2477        tx.insert_audit_log_event(event);
2478        Ok(())
2479    }
2480
2481    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2482        match id {
2483            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2484            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2485                self.get_cluster_replica(*cluster_id, *replica_id)
2486                    .owner_id(),
2487            ),
2488            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2489            ObjectId::Schema((database_spec, schema_spec)) => Some(
2490                self.get_schema(database_spec, schema_spec, conn_id)
2491                    .owner_id(),
2492            ),
2493            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2494            ObjectId::Role(_) => None,
2495            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2496        }
2497    }
2498
2499    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2500        match object_id {
2501            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2502            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2503            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2504            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2505            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2506            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2507            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2508        }
2509    }
2510
2511    pub(super) fn get_system_object_type(
2512        &self,
2513        id: &SystemObjectId,
2514    ) -> mz_sql::catalog::SystemObjectType {
2515        match id {
2516            SystemObjectId::Object(object_id) => {
2517                SystemObjectType::Object(self.get_object_type(object_id))
2518            }
2519            SystemObjectId::System => SystemObjectType::System,
2520        }
2521    }
2522
2523    /// Returns a read-only view of the current [`StorageMetadata`].
2524    ///
2525    /// To write to this struct, you must use a catalog transaction.
2526    pub fn storage_metadata(&self) -> &StorageMetadata {
2527        &self.storage_metadata
2528    }
2529
2530    /// For the Sources ids in `ids`, return their compaction windows.
2531    pub fn source_compaction_windows(
2532        &self,
2533        ids: impl IntoIterator<Item = CatalogItemId>,
2534    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2535        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2536        let mut seen = BTreeSet::new();
2537        for item_id in ids {
2538            if !seen.insert(item_id) {
2539                continue;
2540            }
2541            let entry = self.get_entry(&item_id);
2542            match entry.item() {
2543                CatalogItem::Source(source) => {
2544                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2545                    match source.data_source {
2546                        DataSourceDesc::Ingestion { .. }
2547                        | DataSourceDesc::OldSyntaxIngestion { .. }
2548                        | DataSourceDesc::IngestionExport { .. } => {
2549                            cws.entry(source_cw).or_default().insert(item_id);
2550                        }
2551                        DataSourceDesc::Introspection(_)
2552                        | DataSourceDesc::Progress
2553                        | DataSourceDesc::Webhook { .. } => {
2554                            cws.entry(source_cw).or_default().insert(item_id);
2555                        }
2556                    }
2557                }
2558                CatalogItem::Table(table) => {
2559                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2560                    match &table.data_source {
2561                        TableDataSource::DataSource {
2562                            desc: DataSourceDesc::IngestionExport { .. },
2563                            timeline: _,
2564                        } => {
2565                            cws.entry(table_cw).or_default().insert(item_id);
2566                        }
2567                        _ => {}
2568                    }
2569                }
2570                _ => {
2571                    // Views could depend on sources, so ignore them if added by used_by above.
2572                    continue;
2573                }
2574            }
2575        }
2576        cws
2577    }
2578
2579    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2580        match id {
2581            CommentObjectId::Table(id)
2582            | CommentObjectId::View(id)
2583            | CommentObjectId::MaterializedView(id)
2584            | CommentObjectId::Source(id)
2585            | CommentObjectId::Sink(id)
2586            | CommentObjectId::Index(id)
2587            | CommentObjectId::Func(id)
2588            | CommentObjectId::Connection(id)
2589            | CommentObjectId::Type(id)
2590            | CommentObjectId::Secret(id)
2591            | CommentObjectId::ContinualTask(id) => Some(*id),
2592            CommentObjectId::Role(_)
2593            | CommentObjectId::Database(_)
2594            | CommentObjectId::Schema(_)
2595            | CommentObjectId::Cluster(_)
2596            | CommentObjectId::ClusterReplica(_)
2597            | CommentObjectId::NetworkPolicy(_) => None,
2598        }
2599    }
2600
2601    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2602        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2603    }
2604
2605    pub fn comment_id_to_audit_log_name(
2606        &self,
2607        id: CommentObjectId,
2608        conn_id: &ConnectionId,
2609    ) -> String {
2610        match id {
2611            CommentObjectId::Table(id)
2612            | CommentObjectId::View(id)
2613            | CommentObjectId::MaterializedView(id)
2614            | CommentObjectId::Source(id)
2615            | CommentObjectId::Sink(id)
2616            | CommentObjectId::Index(id)
2617            | CommentObjectId::Func(id)
2618            | CommentObjectId::Connection(id)
2619            | CommentObjectId::Type(id)
2620            | CommentObjectId::Secret(id)
2621            | CommentObjectId::ContinualTask(id) => {
2622                let item = self.get_entry(&id);
2623                let name = self.resolve_full_name(item.name(), Some(conn_id));
2624                name.to_string()
2625            }
2626            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2627            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2628            CommentObjectId::Schema((spec, schema_id)) => {
2629                let schema = self.get_schema(&spec, &schema_id, conn_id);
2630                self.resolve_full_schema_name(&schema.name).to_string()
2631            }
2632            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2633            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2634                let cluster = self.get_cluster(cluster_id);
2635                let replica = self.get_cluster_replica(cluster_id, replica_id);
2636                QualifiedReplica {
2637                    cluster: Ident::new_unchecked(cluster.name.clone()),
2638                    replica: Ident::new_unchecked(replica.name.clone()),
2639                }
2640                .to_string()
2641            }
2642            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2643        }
2644    }
2645
2646    pub fn mock_authentication_nonce(&self) -> String {
2647        self.mock_authentication_nonce.clone().unwrap_or_default()
2648    }
2649}
2650
2651impl ConnectionResolver for CatalogState {
2652    fn resolve_connection(
2653        &self,
2654        id: CatalogItemId,
2655    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2656        use mz_storage_types::connections::Connection::*;
2657        match self
2658            .get_entry(&id)
2659            .connection()
2660            .expect("catalog out of sync")
2661            .details
2662            .to_connection()
2663        {
2664            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2665            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2666            Csr(conn) => Csr(conn.into_inline_connection(self)),
2667            Ssh(conn) => Ssh(conn),
2668            Aws(conn) => Aws(conn),
2669            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2670            MySql(conn) => MySql(conn.into_inline_connection(self)),
2671            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2672            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2673        }
2674    }
2675}
2676
2677impl OptimizerCatalog for CatalogState {
2678    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2679        CatalogState::get_entry_by_global_id(self, id)
2680    }
2681    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2682        CatalogState::get_entry(self, id)
2683    }
2684    fn resolve_full_name(
2685        &self,
2686        name: &QualifiedItemName,
2687        conn_id: Option<&ConnectionId>,
2688    ) -> FullItemName {
2689        CatalogState::resolve_full_name(self, name, conn_id)
2690    }
2691    fn get_indexes_on(
2692        &self,
2693        id: GlobalId,
2694        cluster: ClusterId,
2695    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2696        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2697    }
2698}
2699
2700impl OptimizerCatalog for Catalog {
2701    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2702        self.state.get_entry_by_global_id(id)
2703    }
2704
2705    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2706        self.state.get_entry(id)
2707    }
2708
2709    fn resolve_full_name(
2710        &self,
2711        name: &QualifiedItemName,
2712        conn_id: Option<&ConnectionId>,
2713    ) -> FullItemName {
2714        self.state.resolve_full_name(name, conn_id)
2715    }
2716
2717    fn get_indexes_on(
2718        &self,
2719        id: GlobalId,
2720        cluster: ClusterId,
2721    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2722        Box::new(self.state.get_indexes_on(id, cluster))
2723    }
2724}
2725
2726impl Catalog {
2727    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2728        self
2729    }
2730}