mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
61use mz_secrets::InMemorySecretsController;
62use mz_sql::ast::Ident;
63use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68    TypeReference,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
72    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{
76    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
77    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
78    Plan, PlanContext,
79};
80use mz_sql::rbac;
81use mz_sql::session::metadata::SessionMetadata;
82use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
83use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
84use mz_sql_parser::ast::QualifiedReplica;
85use mz_storage_client::controller::StorageMetadata;
86use mz_storage_types::connections::ConnectionContext;
87use mz_storage_types::connections::inline::{
88    ConnectionResolver, InlinedConnection, IntoInlineConnection,
89};
90use serde::Serialize;
91use timely::progress::Antichain;
92use tokio::sync::mpsc;
93use tracing::{debug, warn};
94
95// DO NOT add any more imports from `crate` outside of `crate::catalog`.
96use crate::AdapterError;
97use crate::catalog::{Catalog, ConnCatalog};
98use crate::coord::ConnMeta;
99use crate::optimize::{self, Optimize, OptimizerCatalog};
100use crate::session::Session;
101
102/// The in-memory representation of the Catalog. This struct is not directly used to persist
103/// metadata to persistent storage. For persistent metadata see
104/// [`mz_catalog::durable::DurableCatalogState`].
105///
106/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
107/// storing the contents of this struct on disk.
108#[derive(Debug, Clone, Serialize)]
109pub struct CatalogState {
110    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
111    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
112    // include:
113    //  - Temporary items.
114    //  - Certain objects are partially derived from read-only state.
115    pub(super) database_by_name: BTreeMap<String, DatabaseId>,
116    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
117    pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
118    #[serde(serialize_with = "skip_temp_items")]
119    pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
120    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121    pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
122    pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
125    pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
128    pub(super) roles_by_name: BTreeMap<String, RoleId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) roles_by_id: BTreeMap<RoleId, Role>,
131    pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
134    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
135    pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
136
137    #[serde(skip)]
138    pub(super) system_configuration: SystemVars,
139    pub(super) default_privileges: DefaultPrivileges,
140    pub(super) system_privileges: PrivilegeMap,
141    pub(super) comments: CommentsMap,
142    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
143    pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
144    pub(super) storage_metadata: StorageMetadata,
145    pub(super) mock_authentication_nonce: Option<String>,
146
147    // Mutable state not derived from the durable catalog.
148    #[serde(skip)]
149    pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
150
151    // Read-only state not derived from the durable catalog.
152    #[serde(skip)]
153    pub(super) config: mz_sql::catalog::CatalogConfig,
154    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
155    #[serde(skip)]
156    pub(crate) availability_zones: Vec<String>,
157
158    // Read-only not derived from the durable catalog.
159    #[serde(skip)]
160    pub(super) egress_addresses: Vec<IpNet>,
161    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
162    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
163    pub(super) http_host_name: Option<String>,
164
165    // Read-only not derived from the durable catalog.
166    #[serde(skip)]
167    pub(super) license_key: ValidatedLicenseKey,
168}
169
170/// Keeps track of what expressions are cached or not during startup.
171#[derive(Debug, Clone, Serialize)]
172pub(crate) enum LocalExpressionCache {
173    /// The cache is being used.
174    Open {
175        /// The local expressions that were cached in the expression cache.
176        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
177        /// The local expressions that were NOT cached in the expression cache.
178        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
179    },
180    /// The cache is not being used.
181    Closed,
182}
183
184impl LocalExpressionCache {
185    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
186        Self::Open {
187            cached_exprs,
188            uncached_exprs: BTreeMap::new(),
189        }
190    }
191
192    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
193        match self {
194            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
195            LocalExpressionCache::Closed => None,
196        }
197    }
198
199    /// Insert an expression that was cached, back into the cache. This is generally needed when
200    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
201    pub(super) fn insert_cached_expression(
202        &mut self,
203        id: GlobalId,
204        local_expressions: LocalExpressions,
205    ) {
206        match self {
207            LocalExpressionCache::Open { cached_exprs, .. } => {
208                cached_exprs.insert(id, local_expressions);
209            }
210            LocalExpressionCache::Closed => {}
211        }
212    }
213
214    /// Inform the cache that `id` was not found in the cache and that we should add it as
215    /// `local_mir` and `optimizer_features`.
216    pub(super) fn insert_uncached_expression(
217        &mut self,
218        id: GlobalId,
219        local_mir: OptimizedMirRelationExpr,
220        optimizer_features: OptimizerFeatures,
221    ) {
222        match self {
223            LocalExpressionCache::Open { uncached_exprs, .. } => {
224                let local_expr = LocalExpressions {
225                    local_mir,
226                    optimizer_features,
227                };
228                // If we are trying to cache the same item a second time, with a different
229                // expression, then we must be migrating the object or doing something else weird.
230                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
231                // version after a restart. Caching the migrated version may cause us to incorrectly
232                // think that the object has already been migrated. To simplify things, we cache
233                // neither.
234                let prev = uncached_exprs.remove(&id);
235                match prev {
236                    Some(prev) if prev == local_expr => {
237                        uncached_exprs.insert(id, local_expr);
238                    }
239                    None => {
240                        uncached_exprs.insert(id, local_expr);
241                    }
242                    Some(_) => {}
243                }
244            }
245            LocalExpressionCache::Closed => {}
246        }
247    }
248
249    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
250        match self {
251            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
252            LocalExpressionCache::Closed => BTreeMap::new(),
253        }
254    }
255}
256
257fn skip_temp_items<S>(
258    entries: &BTreeMap<CatalogItemId, CatalogEntry>,
259    serializer: S,
260) -> Result<S::Ok, S::Error>
261where
262    S: serde::Serializer,
263{
264    mz_ore::serde::map_key_to_string(
265        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
266        serializer,
267    )
268}
269
270impl CatalogState {
271    /// Returns an empty [`CatalogState`] that can be used in tests.
272    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
273    // tests are structured in this repository.
274    pub fn empty_test() -> Self {
275        CatalogState {
276            database_by_name: Default::default(),
277            database_by_id: Default::default(),
278            entry_by_id: Default::default(),
279            entry_by_global_id: Default::default(),
280            ambient_schemas_by_name: Default::default(),
281            ambient_schemas_by_id: Default::default(),
282            temporary_schemas: Default::default(),
283            clusters_by_id: Default::default(),
284            clusters_by_name: Default::default(),
285            network_policies_by_name: Default::default(),
286            roles_by_name: Default::default(),
287            roles_by_id: Default::default(),
288            network_policies_by_id: Default::default(),
289            role_auth_by_id: Default::default(),
290            config: CatalogConfig {
291                start_time: Default::default(),
292                start_instant: Instant::now(),
293                nonce: Default::default(),
294                environment_id: EnvironmentId::for_tests(),
295                session_id: Default::default(),
296                build_info: &DUMMY_BUILD_INFO,
297                timestamp_interval: Default::default(),
298                now: NOW_ZERO.clone(),
299                connection_context: ConnectionContext::for_tests(Arc::new(
300                    InMemorySecretsController::new(),
301                )),
302                builtins_cfg: BuiltinsConfig {
303                    include_continual_tasks: true,
304                },
305                helm_chart_version: None,
306            },
307            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
308            availability_zones: Default::default(),
309            system_configuration: Default::default(),
310            egress_addresses: Default::default(),
311            aws_principal_context: Default::default(),
312            aws_privatelink_availability_zones: Default::default(),
313            http_host_name: Default::default(),
314            default_privileges: Default::default(),
315            system_privileges: Default::default(),
316            comments: Default::default(),
317            source_references: Default::default(),
318            storage_metadata: Default::default(),
319            license_key: ValidatedLicenseKey::for_tests(),
320            mock_authentication_nonce: Default::default(),
321        }
322    }
323
324    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
325        let search_path = self.resolve_search_path(session);
326        let database = self
327            .database_by_name
328            .get(session.vars().database())
329            .map(|id| id.clone());
330        let state = match session.transaction().catalog_state() {
331            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
332            None => Cow::Borrowed(self),
333        };
334        ConnCatalog {
335            state,
336            unresolvable_ids: BTreeSet::new(),
337            conn_id: session.conn_id().clone(),
338            cluster: session.vars().cluster().into(),
339            database,
340            search_path,
341            role_id: session.current_role_id().clone(),
342            prepared_statements: Some(session.prepared_statements()),
343            portals: Some(session.portals()),
344            notices_tx: session.retain_notice_transmitter(),
345        }
346    }
347
348    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
349        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
350        let cluster = self.system_configuration.default_cluster();
351
352        ConnCatalog {
353            state: Cow::Borrowed(self),
354            unresolvable_ids: BTreeSet::new(),
355            conn_id: SYSTEM_CONN_ID.clone(),
356            cluster,
357            database: self
358                .resolve_database(DEFAULT_DATABASE_NAME)
359                .ok()
360                .map(|db| db.id()),
361            // Leaving the system's search path empty allows us to catch issues
362            // where catalog object names have not been normalized correctly.
363            search_path: Vec::new(),
364            role_id,
365            prepared_statements: None,
366            portals: None,
367            notices_tx,
368        }
369    }
370
371    pub fn for_system_session(&self) -> ConnCatalog<'_> {
372        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
373    }
374
375    /// Returns an iterator over the deduplicated identifiers of all
376    /// objects this catalog entry transitively depends on (where
377    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
378    /// [`CatalogItem::references`]).
379    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
380        struct I<'a> {
381            queue: VecDeque<CatalogItemId>,
382            seen: BTreeSet<CatalogItemId>,
383            this: &'a CatalogState,
384        }
385        impl<'a> Iterator for I<'a> {
386            type Item = CatalogItemId;
387            fn next(&mut self) -> Option<Self::Item> {
388                if let Some(next) = self.queue.pop_front() {
389                    for child in self.this.get_entry(&next).item().uses() {
390                        if !self.seen.contains(&child) {
391                            self.queue.push_back(child);
392                            self.seen.insert(child);
393                        }
394                    }
395                    Some(next)
396                } else {
397                    None
398                }
399            }
400        }
401
402        I {
403            queue: [id].into_iter().collect(),
404            seen: [id].into_iter().collect(),
405            this: self,
406        }
407    }
408
409    /// Computes the IDs of any log sources this catalog entry transitively
410    /// depends on.
411    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
412        let mut out = Vec::new();
413        self.introspection_dependencies_inner(id, &mut out);
414        out
415    }
416
417    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
418        match self.get_entry(&id).item() {
419            CatalogItem::Log(_) => out.push(id),
420            item @ (CatalogItem::View(_)
421            | CatalogItem::MaterializedView(_)
422            | CatalogItem::Connection(_)
423            | CatalogItem::ContinualTask(_)) => {
424                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
425                for item_id in item.references().items() {
426                    self.introspection_dependencies_inner(*item_id, out);
427                }
428            }
429            CatalogItem::Sink(sink) => {
430                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
431                self.introspection_dependencies_inner(from_item_id, out)
432            }
433            CatalogItem::Index(idx) => {
434                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
435                self.introspection_dependencies_inner(on_item_id, out)
436            }
437            CatalogItem::Table(_)
438            | CatalogItem::Source(_)
439            | CatalogItem::Type(_)
440            | CatalogItem::Func(_)
441            | CatalogItem::Secret(_) => (),
442        }
443    }
444
445    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
446    ///
447    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
448    /// earlier in the list than the roots. This is particularly useful for the order to drop
449    /// objects.
450    pub(super) fn object_dependents(
451        &self,
452        object_ids: &Vec<ObjectId>,
453        conn_id: &ConnectionId,
454        seen: &mut BTreeSet<ObjectId>,
455    ) -> Vec<ObjectId> {
456        let mut dependents = Vec::new();
457        for object_id in object_ids {
458            match object_id {
459                ObjectId::Cluster(id) => {
460                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
461                }
462                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
463                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
464                ),
465                ObjectId::Database(id) => {
466                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
467                }
468                ObjectId::Schema((database_spec, schema_spec)) => {
469                    dependents.extend_from_slice(&self.schema_dependents(
470                        database_spec.clone(),
471                        schema_spec.clone(),
472                        conn_id,
473                        seen,
474                    ));
475                }
476                ObjectId::NetworkPolicy(id) => {
477                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
478                }
479                id @ ObjectId::Role(_) => {
480                    let unseen = seen.insert(id.clone());
481                    if unseen {
482                        dependents.push(id.clone());
483                    }
484                }
485                ObjectId::Item(id) => {
486                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
487                }
488            }
489        }
490        dependents
491    }
492
493    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
494    /// itself.
495    ///
496    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
497    /// earlier in the list than the roots. This is particularly useful for the order to drop
498    /// objects.
499    fn cluster_dependents(
500        &self,
501        cluster_id: ClusterId,
502        seen: &mut BTreeSet<ObjectId>,
503    ) -> Vec<ObjectId> {
504        let mut dependents = Vec::new();
505        let object_id = ObjectId::Cluster(cluster_id);
506        if !seen.contains(&object_id) {
507            seen.insert(object_id.clone());
508            let cluster = self.get_cluster(cluster_id);
509            for item_id in cluster.bound_objects() {
510                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
511            }
512            for replica_id in cluster.replica_ids().values() {
513                dependents.extend_from_slice(&self.cluster_replica_dependents(
514                    cluster_id,
515                    *replica_id,
516                    seen,
517                ));
518            }
519            dependents.push(object_id);
520        }
521        dependents
522    }
523
524    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
525    /// itself.
526    ///
527    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
528    /// earlier in the list than the roots. This is particularly useful for the order to drop
529    /// objects.
530    pub(super) fn cluster_replica_dependents(
531        &self,
532        cluster_id: ClusterId,
533        replica_id: ReplicaId,
534        seen: &mut BTreeSet<ObjectId>,
535    ) -> Vec<ObjectId> {
536        let mut dependents = Vec::new();
537        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
538        if !seen.contains(&object_id) {
539            seen.insert(object_id.clone());
540            dependents.push(object_id);
541        }
542        dependents
543    }
544
545    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
546    /// itself.
547    ///
548    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
549    /// earlier in the list than the roots. This is particularly useful for the order to drop
550    /// objects.
551    fn database_dependents(
552        &self,
553        database_id: DatabaseId,
554        conn_id: &ConnectionId,
555        seen: &mut BTreeSet<ObjectId>,
556    ) -> Vec<ObjectId> {
557        let mut dependents = Vec::new();
558        let object_id = ObjectId::Database(database_id);
559        if !seen.contains(&object_id) {
560            seen.insert(object_id.clone());
561            let database = self.get_database(&database_id);
562            for schema_id in database.schema_ids().values() {
563                dependents.extend_from_slice(&self.schema_dependents(
564                    ResolvedDatabaseSpecifier::Id(database_id),
565                    SchemaSpecifier::Id(*schema_id),
566                    conn_id,
567                    seen,
568                ));
569            }
570            dependents.push(object_id);
571        }
572        dependents
573    }
574
575    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
576    /// itself.
577    ///
578    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
579    /// earlier in the list than the roots. This is particularly useful for the order to drop
580    /// objects.
581    fn schema_dependents(
582        &self,
583        database_spec: ResolvedDatabaseSpecifier,
584        schema_spec: SchemaSpecifier,
585        conn_id: &ConnectionId,
586        seen: &mut BTreeSet<ObjectId>,
587    ) -> Vec<ObjectId> {
588        let mut dependents = Vec::new();
589        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
590        if !seen.contains(&object_id) {
591            seen.insert(object_id.clone());
592            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
593            for item_id in schema.item_ids() {
594                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
595            }
596            dependents.push(object_id)
597        }
598        dependents
599    }
600
601    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
602    /// itself.
603    ///
604    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
605    /// earlier in the list than the roots. This is particularly useful for the order to drop
606    /// objects.
607    pub(super) fn item_dependents(
608        &self,
609        item_id: CatalogItemId,
610        seen: &mut BTreeSet<ObjectId>,
611    ) -> Vec<ObjectId> {
612        let mut dependents = Vec::new();
613        let object_id = ObjectId::Item(item_id);
614        if !seen.contains(&object_id) {
615            seen.insert(object_id.clone());
616            let entry = self.get_entry(&item_id);
617            for dependent_id in entry.used_by() {
618                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
619            }
620            dependents.push(object_id);
621            // We treat the progress collection as if it depends on the source
622            // for dropping. We have additional code in planning to create a
623            // kind of special-case "CASCADE" for this dependency.
624            if let Some(progress_id) = entry.progress_id() {
625                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
626            }
627        }
628        dependents
629    }
630
631    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
632    /// itself.
633    ///
634    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
635    /// earlier in the list than the roots. This is particularly useful for the order to drop
636    /// objects.
637    pub(super) fn network_policy_dependents(
638        &self,
639        network_policy_id: NetworkPolicyId,
640        _seen: &mut BTreeSet<ObjectId>,
641    ) -> Vec<ObjectId> {
642        let object_id = ObjectId::NetworkPolicy(network_policy_id);
643        // Currently network policies have no dependents
644        // when we add the ability for users or sources/sinks to have policies
645        // this method will need to be updated.
646        vec![object_id]
647    }
648
649    /// Indicates whether the indicated item is considered stable or not.
650    ///
651    /// Only stable items can be used as dependencies of other catalog items.
652    fn is_stable(&self, id: CatalogItemId) -> bool {
653        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
654        !self.is_unstable_schema_specifier(spec)
655    }
656
657    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
658        if self.system_config().unsafe_enable_unstable_dependencies() {
659            return Ok(());
660        }
661
662        let unstable_dependencies: Vec<_> = item
663            .references()
664            .items()
665            .filter(|id| !self.is_stable(**id))
666            .map(|id| self.get_entry(id).name().item.clone())
667            .collect();
668
669        // It's okay to create a temporary object with unstable
670        // dependencies, since we will never need to reboot a catalog
671        // that contains it.
672        if unstable_dependencies.is_empty() || item.is_temporary() {
673            Ok(())
674        } else {
675            let object_type = item.typ().to_string();
676            Err(Error {
677                kind: ErrorKind::UnstableDependency {
678                    object_type,
679                    unstable_dependencies,
680                },
681            })
682        }
683    }
684
685    pub fn resolve_full_name(
686        &self,
687        name: &QualifiedItemName,
688        conn_id: Option<&ConnectionId>,
689    ) -> FullItemName {
690        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
691
692        let database = match &name.qualifiers.database_spec {
693            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
694            ResolvedDatabaseSpecifier::Id(id) => {
695                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
696            }
697        };
698        let schema = self
699            .get_schema(
700                &name.qualifiers.database_spec,
701                &name.qualifiers.schema_spec,
702                conn_id,
703            )
704            .name()
705            .schema
706            .clone();
707        FullItemName {
708            database,
709            schema,
710            item: name.item.clone(),
711        }
712    }
713
714    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
715        let database = match &name.database {
716            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
717            ResolvedDatabaseSpecifier::Id(id) => {
718                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
719            }
720        };
721        FullSchemaName {
722            database,
723            schema: name.schema.clone(),
724        }
725    }
726
727    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
728        self.entry_by_id
729            .get(id)
730            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
731    }
732
733    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
734        let item_id = self
735            .entry_by_global_id
736            .get(id)
737            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
738
739        let entry = self.get_entry(item_id).clone();
740        let version = match entry.item() {
741            CatalogItem::Table(table) => {
742                let (version, _) = table
743                    .collections
744                    .iter()
745                    .find(|(_verison, gid)| *gid == id)
746                    .expect("version to exist");
747                RelationVersionSelector::Specific(*version)
748            }
749            _ => RelationVersionSelector::Latest,
750        };
751        CatalogCollectionEntry { entry, version }
752    }
753
754    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
755        self.entry_by_id.iter()
756    }
757
758    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
759        let schema = self
760            .temporary_schemas
761            .get(conn)
762            .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
763        schema.items.values().copied().map(ObjectId::from)
764    }
765
766    /// Gets a type named `name` from exactly one of the system schemas.
767    ///
768    /// # Panics
769    /// - If `name` is not an entry in any system schema
770    /// - If more than one system schema has an entry named `name`.
771    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
772        let mut res = None;
773        for schema_id in self.system_schema_ids() {
774            let schema = &self.ambient_schemas_by_id[&schema_id];
775            if let Some(global_id) = schema.types.get(name) {
776                match res {
777                    None => res = Some(self.get_entry(global_id)),
778                    Some(_) => panic!(
779                        "only call get_system_type on objects uniquely identifiable in one system schema"
780                    ),
781                }
782            }
783        }
784
785        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
786    }
787
788    pub fn get_item_by_name(
789        &self,
790        name: &QualifiedItemName,
791        conn_id: &ConnectionId,
792    ) -> Option<&CatalogEntry> {
793        self.get_schema(
794            &name.qualifiers.database_spec,
795            &name.qualifiers.schema_spec,
796            conn_id,
797        )
798        .items
799        .get(&name.item)
800        .and_then(|id| self.try_get_entry(id))
801    }
802
803    pub fn get_type_by_name(
804        &self,
805        name: &QualifiedItemName,
806        conn_id: &ConnectionId,
807    ) -> Option<&CatalogEntry> {
808        self.get_schema(
809            &name.qualifiers.database_spec,
810            &name.qualifiers.schema_spec,
811            conn_id,
812        )
813        .types
814        .get(&name.item)
815        .and_then(|id| self.try_get_entry(id))
816    }
817
818    pub(super) fn find_available_name(
819        &self,
820        mut name: QualifiedItemName,
821        conn_id: &ConnectionId,
822    ) -> QualifiedItemName {
823        let mut i = 0;
824        let orig_item_name = name.item.clone();
825        while self.get_item_by_name(&name, conn_id).is_some() {
826            i += 1;
827            name.item = format!("{}{}", orig_item_name, i);
828        }
829        name
830    }
831
832    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
833        self.entry_by_id.get(id)
834    }
835
836    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
837        let item_id = self.entry_by_global_id.get(id)?;
838        self.try_get_entry(item_id)
839    }
840
841    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
842    /// object that returns rows.
843    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
844        let entry = self.try_get_entry_by_global_id(id)?;
845        let desc = match entry.item() {
846            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
847            // TODO(alter_table): Support schema evolution on sources.
848            other => other.desc_opt(RelationVersionSelector::Latest)?,
849        };
850        Some(desc)
851    }
852
853    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
854        self.try_get_cluster(cluster_id)
855            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
856    }
857
858    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
859        self.clusters_by_id.get(&cluster_id)
860    }
861
862    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
863        self.roles_by_id.get(id)
864    }
865
866    pub fn get_role(&self, id: &RoleId) -> &Role {
867        self.roles_by_id.get(id).expect("catalog out of sync")
868    }
869
870    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
871        self.roles_by_id.keys()
872    }
873
874    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
875        self.roles_by_name
876            .get(role_name)
877            .map(|id| &self.roles_by_id[id])
878    }
879
880    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
881        self.role_auth_by_id
882            .get(id)
883            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
884    }
885
886    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
887        self.role_auth_by_id.get(id)
888    }
889
890    pub(super) fn try_get_network_policy_by_name(
891        &self,
892        policy_name: &str,
893    ) -> Option<&NetworkPolicy> {
894        self.network_policies_by_name
895            .get(policy_name)
896            .map(|id| &self.network_policies_by_id[id])
897    }
898
899    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
900        let mut membership = BTreeSet::new();
901        let mut queue = VecDeque::from(vec![id]);
902        while let Some(cur_id) = queue.pop_front() {
903            if !membership.contains(cur_id) {
904                membership.insert(cur_id.clone());
905                let role = self.get_role(cur_id);
906                soft_assert_no_log!(
907                    !role.membership().keys().contains(id),
908                    "circular membership exists in the catalog"
909                );
910                queue.extend(role.membership().keys());
911            }
912        }
913        membership.insert(RoleId::Public);
914        membership
915    }
916
917    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
918        self.network_policies_by_id
919            .get(id)
920            .expect("catalog out of sync")
921    }
922
923    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
924        self.network_policies_by_id.keys()
925    }
926
927    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
928    /// source.
929    ///
930    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
931    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
932        let entry = self.try_get_entry(id)?;
933        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
934        let name = self.resolve_full_name(entry.name(), None);
935        let host_name = self
936            .http_host_name
937            .as_ref()
938            .map(|x| x.as_str())
939            .unwrap_or_else(|| "HOST");
940
941        let RawDatabaseSpecifier::Name(database) = name.database else {
942            return None;
943        };
944
945        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
946        url.path_segments_mut()
947            .ok()?
948            .push(&database)
949            .push(&name.schema)
950            .push(&name.item);
951
952        Some(url)
953    }
954
955    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
956    ///
957    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
958    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
959    ///
960    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
961    /// their original state before the method returns.
962    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
963        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
964        // caller if this method changed the state of the catalog.
965        &mut self,
966        create_sql: &str,
967        force_if_exists_skip: bool,
968    ) -> Result<(Plan, ResolvedIds), AdapterError> {
969        self.with_enable_for_item_parsing(|state| {
970            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
971            let pcx = Some(&pcx);
972            let session_catalog = state.for_system_session();
973
974            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
975            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
976            let plan =
977                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
978
979            Ok((plan, resolved_ids))
980        })
981    }
982
983    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
984    #[mz_ore::instrument]
985    pub(crate) fn parse_plan(
986        create_sql: &str,
987        pcx: Option<&PlanContext>,
988        catalog: &ConnCatalog,
989    ) -> Result<(Plan, ResolvedIds), AdapterError> {
990        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
991        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
992        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
993
994        return Ok((plan, resolved_ids));
995    }
996
997    /// Parses the given SQL string into a pair of [`CatalogItem`].
998    pub(crate) fn deserialize_item(
999        &self,
1000        global_id: GlobalId,
1001        create_sql: &str,
1002        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1003        local_expression_cache: &mut LocalExpressionCache,
1004        previous_item: Option<CatalogItem>,
1005    ) -> Result<CatalogItem, AdapterError> {
1006        self.parse_item(
1007            global_id,
1008            create_sql,
1009            extra_versions,
1010            None,
1011            false,
1012            None,
1013            local_expression_cache,
1014            previous_item,
1015        )
1016    }
1017
1018    /// Parses the given SQL string into a `CatalogItem`.
1019    #[mz_ore::instrument]
1020    pub(crate) fn parse_item(
1021        &self,
1022        global_id: GlobalId,
1023        create_sql: &str,
1024        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1025        pcx: Option<&PlanContext>,
1026        is_retained_metrics_object: bool,
1027        custom_logical_compaction_window: Option<CompactionWindow>,
1028        local_expression_cache: &mut LocalExpressionCache,
1029        previous_item: Option<CatalogItem>,
1030    ) -> Result<CatalogItem, AdapterError> {
1031        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1032        match self.parse_item_inner(
1033            global_id,
1034            create_sql,
1035            extra_versions,
1036            pcx,
1037            is_retained_metrics_object,
1038            custom_logical_compaction_window,
1039            cached_expr,
1040            previous_item,
1041        ) {
1042            Ok((item, uncached_expr)) => {
1043                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1044                    local_expression_cache.insert_uncached_expression(
1045                        global_id,
1046                        uncached_expr,
1047                        optimizer_features,
1048                    );
1049                }
1050                Ok(item)
1051            }
1052            Err((err, cached_expr)) => {
1053                if let Some(local_expr) = cached_expr {
1054                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1055                }
1056                Err(err)
1057            }
1058        }
1059    }
1060
1061    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1062    ///
1063    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1064    /// not cached.
1065    ///
1066    /// On failure returns an error and `cached_expr` so it can be used later.
1067    #[mz_ore::instrument]
1068    pub(crate) fn parse_item_inner(
1069        &self,
1070        global_id: GlobalId,
1071        create_sql: &str,
1072        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1073        pcx: Option<&PlanContext>,
1074        is_retained_metrics_object: bool,
1075        custom_logical_compaction_window: Option<CompactionWindow>,
1076        cached_expr: Option<LocalExpressions>,
1077        previous_item: Option<CatalogItem>,
1078    ) -> Result<
1079        (
1080            CatalogItem,
1081            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1082        ),
1083        (AdapterError, Option<LocalExpressions>),
1084    > {
1085        let session_catalog = self.for_system_session();
1086
1087        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1088            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1089            Err(err) => return Err((err, cached_expr)),
1090        };
1091
1092        let mut uncached_expr = None;
1093
1094        let item = match plan {
1095            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1096                let collections = extra_versions
1097                    .iter()
1098                    .map(|(version, gid)| (*version, *gid))
1099                    .chain([(RelationVersion::root(), global_id)].into_iter())
1100                    .collect();
1101
1102                CatalogItem::Table(Table {
1103                    create_sql: Some(table.create_sql),
1104                    desc: table.desc,
1105                    collections,
1106                    conn_id: None,
1107                    resolved_ids,
1108                    custom_logical_compaction_window: custom_logical_compaction_window
1109                        .or(table.compaction_window),
1110                    is_retained_metrics_object,
1111                    data_source: match table.data_source {
1112                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1113                            TableDataSource::TableWrites { defaults }
1114                        }
1115                        mz_sql::plan::TableDataSource::DataSource {
1116                            desc: data_source_desc,
1117                            timeline,
1118                        } => match data_source_desc {
1119                            mz_sql::plan::DataSourceDesc::IngestionExport {
1120                                ingestion_id,
1121                                external_reference,
1122                                details,
1123                                data_config,
1124                            } => TableDataSource::DataSource {
1125                                desc: DataSourceDesc::IngestionExport {
1126                                    ingestion_id,
1127                                    external_reference,
1128                                    details,
1129                                    data_config,
1130                                },
1131                                timeline,
1132                            },
1133                            mz_sql::plan::DataSourceDesc::Webhook {
1134                                validate_using,
1135                                body_format,
1136                                headers,
1137                                cluster_id,
1138                            } => TableDataSource::DataSource {
1139                                desc: DataSourceDesc::Webhook {
1140                                    validate_using,
1141                                    body_format,
1142                                    headers,
1143                                    cluster_id: cluster_id
1144                                        .expect("Webhook Tables must have a cluster_id set"),
1145                                },
1146                                timeline,
1147                            },
1148                            _ => {
1149                                return Err((
1150                                    AdapterError::Unstructured(anyhow::anyhow!(
1151                                        "unsupported data source for table"
1152                                    )),
1153                                    cached_expr,
1154                                ));
1155                            }
1156                        },
1157                    },
1158                })
1159            }
1160            Plan::CreateSource(CreateSourcePlan {
1161                source,
1162                timeline,
1163                in_cluster,
1164                ..
1165            }) => CatalogItem::Source(Source {
1166                create_sql: Some(source.create_sql),
1167                data_source: match source.data_source {
1168                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1169                        desc,
1170                        cluster_id: match in_cluster {
1171                            Some(id) => id,
1172                            None => {
1173                                return Err((
1174                                    AdapterError::Unstructured(anyhow::anyhow!(
1175                                        "ingestion-based sources must have cluster specified"
1176                                    )),
1177                                    cached_expr,
1178                                ));
1179                            }
1180                        },
1181                    },
1182                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1183                        desc,
1184                        progress_subsource,
1185                        data_config,
1186                        details,
1187                    } => DataSourceDesc::OldSyntaxIngestion {
1188                        desc,
1189                        progress_subsource,
1190                        data_config,
1191                        details,
1192                        cluster_id: match in_cluster {
1193                            Some(id) => id,
1194                            None => {
1195                                return Err((
1196                                    AdapterError::Unstructured(anyhow::anyhow!(
1197                                        "ingestion-based sources must have cluster specified"
1198                                    )),
1199                                    cached_expr,
1200                                ));
1201                            }
1202                        },
1203                    },
1204                    mz_sql::plan::DataSourceDesc::IngestionExport {
1205                        ingestion_id,
1206                        external_reference,
1207                        details,
1208                        data_config,
1209                    } => DataSourceDesc::IngestionExport {
1210                        ingestion_id,
1211                        external_reference,
1212                        details,
1213                        data_config,
1214                    },
1215                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1216                    mz_sql::plan::DataSourceDesc::Webhook {
1217                        validate_using,
1218                        body_format,
1219                        headers,
1220                        cluster_id,
1221                    } => {
1222                        mz_ore::soft_assert_or_log!(
1223                            cluster_id.is_none(),
1224                            "cluster_id set at Source level for Webhooks"
1225                        );
1226                        DataSourceDesc::Webhook {
1227                            validate_using,
1228                            body_format,
1229                            headers,
1230                            cluster_id: in_cluster
1231                                .expect("webhook sources must use an existing cluster"),
1232                        }
1233                    }
1234                },
1235                desc: source.desc,
1236                global_id,
1237                timeline,
1238                resolved_ids,
1239                custom_logical_compaction_window: source
1240                    .compaction_window
1241                    .or(custom_logical_compaction_window),
1242                is_retained_metrics_object,
1243            }),
1244            Plan::CreateView(CreateViewPlan { view, .. }) => {
1245                // Collect optimizer parameters.
1246                let optimizer_config =
1247                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1248                let previous_exprs = previous_item.map(|item| match item {
1249                    CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1250                    _ => None,
1251                });
1252
1253                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1254                    (Some(local_expr), _)
1255                        if local_expr.optimizer_features == optimizer_config.features =>
1256                    {
1257                        debug!("local expression cache hit for {global_id:?}");
1258                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1259                    }
1260                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1261                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1262                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1263                    }
1264                    (cached_expr, _) => {
1265                        let optimizer_features = optimizer_config.features.clone();
1266                        // Build an optimizer for this VIEW.
1267                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1268
1269                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1270                        let raw_expr = view.expr;
1271                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1272                            Ok(optimzed_expr) => optimzed_expr,
1273                            Err(err) => return Err((err.into(), cached_expr)),
1274                        };
1275
1276                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1277
1278                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1279                    }
1280                };
1281
1282                // Resolve all item dependencies from the HIR expression.
1283                let dependencies: BTreeSet<_> = raw_expr
1284                    .depends_on()
1285                    .into_iter()
1286                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1287                    .collect();
1288
1289                CatalogItem::View(View {
1290                    create_sql: view.create_sql,
1291                    global_id,
1292                    raw_expr,
1293                    desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1294                    optimized_expr,
1295                    conn_id: None,
1296                    resolved_ids,
1297                    dependencies: DependencyIds(dependencies),
1298                })
1299            }
1300            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1301                materialized_view, ..
1302            }) => {
1303                // Collect optimizer parameters.
1304                let optimizer_config =
1305                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1306                let previous_exprs = previous_item.map(|item| match item {
1307                    CatalogItem::MaterializedView(materialized_view) => {
1308                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1309                    }
1310                    item => unreachable!("expected materialized view, found: {item:#?}"),
1311                });
1312
1313                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1314                    (Some(local_expr), _)
1315                        if local_expr.optimizer_features == optimizer_config.features =>
1316                    {
1317                        debug!("local expression cache hit for {global_id:?}");
1318                        (
1319                            Arc::new(materialized_view.expr),
1320                            Arc::new(local_expr.local_mir),
1321                        )
1322                    }
1323                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1324                    (_, Some((raw_expr, optimized_expr)))
1325                        if *raw_expr == materialized_view.expr =>
1326                    {
1327                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1328                    }
1329                    (cached_expr, _) => {
1330                        let optimizer_features = optimizer_config.features.clone();
1331                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1332                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1333
1334                        let raw_expr = materialized_view.expr;
1335                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1336                            Ok(optimized_expr) => optimized_expr,
1337                            Err(err) => return Err((err.into(), cached_expr)),
1338                        };
1339
1340                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1341
1342                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1343                    }
1344                };
1345                let mut typ = optimized_expr.typ();
1346                for &i in &materialized_view.non_null_assertions {
1347                    typ.column_types[i].nullable = false;
1348                }
1349                let desc = RelationDesc::new(typ, materialized_view.column_names);
1350
1351                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1352
1353                // Resolve all item dependencies from the HIR expression.
1354                let dependencies = raw_expr
1355                    .depends_on()
1356                    .into_iter()
1357                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1358                    .collect();
1359
1360                CatalogItem::MaterializedView(MaterializedView {
1361                    create_sql: materialized_view.create_sql,
1362                    global_id,
1363                    raw_expr,
1364                    optimized_expr,
1365                    desc,
1366                    resolved_ids,
1367                    dependencies,
1368                    cluster_id: materialized_view.cluster_id,
1369                    non_null_assertions: materialized_view.non_null_assertions,
1370                    custom_logical_compaction_window: materialized_view.compaction_window,
1371                    refresh_schedule: materialized_view.refresh_schedule,
1372                    initial_as_of,
1373                })
1374            }
1375            Plan::CreateContinualTask(plan) => {
1376                let ct =
1377                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1378                        Ok(ct) => ct,
1379                        Err(err) => return Err((err, cached_expr)),
1380                    };
1381                CatalogItem::ContinualTask(ct)
1382            }
1383            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1384                create_sql: index.create_sql,
1385                global_id,
1386                on: index.on,
1387                keys: index.keys.into(),
1388                conn_id: None,
1389                resolved_ids,
1390                cluster_id: index.cluster_id,
1391                custom_logical_compaction_window: custom_logical_compaction_window
1392                    .or(index.compaction_window),
1393                is_retained_metrics_object,
1394            }),
1395            Plan::CreateSink(CreateSinkPlan {
1396                sink,
1397                with_snapshot,
1398                in_cluster,
1399                ..
1400            }) => CatalogItem::Sink(Sink {
1401                create_sql: sink.create_sql,
1402                global_id,
1403                from: sink.from,
1404                connection: sink.connection,
1405                envelope: sink.envelope,
1406                version: sink.version,
1407                with_snapshot,
1408                resolved_ids,
1409                cluster_id: in_cluster,
1410            }),
1411            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1412                let desc = match typ.inner.desc(&session_catalog) {
1413                    Ok(desc) => desc,
1414                    Err(err) => return Err((err.into(), cached_expr)),
1415                };
1416                CatalogItem::Type(Type {
1417                    create_sql: Some(typ.create_sql),
1418                    global_id,
1419                    desc,
1420                    details: CatalogTypeDetails {
1421                        array_id: None,
1422                        typ: typ.inner,
1423                        pg_metadata: None,
1424                    },
1425                    resolved_ids,
1426                })
1427            }
1428            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1429                create_sql: secret.create_sql,
1430                global_id,
1431            }),
1432            Plan::CreateConnection(CreateConnectionPlan {
1433                connection:
1434                    mz_sql::plan::Connection {
1435                        create_sql,
1436                        details,
1437                    },
1438                ..
1439            }) => CatalogItem::Connection(Connection {
1440                create_sql,
1441                global_id,
1442                details,
1443                resolved_ids,
1444            }),
1445            _ => {
1446                return Err((
1447                    Error::new(ErrorKind::Corruption {
1448                        detail: "catalog entry generated inappropriate plan".to_string(),
1449                    })
1450                    .into(),
1451                    cached_expr,
1452                ));
1453            }
1454        };
1455
1456        Ok((item, uncached_expr))
1457    }
1458
1459    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1460    /// Calling this method will not permanently modify any system configuration variables.
1461    ///
1462    /// WARNING:
1463    /// Any modifications made to the system configuration variables in `f`, will be lost.
1464    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1465        // Enable catalog features that might be required during planning existing
1466        // catalog items. Existing catalog items might have been created while
1467        // a specific feature flag was turned on, so we need to ensure that this
1468        // is also the case during catalog rehydration in order to avoid panics.
1469        //
1470        // WARNING / CONTRACT:
1471        // 1. Features used in this method that related to parsing / planning
1472        //    should be `enable_for_item_parsing` set to `true`.
1473        // 2. After this step, feature flag configuration must not be
1474        //    overridden.
1475        let restore = self.system_configuration.clone();
1476        self.system_configuration.enable_for_item_parsing();
1477        let res = f(self);
1478        self.system_configuration = restore;
1479        res
1480    }
1481
1482    /// Returns all indexes on the given object and cluster known in the catalog.
1483    pub fn get_indexes_on(
1484        &self,
1485        id: GlobalId,
1486        cluster: ClusterId,
1487    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1488        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1489
1490        self.try_get_entry_by_global_id(&id)
1491            .into_iter()
1492            .map(move |e| {
1493                e.used_by()
1494                    .iter()
1495                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1496                        CatalogItem::Index(index) if index_matches(index) => {
1497                            Some((index.global_id(), index))
1498                        }
1499                        _ => None,
1500                    })
1501            })
1502            .flatten()
1503    }
1504
1505    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1506        &self.database_by_id[database_id]
1507    }
1508
1509    /// Gets a reference to the specified replica of the specified cluster.
1510    ///
1511    /// Returns `None` if either the cluster or the replica does not
1512    /// exist.
1513    pub(super) fn try_get_cluster_replica(
1514        &self,
1515        id: ClusterId,
1516        replica_id: ReplicaId,
1517    ) -> Option<&ClusterReplica> {
1518        self.try_get_cluster(id)
1519            .and_then(|cluster| cluster.replica(replica_id))
1520    }
1521
1522    /// Gets a reference to the specified replica of the specified cluster.
1523    ///
1524    /// Panics if either the cluster or the replica does not exist.
1525    pub(super) fn get_cluster_replica(
1526        &self,
1527        cluster_id: ClusterId,
1528        replica_id: ReplicaId,
1529    ) -> &ClusterReplica {
1530        self.try_get_cluster_replica(cluster_id, replica_id)
1531            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1532    }
1533
1534    pub(super) fn resolve_replica_in_cluster(
1535        &self,
1536        cluster_id: &ClusterId,
1537        replica_name: &str,
1538    ) -> Result<&ClusterReplica, SqlCatalogError> {
1539        let cluster = self.get_cluster(*cluster_id);
1540        let replica_id = cluster
1541            .replica_id_by_name_
1542            .get(replica_name)
1543            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1544        Ok(&cluster.replicas_by_id_[replica_id])
1545    }
1546
1547    /// Get system configuration `name`.
1548    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1549        Ok(self.system_configuration.get(name)?)
1550    }
1551
1552    /// Parse system configuration `name` with `value` int.
1553    ///
1554    /// Returns the parsed value as a string.
1555    pub(super) fn parse_system_configuration(
1556        &self,
1557        name: &str,
1558        value: VarInput,
1559    ) -> Result<String, Error> {
1560        let value = self.system_configuration.parse(name, value)?;
1561        Ok(value.format())
1562    }
1563
1564    /// Gets the schema map for the database matching `database_spec`.
1565    pub(super) fn resolve_schema_in_database(
1566        &self,
1567        database_spec: &ResolvedDatabaseSpecifier,
1568        schema_name: &str,
1569        conn_id: &ConnectionId,
1570    ) -> Result<&Schema, SqlCatalogError> {
1571        let schema = match database_spec {
1572            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1573                self.temporary_schemas.get(conn_id)
1574            }
1575            ResolvedDatabaseSpecifier::Ambient => self
1576                .ambient_schemas_by_name
1577                .get(schema_name)
1578                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1579            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1580                db.schemas_by_name
1581                    .get(schema_name)
1582                    .and_then(|id| db.schemas_by_id.get(id))
1583            }),
1584        };
1585        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1586    }
1587
1588    pub fn get_schema(
1589        &self,
1590        database_spec: &ResolvedDatabaseSpecifier,
1591        schema_spec: &SchemaSpecifier,
1592        conn_id: &ConnectionId,
1593    ) -> &Schema {
1594        // Keep in sync with `get_schemas_mut`
1595        match (database_spec, schema_spec) {
1596            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1597                &self.temporary_schemas[conn_id]
1598            }
1599            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1600                &self.ambient_schemas_by_id[id]
1601            }
1602
1603            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1604                &self.database_by_id[database_id].schemas_by_id[schema_id]
1605            }
1606            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1607                unreachable!("temporary schemas are in the ambient database")
1608            }
1609        }
1610    }
1611
1612    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1613        self.database_by_id
1614            .values()
1615            .filter_map(|database| database.schemas_by_id.get(schema_id))
1616            .chain(self.ambient_schemas_by_id.values())
1617            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1618            .into_first()
1619    }
1620
1621    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1622        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1623    }
1624
1625    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1626        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1627    }
1628
1629    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1630        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1631    }
1632
1633    pub fn get_information_schema_id(&self) -> SchemaId {
1634        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1635    }
1636
1637    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1638        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1639    }
1640
1641    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1642        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1643    }
1644
1645    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1646        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1647    }
1648
1649    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1650        SYSTEM_SCHEMAS
1651            .iter()
1652            .map(|name| self.ambient_schemas_by_name[*name])
1653    }
1654
1655    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1656        self.system_schema_ids().contains(&id)
1657    }
1658
1659    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1660        match spec {
1661            SchemaSpecifier::Temporary => false,
1662            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1663        }
1664    }
1665
1666    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1667        UNSTABLE_SCHEMAS
1668            .iter()
1669            .map(|name| self.ambient_schemas_by_name[*name])
1670    }
1671
1672    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1673        self.unstable_schema_ids().contains(&id)
1674    }
1675
1676    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1677        match spec {
1678            SchemaSpecifier::Temporary => false,
1679            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1680        }
1681    }
1682
1683    /// Creates a new schema in the `Catalog` for temporary items
1684    /// indicated by the TEMPORARY or TEMP keywords.
1685    pub fn create_temporary_schema(
1686        &mut self,
1687        conn_id: &ConnectionId,
1688        owner_id: RoleId,
1689    ) -> Result<(), Error> {
1690        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1691        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1692        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1693        // all -1.
1694        let oid = INVALID_OID;
1695        self.temporary_schemas.insert(
1696            conn_id.clone(),
1697            Schema {
1698                name: QualifiedSchemaName {
1699                    database: ResolvedDatabaseSpecifier::Ambient,
1700                    schema: MZ_TEMP_SCHEMA.into(),
1701                },
1702                id: SchemaSpecifier::Temporary,
1703                oid,
1704                items: BTreeMap::new(),
1705                functions: BTreeMap::new(),
1706                types: BTreeMap::new(),
1707                owner_id,
1708                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1709                    mz_sql::catalog::ObjectType::Schema,
1710                    owner_id,
1711                )]),
1712            },
1713        );
1714        Ok(())
1715    }
1716
1717    /// Return all OIDs that are allocated to temporary objects.
1718    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1719        std::iter::empty()
1720            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1721                if schema.id.is_temporary() {
1722                    Some(schema.oid)
1723                } else {
1724                    None
1725                }
1726            }))
1727            .chain(self.entry_by_id.values().filter_map(|entry| {
1728                if entry.item().is_temporary() {
1729                    Some(entry.oid)
1730                } else {
1731                    None
1732                }
1733            }))
1734    }
1735
1736    /// Optimized lookup for a builtin table.
1737    ///
1738    /// Panics if the builtin table doesn't exist in the catalog.
1739    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1740        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1741    }
1742
1743    /// Optimized lookup for a builtin log.
1744    ///
1745    /// Panics if the builtin log doesn't exist in the catalog.
1746    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1747        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1748        let log = match self.get_entry(&item_id).item() {
1749            CatalogItem::Log(log) => log,
1750            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1751        };
1752        (item_id, log.global_id)
1753    }
1754
1755    /// Optimized lookup for a builtin storage collection.
1756    ///
1757    /// Panics if the builtin storage collection doesn't exist in the catalog.
1758    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1759        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1760    }
1761
1762    /// Optimized lookup for a builtin object.
1763    ///
1764    /// Panics if the builtin object doesn't exist in the catalog.
1765    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1766        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1767        let schema = &self.ambient_schemas_by_id[schema_id];
1768        match builtin.catalog_item_type() {
1769            CatalogItemType::Type => schema.types[builtin.name()],
1770            CatalogItemType::Func => schema.functions[builtin.name()],
1771            CatalogItemType::Table
1772            | CatalogItemType::Source
1773            | CatalogItemType::Sink
1774            | CatalogItemType::View
1775            | CatalogItemType::MaterializedView
1776            | CatalogItemType::Index
1777            | CatalogItemType::Secret
1778            | CatalogItemType::Connection
1779            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1780        }
1781    }
1782
1783    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1784    pub fn resolve_builtin_type_references(
1785        &self,
1786        builtin: &BuiltinType<NameReference>,
1787    ) -> BuiltinType<IdReference> {
1788        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1789            CatalogType::AclItem => CatalogType::AclItem,
1790            CatalogType::Array { element_reference } => CatalogType::Array {
1791                element_reference: self.get_system_type(element_reference).id,
1792            },
1793            CatalogType::List {
1794                element_reference,
1795                element_modifiers,
1796            } => CatalogType::List {
1797                element_reference: self.get_system_type(element_reference).id,
1798                element_modifiers: element_modifiers.clone(),
1799            },
1800            CatalogType::Map {
1801                key_reference,
1802                value_reference,
1803                key_modifiers,
1804                value_modifiers,
1805            } => CatalogType::Map {
1806                key_reference: self.get_system_type(key_reference).id,
1807                value_reference: self.get_system_type(value_reference).id,
1808                key_modifiers: key_modifiers.clone(),
1809                value_modifiers: value_modifiers.clone(),
1810            },
1811            CatalogType::Range { element_reference } => CatalogType::Range {
1812                element_reference: self.get_system_type(element_reference).id,
1813            },
1814            CatalogType::Record { fields } => CatalogType::Record {
1815                fields: fields
1816                    .into_iter()
1817                    .map(|f| CatalogRecordField {
1818                        name: f.name.clone(),
1819                        type_reference: self.get_system_type(f.type_reference).id,
1820                        type_modifiers: f.type_modifiers.clone(),
1821                    })
1822                    .collect(),
1823            },
1824            CatalogType::Bool => CatalogType::Bool,
1825            CatalogType::Bytes => CatalogType::Bytes,
1826            CatalogType::Char => CatalogType::Char,
1827            CatalogType::Date => CatalogType::Date,
1828            CatalogType::Float32 => CatalogType::Float32,
1829            CatalogType::Float64 => CatalogType::Float64,
1830            CatalogType::Int16 => CatalogType::Int16,
1831            CatalogType::Int32 => CatalogType::Int32,
1832            CatalogType::Int64 => CatalogType::Int64,
1833            CatalogType::UInt16 => CatalogType::UInt16,
1834            CatalogType::UInt32 => CatalogType::UInt32,
1835            CatalogType::UInt64 => CatalogType::UInt64,
1836            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1837            CatalogType::Interval => CatalogType::Interval,
1838            CatalogType::Jsonb => CatalogType::Jsonb,
1839            CatalogType::Numeric => CatalogType::Numeric,
1840            CatalogType::Oid => CatalogType::Oid,
1841            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1842            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1843            CatalogType::Pseudo => CatalogType::Pseudo,
1844            CatalogType::RegClass => CatalogType::RegClass,
1845            CatalogType::RegProc => CatalogType::RegProc,
1846            CatalogType::RegType => CatalogType::RegType,
1847            CatalogType::String => CatalogType::String,
1848            CatalogType::Time => CatalogType::Time,
1849            CatalogType::Timestamp => CatalogType::Timestamp,
1850            CatalogType::TimestampTz => CatalogType::TimestampTz,
1851            CatalogType::Uuid => CatalogType::Uuid,
1852            CatalogType::VarChar => CatalogType::VarChar,
1853            CatalogType::Int2Vector => CatalogType::Int2Vector,
1854            CatalogType::MzAclItem => CatalogType::MzAclItem,
1855        };
1856
1857        BuiltinType {
1858            name: builtin.name,
1859            schema: builtin.schema,
1860            oid: builtin.oid,
1861            details: CatalogTypeDetails {
1862                array_id: builtin.details.array_id,
1863                typ,
1864                pg_metadata: builtin.details.pg_metadata.clone(),
1865            },
1866        }
1867    }
1868
1869    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1870        &self.config
1871    }
1872
1873    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1874        match self.database_by_name.get(database_name) {
1875            Some(id) => Ok(&self.database_by_id[id]),
1876            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1877        }
1878    }
1879
1880    pub fn resolve_schema(
1881        &self,
1882        current_database: Option<&DatabaseId>,
1883        database_name: Option<&str>,
1884        schema_name: &str,
1885        conn_id: &ConnectionId,
1886    ) -> Result<&Schema, SqlCatalogError> {
1887        let database_spec = match database_name {
1888            // If a database is explicitly specified, validate it. Note that we
1889            // intentionally do not validate `current_database` to permit
1890            // querying `mz_catalog` with an invalid session database, e.g., so
1891            // that you can run `SHOW DATABASES` to *find* a valid database.
1892            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1893                self.resolve_database(database)?.id().clone(),
1894            )),
1895            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1896        };
1897
1898        // First try to find the schema in the named database.
1899        if let Some(database_spec) = database_spec {
1900            if let Ok(schema) =
1901                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1902            {
1903                return Ok(schema);
1904            }
1905        }
1906
1907        // Then fall back to the ambient database.
1908        if let Ok(schema) = self.resolve_schema_in_database(
1909            &ResolvedDatabaseSpecifier::Ambient,
1910            schema_name,
1911            conn_id,
1912        ) {
1913            return Ok(schema);
1914        }
1915
1916        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1917    }
1918
1919    /// Optimized lookup for a system schema.
1920    ///
1921    /// Panics if the system schema doesn't exist in the catalog.
1922    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1923        self.ambient_schemas_by_name[name]
1924    }
1925
1926    pub fn resolve_search_path(
1927        &self,
1928        session: &dyn SessionMetadata,
1929    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1930        let database = self
1931            .database_by_name
1932            .get(session.database())
1933            .map(|id| id.clone());
1934
1935        session
1936            .search_path()
1937            .iter()
1938            .map(|schema| {
1939                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1940            })
1941            .filter_map(|schema| schema.ok())
1942            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1943            .collect()
1944    }
1945
1946    pub fn effective_search_path(
1947        &self,
1948        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1949        include_temp_schema: bool,
1950    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1951        let mut v = Vec::with_capacity(search_path.len() + 3);
1952        // Temp schema is only included for relations and data types, not for functions and operators
1953        let temp_schema = (
1954            ResolvedDatabaseSpecifier::Ambient,
1955            SchemaSpecifier::Temporary,
1956        );
1957        if include_temp_schema && !search_path.contains(&temp_schema) {
1958            v.push(temp_schema);
1959        }
1960        let default_schemas = [
1961            (
1962                ResolvedDatabaseSpecifier::Ambient,
1963                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1964            ),
1965            (
1966                ResolvedDatabaseSpecifier::Ambient,
1967                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1968            ),
1969        ];
1970        for schema in default_schemas.into_iter() {
1971            if !search_path.contains(&schema) {
1972                v.push(schema);
1973            }
1974        }
1975        v.extend_from_slice(search_path);
1976        v
1977    }
1978
1979    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1980        let id = self
1981            .clusters_by_name
1982            .get(name)
1983            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1984        Ok(&self.clusters_by_id[id])
1985    }
1986
1987    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1988        let id = self
1989            .clusters_by_name
1990            .get(cluster.name)
1991            .expect("failed to lookup BuiltinCluster by name");
1992        self.clusters_by_id
1993            .get(id)
1994            .expect("failed to lookup BuiltinCluster by ID")
1995    }
1996
1997    pub fn resolve_cluster_replica(
1998        &self,
1999        cluster_replica_name: &QualifiedReplica,
2000    ) -> Result<&ClusterReplica, SqlCatalogError> {
2001        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2002        let replica_name = cluster_replica_name.replica.as_str();
2003        let replica_id = cluster
2004            .replica_id(replica_name)
2005            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2006        Ok(cluster.replica(replica_id).expect("Must exist"))
2007    }
2008
2009    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2010    ///
2011    /// If `name` does not specify a database, the `current_database` is used.
2012    /// If `name` does not specify a schema, then the schemas in `search_path`
2013    /// are searched in order.
2014    #[allow(clippy::useless_let_if_seq)]
2015    pub fn resolve(
2016        &self,
2017        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2018        current_database: Option<&DatabaseId>,
2019        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2020        name: &PartialItemName,
2021        conn_id: &ConnectionId,
2022        err_gen: fn(String) -> SqlCatalogError,
2023    ) -> Result<&CatalogEntry, SqlCatalogError> {
2024        // If a schema name was specified, just try to find the item in that
2025        // schema. If no schema was specified, try to find the item in the connection's
2026        // temporary schema. If the item is not found, try to find the item in every
2027        // schema in the search path.
2028        let schemas = match &name.schema {
2029            Some(schema_name) => {
2030                match self.resolve_schema(
2031                    current_database,
2032                    name.database.as_deref(),
2033                    schema_name,
2034                    conn_id,
2035                ) {
2036                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2037                    Err(e) => return Err(e),
2038                }
2039            }
2040            None => match self
2041                .get_schema(
2042                    &ResolvedDatabaseSpecifier::Ambient,
2043                    &SchemaSpecifier::Temporary,
2044                    conn_id,
2045                )
2046                .items
2047                .get(&name.item)
2048            {
2049                Some(id) => return Ok(self.get_entry(id)),
2050                None => search_path.to_vec(),
2051            },
2052        };
2053
2054        for (database_spec, schema_spec) in &schemas {
2055            let schema = self.get_schema(database_spec, schema_spec, conn_id);
2056
2057            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2058                return Ok(&self.entry_by_id[id]);
2059            }
2060        }
2061
2062        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2063        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2064        // automatically let uses of the old schema resolve to the new ones as well.
2065        // TODO(database-issues#8173) remove this after sufficient time has passed
2066        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2067        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2068            for schema_id in [
2069                self.get_mz_catalog_unstable_schema_id(),
2070                self.get_mz_introspection_schema_id(),
2071            ] {
2072                let schema = self.get_schema(
2073                    &ResolvedDatabaseSpecifier::Ambient,
2074                    &SchemaSpecifier::Id(schema_id),
2075                    conn_id,
2076                );
2077
2078                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2079                    debug!(
2080                        github_27831 = true,
2081                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2082                    );
2083                    return Ok(&self.entry_by_id[id]);
2084                }
2085            }
2086        }
2087
2088        Err(err_gen(name.to_string()))
2089    }
2090
2091    /// Resolves `name` to a non-function [`CatalogEntry`].
2092    pub fn resolve_entry(
2093        &self,
2094        current_database: Option<&DatabaseId>,
2095        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2096        name: &PartialItemName,
2097        conn_id: &ConnectionId,
2098    ) -> Result<&CatalogEntry, SqlCatalogError> {
2099        self.resolve(
2100            |schema| &schema.items,
2101            current_database,
2102            search_path,
2103            name,
2104            conn_id,
2105            SqlCatalogError::UnknownItem,
2106        )
2107    }
2108
2109    /// Resolves `name` to a function [`CatalogEntry`].
2110    pub fn resolve_function(
2111        &self,
2112        current_database: Option<&DatabaseId>,
2113        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2114        name: &PartialItemName,
2115        conn_id: &ConnectionId,
2116    ) -> Result<&CatalogEntry, SqlCatalogError> {
2117        self.resolve(
2118            |schema| &schema.functions,
2119            current_database,
2120            search_path,
2121            name,
2122            conn_id,
2123            |name| SqlCatalogError::UnknownFunction {
2124                name,
2125                alternative: None,
2126            },
2127        )
2128    }
2129
2130    /// Resolves `name` to a type [`CatalogEntry`].
2131    pub fn resolve_type(
2132        &self,
2133        current_database: Option<&DatabaseId>,
2134        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2135        name: &PartialItemName,
2136        conn_id: &ConnectionId,
2137    ) -> Result<&CatalogEntry, SqlCatalogError> {
2138        static NON_PG_CATALOG_TYPES: LazyLock<
2139            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2140        > = LazyLock::new(|| {
2141            BUILTINS::types()
2142                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2143                .map(|typ| (typ.name, typ))
2144                .collect()
2145        });
2146
2147        let entry = self.resolve(
2148            |schema| &schema.types,
2149            current_database,
2150            search_path,
2151            name,
2152            conn_id,
2153            |name| SqlCatalogError::UnknownType { name },
2154        )?;
2155
2156        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2157            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2158                warn!(
2159                    "user specified an incorrect schema of {} for the type {}, which should be in \
2160                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2161                    PG_CATALOG_SCHEMA.quoted(),
2162                    typ.name.quoted(),
2163                    typ.schema.quoted(),
2164                )
2165            }
2166        }
2167
2168        Ok(entry)
2169    }
2170
2171    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2172    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2173        match object_id {
2174            ObjectId::Item(item_id) => {
2175                let entry = self.get_entry(&item_id);
2176                match entry.item_type() {
2177                    CatalogItemType::Table => CommentObjectId::Table(item_id),
2178                    CatalogItemType::Source => CommentObjectId::Source(item_id),
2179                    CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2180                    CatalogItemType::View => CommentObjectId::View(item_id),
2181                    CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2182                    CatalogItemType::Index => CommentObjectId::Index(item_id),
2183                    CatalogItemType::Func => CommentObjectId::Func(item_id),
2184                    CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2185                    CatalogItemType::Type => CommentObjectId::Type(item_id),
2186                    CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2187                    CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2188                }
2189            }
2190            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2191            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2192            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2193            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2194            ObjectId::ClusterReplica(cluster_replica_id) => {
2195                CommentObjectId::ClusterReplica(cluster_replica_id)
2196            }
2197            ObjectId::NetworkPolicy(network_policy_id) => {
2198                CommentObjectId::NetworkPolicy(network_policy_id)
2199            }
2200        }
2201    }
2202
2203    /// Return current system configuration.
2204    pub fn system_config(&self) -> &SystemVars {
2205        &self.system_configuration
2206    }
2207
2208    /// Return a mutable reference to the current system configuration.
2209    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2210        &mut self.system_configuration
2211    }
2212
2213    /// Serializes the catalog's in-memory state.
2214    ///
2215    /// There are no guarantees about the format of the serialized state, except
2216    /// that the serialized state for two identical catalogs will compare
2217    /// identically.
2218    ///
2219    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2220    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2221    /// argument.
2222    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2223        // Dump the base catalog.
2224        let mut dump = serde_json::to_value(&self).map_err(|e| {
2225            Error::new(ErrorKind::Unstructured(format!(
2226                // Don't panic here because we don't have compile-time failures for maps with
2227                // non-string keys.
2228                "internal error: could not dump catalog: {}",
2229                e
2230            )))
2231        })?;
2232
2233        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2234        // Stitch in system parameter defaults.
2235        dump_obj.insert(
2236            "system_parameter_defaults".into(),
2237            serde_json::json!(self.system_config().defaults()),
2238        );
2239        // Potentially overwrite unfinalized shards.
2240        if let Some(unfinalized_shards) = unfinalized_shards {
2241            dump_obj
2242                .get_mut("storage_metadata")
2243                .expect("known to exist")
2244                .as_object_mut()
2245                .expect("storage_metadata is an object")
2246                .insert(
2247                    "unfinalized_shards".into(),
2248                    serde_json::json!(unfinalized_shards),
2249                );
2250        }
2251        // Remove GlobalIds for temporary objects from the mapping.
2252        //
2253        // Post-test consistency checks with the durable catalog don't know about temporary items
2254        // since they're kept entirely in memory.
2255        let temporary_gids: Vec<_> = self
2256            .entry_by_global_id
2257            .iter()
2258            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2259            .map(|(gid, _item_id)| *gid)
2260            .collect();
2261        if !temporary_gids.is_empty() {
2262            let gids = dump_obj
2263                .get_mut("entry_by_global_id")
2264                .expect("known_to_exist")
2265                .as_object_mut()
2266                .expect("entry_by_global_id is an object");
2267            for gid in temporary_gids {
2268                gids.remove(&gid.to_string());
2269            }
2270        }
2271        // We exclude role_auth_by_id because it contains password information
2272        // which should not be included in the dump.
2273        dump_obj.remove("role_auth_by_id");
2274
2275        // Emit as pretty-printed JSON.
2276        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2277    }
2278
2279    pub fn availability_zones(&self) -> &[String] {
2280        &self.availability_zones
2281    }
2282
2283    pub fn concretize_replica_location(
2284        &self,
2285        location: mz_catalog::durable::ReplicaLocation,
2286        allowed_sizes: &Vec<String>,
2287        allowed_availability_zones: Option<&[String]>,
2288    ) -> Result<ReplicaLocation, Error> {
2289        let location = match location {
2290            mz_catalog::durable::ReplicaLocation::Unmanaged {
2291                storagectl_addrs,
2292                computectl_addrs,
2293            } => {
2294                if allowed_availability_zones.is_some() {
2295                    return Err(Error {
2296                        kind: ErrorKind::Internal(
2297                            "tried concretize unmanaged replica with specific availability_zones"
2298                                .to_string(),
2299                        ),
2300                    });
2301                }
2302                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2303                    storagectl_addrs,
2304                    computectl_addrs,
2305                })
2306            }
2307            mz_catalog::durable::ReplicaLocation::Managed {
2308                size,
2309                availability_zone,
2310                billed_as,
2311                internal,
2312                pending,
2313            } => {
2314                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2315                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2316                    return Err(Error {
2317                        kind: ErrorKind::Internal(message.to_string()),
2318                    });
2319                }
2320                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2321                let cluster_replica_sizes = &self.cluster_replica_sizes;
2322
2323                ReplicaLocation::Managed(ManagedReplicaLocation {
2324                    allocation: cluster_replica_sizes
2325                        .0
2326                        .get(&size)
2327                        .expect("catalog out of sync")
2328                        .clone(),
2329                    availability_zones: match (availability_zone, allowed_availability_zones) {
2330                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2331                        (None, Some(azs)) if azs.is_empty() => {
2332                            ManagedReplicaAvailabilityZones::FromCluster(None)
2333                        }
2334                        (None, Some(azs)) => {
2335                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2336                        }
2337                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2338                    },
2339                    size,
2340                    billed_as,
2341                    internal,
2342                    pending,
2343                })
2344            }
2345        };
2346        Ok(location)
2347    }
2348
2349    /// Return whether the given replica size requests a disk.
2350    ///
2351    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2352    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2353    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2354    /// not something we can know at this point.
2355    ///
2356    /// # Panics
2357    ///
2358    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2359    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2360        let alloc = &self.cluster_replica_sizes.0[size];
2361        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2362    }
2363
2364    pub(crate) fn ensure_valid_replica_size(
2365        &self,
2366        allowed_sizes: &[String],
2367        size: &String,
2368    ) -> Result<(), Error> {
2369        let cluster_replica_sizes = &self.cluster_replica_sizes;
2370
2371        if !cluster_replica_sizes.0.contains_key(size)
2372            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2373            || cluster_replica_sizes.0[size].disabled
2374        {
2375            let mut entries = cluster_replica_sizes
2376                .enabled_allocations()
2377                .collect::<Vec<_>>();
2378
2379            if !allowed_sizes.is_empty() {
2380                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2381                entries.retain(|(name, _)| allowed_sizes.contains(name));
2382            }
2383
2384            entries.sort_by_key(
2385                |(
2386                    _name,
2387                    ReplicaAllocation {
2388                        scale, cpu_limit, ..
2389                    },
2390                )| (scale, cpu_limit),
2391            );
2392
2393            Err(Error {
2394                kind: ErrorKind::InvalidClusterReplicaSize {
2395                    size: size.to_owned(),
2396                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2397                },
2398            })
2399        } else {
2400            Ok(())
2401        }
2402    }
2403
2404    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2405        if role_id.is_builtin() {
2406            let role = self.get_role(role_id);
2407            Err(Error::new(ErrorKind::ReservedRoleName(
2408                role.name().to_string(),
2409            )))
2410        } else {
2411            Ok(())
2412        }
2413    }
2414
2415    pub fn ensure_not_reserved_network_policy(
2416        &self,
2417        network_policy_id: &NetworkPolicyId,
2418    ) -> Result<(), Error> {
2419        if network_policy_id.is_builtin() {
2420            let policy = self.get_network_policy(network_policy_id);
2421            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2422                policy.name.clone(),
2423            )))
2424        } else {
2425            Ok(())
2426        }
2427    }
2428
2429    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2430        let is_grantable = !role_id.is_public() && !role_id.is_system();
2431        if is_grantable {
2432            Ok(())
2433        } else {
2434            let role = self.get_role(role_id);
2435            Err(Error::new(ErrorKind::UngrantableRoleName(
2436                role.name().to_string(),
2437            )))
2438        }
2439    }
2440
2441    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2442        if role_id.is_system() {
2443            let role = self.get_role(role_id);
2444            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2445                role.name().to_string(),
2446            )))
2447        } else {
2448            Ok(())
2449        }
2450    }
2451
2452    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2453        if role_id.is_predefined() {
2454            let role = self.get_role(role_id);
2455            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2456                role.name().to_string(),
2457            )))
2458        } else {
2459            Ok(())
2460        }
2461    }
2462
2463    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2464    // passing tx, and session?
2465    pub(crate) fn add_to_audit_log(
2466        system_configuration: &SystemVars,
2467        oracle_write_ts: mz_repr::Timestamp,
2468        session: Option<&ConnMeta>,
2469        tx: &mut mz_catalog::durable::Transaction,
2470        audit_events: &mut Vec<VersionedEvent>,
2471        event_type: EventType,
2472        object_type: ObjectType,
2473        details: EventDetails,
2474    ) -> Result<(), Error> {
2475        let user = session.map(|session| session.user().name.to_string());
2476
2477        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2478
2479        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2480            Some(ts) => ts.into(),
2481            _ => oracle_write_ts.into(),
2482        };
2483        let id = tx.allocate_audit_log_id()?;
2484        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2485        audit_events.push(event.clone());
2486        tx.insert_audit_log_event(event);
2487        Ok(())
2488    }
2489
2490    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2491        match id {
2492            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2493            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2494                self.get_cluster_replica(*cluster_id, *replica_id)
2495                    .owner_id(),
2496            ),
2497            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2498            ObjectId::Schema((database_spec, schema_spec)) => Some(
2499                self.get_schema(database_spec, schema_spec, conn_id)
2500                    .owner_id(),
2501            ),
2502            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2503            ObjectId::Role(_) => None,
2504            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2505        }
2506    }
2507
2508    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2509        match object_id {
2510            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2511            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2512            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2513            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2514            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2515            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2516            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2517        }
2518    }
2519
2520    pub(super) fn get_system_object_type(
2521        &self,
2522        id: &SystemObjectId,
2523    ) -> mz_sql::catalog::SystemObjectType {
2524        match id {
2525            SystemObjectId::Object(object_id) => {
2526                SystemObjectType::Object(self.get_object_type(object_id))
2527            }
2528            SystemObjectId::System => SystemObjectType::System,
2529        }
2530    }
2531
2532    /// Returns a read-only view of the current [`StorageMetadata`].
2533    ///
2534    /// To write to this struct, you must use a catalog transaction.
2535    pub fn storage_metadata(&self) -> &StorageMetadata {
2536        &self.storage_metadata
2537    }
2538
2539    /// For the Sources ids in `ids`, return their compaction windows.
2540    pub fn source_compaction_windows(
2541        &self,
2542        ids: impl IntoIterator<Item = CatalogItemId>,
2543    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2544        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2545        let mut seen = BTreeSet::new();
2546        for item_id in ids {
2547            if !seen.insert(item_id) {
2548                continue;
2549            }
2550            let entry = self.get_entry(&item_id);
2551            match entry.item() {
2552                CatalogItem::Source(source) => {
2553                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2554                    match source.data_source {
2555                        DataSourceDesc::Ingestion { .. }
2556                        | DataSourceDesc::OldSyntaxIngestion { .. }
2557                        | DataSourceDesc::IngestionExport { .. } => {
2558                            cws.entry(source_cw).or_default().insert(item_id);
2559                        }
2560                        DataSourceDesc::Introspection(_)
2561                        | DataSourceDesc::Progress
2562                        | DataSourceDesc::Webhook { .. } => {
2563                            cws.entry(source_cw).or_default().insert(item_id);
2564                        }
2565                    }
2566                }
2567                CatalogItem::Table(table) => {
2568                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2569                    match &table.data_source {
2570                        TableDataSource::DataSource {
2571                            desc: DataSourceDesc::IngestionExport { .. },
2572                            timeline: _,
2573                        } => {
2574                            cws.entry(table_cw).or_default().insert(item_id);
2575                        }
2576                        _ => {}
2577                    }
2578                }
2579                _ => {
2580                    // Views could depend on sources, so ignore them if added by used_by above.
2581                    continue;
2582                }
2583            }
2584        }
2585        cws
2586    }
2587
2588    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2589        match id {
2590            CommentObjectId::Table(id)
2591            | CommentObjectId::View(id)
2592            | CommentObjectId::MaterializedView(id)
2593            | CommentObjectId::Source(id)
2594            | CommentObjectId::Sink(id)
2595            | CommentObjectId::Index(id)
2596            | CommentObjectId::Func(id)
2597            | CommentObjectId::Connection(id)
2598            | CommentObjectId::Type(id)
2599            | CommentObjectId::Secret(id)
2600            | CommentObjectId::ContinualTask(id) => Some(*id),
2601            CommentObjectId::Role(_)
2602            | CommentObjectId::Database(_)
2603            | CommentObjectId::Schema(_)
2604            | CommentObjectId::Cluster(_)
2605            | CommentObjectId::ClusterReplica(_)
2606            | CommentObjectId::NetworkPolicy(_) => None,
2607        }
2608    }
2609
2610    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2611        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2612    }
2613
2614    pub fn comment_id_to_audit_log_name(
2615        &self,
2616        id: CommentObjectId,
2617        conn_id: &ConnectionId,
2618    ) -> String {
2619        match id {
2620            CommentObjectId::Table(id)
2621            | CommentObjectId::View(id)
2622            | CommentObjectId::MaterializedView(id)
2623            | CommentObjectId::Source(id)
2624            | CommentObjectId::Sink(id)
2625            | CommentObjectId::Index(id)
2626            | CommentObjectId::Func(id)
2627            | CommentObjectId::Connection(id)
2628            | CommentObjectId::Type(id)
2629            | CommentObjectId::Secret(id)
2630            | CommentObjectId::ContinualTask(id) => {
2631                let item = self.get_entry(&id);
2632                let name = self.resolve_full_name(item.name(), Some(conn_id));
2633                name.to_string()
2634            }
2635            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2636            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2637            CommentObjectId::Schema((spec, schema_id)) => {
2638                let schema = self.get_schema(&spec, &schema_id, conn_id);
2639                self.resolve_full_schema_name(&schema.name).to_string()
2640            }
2641            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2642            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2643                let cluster = self.get_cluster(cluster_id);
2644                let replica = self.get_cluster_replica(cluster_id, replica_id);
2645                QualifiedReplica {
2646                    cluster: Ident::new_unchecked(cluster.name.clone()),
2647                    replica: Ident::new_unchecked(replica.name.clone()),
2648                }
2649                .to_string()
2650            }
2651            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2652        }
2653    }
2654
2655    pub fn mock_authentication_nonce(&self) -> String {
2656        self.mock_authentication_nonce.clone().unwrap_or_default()
2657    }
2658}
2659
2660impl ConnectionResolver for CatalogState {
2661    fn resolve_connection(
2662        &self,
2663        id: CatalogItemId,
2664    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2665        use mz_storage_types::connections::Connection::*;
2666        match self
2667            .get_entry(&id)
2668            .connection()
2669            .expect("catalog out of sync")
2670            .details
2671            .to_connection()
2672        {
2673            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2674            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2675            Csr(conn) => Csr(conn.into_inline_connection(self)),
2676            Ssh(conn) => Ssh(conn),
2677            Aws(conn) => Aws(conn),
2678            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2679            MySql(conn) => MySql(conn.into_inline_connection(self)),
2680            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2681            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2682        }
2683    }
2684}
2685
2686impl OptimizerCatalog for CatalogState {
2687    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2688        CatalogState::get_entry_by_global_id(self, id)
2689    }
2690    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2691        CatalogState::get_entry(self, id)
2692    }
2693    fn resolve_full_name(
2694        &self,
2695        name: &QualifiedItemName,
2696        conn_id: Option<&ConnectionId>,
2697    ) -> FullItemName {
2698        CatalogState::resolve_full_name(self, name, conn_id)
2699    }
2700    fn get_indexes_on(
2701        &self,
2702        id: GlobalId,
2703        cluster: ClusterId,
2704    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2705        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2706    }
2707}
2708
2709impl OptimizerCatalog for Catalog {
2710    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2711        self.state.get_entry_by_global_id(id)
2712    }
2713
2714    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2715        self.state.get_entry(id)
2716    }
2717
2718    fn resolve_full_name(
2719        &self,
2720        name: &QualifiedItemName,
2721        conn_id: Option<&ConnectionId>,
2722    ) -> FullItemName {
2723        self.state.resolve_full_name(name, conn_id)
2724    }
2725
2726    fn get_indexes_on(
2727        &self,
2728        id: GlobalId,
2729        cluster: ClusterId,
2730    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2731        Box::new(self.state.get_indexes_on(id, cluster))
2732    }
2733}
2734
2735impl Catalog {
2736    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2737        self
2738    }
2739}