mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_ore::collections::CollectionExt;
45use mz_ore::now::NOW_ZERO;
46use mz_ore::soft_assert_no_log;
47use mz_ore::str::StrExt;
48use mz_pgrepr::oid::INVALID_OID;
49use mz_repr::adt::mz_acl_item::PrivilegeMap;
50use mz_repr::namespaces::{
51    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
52    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
53    UNSTABLE_SCHEMAS,
54};
55use mz_repr::network_policy_id::NetworkPolicyId;
56use mz_repr::optimize::OptimizerFeatures;
57use mz_repr::role_id::RoleId;
58use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
59use mz_secrets::InMemorySecretsController;
60use mz_sql::ast::Ident;
61use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
62use mz_sql::catalog::{
63    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
64    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
65    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
66    TypeReference,
67};
68use mz_sql::names::{
69    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
70    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
71    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
72};
73use mz_sql::plan::{
74    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
75    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
76    Plan, PlanContext,
77};
78use mz_sql::rbac;
79use mz_sql::session::metadata::SessionMetadata;
80use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
81use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
82use mz_sql_parser::ast::QualifiedReplica;
83use mz_storage_client::controller::StorageMetadata;
84use mz_storage_types::connections::ConnectionContext;
85use mz_storage_types::connections::inline::{
86    ConnectionResolver, InlinedConnection, IntoInlineConnection,
87};
88use serde::Serialize;
89use timely::progress::Antichain;
90use tokio::sync::mpsc;
91use tracing::{debug, warn};
92
93// DO NOT add any more imports from `crate` outside of `crate::catalog`.
94use crate::AdapterError;
95use crate::catalog::{Catalog, ConnCatalog};
96use crate::coord::ConnMeta;
97use crate::optimize::{self, Optimize, OptimizerCatalog};
98use crate::session::Session;
99
100/// The in-memory representation of the Catalog. This struct is not directly used to persist
101/// metadata to persistent storage. For persistent metadata see
102/// [`mz_catalog::durable::DurableCatalogState`].
103///
104/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
105/// storing the contents of this struct on disk.
106#[derive(Debug, Clone, Serialize)]
107pub struct CatalogState {
108    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
109    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
110    // include:
111    //  - Temporary items.
112    //  - Certain objects are partially derived from read-only state.
113    pub(super) database_by_name: BTreeMap<String, DatabaseId>,
114    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
115    pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
116    #[serde(serialize_with = "skip_temp_items")]
117    pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
118    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
119    pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
120    pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
121    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
122    pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
123    pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
124    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125    pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
126    pub(super) roles_by_name: BTreeMap<String, RoleId>,
127    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128    pub(super) roles_by_id: BTreeMap<RoleId, Role>,
129    pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
130    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131    pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
134
135    #[serde(skip)]
136    pub(super) system_configuration: SystemVars,
137    pub(super) default_privileges: DefaultPrivileges,
138    pub(super) system_privileges: PrivilegeMap,
139    pub(super) comments: CommentsMap,
140    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
141    pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
142    pub(super) storage_metadata: StorageMetadata,
143
144    // Mutable state not derived from the durable catalog.
145    #[serde(skip)]
146    pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
147
148    // Read-only state not derived from the durable catalog.
149    #[serde(skip)]
150    pub(super) config: mz_sql::catalog::CatalogConfig,
151    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
152    #[serde(skip)]
153    pub(crate) availability_zones: Vec<String>,
154
155    // Read-only not derived from the durable catalog.
156    #[serde(skip)]
157    pub(super) egress_addresses: Vec<IpNet>,
158    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
159    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
160    pub(super) http_host_name: Option<String>,
161}
162
163/// Keeps track of what expressions are cached or not during startup.
164#[derive(Debug, Clone, Serialize)]
165pub(crate) enum LocalExpressionCache {
166    /// The cache is being used.
167    Open {
168        /// The local expressions that were cached in the expression cache.
169        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
170        /// The local expressions that were NOT cached in the expression cache.
171        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
172    },
173    /// The cache is not being used.
174    Closed,
175}
176
177impl LocalExpressionCache {
178    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
179        Self::Open {
180            cached_exprs,
181            uncached_exprs: BTreeMap::new(),
182        }
183    }
184
185    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
186        match self {
187            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
188            LocalExpressionCache::Closed => None,
189        }
190    }
191
192    /// Insert an expression that was cached, back into the cache. This is generally needed when
193    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
194    pub(super) fn insert_cached_expression(
195        &mut self,
196        id: GlobalId,
197        local_expressions: LocalExpressions,
198    ) {
199        match self {
200            LocalExpressionCache::Open { cached_exprs, .. } => {
201                cached_exprs.insert(id, local_expressions);
202            }
203            LocalExpressionCache::Closed => {}
204        }
205    }
206
207    /// Inform the cache that `id` was not found in the cache and that we should add it as
208    /// `local_mir` and `optimizer_features`.
209    pub(super) fn insert_uncached_expression(
210        &mut self,
211        id: GlobalId,
212        local_mir: OptimizedMirRelationExpr,
213        optimizer_features: OptimizerFeatures,
214    ) {
215        match self {
216            LocalExpressionCache::Open { uncached_exprs, .. } => {
217                let local_expr = LocalExpressions {
218                    local_mir,
219                    optimizer_features,
220                };
221                // If we are trying to cache the same item a second time, with a different
222                // expression, then we must be migrating the object or doing something else weird.
223                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
224                // version after a restart. Caching the migrated version may cause us to incorrectly
225                // think that the object has already been migrated. To simplify things, we cache
226                // neither.
227                let prev = uncached_exprs.remove(&id);
228                match prev {
229                    Some(prev) if prev == local_expr => {
230                        uncached_exprs.insert(id, local_expr);
231                    }
232                    None => {
233                        uncached_exprs.insert(id, local_expr);
234                    }
235                    Some(_) => {}
236                }
237            }
238            LocalExpressionCache::Closed => {}
239        }
240    }
241
242    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
243        match self {
244            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
245            LocalExpressionCache::Closed => BTreeMap::new(),
246        }
247    }
248}
249
250fn skip_temp_items<S>(
251    entries: &BTreeMap<CatalogItemId, CatalogEntry>,
252    serializer: S,
253) -> Result<S::Ok, S::Error>
254where
255    S: serde::Serializer,
256{
257    mz_ore::serde::map_key_to_string(
258        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
259        serializer,
260    )
261}
262
263impl CatalogState {
264    /// Returns an empty [`CatalogState`] that can be used in tests.
265    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
266    // tests are structured in this repository.
267    pub fn empty_test() -> Self {
268        CatalogState {
269            database_by_name: Default::default(),
270            database_by_id: Default::default(),
271            entry_by_id: Default::default(),
272            entry_by_global_id: Default::default(),
273            ambient_schemas_by_name: Default::default(),
274            ambient_schemas_by_id: Default::default(),
275            temporary_schemas: Default::default(),
276            clusters_by_id: Default::default(),
277            clusters_by_name: Default::default(),
278            network_policies_by_name: Default::default(),
279            roles_by_name: Default::default(),
280            roles_by_id: Default::default(),
281            network_policies_by_id: Default::default(),
282            role_auth_by_id: Default::default(),
283            config: CatalogConfig {
284                start_time: Default::default(),
285                start_instant: Instant::now(),
286                nonce: Default::default(),
287                environment_id: EnvironmentId::for_tests(),
288                session_id: Default::default(),
289                build_info: &DUMMY_BUILD_INFO,
290                timestamp_interval: Default::default(),
291                now: NOW_ZERO.clone(),
292                connection_context: ConnectionContext::for_tests(Arc::new(
293                    InMemorySecretsController::new(),
294                )),
295                builtins_cfg: BuiltinsConfig {
296                    include_continual_tasks: true,
297                },
298                helm_chart_version: None,
299            },
300            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
301            availability_zones: Default::default(),
302            system_configuration: Default::default(),
303            egress_addresses: Default::default(),
304            aws_principal_context: Default::default(),
305            aws_privatelink_availability_zones: Default::default(),
306            http_host_name: Default::default(),
307            default_privileges: Default::default(),
308            system_privileges: Default::default(),
309            comments: Default::default(),
310            source_references: Default::default(),
311            storage_metadata: Default::default(),
312        }
313    }
314
315    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
316        let search_path = self.resolve_search_path(session);
317        let database = self
318            .database_by_name
319            .get(session.vars().database())
320            .map(|id| id.clone());
321        let state = match session.transaction().catalog_state() {
322            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
323            None => Cow::Borrowed(self),
324        };
325        ConnCatalog {
326            state,
327            unresolvable_ids: BTreeSet::new(),
328            conn_id: session.conn_id().clone(),
329            cluster: session.vars().cluster().into(),
330            database,
331            search_path,
332            role_id: session.current_role_id().clone(),
333            prepared_statements: Some(session.prepared_statements()),
334            notices_tx: session.retain_notice_transmitter(),
335        }
336    }
337
338    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog {
339        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
340        let cluster = self.system_configuration.default_cluster();
341
342        ConnCatalog {
343            state: Cow::Borrowed(self),
344            unresolvable_ids: BTreeSet::new(),
345            conn_id: SYSTEM_CONN_ID.clone(),
346            cluster,
347            database: self
348                .resolve_database(DEFAULT_DATABASE_NAME)
349                .ok()
350                .map(|db| db.id()),
351            // Leaving the system's search path empty allows us to catch issues
352            // where catalog object names have not been normalized correctly.
353            search_path: Vec::new(),
354            role_id,
355            prepared_statements: None,
356            notices_tx,
357        }
358    }
359
360    pub fn for_system_session(&self) -> ConnCatalog {
361        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
362    }
363
364    /// Returns an iterator over the deduplicated identifiers of all
365    /// objects this catalog entry transitively depends on (where
366    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
367    /// [`CatalogItem::references`]).
368    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
369        struct I<'a> {
370            queue: VecDeque<CatalogItemId>,
371            seen: BTreeSet<CatalogItemId>,
372            this: &'a CatalogState,
373        }
374        impl<'a> Iterator for I<'a> {
375            type Item = CatalogItemId;
376            fn next(&mut self) -> Option<Self::Item> {
377                if let Some(next) = self.queue.pop_front() {
378                    for child in self.this.get_entry(&next).item().uses() {
379                        if !self.seen.contains(&child) {
380                            self.queue.push_back(child);
381                            self.seen.insert(child);
382                        }
383                    }
384                    Some(next)
385                } else {
386                    None
387                }
388            }
389        }
390
391        I {
392            queue: [id].into_iter().collect(),
393            seen: [id].into_iter().collect(),
394            this: self,
395        }
396    }
397
398    /// Computes the IDs of any log sources this catalog entry transitively
399    /// depends on.
400    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
401        let mut out = Vec::new();
402        self.introspection_dependencies_inner(id, &mut out);
403        out
404    }
405
406    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
407        match self.get_entry(&id).item() {
408            CatalogItem::Log(_) => out.push(id),
409            item @ (CatalogItem::View(_)
410            | CatalogItem::MaterializedView(_)
411            | CatalogItem::Connection(_)
412            | CatalogItem::ContinualTask(_)) => {
413                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
414                for item_id in item.references().items() {
415                    self.introspection_dependencies_inner(*item_id, out);
416                }
417            }
418            CatalogItem::Sink(sink) => {
419                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
420                self.introspection_dependencies_inner(from_item_id, out)
421            }
422            CatalogItem::Index(idx) => {
423                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
424                self.introspection_dependencies_inner(on_item_id, out)
425            }
426            CatalogItem::Table(_)
427            | CatalogItem::Source(_)
428            | CatalogItem::Type(_)
429            | CatalogItem::Func(_)
430            | CatalogItem::Secret(_) => (),
431        }
432    }
433
434    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
435    ///
436    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
437    /// earlier in the list than the roots. This is particularly useful for the order to drop
438    /// objects.
439    pub(super) fn object_dependents(
440        &self,
441        object_ids: &Vec<ObjectId>,
442        conn_id: &ConnectionId,
443        seen: &mut BTreeSet<ObjectId>,
444    ) -> Vec<ObjectId> {
445        let mut dependents = Vec::new();
446        for object_id in object_ids {
447            match object_id {
448                ObjectId::Cluster(id) => {
449                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
450                }
451                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
452                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
453                ),
454                ObjectId::Database(id) => {
455                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
456                }
457                ObjectId::Schema((database_spec, schema_spec)) => {
458                    dependents.extend_from_slice(&self.schema_dependents(
459                        database_spec.clone(),
460                        schema_spec.clone(),
461                        conn_id,
462                        seen,
463                    ));
464                }
465                ObjectId::NetworkPolicy(id) => {
466                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
467                }
468                id @ ObjectId::Role(_) => {
469                    let unseen = seen.insert(id.clone());
470                    if unseen {
471                        dependents.push(id.clone());
472                    }
473                }
474                ObjectId::Item(id) => {
475                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
476                }
477            }
478        }
479        dependents
480    }
481
482    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
483    /// itself.
484    ///
485    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
486    /// earlier in the list than the roots. This is particularly useful for the order to drop
487    /// objects.
488    fn cluster_dependents(
489        &self,
490        cluster_id: ClusterId,
491        seen: &mut BTreeSet<ObjectId>,
492    ) -> Vec<ObjectId> {
493        let mut dependents = Vec::new();
494        let object_id = ObjectId::Cluster(cluster_id);
495        if !seen.contains(&object_id) {
496            seen.insert(object_id.clone());
497            let cluster = self.get_cluster(cluster_id);
498            for item_id in cluster.bound_objects() {
499                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
500            }
501            for replica_id in cluster.replica_ids().values() {
502                dependents.extend_from_slice(&self.cluster_replica_dependents(
503                    cluster_id,
504                    *replica_id,
505                    seen,
506                ));
507            }
508            dependents.push(object_id);
509        }
510        dependents
511    }
512
513    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
514    /// itself.
515    ///
516    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
517    /// earlier in the list than the roots. This is particularly useful for the order to drop
518    /// objects.
519    pub(super) fn cluster_replica_dependents(
520        &self,
521        cluster_id: ClusterId,
522        replica_id: ReplicaId,
523        seen: &mut BTreeSet<ObjectId>,
524    ) -> Vec<ObjectId> {
525        let mut dependents = Vec::new();
526        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
527        if !seen.contains(&object_id) {
528            seen.insert(object_id.clone());
529            dependents.push(object_id);
530        }
531        dependents
532    }
533
534    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
535    /// itself.
536    ///
537    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
538    /// earlier in the list than the roots. This is particularly useful for the order to drop
539    /// objects.
540    fn database_dependents(
541        &self,
542        database_id: DatabaseId,
543        conn_id: &ConnectionId,
544        seen: &mut BTreeSet<ObjectId>,
545    ) -> Vec<ObjectId> {
546        let mut dependents = Vec::new();
547        let object_id = ObjectId::Database(database_id);
548        if !seen.contains(&object_id) {
549            seen.insert(object_id.clone());
550            let database = self.get_database(&database_id);
551            for schema_id in database.schema_ids().values() {
552                dependents.extend_from_slice(&self.schema_dependents(
553                    ResolvedDatabaseSpecifier::Id(database_id),
554                    SchemaSpecifier::Id(*schema_id),
555                    conn_id,
556                    seen,
557                ));
558            }
559            dependents.push(object_id);
560        }
561        dependents
562    }
563
564    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
565    /// itself.
566    ///
567    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
568    /// earlier in the list than the roots. This is particularly useful for the order to drop
569    /// objects.
570    fn schema_dependents(
571        &self,
572        database_spec: ResolvedDatabaseSpecifier,
573        schema_spec: SchemaSpecifier,
574        conn_id: &ConnectionId,
575        seen: &mut BTreeSet<ObjectId>,
576    ) -> Vec<ObjectId> {
577        let mut dependents = Vec::new();
578        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
579        if !seen.contains(&object_id) {
580            seen.insert(object_id.clone());
581            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
582            for item_id in schema.item_ids() {
583                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
584            }
585            dependents.push(object_id)
586        }
587        dependents
588    }
589
590    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
591    /// itself.
592    ///
593    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
594    /// earlier in the list than the roots. This is particularly useful for the order to drop
595    /// objects.
596    pub(super) fn item_dependents(
597        &self,
598        item_id: CatalogItemId,
599        seen: &mut BTreeSet<ObjectId>,
600    ) -> Vec<ObjectId> {
601        let mut dependents = Vec::new();
602        let object_id = ObjectId::Item(item_id);
603        if !seen.contains(&object_id) {
604            seen.insert(object_id.clone());
605            let entry = self.get_entry(&item_id);
606            for dependent_id in entry.used_by() {
607                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
608            }
609            dependents.push(object_id);
610            // We treat the progress collection as if it depends on the source
611            // for dropping. We have additional code in planning to create a
612            // kind of special-case "CASCADE" for this dependency.
613            if let Some(progress_id) = entry.progress_id() {
614                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
615            }
616        }
617        dependents
618    }
619
620    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
621    /// itself.
622    ///
623    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
624    /// earlier in the list than the roots. This is particularly useful for the order to drop
625    /// objects.
626    pub(super) fn network_policy_dependents(
627        &self,
628        network_policy_id: NetworkPolicyId,
629        _seen: &mut BTreeSet<ObjectId>,
630    ) -> Vec<ObjectId> {
631        let object_id = ObjectId::NetworkPolicy(network_policy_id);
632        // Currently network policies have no dependents
633        // when we add the ability for users or sources/sinks to have policies
634        // this method will need to be updated.
635        vec![object_id]
636    }
637
638    /// Indicates whether the indicated item is considered stable or not.
639    ///
640    /// Only stable items can be used as dependencies of other catalog items.
641    fn is_stable(&self, id: CatalogItemId) -> bool {
642        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
643        !self.is_unstable_schema_specifier(spec)
644    }
645
646    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
647        if self.system_config().unsafe_enable_unstable_dependencies() {
648            return Ok(());
649        }
650
651        let unstable_dependencies: Vec<_> = item
652            .references()
653            .items()
654            .filter(|id| !self.is_stable(**id))
655            .map(|id| self.get_entry(id).name().item.clone())
656            .collect();
657
658        // It's okay to create a temporary object with unstable
659        // dependencies, since we will never need to reboot a catalog
660        // that contains it.
661        if unstable_dependencies.is_empty() || item.is_temporary() {
662            Ok(())
663        } else {
664            let object_type = item.typ().to_string();
665            Err(Error {
666                kind: ErrorKind::UnstableDependency {
667                    object_type,
668                    unstable_dependencies,
669                },
670            })
671        }
672    }
673
674    pub fn resolve_full_name(
675        &self,
676        name: &QualifiedItemName,
677        conn_id: Option<&ConnectionId>,
678    ) -> FullItemName {
679        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
680
681        let database = match &name.qualifiers.database_spec {
682            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
683            ResolvedDatabaseSpecifier::Id(id) => {
684                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
685            }
686        };
687        let schema = self
688            .get_schema(
689                &name.qualifiers.database_spec,
690                &name.qualifiers.schema_spec,
691                conn_id,
692            )
693            .name()
694            .schema
695            .clone();
696        FullItemName {
697            database,
698            schema,
699            item: name.item.clone(),
700        }
701    }
702
703    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
704        let database = match &name.database {
705            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
706            ResolvedDatabaseSpecifier::Id(id) => {
707                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
708            }
709        };
710        FullSchemaName {
711            database,
712            schema: name.schema.clone(),
713        }
714    }
715
716    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
717        self.entry_by_id
718            .get(id)
719            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
720    }
721
722    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
723        let item_id = self
724            .entry_by_global_id
725            .get(id)
726            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
727
728        let entry = self.get_entry(item_id).clone();
729        let version = match entry.item() {
730            CatalogItem::Table(table) => {
731                let (version, _) = table
732                    .collections
733                    .iter()
734                    .find(|(_verison, gid)| *gid == id)
735                    .expect("version to exist");
736                RelationVersionSelector::Specific(*version)
737            }
738            _ => RelationVersionSelector::Latest,
739        };
740        CatalogCollectionEntry { entry, version }
741    }
742
743    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
744        self.entry_by_id.iter()
745    }
746
747    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
748        let schema = self
749            .temporary_schemas
750            .get(conn)
751            .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
752        schema.items.values().copied().map(ObjectId::from)
753    }
754
755    /// Gets a type named `name` from exactly one of the system schemas.
756    ///
757    /// # Panics
758    /// - If `name` is not an entry in any system schema
759    /// - If more than one system schema has an entry named `name`.
760    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
761        let mut res = None;
762        for schema_id in self.system_schema_ids() {
763            let schema = &self.ambient_schemas_by_id[&schema_id];
764            if let Some(global_id) = schema.types.get(name) {
765                match res {
766                    None => res = Some(self.get_entry(global_id)),
767                    Some(_) => panic!(
768                        "only call get_system_type on objects uniquely identifiable in one system schema"
769                    ),
770                }
771            }
772        }
773
774        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
775    }
776
777    pub fn get_item_by_name(
778        &self,
779        name: &QualifiedItemName,
780        conn_id: &ConnectionId,
781    ) -> Option<&CatalogEntry> {
782        self.get_schema(
783            &name.qualifiers.database_spec,
784            &name.qualifiers.schema_spec,
785            conn_id,
786        )
787        .items
788        .get(&name.item)
789        .and_then(|id| self.try_get_entry(id))
790    }
791
792    pub fn get_type_by_name(
793        &self,
794        name: &QualifiedItemName,
795        conn_id: &ConnectionId,
796    ) -> Option<&CatalogEntry> {
797        self.get_schema(
798            &name.qualifiers.database_spec,
799            &name.qualifiers.schema_spec,
800            conn_id,
801        )
802        .types
803        .get(&name.item)
804        .and_then(|id| self.try_get_entry(id))
805    }
806
807    pub(super) fn find_available_name(
808        &self,
809        mut name: QualifiedItemName,
810        conn_id: &ConnectionId,
811    ) -> QualifiedItemName {
812        let mut i = 0;
813        let orig_item_name = name.item.clone();
814        while self.get_item_by_name(&name, conn_id).is_some() {
815            i += 1;
816            name.item = format!("{}{}", orig_item_name, i);
817        }
818        name
819    }
820
821    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
822        self.entry_by_id.get(id)
823    }
824
825    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
826        let item_id = self.entry_by_global_id.get(id)?;
827        self.try_get_entry(item_id)
828    }
829
830    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
831    /// object that returns rows.
832    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<RelationDesc>> {
833        let entry = self.try_get_entry_by_global_id(id)?;
834        let desc = match entry.item() {
835            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
836            // TODO(alter_table): Support schema evolution on sources.
837            other => other.desc_opt(RelationVersionSelector::Latest)?,
838        };
839        Some(desc)
840    }
841
842    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
843        self.try_get_cluster(cluster_id)
844            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
845    }
846
847    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
848        self.clusters_by_id.get(&cluster_id)
849    }
850
851    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
852        self.roles_by_id.get(id)
853    }
854
855    pub fn get_role(&self, id: &RoleId) -> &Role {
856        self.roles_by_id.get(id).expect("catalog out of sync")
857    }
858
859    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
860        self.roles_by_id.keys()
861    }
862
863    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
864        self.roles_by_name
865            .get(role_name)
866            .map(|id| &self.roles_by_id[id])
867    }
868
869    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
870        self.role_auth_by_id.get(id)
871    }
872
873    pub(super) fn try_get_network_policy_by_name(
874        &self,
875        policy_name: &str,
876    ) -> Option<&NetworkPolicy> {
877        self.network_policies_by_name
878            .get(policy_name)
879            .map(|id| &self.network_policies_by_id[id])
880    }
881
882    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
883        let mut membership = BTreeSet::new();
884        let mut queue = VecDeque::from(vec![id]);
885        while let Some(cur_id) = queue.pop_front() {
886            if !membership.contains(cur_id) {
887                membership.insert(cur_id.clone());
888                let role = self.get_role(cur_id);
889                soft_assert_no_log!(
890                    !role.membership().keys().contains(id),
891                    "circular membership exists in the catalog"
892                );
893                queue.extend(role.membership().keys());
894            }
895        }
896        membership.insert(RoleId::Public);
897        membership
898    }
899
900    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
901        self.network_policies_by_id
902            .get(id)
903            .expect("catalog out of sync")
904    }
905
906    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
907        self.network_policies_by_id.keys()
908    }
909
910    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
911    /// source.
912    ///
913    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
914    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
915        let entry = self.try_get_entry(id)?;
916        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
917        let name = self.resolve_full_name(entry.name(), None);
918        let host_name = self
919            .http_host_name
920            .as_ref()
921            .map(|x| x.as_str())
922            .unwrap_or_else(|| "HOST");
923
924        let RawDatabaseSpecifier::Name(database) = name.database else {
925            return None;
926        };
927
928        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
929        url.path_segments_mut()
930            .ok()?
931            .push(&database)
932            .push(&name.schema)
933            .push(&name.item);
934
935        Some(url)
936    }
937
938    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
939    ///
940    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
941    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
942    ///
943    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
944    /// their original state before the method returns.
945    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
946        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
947        // caller if this method changed the state of the catalog.
948        &mut self,
949        create_sql: &str,
950        force_if_exists_skip: bool,
951    ) -> Result<(Plan, ResolvedIds), AdapterError> {
952        self.with_enable_for_item_parsing(|state| {
953            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
954            let pcx = Some(&pcx);
955            let session_catalog = state.for_system_session();
956
957            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
958            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
959            let plan =
960                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
961
962            Ok((plan, resolved_ids))
963        })
964    }
965
966    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
967    #[mz_ore::instrument]
968    pub(crate) fn parse_plan(
969        create_sql: &str,
970        pcx: Option<&PlanContext>,
971        catalog: &ConnCatalog,
972    ) -> Result<(Plan, ResolvedIds), AdapterError> {
973        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
974        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
975        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
976
977        return Ok((plan, resolved_ids));
978    }
979
980    /// Parses the given SQL string into a pair of [`CatalogItem`].
981    pub(crate) fn deserialize_item(
982        &self,
983        global_id: GlobalId,
984        create_sql: &str,
985        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
986        local_expression_cache: &mut LocalExpressionCache,
987        previous_item: Option<CatalogItem>,
988    ) -> Result<CatalogItem, AdapterError> {
989        self.parse_item(
990            global_id,
991            create_sql,
992            extra_versions,
993            None,
994            false,
995            None,
996            local_expression_cache,
997            previous_item,
998        )
999    }
1000
1001    /// Parses the given SQL string into a `CatalogItem`.
1002    #[mz_ore::instrument]
1003    pub(crate) fn parse_item(
1004        &self,
1005        global_id: GlobalId,
1006        create_sql: &str,
1007        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1008        pcx: Option<&PlanContext>,
1009        is_retained_metrics_object: bool,
1010        custom_logical_compaction_window: Option<CompactionWindow>,
1011        local_expression_cache: &mut LocalExpressionCache,
1012        previous_item: Option<CatalogItem>,
1013    ) -> Result<CatalogItem, AdapterError> {
1014        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1015        match self.parse_item_inner(
1016            global_id,
1017            create_sql,
1018            extra_versions,
1019            pcx,
1020            is_retained_metrics_object,
1021            custom_logical_compaction_window,
1022            cached_expr,
1023            previous_item,
1024        ) {
1025            Ok((item, uncached_expr)) => {
1026                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1027                    local_expression_cache.insert_uncached_expression(
1028                        global_id,
1029                        uncached_expr,
1030                        optimizer_features,
1031                    );
1032                }
1033                Ok(item)
1034            }
1035            Err((err, cached_expr)) => {
1036                if let Some(local_expr) = cached_expr {
1037                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1038                }
1039                Err(err)
1040            }
1041        }
1042    }
1043
1044    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1045    ///
1046    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1047    /// not cached.
1048    ///
1049    /// On failure returns an error and `cached_expr` so it can be used later.
1050    #[mz_ore::instrument]
1051    pub(crate) fn parse_item_inner(
1052        &self,
1053        global_id: GlobalId,
1054        create_sql: &str,
1055        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1056        pcx: Option<&PlanContext>,
1057        is_retained_metrics_object: bool,
1058        custom_logical_compaction_window: Option<CompactionWindow>,
1059        cached_expr: Option<LocalExpressions>,
1060        previous_item: Option<CatalogItem>,
1061    ) -> Result<
1062        (
1063            CatalogItem,
1064            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1065        ),
1066        (AdapterError, Option<LocalExpressions>),
1067    > {
1068        let session_catalog = self.for_system_session();
1069
1070        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1071            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1072            Err(err) => return Err((err, cached_expr)),
1073        };
1074
1075        let mut uncached_expr = None;
1076
1077        let item = match plan {
1078            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1079                let collections = extra_versions
1080                    .iter()
1081                    .map(|(version, gid)| (*version, *gid))
1082                    .chain([(RelationVersion::root(), global_id)].into_iter())
1083                    .collect();
1084
1085                CatalogItem::Table(Table {
1086                    create_sql: Some(table.create_sql),
1087                    desc: table.desc,
1088                    collections,
1089                    conn_id: None,
1090                    resolved_ids,
1091                    custom_logical_compaction_window: custom_logical_compaction_window
1092                        .or(table.compaction_window),
1093                    is_retained_metrics_object,
1094                    data_source: match table.data_source {
1095                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1096                            TableDataSource::TableWrites { defaults }
1097                        }
1098                        mz_sql::plan::TableDataSource::DataSource {
1099                            desc: data_source_desc,
1100                            timeline,
1101                        } => match data_source_desc {
1102                            mz_sql::plan::DataSourceDesc::IngestionExport {
1103                                ingestion_id,
1104                                external_reference,
1105                                details,
1106                                data_config,
1107                            } => TableDataSource::DataSource {
1108                                desc: DataSourceDesc::IngestionExport {
1109                                    ingestion_id,
1110                                    external_reference,
1111                                    details,
1112                                    data_config,
1113                                },
1114                                timeline,
1115                            },
1116                            mz_sql::plan::DataSourceDesc::Webhook {
1117                                validate_using,
1118                                body_format,
1119                                headers,
1120                                cluster_id,
1121                            } => TableDataSource::DataSource {
1122                                desc: DataSourceDesc::Webhook {
1123                                    validate_using,
1124                                    body_format,
1125                                    headers,
1126                                    cluster_id: cluster_id
1127                                        .expect("Webhook Tables must have a cluster_id set"),
1128                                },
1129                                timeline,
1130                            },
1131                            _ => {
1132                                return Err((
1133                                    AdapterError::Unstructured(anyhow::anyhow!(
1134                                        "unsupported data source for table"
1135                                    )),
1136                                    cached_expr,
1137                                ));
1138                            }
1139                        },
1140                    },
1141                })
1142            }
1143            Plan::CreateSource(CreateSourcePlan {
1144                source,
1145                timeline,
1146                in_cluster,
1147                ..
1148            }) => CatalogItem::Source(Source {
1149                create_sql: Some(source.create_sql),
1150                data_source: match source.data_source {
1151                    mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1152                        DataSourceDesc::Ingestion {
1153                            ingestion_desc,
1154                            cluster_id: match in_cluster {
1155                                Some(id) => id,
1156                                None => {
1157                                    return Err((
1158                                        AdapterError::Unstructured(anyhow::anyhow!(
1159                                            "ingestion-based sources must have cluster specified"
1160                                        )),
1161                                        cached_expr,
1162                                    ));
1163                                }
1164                            },
1165                        }
1166                    }
1167                    mz_sql::plan::DataSourceDesc::IngestionExport {
1168                        ingestion_id,
1169                        external_reference,
1170                        details,
1171                        data_config,
1172                    } => DataSourceDesc::IngestionExport {
1173                        ingestion_id,
1174                        external_reference,
1175                        details,
1176                        data_config,
1177                    },
1178                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1179                    mz_sql::plan::DataSourceDesc::Webhook {
1180                        validate_using,
1181                        body_format,
1182                        headers,
1183                        cluster_id,
1184                    } => {
1185                        mz_ore::soft_assert_or_log!(
1186                            cluster_id.is_none(),
1187                            "cluster_id set at Source level for Webhooks"
1188                        );
1189                        DataSourceDesc::Webhook {
1190                            validate_using,
1191                            body_format,
1192                            headers,
1193                            cluster_id: in_cluster
1194                                .expect("webhook sources must use an existing cluster"),
1195                        }
1196                    }
1197                },
1198                desc: source.desc,
1199                global_id,
1200                timeline,
1201                resolved_ids,
1202                custom_logical_compaction_window: source
1203                    .compaction_window
1204                    .or(custom_logical_compaction_window),
1205                is_retained_metrics_object,
1206            }),
1207            Plan::CreateView(CreateViewPlan { view, .. }) => {
1208                // Collect optimizer parameters.
1209                let optimizer_config =
1210                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1211                let previous_exprs = previous_item.map(|item| match item {
1212                    CatalogItem::View(view) => (view.raw_expr, view.optimized_expr),
1213                    item => unreachable!("expected view, found: {item:#?}"),
1214                });
1215
1216                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1217                    (Some(local_expr), _)
1218                        if local_expr.optimizer_features == optimizer_config.features =>
1219                    {
1220                        debug!("local expression cache hit for {global_id:?}");
1221                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1222                    }
1223                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1224                    (_, Some((raw_expr, optimized_expr))) if *raw_expr == view.expr => {
1225                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1226                    }
1227                    (cached_expr, _) => {
1228                        let optimizer_features = optimizer_config.features.clone();
1229                        // Build an optimizer for this VIEW.
1230                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1231
1232                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1233                        let raw_expr = view.expr;
1234                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1235                            Ok(optimzed_expr) => optimzed_expr,
1236                            Err(err) => return Err((err.into(), cached_expr)),
1237                        };
1238
1239                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1240
1241                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1242                    }
1243                };
1244
1245                // Resolve all item dependencies from the HIR expression.
1246                let dependencies: BTreeSet<_> = raw_expr
1247                    .depends_on()
1248                    .into_iter()
1249                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1250                    .collect();
1251
1252                CatalogItem::View(View {
1253                    create_sql: view.create_sql,
1254                    global_id,
1255                    raw_expr,
1256                    desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1257                    optimized_expr,
1258                    conn_id: None,
1259                    resolved_ids,
1260                    dependencies: DependencyIds(dependencies),
1261                })
1262            }
1263            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1264                materialized_view, ..
1265            }) => {
1266                // Collect optimizer parameters.
1267                let optimizer_config =
1268                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1269                let previous_exprs = previous_item.map(|item| match item {
1270                    CatalogItem::MaterializedView(materialized_view) => {
1271                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1272                    }
1273                    item => unreachable!("expected materialized view, found: {item:#?}"),
1274                });
1275
1276                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1277                    (Some(local_expr), _)
1278                        if local_expr.optimizer_features == optimizer_config.features =>
1279                    {
1280                        debug!("local expression cache hit for {global_id:?}");
1281                        (
1282                            Arc::new(materialized_view.expr),
1283                            Arc::new(local_expr.local_mir),
1284                        )
1285                    }
1286                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1287                    (_, Some((raw_expr, optimized_expr)))
1288                        if *raw_expr == materialized_view.expr =>
1289                    {
1290                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1291                    }
1292                    (cached_expr, _) => {
1293                        let optimizer_features = optimizer_config.features.clone();
1294                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1295                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1296
1297                        let raw_expr = materialized_view.expr;
1298                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1299                            Ok(optimized_expr) => optimized_expr,
1300                            Err(err) => return Err((err.into(), cached_expr)),
1301                        };
1302
1303                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1304
1305                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1306                    }
1307                };
1308                let mut typ = optimized_expr.typ();
1309                for &i in &materialized_view.non_null_assertions {
1310                    typ.column_types[i].nullable = false;
1311                }
1312                let desc = RelationDesc::new(typ, materialized_view.column_names);
1313
1314                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1315
1316                // Resolve all item dependencies from the HIR expression.
1317                let dependencies = raw_expr
1318                    .depends_on()
1319                    .into_iter()
1320                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1321                    .collect();
1322
1323                CatalogItem::MaterializedView(MaterializedView {
1324                    create_sql: materialized_view.create_sql,
1325                    global_id,
1326                    raw_expr,
1327                    optimized_expr,
1328                    desc,
1329                    resolved_ids,
1330                    dependencies,
1331                    cluster_id: materialized_view.cluster_id,
1332                    non_null_assertions: materialized_view.non_null_assertions,
1333                    custom_logical_compaction_window: materialized_view.compaction_window,
1334                    refresh_schedule: materialized_view.refresh_schedule,
1335                    initial_as_of,
1336                })
1337            }
1338            Plan::CreateContinualTask(plan) => {
1339                let ct =
1340                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1341                        Ok(ct) => ct,
1342                        Err(err) => return Err((err, cached_expr)),
1343                    };
1344                CatalogItem::ContinualTask(ct)
1345            }
1346            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1347                create_sql: index.create_sql,
1348                global_id,
1349                on: index.on,
1350                keys: index.keys.into(),
1351                conn_id: None,
1352                resolved_ids,
1353                cluster_id: index.cluster_id,
1354                custom_logical_compaction_window: custom_logical_compaction_window
1355                    .or(index.compaction_window),
1356                is_retained_metrics_object,
1357            }),
1358            Plan::CreateSink(CreateSinkPlan {
1359                sink,
1360                with_snapshot,
1361                in_cluster,
1362                ..
1363            }) => CatalogItem::Sink(Sink {
1364                create_sql: sink.create_sql,
1365                global_id,
1366                from: sink.from,
1367                connection: sink.connection,
1368                envelope: sink.envelope,
1369                version: sink.version,
1370                with_snapshot,
1371                resolved_ids,
1372                cluster_id: in_cluster,
1373            }),
1374            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1375                let desc = match typ.inner.desc(&session_catalog) {
1376                    Ok(desc) => desc,
1377                    Err(err) => return Err((err.into(), cached_expr)),
1378                };
1379                CatalogItem::Type(Type {
1380                    create_sql: Some(typ.create_sql),
1381                    global_id,
1382                    desc,
1383                    details: CatalogTypeDetails {
1384                        array_id: None,
1385                        typ: typ.inner,
1386                        pg_metadata: None,
1387                    },
1388                    resolved_ids,
1389                })
1390            }
1391            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1392                create_sql: secret.create_sql,
1393                global_id,
1394            }),
1395            Plan::CreateConnection(CreateConnectionPlan {
1396                connection:
1397                    mz_sql::plan::Connection {
1398                        create_sql,
1399                        details,
1400                    },
1401                ..
1402            }) => CatalogItem::Connection(Connection {
1403                create_sql,
1404                global_id,
1405                details,
1406                resolved_ids,
1407            }),
1408            _ => {
1409                return Err((
1410                    Error::new(ErrorKind::Corruption {
1411                        detail: "catalog entry generated inappropriate plan".to_string(),
1412                    })
1413                    .into(),
1414                    cached_expr,
1415                ));
1416            }
1417        };
1418
1419        Ok((item, uncached_expr))
1420    }
1421
1422    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1423    /// Calling this method will not permanently modify any system configuration variables.
1424    ///
1425    /// WARNING:
1426    /// Any modifications made to the system configuration variables in `f`, will be lost.
1427    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1428        // Enable catalog features that might be required during planning existing
1429        // catalog items. Existing catalog items might have been created while
1430        // a specific feature flag was turned on, so we need to ensure that this
1431        // is also the case during catalog rehydration in order to avoid panics.
1432        //
1433        // WARNING / CONTRACT:
1434        // 1. Features used in this method that related to parsing / planning
1435        //    should be `enable_for_item_parsing` set to `true`.
1436        // 2. After this step, feature flag configuration must not be
1437        //    overridden.
1438        let restore = self.system_configuration.clone();
1439        self.system_configuration.enable_for_item_parsing();
1440        let res = f(self);
1441        self.system_configuration = restore;
1442        res
1443    }
1444
1445    /// Returns all indexes on the given object and cluster known in the catalog.
1446    pub fn get_indexes_on(
1447        &self,
1448        id: GlobalId,
1449        cluster: ClusterId,
1450    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1451        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1452
1453        self.try_get_entry_by_global_id(&id)
1454            .into_iter()
1455            .map(move |e| {
1456                e.used_by()
1457                    .iter()
1458                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1459                        CatalogItem::Index(index) if index_matches(index) => {
1460                            Some((index.global_id(), index))
1461                        }
1462                        _ => None,
1463                    })
1464            })
1465            .flatten()
1466    }
1467
1468    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1469        &self.database_by_id[database_id]
1470    }
1471
1472    /// Gets a reference to the specified replica of the specified cluster.
1473    ///
1474    /// Returns `None` if either the cluster or the replica does not
1475    /// exist.
1476    pub(super) fn try_get_cluster_replica(
1477        &self,
1478        id: ClusterId,
1479        replica_id: ReplicaId,
1480    ) -> Option<&ClusterReplica> {
1481        self.try_get_cluster(id)
1482            .and_then(|cluster| cluster.replica(replica_id))
1483    }
1484
1485    /// Gets a reference to the specified replica of the specified cluster.
1486    ///
1487    /// Panics if either the cluster or the replica does not exist.
1488    pub(super) fn get_cluster_replica(
1489        &self,
1490        cluster_id: ClusterId,
1491        replica_id: ReplicaId,
1492    ) -> &ClusterReplica {
1493        self.try_get_cluster_replica(cluster_id, replica_id)
1494            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1495    }
1496
1497    pub(super) fn resolve_replica_in_cluster(
1498        &self,
1499        cluster_id: &ClusterId,
1500        replica_name: &str,
1501    ) -> Result<&ClusterReplica, SqlCatalogError> {
1502        let cluster = self.get_cluster(*cluster_id);
1503        let replica_id = cluster
1504            .replica_id_by_name_
1505            .get(replica_name)
1506            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1507        Ok(&cluster.replicas_by_id_[replica_id])
1508    }
1509
1510    /// Get system configuration `name`.
1511    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1512        Ok(self.system_configuration.get(name)?)
1513    }
1514
1515    /// Parse system configuration `name` with `value` int.
1516    ///
1517    /// Returns the parsed value as a string.
1518    pub(super) fn parse_system_configuration(
1519        &self,
1520        name: &str,
1521        value: VarInput,
1522    ) -> Result<String, Error> {
1523        let value = self.system_configuration.parse(name, value)?;
1524        Ok(value.format())
1525    }
1526
1527    /// Gets the schema map for the database matching `database_spec`.
1528    pub(super) fn resolve_schema_in_database(
1529        &self,
1530        database_spec: &ResolvedDatabaseSpecifier,
1531        schema_name: &str,
1532        conn_id: &ConnectionId,
1533    ) -> Result<&Schema, SqlCatalogError> {
1534        let schema = match database_spec {
1535            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1536                self.temporary_schemas.get(conn_id)
1537            }
1538            ResolvedDatabaseSpecifier::Ambient => self
1539                .ambient_schemas_by_name
1540                .get(schema_name)
1541                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1542            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1543                db.schemas_by_name
1544                    .get(schema_name)
1545                    .and_then(|id| db.schemas_by_id.get(id))
1546            }),
1547        };
1548        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1549    }
1550
1551    pub fn get_schema(
1552        &self,
1553        database_spec: &ResolvedDatabaseSpecifier,
1554        schema_spec: &SchemaSpecifier,
1555        conn_id: &ConnectionId,
1556    ) -> &Schema {
1557        // Keep in sync with `get_schemas_mut`
1558        match (database_spec, schema_spec) {
1559            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1560                &self.temporary_schemas[conn_id]
1561            }
1562            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1563                &self.ambient_schemas_by_id[id]
1564            }
1565
1566            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1567                &self.database_by_id[database_id].schemas_by_id[schema_id]
1568            }
1569            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1570                unreachable!("temporary schemas are in the ambient database")
1571            }
1572        }
1573    }
1574
1575    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1576        self.database_by_id
1577            .values()
1578            .filter_map(|database| database.schemas_by_id.get(schema_id))
1579            .chain(self.ambient_schemas_by_id.values())
1580            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1581            .into_first()
1582    }
1583
1584    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1585        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1586    }
1587
1588    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1589        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1590    }
1591
1592    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1593        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1594    }
1595
1596    pub fn get_information_schema_id(&self) -> SchemaId {
1597        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1598    }
1599
1600    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1601        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1602    }
1603
1604    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1605        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1606    }
1607
1608    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1609        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1610    }
1611
1612    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1613        SYSTEM_SCHEMAS
1614            .iter()
1615            .map(|name| self.ambient_schemas_by_name[*name])
1616    }
1617
1618    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1619        self.system_schema_ids().contains(&id)
1620    }
1621
1622    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1623        match spec {
1624            SchemaSpecifier::Temporary => false,
1625            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1626        }
1627    }
1628
1629    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1630        UNSTABLE_SCHEMAS
1631            .iter()
1632            .map(|name| self.ambient_schemas_by_name[*name])
1633    }
1634
1635    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1636        self.unstable_schema_ids().contains(&id)
1637    }
1638
1639    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1640        match spec {
1641            SchemaSpecifier::Temporary => false,
1642            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1643        }
1644    }
1645
1646    /// Creates a new schema in the `Catalog` for temporary items
1647    /// indicated by the TEMPORARY or TEMP keywords.
1648    pub fn create_temporary_schema(
1649        &mut self,
1650        conn_id: &ConnectionId,
1651        owner_id: RoleId,
1652    ) -> Result<(), Error> {
1653        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1654        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1655        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1656        // all -1.
1657        let oid = INVALID_OID;
1658        self.temporary_schemas.insert(
1659            conn_id.clone(),
1660            Schema {
1661                name: QualifiedSchemaName {
1662                    database: ResolvedDatabaseSpecifier::Ambient,
1663                    schema: MZ_TEMP_SCHEMA.into(),
1664                },
1665                id: SchemaSpecifier::Temporary,
1666                oid,
1667                items: BTreeMap::new(),
1668                functions: BTreeMap::new(),
1669                types: BTreeMap::new(),
1670                owner_id,
1671                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1672                    mz_sql::catalog::ObjectType::Schema,
1673                    owner_id,
1674                )]),
1675            },
1676        );
1677        Ok(())
1678    }
1679
1680    /// Return all OIDs that are allocated to temporary objects.
1681    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1682        std::iter::empty()
1683            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1684                if schema.id.is_temporary() {
1685                    Some(schema.oid)
1686                } else {
1687                    None
1688                }
1689            }))
1690            .chain(self.entry_by_id.values().filter_map(|entry| {
1691                if entry.item().is_temporary() {
1692                    Some(entry.oid)
1693                } else {
1694                    None
1695                }
1696            }))
1697    }
1698
1699    /// Optimized lookup for a builtin table.
1700    ///
1701    /// Panics if the builtin table doesn't exist in the catalog.
1702    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1703        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1704    }
1705
1706    /// Optimized lookup for a builtin log.
1707    ///
1708    /// Panics if the builtin log doesn't exist in the catalog.
1709    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1710        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1711        let log = match self.get_entry(&item_id).item() {
1712            CatalogItem::Log(log) => log,
1713            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1714        };
1715        (item_id, log.global_id)
1716    }
1717
1718    /// Optimized lookup for a builtin storage collection.
1719    ///
1720    /// Panics if the builtin storage collection doesn't exist in the catalog.
1721    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1722        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1723    }
1724
1725    /// Optimized lookup for a builtin object.
1726    ///
1727    /// Panics if the builtin object doesn't exist in the catalog.
1728    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1729        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1730        let schema = &self.ambient_schemas_by_id[schema_id];
1731        match builtin.catalog_item_type() {
1732            CatalogItemType::Type => schema.types[builtin.name()],
1733            CatalogItemType::Func => schema.functions[builtin.name()],
1734            CatalogItemType::Table
1735            | CatalogItemType::Source
1736            | CatalogItemType::Sink
1737            | CatalogItemType::View
1738            | CatalogItemType::MaterializedView
1739            | CatalogItemType::Index
1740            | CatalogItemType::Secret
1741            | CatalogItemType::Connection
1742            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1743        }
1744    }
1745
1746    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1747    pub fn resolve_builtin_type_references(
1748        &self,
1749        builtin: &BuiltinType<NameReference>,
1750    ) -> BuiltinType<IdReference> {
1751        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1752            CatalogType::AclItem => CatalogType::AclItem,
1753            CatalogType::Array { element_reference } => CatalogType::Array {
1754                element_reference: self.get_system_type(element_reference).id,
1755            },
1756            CatalogType::List {
1757                element_reference,
1758                element_modifiers,
1759            } => CatalogType::List {
1760                element_reference: self.get_system_type(element_reference).id,
1761                element_modifiers: element_modifiers.clone(),
1762            },
1763            CatalogType::Map {
1764                key_reference,
1765                value_reference,
1766                key_modifiers,
1767                value_modifiers,
1768            } => CatalogType::Map {
1769                key_reference: self.get_system_type(key_reference).id,
1770                value_reference: self.get_system_type(value_reference).id,
1771                key_modifiers: key_modifiers.clone(),
1772                value_modifiers: value_modifiers.clone(),
1773            },
1774            CatalogType::Range { element_reference } => CatalogType::Range {
1775                element_reference: self.get_system_type(element_reference).id,
1776            },
1777            CatalogType::Record { fields } => CatalogType::Record {
1778                fields: fields
1779                    .into_iter()
1780                    .map(|f| CatalogRecordField {
1781                        name: f.name.clone(),
1782                        type_reference: self.get_system_type(f.type_reference).id,
1783                        type_modifiers: f.type_modifiers.clone(),
1784                    })
1785                    .collect(),
1786            },
1787            CatalogType::Bool => CatalogType::Bool,
1788            CatalogType::Bytes => CatalogType::Bytes,
1789            CatalogType::Char => CatalogType::Char,
1790            CatalogType::Date => CatalogType::Date,
1791            CatalogType::Float32 => CatalogType::Float32,
1792            CatalogType::Float64 => CatalogType::Float64,
1793            CatalogType::Int16 => CatalogType::Int16,
1794            CatalogType::Int32 => CatalogType::Int32,
1795            CatalogType::Int64 => CatalogType::Int64,
1796            CatalogType::UInt16 => CatalogType::UInt16,
1797            CatalogType::UInt32 => CatalogType::UInt32,
1798            CatalogType::UInt64 => CatalogType::UInt64,
1799            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1800            CatalogType::Interval => CatalogType::Interval,
1801            CatalogType::Jsonb => CatalogType::Jsonb,
1802            CatalogType::Numeric => CatalogType::Numeric,
1803            CatalogType::Oid => CatalogType::Oid,
1804            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1805            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1806            CatalogType::Pseudo => CatalogType::Pseudo,
1807            CatalogType::RegClass => CatalogType::RegClass,
1808            CatalogType::RegProc => CatalogType::RegProc,
1809            CatalogType::RegType => CatalogType::RegType,
1810            CatalogType::String => CatalogType::String,
1811            CatalogType::Time => CatalogType::Time,
1812            CatalogType::Timestamp => CatalogType::Timestamp,
1813            CatalogType::TimestampTz => CatalogType::TimestampTz,
1814            CatalogType::Uuid => CatalogType::Uuid,
1815            CatalogType::VarChar => CatalogType::VarChar,
1816            CatalogType::Int2Vector => CatalogType::Int2Vector,
1817            CatalogType::MzAclItem => CatalogType::MzAclItem,
1818        };
1819
1820        BuiltinType {
1821            name: builtin.name,
1822            schema: builtin.schema,
1823            oid: builtin.oid,
1824            details: CatalogTypeDetails {
1825                array_id: builtin.details.array_id,
1826                typ,
1827                pg_metadata: builtin.details.pg_metadata.clone(),
1828            },
1829        }
1830    }
1831
1832    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1833        &self.config
1834    }
1835
1836    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1837        match self.database_by_name.get(database_name) {
1838            Some(id) => Ok(&self.database_by_id[id]),
1839            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1840        }
1841    }
1842
1843    pub fn resolve_schema(
1844        &self,
1845        current_database: Option<&DatabaseId>,
1846        database_name: Option<&str>,
1847        schema_name: &str,
1848        conn_id: &ConnectionId,
1849    ) -> Result<&Schema, SqlCatalogError> {
1850        let database_spec = match database_name {
1851            // If a database is explicitly specified, validate it. Note that we
1852            // intentionally do not validate `current_database` to permit
1853            // querying `mz_catalog` with an invalid session database, e.g., so
1854            // that you can run `SHOW DATABASES` to *find* a valid database.
1855            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1856                self.resolve_database(database)?.id().clone(),
1857            )),
1858            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1859        };
1860
1861        // First try to find the schema in the named database.
1862        if let Some(database_spec) = database_spec {
1863            if let Ok(schema) =
1864                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1865            {
1866                return Ok(schema);
1867            }
1868        }
1869
1870        // Then fall back to the ambient database.
1871        if let Ok(schema) = self.resolve_schema_in_database(
1872            &ResolvedDatabaseSpecifier::Ambient,
1873            schema_name,
1874            conn_id,
1875        ) {
1876            return Ok(schema);
1877        }
1878
1879        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1880    }
1881
1882    /// Optimized lookup for a system schema.
1883    ///
1884    /// Panics if the system schema doesn't exist in the catalog.
1885    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1886        self.ambient_schemas_by_name[name]
1887    }
1888
1889    pub fn resolve_search_path(
1890        &self,
1891        session: &dyn SessionMetadata,
1892    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1893        let database = self
1894            .database_by_name
1895            .get(session.database())
1896            .map(|id| id.clone());
1897
1898        session
1899            .search_path()
1900            .iter()
1901            .map(|schema| {
1902                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1903            })
1904            .filter_map(|schema| schema.ok())
1905            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1906            .collect()
1907    }
1908
1909    pub fn effective_search_path(
1910        &self,
1911        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1912        include_temp_schema: bool,
1913    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1914        let mut v = Vec::with_capacity(search_path.len() + 3);
1915        // Temp schema is only included for relations and data types, not for functions and operators
1916        let temp_schema = (
1917            ResolvedDatabaseSpecifier::Ambient,
1918            SchemaSpecifier::Temporary,
1919        );
1920        if include_temp_schema && !search_path.contains(&temp_schema) {
1921            v.push(temp_schema);
1922        }
1923        let default_schemas = [
1924            (
1925                ResolvedDatabaseSpecifier::Ambient,
1926                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1927            ),
1928            (
1929                ResolvedDatabaseSpecifier::Ambient,
1930                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1931            ),
1932        ];
1933        for schema in default_schemas.into_iter() {
1934            if !search_path.contains(&schema) {
1935                v.push(schema);
1936            }
1937        }
1938        v.extend_from_slice(search_path);
1939        v
1940    }
1941
1942    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1943        let id = self
1944            .clusters_by_name
1945            .get(name)
1946            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1947        Ok(&self.clusters_by_id[id])
1948    }
1949
1950    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1951        let id = self
1952            .clusters_by_name
1953            .get(cluster.name)
1954            .expect("failed to lookup BuiltinCluster by name");
1955        self.clusters_by_id
1956            .get(id)
1957            .expect("failed to lookup BuiltinCluster by ID")
1958    }
1959
1960    pub fn resolve_cluster_replica(
1961        &self,
1962        cluster_replica_name: &QualifiedReplica,
1963    ) -> Result<&ClusterReplica, SqlCatalogError> {
1964        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
1965        let replica_name = cluster_replica_name.replica.as_str();
1966        let replica_id = cluster
1967            .replica_id(replica_name)
1968            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1969        Ok(cluster.replica(replica_id).expect("Must exist"))
1970    }
1971
1972    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
1973    ///
1974    /// If `name` does not specify a database, the `current_database` is used.
1975    /// If `name` does not specify a schema, then the schemas in `search_path`
1976    /// are searched in order.
1977    #[allow(clippy::useless_let_if_seq)]
1978    pub fn resolve(
1979        &self,
1980        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
1981        current_database: Option<&DatabaseId>,
1982        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
1983        name: &PartialItemName,
1984        conn_id: &ConnectionId,
1985        err_gen: fn(String) -> SqlCatalogError,
1986    ) -> Result<&CatalogEntry, SqlCatalogError> {
1987        // If a schema name was specified, just try to find the item in that
1988        // schema. If no schema was specified, try to find the item in the connection's
1989        // temporary schema. If the item is not found, try to find the item in every
1990        // schema in the search path.
1991        let schemas = match &name.schema {
1992            Some(schema_name) => {
1993                match self.resolve_schema(
1994                    current_database,
1995                    name.database.as_deref(),
1996                    schema_name,
1997                    conn_id,
1998                ) {
1999                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2000                    Err(e) => return Err(e),
2001                }
2002            }
2003            None => match self
2004                .get_schema(
2005                    &ResolvedDatabaseSpecifier::Ambient,
2006                    &SchemaSpecifier::Temporary,
2007                    conn_id,
2008                )
2009                .items
2010                .get(&name.item)
2011            {
2012                Some(id) => return Ok(self.get_entry(id)),
2013                None => search_path.to_vec(),
2014            },
2015        };
2016
2017        for (database_spec, schema_spec) in &schemas {
2018            let schema = self.get_schema(database_spec, schema_spec, conn_id);
2019
2020            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2021                return Ok(&self.entry_by_id[id]);
2022            }
2023        }
2024
2025        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2026        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2027        // automatically let uses of the old schema resolve to the new ones as well.
2028        // TODO(database-issues#8173) remove this after sufficient time has passed
2029        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2030        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2031            for schema_id in [
2032                self.get_mz_catalog_unstable_schema_id(),
2033                self.get_mz_introspection_schema_id(),
2034            ] {
2035                let schema = self.get_schema(
2036                    &ResolvedDatabaseSpecifier::Ambient,
2037                    &SchemaSpecifier::Id(schema_id),
2038                    conn_id,
2039                );
2040
2041                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2042                    debug!(
2043                        github_27831 = true,
2044                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2045                    );
2046                    return Ok(&self.entry_by_id[id]);
2047                }
2048            }
2049        }
2050
2051        Err(err_gen(name.to_string()))
2052    }
2053
2054    /// Resolves `name` to a non-function [`CatalogEntry`].
2055    pub fn resolve_entry(
2056        &self,
2057        current_database: Option<&DatabaseId>,
2058        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2059        name: &PartialItemName,
2060        conn_id: &ConnectionId,
2061    ) -> Result<&CatalogEntry, SqlCatalogError> {
2062        self.resolve(
2063            |schema| &schema.items,
2064            current_database,
2065            search_path,
2066            name,
2067            conn_id,
2068            SqlCatalogError::UnknownItem,
2069        )
2070    }
2071
2072    /// Resolves `name` to a function [`CatalogEntry`].
2073    pub fn resolve_function(
2074        &self,
2075        current_database: Option<&DatabaseId>,
2076        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2077        name: &PartialItemName,
2078        conn_id: &ConnectionId,
2079    ) -> Result<&CatalogEntry, SqlCatalogError> {
2080        self.resolve(
2081            |schema| &schema.functions,
2082            current_database,
2083            search_path,
2084            name,
2085            conn_id,
2086            |name| SqlCatalogError::UnknownFunction {
2087                name,
2088                alternative: None,
2089            },
2090        )
2091    }
2092
2093    /// Resolves `name` to a type [`CatalogEntry`].
2094    pub fn resolve_type(
2095        &self,
2096        current_database: Option<&DatabaseId>,
2097        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2098        name: &PartialItemName,
2099        conn_id: &ConnectionId,
2100    ) -> Result<&CatalogEntry, SqlCatalogError> {
2101        static NON_PG_CATALOG_TYPES: LazyLock<
2102            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2103        > = LazyLock::new(|| {
2104            BUILTINS::types()
2105                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2106                .map(|typ| (typ.name, typ))
2107                .collect()
2108        });
2109
2110        let entry = self.resolve(
2111            |schema| &schema.types,
2112            current_database,
2113            search_path,
2114            name,
2115            conn_id,
2116            |name| SqlCatalogError::UnknownType { name },
2117        )?;
2118
2119        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2120            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2121                warn!(
2122                    "user specified an incorrect schema of {} for the type {}, which should be in \
2123                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2124                    PG_CATALOG_SCHEMA.quoted(),
2125                    typ.name.quoted(),
2126                    typ.schema.quoted(),
2127                )
2128            }
2129        }
2130
2131        Ok(entry)
2132    }
2133
2134    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2135    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2136        match object_id {
2137            ObjectId::Item(item_id) => {
2138                let entry = self.get_entry(&item_id);
2139                match entry.item_type() {
2140                    CatalogItemType::Table => CommentObjectId::Table(item_id),
2141                    CatalogItemType::Source => CommentObjectId::Source(item_id),
2142                    CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2143                    CatalogItemType::View => CommentObjectId::View(item_id),
2144                    CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2145                    CatalogItemType::Index => CommentObjectId::Index(item_id),
2146                    CatalogItemType::Func => CommentObjectId::Func(item_id),
2147                    CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2148                    CatalogItemType::Type => CommentObjectId::Type(item_id),
2149                    CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2150                    CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2151                }
2152            }
2153            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2154            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2155            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2156            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2157            ObjectId::ClusterReplica(cluster_replica_id) => {
2158                CommentObjectId::ClusterReplica(cluster_replica_id)
2159            }
2160            ObjectId::NetworkPolicy(network_policy_id) => {
2161                CommentObjectId::NetworkPolicy(network_policy_id)
2162            }
2163        }
2164    }
2165
2166    /// Return current system configuration.
2167    pub fn system_config(&self) -> &SystemVars {
2168        &self.system_configuration
2169    }
2170
2171    /// Return a mutable reference to the current system configuration.
2172    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2173        &mut self.system_configuration
2174    }
2175
2176    /// Serializes the catalog's in-memory state.
2177    ///
2178    /// There are no guarantees about the format of the serialized state, except
2179    /// that the serialized state for two identical catalogs will compare
2180    /// identically.
2181    ///
2182    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2183    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2184    /// argument.
2185    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2186        // Dump the base catalog.
2187        let mut dump = serde_json::to_value(&self).map_err(|e| {
2188            Error::new(ErrorKind::Unstructured(format!(
2189                // Don't panic here because we don't have compile-time failures for maps with
2190                // non-string keys.
2191                "internal error: could not dump catalog: {}",
2192                e
2193            )))
2194        })?;
2195
2196        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2197        // Stitch in system parameter defaults.
2198        dump_obj.insert(
2199            "system_parameter_defaults".into(),
2200            serde_json::json!(self.system_config().defaults()),
2201        );
2202        // Potentially overwrite unfinalized shards.
2203        if let Some(unfinalized_shards) = unfinalized_shards {
2204            dump_obj
2205                .get_mut("storage_metadata")
2206                .expect("known to exist")
2207                .as_object_mut()
2208                .expect("storage_metadata is an object")
2209                .insert(
2210                    "unfinalized_shards".into(),
2211                    serde_json::json!(unfinalized_shards),
2212                );
2213        }
2214        // Remove GlobalIds for temporary objects from the mapping.
2215        //
2216        // Post-test consistency checks with the durable catalog don't know about temporary items
2217        // since they're kept entirely in memory.
2218        let temporary_gids: Vec<_> = self
2219            .entry_by_global_id
2220            .iter()
2221            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2222            .map(|(gid, _item_id)| *gid)
2223            .collect();
2224        if !temporary_gids.is_empty() {
2225            let gids = dump_obj
2226                .get_mut("entry_by_global_id")
2227                .expect("known_to_exist")
2228                .as_object_mut()
2229                .expect("entry_by_global_id is an object");
2230            for gid in temporary_gids {
2231                gids.remove(&gid.to_string());
2232            }
2233        }
2234
2235        // Emit as pretty-printed JSON.
2236        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2237    }
2238
2239    pub fn availability_zones(&self) -> &[String] {
2240        &self.availability_zones
2241    }
2242
2243    pub fn concretize_replica_location(
2244        &self,
2245        location: mz_catalog::durable::ReplicaLocation,
2246        allowed_sizes: &Vec<String>,
2247        allowed_availability_zones: Option<&[String]>,
2248    ) -> Result<ReplicaLocation, Error> {
2249        let location = match location {
2250            mz_catalog::durable::ReplicaLocation::Unmanaged {
2251                storagectl_addrs,
2252                storage_addrs,
2253                computectl_addrs,
2254                compute_addrs,
2255                workers,
2256            } => {
2257                if allowed_availability_zones.is_some() {
2258                    return Err(Error {
2259                        kind: ErrorKind::Internal(
2260                            "tried concretize unmanaged replica with specific availability_zones"
2261                                .to_string(),
2262                        ),
2263                    });
2264                }
2265                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2266                    storagectl_addrs,
2267                    storage_addrs,
2268                    computectl_addrs,
2269                    compute_addrs,
2270                    workers,
2271                })
2272            }
2273            mz_catalog::durable::ReplicaLocation::Managed {
2274                size,
2275                availability_zone,
2276                disk,
2277                billed_as,
2278                internal,
2279                pending,
2280            } => {
2281                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2282                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2283                    return Err(Error {
2284                        kind: ErrorKind::Internal(message.to_string()),
2285                    });
2286                }
2287                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2288                let cluster_replica_sizes = &self.cluster_replica_sizes;
2289
2290                ReplicaLocation::Managed(ManagedReplicaLocation {
2291                    allocation: cluster_replica_sizes
2292                        .0
2293                        .get(&size)
2294                        .expect("catalog out of sync")
2295                        .clone(),
2296                    availability_zones: match (availability_zone, allowed_availability_zones) {
2297                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2298                        (None, Some(azs)) if azs.is_empty() => {
2299                            ManagedReplicaAvailabilityZones::FromCluster(None)
2300                        }
2301                        (None, Some(azs)) => {
2302                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2303                        }
2304                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2305                    },
2306                    size,
2307                    disk,
2308                    billed_as,
2309                    internal,
2310                    pending,
2311                })
2312            }
2313        };
2314        Ok(location)
2315    }
2316
2317    pub(crate) fn ensure_valid_replica_size(
2318        &self,
2319        allowed_sizes: &[String],
2320        size: &String,
2321    ) -> Result<(), Error> {
2322        let cluster_replica_sizes = &self.cluster_replica_sizes;
2323
2324        if !cluster_replica_sizes.0.contains_key(size)
2325            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2326            || cluster_replica_sizes.0[size].disabled
2327        {
2328            let mut entries = cluster_replica_sizes
2329                .enabled_allocations()
2330                .collect::<Vec<_>>();
2331
2332            if !allowed_sizes.is_empty() {
2333                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2334                entries.retain(|(name, _)| allowed_sizes.contains(name));
2335            }
2336
2337            entries.sort_by_key(
2338                |(
2339                    _name,
2340                    ReplicaAllocation {
2341                        scale, cpu_limit, ..
2342                    },
2343                )| (scale, cpu_limit),
2344            );
2345
2346            Err(Error {
2347                kind: ErrorKind::InvalidClusterReplicaSize {
2348                    size: size.to_owned(),
2349                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2350                },
2351            })
2352        } else {
2353            Ok(())
2354        }
2355    }
2356
2357    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2358        if role_id.is_builtin() {
2359            let role = self.get_role(role_id);
2360            Err(Error::new(ErrorKind::ReservedRoleName(
2361                role.name().to_string(),
2362            )))
2363        } else {
2364            Ok(())
2365        }
2366    }
2367
2368    pub fn ensure_not_reserved_network_policy(
2369        &self,
2370        network_policy_id: &NetworkPolicyId,
2371    ) -> Result<(), Error> {
2372        if network_policy_id.is_builtin() {
2373            let policy = self.get_network_policy(network_policy_id);
2374            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2375                policy.name.clone(),
2376            )))
2377        } else {
2378            Ok(())
2379        }
2380    }
2381
2382    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2383        let is_grantable = !role_id.is_public() && !role_id.is_system();
2384        if is_grantable {
2385            Ok(())
2386        } else {
2387            let role = self.get_role(role_id);
2388            Err(Error::new(ErrorKind::UngrantableRoleName(
2389                role.name().to_string(),
2390            )))
2391        }
2392    }
2393
2394    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2395        if role_id.is_system() {
2396            let role = self.get_role(role_id);
2397            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2398                role.name().to_string(),
2399            )))
2400        } else {
2401            Ok(())
2402        }
2403    }
2404
2405    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2406        if role_id.is_predefined() {
2407            let role = self.get_role(role_id);
2408            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2409                role.name().to_string(),
2410            )))
2411        } else {
2412            Ok(())
2413        }
2414    }
2415
2416    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2417    // passing tx, and session?
2418    pub(crate) fn add_to_audit_log(
2419        system_configuration: &SystemVars,
2420        oracle_write_ts: mz_repr::Timestamp,
2421        session: Option<&ConnMeta>,
2422        tx: &mut mz_catalog::durable::Transaction,
2423        audit_events: &mut Vec<VersionedEvent>,
2424        event_type: EventType,
2425        object_type: ObjectType,
2426        details: EventDetails,
2427    ) -> Result<(), Error> {
2428        let user = session.map(|session| session.user().name.to_string());
2429
2430        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2431
2432        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2433            Some(ts) => ts.into(),
2434            _ => oracle_write_ts.into(),
2435        };
2436        let id = tx.allocate_audit_log_id()?;
2437        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2438        audit_events.push(event.clone());
2439        tx.insert_audit_log_event(event);
2440        Ok(())
2441    }
2442
2443    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2444        match id {
2445            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2446            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2447                self.get_cluster_replica(*cluster_id, *replica_id)
2448                    .owner_id(),
2449            ),
2450            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2451            ObjectId::Schema((database_spec, schema_spec)) => Some(
2452                self.get_schema(database_spec, schema_spec, conn_id)
2453                    .owner_id(),
2454            ),
2455            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2456            ObjectId::Role(_) => None,
2457            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2458        }
2459    }
2460
2461    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2462        match object_id {
2463            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2464            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2465            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2466            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2467            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2468            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2469            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2470        }
2471    }
2472
2473    pub(super) fn get_system_object_type(
2474        &self,
2475        id: &SystemObjectId,
2476    ) -> mz_sql::catalog::SystemObjectType {
2477        match id {
2478            SystemObjectId::Object(object_id) => {
2479                SystemObjectType::Object(self.get_object_type(object_id))
2480            }
2481            SystemObjectId::System => SystemObjectType::System,
2482        }
2483    }
2484
2485    /// Returns a read-only view of the current [`StorageMetadata`].
2486    ///
2487    /// To write to this struct, you must use a catalog transaction.
2488    pub fn storage_metadata(&self) -> &StorageMetadata {
2489        &self.storage_metadata
2490    }
2491
2492    /// For the Sources ids in `ids`, return the compaction windows for all `ids` and additional ids
2493    /// that propagate from them. Specifically, if `ids` contains a source, it and all of its
2494    /// source exports will be added to the result.
2495    pub fn source_compaction_windows(
2496        &self,
2497        ids: impl IntoIterator<Item = CatalogItemId>,
2498    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2499        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2500        let mut ids = VecDeque::from_iter(ids);
2501        let mut seen = BTreeSet::new();
2502        while let Some(item_id) = ids.pop_front() {
2503            if !seen.insert(item_id) {
2504                continue;
2505            }
2506            let entry = self.get_entry(&item_id);
2507            match entry.item() {
2508                CatalogItem::Source(source) => {
2509                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2510                    match source.data_source {
2511                        DataSourceDesc::Ingestion { .. } => {
2512                            // For sources, look up each dependent source export and propagate.
2513                            cws.entry(source_cw).or_default().insert(item_id);
2514                            ids.extend(entry.used_by());
2515                        }
2516                        DataSourceDesc::IngestionExport { ingestion_id, .. } => {
2517                            // For subsources, look up the parent source and propagate the compaction
2518                            // window.
2519                            let ingestion = self
2520                                .get_entry(&ingestion_id)
2521                                .source()
2522                                .expect("must be source");
2523                            let cw = ingestion
2524                                .custom_logical_compaction_window
2525                                .unwrap_or(source_cw);
2526                            cws.entry(cw).or_default().insert(item_id);
2527                        }
2528                        DataSourceDesc::Introspection(_)
2529                        | DataSourceDesc::Progress
2530                        | DataSourceDesc::Webhook { .. } => {
2531                            cws.entry(source_cw).or_default().insert(item_id);
2532                        }
2533                    }
2534                }
2535                CatalogItem::Table(table) => match &table.data_source {
2536                    TableDataSource::DataSource {
2537                        desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
2538                        timeline: _,
2539                    } => {
2540                        let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2541                        let ingestion = self
2542                            .get_entry(ingestion_id)
2543                            .source()
2544                            .expect("must be source");
2545                        let cw = ingestion
2546                            .custom_logical_compaction_window
2547                            .unwrap_or(table_cw);
2548                        cws.entry(cw).or_default().insert(item_id);
2549                    }
2550                    _ => {}
2551                },
2552                _ => {
2553                    // Views could depend on sources, so ignore them if added by used_by above.
2554                    continue;
2555                }
2556            }
2557        }
2558        cws
2559    }
2560
2561    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2562        match id {
2563            CommentObjectId::Table(id)
2564            | CommentObjectId::View(id)
2565            | CommentObjectId::MaterializedView(id)
2566            | CommentObjectId::Source(id)
2567            | CommentObjectId::Sink(id)
2568            | CommentObjectId::Index(id)
2569            | CommentObjectId::Func(id)
2570            | CommentObjectId::Connection(id)
2571            | CommentObjectId::Type(id)
2572            | CommentObjectId::Secret(id)
2573            | CommentObjectId::ContinualTask(id) => Some(*id),
2574            CommentObjectId::Role(_)
2575            | CommentObjectId::Database(_)
2576            | CommentObjectId::Schema(_)
2577            | CommentObjectId::Cluster(_)
2578            | CommentObjectId::ClusterReplica(_)
2579            | CommentObjectId::NetworkPolicy(_) => None,
2580        }
2581    }
2582
2583    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2584        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2585    }
2586
2587    pub fn comment_id_to_audit_log_name(
2588        &self,
2589        id: CommentObjectId,
2590        conn_id: &ConnectionId,
2591    ) -> String {
2592        match id {
2593            CommentObjectId::Table(id)
2594            | CommentObjectId::View(id)
2595            | CommentObjectId::MaterializedView(id)
2596            | CommentObjectId::Source(id)
2597            | CommentObjectId::Sink(id)
2598            | CommentObjectId::Index(id)
2599            | CommentObjectId::Func(id)
2600            | CommentObjectId::Connection(id)
2601            | CommentObjectId::Type(id)
2602            | CommentObjectId::Secret(id)
2603            | CommentObjectId::ContinualTask(id) => {
2604                let item = self.get_entry(&id);
2605                let name = self.resolve_full_name(item.name(), Some(conn_id));
2606                name.to_string()
2607            }
2608            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2609            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2610            CommentObjectId::Schema((spec, schema_id)) => {
2611                let schema = self.get_schema(&spec, &schema_id, conn_id);
2612                self.resolve_full_schema_name(&schema.name).to_string()
2613            }
2614            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2615            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2616                let cluster = self.get_cluster(cluster_id);
2617                let replica = self.get_cluster_replica(cluster_id, replica_id);
2618                QualifiedReplica {
2619                    cluster: Ident::new_unchecked(cluster.name.clone()),
2620                    replica: Ident::new_unchecked(replica.name.clone()),
2621                }
2622                .to_string()
2623            }
2624            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2625        }
2626    }
2627}
2628
2629impl ConnectionResolver for CatalogState {
2630    fn resolve_connection(
2631        &self,
2632        id: CatalogItemId,
2633    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2634        use mz_storage_types::connections::Connection::*;
2635        match self
2636            .get_entry(&id)
2637            .connection()
2638            .expect("catalog out of sync")
2639            .details
2640            .to_connection()
2641        {
2642            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2643            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2644            Csr(conn) => Csr(conn.into_inline_connection(self)),
2645            Ssh(conn) => Ssh(conn),
2646            Aws(conn) => Aws(conn),
2647            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2648            MySql(conn) => MySql(conn.into_inline_connection(self)),
2649            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2650        }
2651    }
2652}
2653
2654impl OptimizerCatalog for CatalogState {
2655    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2656        CatalogState::get_entry_by_global_id(self, id)
2657    }
2658    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2659        CatalogState::get_entry(self, id)
2660    }
2661    fn resolve_full_name(
2662        &self,
2663        name: &QualifiedItemName,
2664        conn_id: Option<&ConnectionId>,
2665    ) -> FullItemName {
2666        CatalogState::resolve_full_name(self, name, conn_id)
2667    }
2668    fn get_indexes_on(
2669        &self,
2670        id: GlobalId,
2671        cluster: ClusterId,
2672    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2673        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2674    }
2675}
2676
2677impl OptimizerCatalog for Catalog {
2678    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2679        self.state.get_entry_by_global_id(id)
2680    }
2681
2682    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2683        self.state.get_entry(id)
2684    }
2685
2686    fn resolve_full_name(
2687        &self,
2688        name: &QualifiedItemName,
2689        conn_id: Option<&ConnectionId>,
2690    ) -> FullItemName {
2691        self.state.resolve_full_name(name, conn_id)
2692    }
2693
2694    fn get_indexes_on(
2695        &self,
2696        id: GlobalId,
2697        cluster: ClusterId,
2698    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2699        Box::new(self.state.get_indexes_on(id, cluster))
2700    }
2701}
2702
2703impl Catalog {
2704    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2705        self
2706    }
2707}