mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
61use mz_secrets::InMemorySecretsController;
62use mz_sql::ast::Ident;
63use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68    TypeReference,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
72    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{
76    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
77    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
78    Plan, PlanContext,
79};
80use mz_sql::rbac;
81use mz_sql::session::metadata::SessionMetadata;
82use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
83use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
84use mz_sql_parser::ast::QualifiedReplica;
85use mz_storage_client::controller::StorageMetadata;
86use mz_storage_types::connections::ConnectionContext;
87use mz_storage_types::connections::inline::{
88    ConnectionResolver, InlinedConnection, IntoInlineConnection,
89};
90use serde::Serialize;
91use timely::progress::Antichain;
92use tokio::sync::mpsc;
93use tracing::{debug, warn};
94
95// DO NOT add any more imports from `crate` outside of `crate::catalog`.
96use crate::AdapterError;
97use crate::catalog::{Catalog, ConnCatalog};
98use crate::coord::ConnMeta;
99use crate::optimize::{self, Optimize, OptimizerCatalog};
100use crate::session::Session;
101
102/// The in-memory representation of the Catalog. This struct is not directly used to persist
103/// metadata to persistent storage. For persistent metadata see
104/// [`mz_catalog::durable::DurableCatalogState`].
105///
106/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
107/// storing the contents of this struct on disk.
108#[derive(Debug, Clone, Serialize)]
109pub struct CatalogState {
110    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
111    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
112    // include:
113    //  - Temporary items.
114    //  - Certain objects are partially derived from read-only state.
115    pub(super) database_by_name: BTreeMap<String, DatabaseId>,
116    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
117    pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
118    #[serde(serialize_with = "skip_temp_items")]
119    pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
120    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121    pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
122    pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
125    pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
128    pub(super) roles_by_name: BTreeMap<String, RoleId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) roles_by_id: BTreeMap<RoleId, Role>,
131    pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
134    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
135    pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
136
137    #[serde(skip)]
138    pub(super) system_configuration: SystemVars,
139    pub(super) default_privileges: DefaultPrivileges,
140    pub(super) system_privileges: PrivilegeMap,
141    pub(super) comments: CommentsMap,
142    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
143    pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
144    pub(super) storage_metadata: StorageMetadata,
145
146    // Mutable state not derived from the durable catalog.
147    #[serde(skip)]
148    pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
149
150    // Read-only state not derived from the durable catalog.
151    #[serde(skip)]
152    pub(super) config: mz_sql::catalog::CatalogConfig,
153    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
154    #[serde(skip)]
155    pub(crate) availability_zones: Vec<String>,
156
157    // Read-only not derived from the durable catalog.
158    #[serde(skip)]
159    pub(super) egress_addresses: Vec<IpNet>,
160    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
161    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
162    pub(super) http_host_name: Option<String>,
163
164    // Read-only not derived from the durable catalog.
165    #[serde(skip)]
166    pub(super) license_key: ValidatedLicenseKey,
167}
168
169/// Keeps track of what expressions are cached or not during startup.
170#[derive(Debug, Clone, Serialize)]
171pub(crate) enum LocalExpressionCache {
172    /// The cache is being used.
173    Open {
174        /// The local expressions that were cached in the expression cache.
175        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
176        /// The local expressions that were NOT cached in the expression cache.
177        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
178    },
179    /// The cache is not being used.
180    Closed,
181}
182
183impl LocalExpressionCache {
184    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
185        Self::Open {
186            cached_exprs,
187            uncached_exprs: BTreeMap::new(),
188        }
189    }
190
191    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
192        match self {
193            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
194            LocalExpressionCache::Closed => None,
195        }
196    }
197
198    /// Insert an expression that was cached, back into the cache. This is generally needed when
199    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
200    pub(super) fn insert_cached_expression(
201        &mut self,
202        id: GlobalId,
203        local_expressions: LocalExpressions,
204    ) {
205        match self {
206            LocalExpressionCache::Open { cached_exprs, .. } => {
207                cached_exprs.insert(id, local_expressions);
208            }
209            LocalExpressionCache::Closed => {}
210        }
211    }
212
213    /// Inform the cache that `id` was not found in the cache and that we should add it as
214    /// `local_mir` and `optimizer_features`.
215    pub(super) fn insert_uncached_expression(
216        &mut self,
217        id: GlobalId,
218        local_mir: OptimizedMirRelationExpr,
219        optimizer_features: OptimizerFeatures,
220    ) {
221        match self {
222            LocalExpressionCache::Open { uncached_exprs, .. } => {
223                let local_expr = LocalExpressions {
224                    local_mir,
225                    optimizer_features,
226                };
227                // If we are trying to cache the same item a second time, with a different
228                // expression, then we must be migrating the object or doing something else weird.
229                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
230                // version after a restart. Caching the migrated version may cause us to incorrectly
231                // think that the object has already been migrated. To simplify things, we cache
232                // neither.
233                let prev = uncached_exprs.remove(&id);
234                match prev {
235                    Some(prev) if prev == local_expr => {
236                        uncached_exprs.insert(id, local_expr);
237                    }
238                    None => {
239                        uncached_exprs.insert(id, local_expr);
240                    }
241                    Some(_) => {}
242                }
243            }
244            LocalExpressionCache::Closed => {}
245        }
246    }
247
248    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
249        match self {
250            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
251            LocalExpressionCache::Closed => BTreeMap::new(),
252        }
253    }
254}
255
256fn skip_temp_items<S>(
257    entries: &BTreeMap<CatalogItemId, CatalogEntry>,
258    serializer: S,
259) -> Result<S::Ok, S::Error>
260where
261    S: serde::Serializer,
262{
263    mz_ore::serde::map_key_to_string(
264        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
265        serializer,
266    )
267}
268
269impl CatalogState {
270    /// Returns an empty [`CatalogState`] that can be used in tests.
271    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
272    // tests are structured in this repository.
273    pub fn empty_test() -> Self {
274        CatalogState {
275            database_by_name: Default::default(),
276            database_by_id: Default::default(),
277            entry_by_id: Default::default(),
278            entry_by_global_id: Default::default(),
279            ambient_schemas_by_name: Default::default(),
280            ambient_schemas_by_id: Default::default(),
281            temporary_schemas: Default::default(),
282            clusters_by_id: Default::default(),
283            clusters_by_name: Default::default(),
284            network_policies_by_name: Default::default(),
285            roles_by_name: Default::default(),
286            roles_by_id: Default::default(),
287            network_policies_by_id: Default::default(),
288            role_auth_by_id: Default::default(),
289            config: CatalogConfig {
290                start_time: Default::default(),
291                start_instant: Instant::now(),
292                nonce: Default::default(),
293                environment_id: EnvironmentId::for_tests(),
294                session_id: Default::default(),
295                build_info: &DUMMY_BUILD_INFO,
296                timestamp_interval: Default::default(),
297                now: NOW_ZERO.clone(),
298                connection_context: ConnectionContext::for_tests(Arc::new(
299                    InMemorySecretsController::new(),
300                )),
301                builtins_cfg: BuiltinsConfig {
302                    include_continual_tasks: true,
303                },
304                helm_chart_version: None,
305            },
306            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
307            availability_zones: Default::default(),
308            system_configuration: Default::default(),
309            egress_addresses: Default::default(),
310            aws_principal_context: Default::default(),
311            aws_privatelink_availability_zones: Default::default(),
312            http_host_name: Default::default(),
313            default_privileges: Default::default(),
314            system_privileges: Default::default(),
315            comments: Default::default(),
316            source_references: Default::default(),
317            storage_metadata: Default::default(),
318            license_key: ValidatedLicenseKey::for_tests(),
319        }
320    }
321
322    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
323        let search_path = self.resolve_search_path(session);
324        let database = self
325            .database_by_name
326            .get(session.vars().database())
327            .map(|id| id.clone());
328        let state = match session.transaction().catalog_state() {
329            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
330            None => Cow::Borrowed(self),
331        };
332        ConnCatalog {
333            state,
334            unresolvable_ids: BTreeSet::new(),
335            conn_id: session.conn_id().clone(),
336            cluster: session.vars().cluster().into(),
337            database,
338            search_path,
339            role_id: session.current_role_id().clone(),
340            prepared_statements: Some(session.prepared_statements()),
341            notices_tx: session.retain_notice_transmitter(),
342        }
343    }
344
345    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
346        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
347        let cluster = self.system_configuration.default_cluster();
348
349        ConnCatalog {
350            state: Cow::Borrowed(self),
351            unresolvable_ids: BTreeSet::new(),
352            conn_id: SYSTEM_CONN_ID.clone(),
353            cluster,
354            database: self
355                .resolve_database(DEFAULT_DATABASE_NAME)
356                .ok()
357                .map(|db| db.id()),
358            // Leaving the system's search path empty allows us to catch issues
359            // where catalog object names have not been normalized correctly.
360            search_path: Vec::new(),
361            role_id,
362            prepared_statements: None,
363            notices_tx,
364        }
365    }
366
367    pub fn for_system_session(&self) -> ConnCatalog<'_> {
368        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
369    }
370
371    /// Returns an iterator over the deduplicated identifiers of all
372    /// objects this catalog entry transitively depends on (where
373    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
374    /// [`CatalogItem::references`]).
375    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
376        struct I<'a> {
377            queue: VecDeque<CatalogItemId>,
378            seen: BTreeSet<CatalogItemId>,
379            this: &'a CatalogState,
380        }
381        impl<'a> Iterator for I<'a> {
382            type Item = CatalogItemId;
383            fn next(&mut self) -> Option<Self::Item> {
384                if let Some(next) = self.queue.pop_front() {
385                    for child in self.this.get_entry(&next).item().uses() {
386                        if !self.seen.contains(&child) {
387                            self.queue.push_back(child);
388                            self.seen.insert(child);
389                        }
390                    }
391                    Some(next)
392                } else {
393                    None
394                }
395            }
396        }
397
398        I {
399            queue: [id].into_iter().collect(),
400            seen: [id].into_iter().collect(),
401            this: self,
402        }
403    }
404
405    /// Computes the IDs of any log sources this catalog entry transitively
406    /// depends on.
407    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
408        let mut out = Vec::new();
409        self.introspection_dependencies_inner(id, &mut out);
410        out
411    }
412
413    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
414        match self.get_entry(&id).item() {
415            CatalogItem::Log(_) => out.push(id),
416            item @ (CatalogItem::View(_)
417            | CatalogItem::MaterializedView(_)
418            | CatalogItem::Connection(_)
419            | CatalogItem::ContinualTask(_)) => {
420                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
421                for item_id in item.references().items() {
422                    self.introspection_dependencies_inner(*item_id, out);
423                }
424            }
425            CatalogItem::Sink(sink) => {
426                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
427                self.introspection_dependencies_inner(from_item_id, out)
428            }
429            CatalogItem::Index(idx) => {
430                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
431                self.introspection_dependencies_inner(on_item_id, out)
432            }
433            CatalogItem::Table(_)
434            | CatalogItem::Source(_)
435            | CatalogItem::Type(_)
436            | CatalogItem::Func(_)
437            | CatalogItem::Secret(_) => (),
438        }
439    }
440
441    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
442    ///
443    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
444    /// earlier in the list than the roots. This is particularly useful for the order to drop
445    /// objects.
446    pub(super) fn object_dependents(
447        &self,
448        object_ids: &Vec<ObjectId>,
449        conn_id: &ConnectionId,
450        seen: &mut BTreeSet<ObjectId>,
451    ) -> Vec<ObjectId> {
452        let mut dependents = Vec::new();
453        for object_id in object_ids {
454            match object_id {
455                ObjectId::Cluster(id) => {
456                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
457                }
458                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
459                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
460                ),
461                ObjectId::Database(id) => {
462                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
463                }
464                ObjectId::Schema((database_spec, schema_spec)) => {
465                    dependents.extend_from_slice(&self.schema_dependents(
466                        database_spec.clone(),
467                        schema_spec.clone(),
468                        conn_id,
469                        seen,
470                    ));
471                }
472                ObjectId::NetworkPolicy(id) => {
473                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
474                }
475                id @ ObjectId::Role(_) => {
476                    let unseen = seen.insert(id.clone());
477                    if unseen {
478                        dependents.push(id.clone());
479                    }
480                }
481                ObjectId::Item(id) => {
482                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
483                }
484            }
485        }
486        dependents
487    }
488
489    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
490    /// itself.
491    ///
492    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
493    /// earlier in the list than the roots. This is particularly useful for the order to drop
494    /// objects.
495    fn cluster_dependents(
496        &self,
497        cluster_id: ClusterId,
498        seen: &mut BTreeSet<ObjectId>,
499    ) -> Vec<ObjectId> {
500        let mut dependents = Vec::new();
501        let object_id = ObjectId::Cluster(cluster_id);
502        if !seen.contains(&object_id) {
503            seen.insert(object_id.clone());
504            let cluster = self.get_cluster(cluster_id);
505            for item_id in cluster.bound_objects() {
506                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
507            }
508            for replica_id in cluster.replica_ids().values() {
509                dependents.extend_from_slice(&self.cluster_replica_dependents(
510                    cluster_id,
511                    *replica_id,
512                    seen,
513                ));
514            }
515            dependents.push(object_id);
516        }
517        dependents
518    }
519
520    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
521    /// itself.
522    ///
523    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
524    /// earlier in the list than the roots. This is particularly useful for the order to drop
525    /// objects.
526    pub(super) fn cluster_replica_dependents(
527        &self,
528        cluster_id: ClusterId,
529        replica_id: ReplicaId,
530        seen: &mut BTreeSet<ObjectId>,
531    ) -> Vec<ObjectId> {
532        let mut dependents = Vec::new();
533        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
534        if !seen.contains(&object_id) {
535            seen.insert(object_id.clone());
536            dependents.push(object_id);
537        }
538        dependents
539    }
540
541    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
542    /// itself.
543    ///
544    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
545    /// earlier in the list than the roots. This is particularly useful for the order to drop
546    /// objects.
547    fn database_dependents(
548        &self,
549        database_id: DatabaseId,
550        conn_id: &ConnectionId,
551        seen: &mut BTreeSet<ObjectId>,
552    ) -> Vec<ObjectId> {
553        let mut dependents = Vec::new();
554        let object_id = ObjectId::Database(database_id);
555        if !seen.contains(&object_id) {
556            seen.insert(object_id.clone());
557            let database = self.get_database(&database_id);
558            for schema_id in database.schema_ids().values() {
559                dependents.extend_from_slice(&self.schema_dependents(
560                    ResolvedDatabaseSpecifier::Id(database_id),
561                    SchemaSpecifier::Id(*schema_id),
562                    conn_id,
563                    seen,
564                ));
565            }
566            dependents.push(object_id);
567        }
568        dependents
569    }
570
571    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
572    /// itself.
573    ///
574    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
575    /// earlier in the list than the roots. This is particularly useful for the order to drop
576    /// objects.
577    fn schema_dependents(
578        &self,
579        database_spec: ResolvedDatabaseSpecifier,
580        schema_spec: SchemaSpecifier,
581        conn_id: &ConnectionId,
582        seen: &mut BTreeSet<ObjectId>,
583    ) -> Vec<ObjectId> {
584        let mut dependents = Vec::new();
585        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
586        if !seen.contains(&object_id) {
587            seen.insert(object_id.clone());
588            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
589            for item_id in schema.item_ids() {
590                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
591            }
592            dependents.push(object_id)
593        }
594        dependents
595    }
596
597    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
598    /// itself.
599    ///
600    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
601    /// earlier in the list than the roots. This is particularly useful for the order to drop
602    /// objects.
603    pub(super) fn item_dependents(
604        &self,
605        item_id: CatalogItemId,
606        seen: &mut BTreeSet<ObjectId>,
607    ) -> Vec<ObjectId> {
608        let mut dependents = Vec::new();
609        let object_id = ObjectId::Item(item_id);
610        if !seen.contains(&object_id) {
611            seen.insert(object_id.clone());
612            let entry = self.get_entry(&item_id);
613            for dependent_id in entry.used_by() {
614                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
615            }
616            dependents.push(object_id);
617            // We treat the progress collection as if it depends on the source
618            // for dropping. We have additional code in planning to create a
619            // kind of special-case "CASCADE" for this dependency.
620            if let Some(progress_id) = entry.progress_id() {
621                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
622            }
623        }
624        dependents
625    }
626
627    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
628    /// itself.
629    ///
630    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
631    /// earlier in the list than the roots. This is particularly useful for the order to drop
632    /// objects.
633    pub(super) fn network_policy_dependents(
634        &self,
635        network_policy_id: NetworkPolicyId,
636        _seen: &mut BTreeSet<ObjectId>,
637    ) -> Vec<ObjectId> {
638        let object_id = ObjectId::NetworkPolicy(network_policy_id);
639        // Currently network policies have no dependents
640        // when we add the ability for users or sources/sinks to have policies
641        // this method will need to be updated.
642        vec![object_id]
643    }
644
645    /// Indicates whether the indicated item is considered stable or not.
646    ///
647    /// Only stable items can be used as dependencies of other catalog items.
648    fn is_stable(&self, id: CatalogItemId) -> bool {
649        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
650        !self.is_unstable_schema_specifier(spec)
651    }
652
653    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
654        if self.system_config().unsafe_enable_unstable_dependencies() {
655            return Ok(());
656        }
657
658        let unstable_dependencies: Vec<_> = item
659            .references()
660            .items()
661            .filter(|id| !self.is_stable(**id))
662            .map(|id| self.get_entry(id).name().item.clone())
663            .collect();
664
665        // It's okay to create a temporary object with unstable
666        // dependencies, since we will never need to reboot a catalog
667        // that contains it.
668        if unstable_dependencies.is_empty() || item.is_temporary() {
669            Ok(())
670        } else {
671            let object_type = item.typ().to_string();
672            Err(Error {
673                kind: ErrorKind::UnstableDependency {
674                    object_type,
675                    unstable_dependencies,
676                },
677            })
678        }
679    }
680
681    pub fn resolve_full_name(
682        &self,
683        name: &QualifiedItemName,
684        conn_id: Option<&ConnectionId>,
685    ) -> FullItemName {
686        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
687
688        let database = match &name.qualifiers.database_spec {
689            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
690            ResolvedDatabaseSpecifier::Id(id) => {
691                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
692            }
693        };
694        let schema = self
695            .get_schema(
696                &name.qualifiers.database_spec,
697                &name.qualifiers.schema_spec,
698                conn_id,
699            )
700            .name()
701            .schema
702            .clone();
703        FullItemName {
704            database,
705            schema,
706            item: name.item.clone(),
707        }
708    }
709
710    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
711        let database = match &name.database {
712            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
713            ResolvedDatabaseSpecifier::Id(id) => {
714                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
715            }
716        };
717        FullSchemaName {
718            database,
719            schema: name.schema.clone(),
720        }
721    }
722
723    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
724        self.entry_by_id
725            .get(id)
726            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
727    }
728
729    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
730        let item_id = self
731            .entry_by_global_id
732            .get(id)
733            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
734
735        let entry = self.get_entry(item_id).clone();
736        let version = match entry.item() {
737            CatalogItem::Table(table) => {
738                let (version, _) = table
739                    .collections
740                    .iter()
741                    .find(|(_verison, gid)| *gid == id)
742                    .expect("version to exist");
743                RelationVersionSelector::Specific(*version)
744            }
745            _ => RelationVersionSelector::Latest,
746        };
747        CatalogCollectionEntry { entry, version }
748    }
749
750    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
751        self.entry_by_id.iter()
752    }
753
754    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
755        let schema = self
756            .temporary_schemas
757            .get(conn)
758            .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
759        schema.items.values().copied().map(ObjectId::from)
760    }
761
762    /// Gets a type named `name` from exactly one of the system schemas.
763    ///
764    /// # Panics
765    /// - If `name` is not an entry in any system schema
766    /// - If more than one system schema has an entry named `name`.
767    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
768        let mut res = None;
769        for schema_id in self.system_schema_ids() {
770            let schema = &self.ambient_schemas_by_id[&schema_id];
771            if let Some(global_id) = schema.types.get(name) {
772                match res {
773                    None => res = Some(self.get_entry(global_id)),
774                    Some(_) => panic!(
775                        "only call get_system_type on objects uniquely identifiable in one system schema"
776                    ),
777                }
778            }
779        }
780
781        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
782    }
783
784    pub fn get_item_by_name(
785        &self,
786        name: &QualifiedItemName,
787        conn_id: &ConnectionId,
788    ) -> Option<&CatalogEntry> {
789        self.get_schema(
790            &name.qualifiers.database_spec,
791            &name.qualifiers.schema_spec,
792            conn_id,
793        )
794        .items
795        .get(&name.item)
796        .and_then(|id| self.try_get_entry(id))
797    }
798
799    pub fn get_type_by_name(
800        &self,
801        name: &QualifiedItemName,
802        conn_id: &ConnectionId,
803    ) -> Option<&CatalogEntry> {
804        self.get_schema(
805            &name.qualifiers.database_spec,
806            &name.qualifiers.schema_spec,
807            conn_id,
808        )
809        .types
810        .get(&name.item)
811        .and_then(|id| self.try_get_entry(id))
812    }
813
814    pub(super) fn find_available_name(
815        &self,
816        mut name: QualifiedItemName,
817        conn_id: &ConnectionId,
818    ) -> QualifiedItemName {
819        let mut i = 0;
820        let orig_item_name = name.item.clone();
821        while self.get_item_by_name(&name, conn_id).is_some() {
822            i += 1;
823            name.item = format!("{}{}", orig_item_name, i);
824        }
825        name
826    }
827
828    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
829        self.entry_by_id.get(id)
830    }
831
832    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
833        let item_id = self.entry_by_global_id.get(id)?;
834        self.try_get_entry(item_id)
835    }
836
837    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
838    /// object that returns rows.
839    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
840        let entry = self.try_get_entry_by_global_id(id)?;
841        let desc = match entry.item() {
842            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
843            // TODO(alter_table): Support schema evolution on sources.
844            other => other.desc_opt(RelationVersionSelector::Latest)?,
845        };
846        Some(desc)
847    }
848
849    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
850        self.try_get_cluster(cluster_id)
851            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
852    }
853
854    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
855        self.clusters_by_id.get(&cluster_id)
856    }
857
858    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
859        self.roles_by_id.get(id)
860    }
861
862    pub fn get_role(&self, id: &RoleId) -> &Role {
863        self.roles_by_id.get(id).expect("catalog out of sync")
864    }
865
866    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
867        self.roles_by_id.keys()
868    }
869
870    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
871        self.roles_by_name
872            .get(role_name)
873            .map(|id| &self.roles_by_id[id])
874    }
875
876    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
877        self.role_auth_by_id.get(id)
878    }
879
880    pub(super) fn try_get_network_policy_by_name(
881        &self,
882        policy_name: &str,
883    ) -> Option<&NetworkPolicy> {
884        self.network_policies_by_name
885            .get(policy_name)
886            .map(|id| &self.network_policies_by_id[id])
887    }
888
889    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
890        let mut membership = BTreeSet::new();
891        let mut queue = VecDeque::from(vec![id]);
892        while let Some(cur_id) = queue.pop_front() {
893            if !membership.contains(cur_id) {
894                membership.insert(cur_id.clone());
895                let role = self.get_role(cur_id);
896                soft_assert_no_log!(
897                    !role.membership().keys().contains(id),
898                    "circular membership exists in the catalog"
899                );
900                queue.extend(role.membership().keys());
901            }
902        }
903        membership.insert(RoleId::Public);
904        membership
905    }
906
907    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
908        self.network_policies_by_id
909            .get(id)
910            .expect("catalog out of sync")
911    }
912
913    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
914        self.network_policies_by_id.keys()
915    }
916
917    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
918    /// source.
919    ///
920    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
921    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
922        let entry = self.try_get_entry(id)?;
923        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
924        let name = self.resolve_full_name(entry.name(), None);
925        let host_name = self
926            .http_host_name
927            .as_ref()
928            .map(|x| x.as_str())
929            .unwrap_or_else(|| "HOST");
930
931        let RawDatabaseSpecifier::Name(database) = name.database else {
932            return None;
933        };
934
935        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
936        url.path_segments_mut()
937            .ok()?
938            .push(&database)
939            .push(&name.schema)
940            .push(&name.item);
941
942        Some(url)
943    }
944
945    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
946    ///
947    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
948    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
949    ///
950    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
951    /// their original state before the method returns.
952    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
953        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
954        // caller if this method changed the state of the catalog.
955        &mut self,
956        create_sql: &str,
957        force_if_exists_skip: bool,
958    ) -> Result<(Plan, ResolvedIds), AdapterError> {
959        self.with_enable_for_item_parsing(|state| {
960            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
961            let pcx = Some(&pcx);
962            let session_catalog = state.for_system_session();
963
964            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
965            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
966            let plan =
967                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
968
969            Ok((plan, resolved_ids))
970        })
971    }
972
973    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
974    #[mz_ore::instrument]
975    pub(crate) fn parse_plan(
976        create_sql: &str,
977        pcx: Option<&PlanContext>,
978        catalog: &ConnCatalog,
979    ) -> Result<(Plan, ResolvedIds), AdapterError> {
980        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
981        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
982        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
983
984        return Ok((plan, resolved_ids));
985    }
986
987    /// Parses the given SQL string into a pair of [`CatalogItem`].
988    pub(crate) fn deserialize_item(
989        &self,
990        global_id: GlobalId,
991        create_sql: &str,
992        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
993        local_expression_cache: &mut LocalExpressionCache,
994        previous_item: Option<CatalogItem>,
995    ) -> Result<CatalogItem, AdapterError> {
996        self.parse_item(
997            global_id,
998            create_sql,
999            extra_versions,
1000            None,
1001            false,
1002            None,
1003            local_expression_cache,
1004            previous_item,
1005        )
1006    }
1007
1008    /// Parses the given SQL string into a `CatalogItem`.
1009    #[mz_ore::instrument]
1010    pub(crate) fn parse_item(
1011        &self,
1012        global_id: GlobalId,
1013        create_sql: &str,
1014        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1015        pcx: Option<&PlanContext>,
1016        is_retained_metrics_object: bool,
1017        custom_logical_compaction_window: Option<CompactionWindow>,
1018        local_expression_cache: &mut LocalExpressionCache,
1019        previous_item: Option<CatalogItem>,
1020    ) -> Result<CatalogItem, AdapterError> {
1021        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1022        match self.parse_item_inner(
1023            global_id,
1024            create_sql,
1025            extra_versions,
1026            pcx,
1027            is_retained_metrics_object,
1028            custom_logical_compaction_window,
1029            cached_expr,
1030            previous_item,
1031        ) {
1032            Ok((item, uncached_expr)) => {
1033                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1034                    local_expression_cache.insert_uncached_expression(
1035                        global_id,
1036                        uncached_expr,
1037                        optimizer_features,
1038                    );
1039                }
1040                Ok(item)
1041            }
1042            Err((err, cached_expr)) => {
1043                if let Some(local_expr) = cached_expr {
1044                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1045                }
1046                Err(err)
1047            }
1048        }
1049    }
1050
1051    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1052    ///
1053    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1054    /// not cached.
1055    ///
1056    /// On failure returns an error and `cached_expr` so it can be used later.
1057    #[mz_ore::instrument]
1058    pub(crate) fn parse_item_inner(
1059        &self,
1060        global_id: GlobalId,
1061        create_sql: &str,
1062        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1063        pcx: Option<&PlanContext>,
1064        is_retained_metrics_object: bool,
1065        custom_logical_compaction_window: Option<CompactionWindow>,
1066        cached_expr: Option<LocalExpressions>,
1067        previous_item: Option<CatalogItem>,
1068    ) -> Result<
1069        (
1070            CatalogItem,
1071            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1072        ),
1073        (AdapterError, Option<LocalExpressions>),
1074    > {
1075        let session_catalog = self.for_system_session();
1076
1077        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1078            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1079            Err(err) => return Err((err, cached_expr)),
1080        };
1081
1082        let mut uncached_expr = None;
1083
1084        let item = match plan {
1085            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1086                let collections = extra_versions
1087                    .iter()
1088                    .map(|(version, gid)| (*version, *gid))
1089                    .chain([(RelationVersion::root(), global_id)].into_iter())
1090                    .collect();
1091
1092                CatalogItem::Table(Table {
1093                    create_sql: Some(table.create_sql),
1094                    desc: table.desc,
1095                    collections,
1096                    conn_id: None,
1097                    resolved_ids,
1098                    custom_logical_compaction_window: custom_logical_compaction_window
1099                        .or(table.compaction_window),
1100                    is_retained_metrics_object,
1101                    data_source: match table.data_source {
1102                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1103                            TableDataSource::TableWrites { defaults }
1104                        }
1105                        mz_sql::plan::TableDataSource::DataSource {
1106                            desc: data_source_desc,
1107                            timeline,
1108                        } => match data_source_desc {
1109                            mz_sql::plan::DataSourceDesc::IngestionExport {
1110                                ingestion_id,
1111                                external_reference,
1112                                details,
1113                                data_config,
1114                            } => TableDataSource::DataSource {
1115                                desc: DataSourceDesc::IngestionExport {
1116                                    ingestion_id,
1117                                    external_reference,
1118                                    details,
1119                                    data_config,
1120                                },
1121                                timeline,
1122                            },
1123                            mz_sql::plan::DataSourceDesc::Webhook {
1124                                validate_using,
1125                                body_format,
1126                                headers,
1127                                cluster_id,
1128                            } => TableDataSource::DataSource {
1129                                desc: DataSourceDesc::Webhook {
1130                                    validate_using,
1131                                    body_format,
1132                                    headers,
1133                                    cluster_id: cluster_id
1134                                        .expect("Webhook Tables must have a cluster_id set"),
1135                                },
1136                                timeline,
1137                            },
1138                            _ => {
1139                                return Err((
1140                                    AdapterError::Unstructured(anyhow::anyhow!(
1141                                        "unsupported data source for table"
1142                                    )),
1143                                    cached_expr,
1144                                ));
1145                            }
1146                        },
1147                    },
1148                })
1149            }
1150            Plan::CreateSource(CreateSourcePlan {
1151                source,
1152                timeline,
1153                in_cluster,
1154                ..
1155            }) => CatalogItem::Source(Source {
1156                create_sql: Some(source.create_sql),
1157                data_source: match source.data_source {
1158                    mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1159                        DataSourceDesc::Ingestion {
1160                            ingestion_desc,
1161                            cluster_id: match in_cluster {
1162                                Some(id) => id,
1163                                None => {
1164                                    return Err((
1165                                        AdapterError::Unstructured(anyhow::anyhow!(
1166                                            "ingestion-based sources must have cluster specified"
1167                                        )),
1168                                        cached_expr,
1169                                    ));
1170                                }
1171                            },
1172                        }
1173                    }
1174                    mz_sql::plan::DataSourceDesc::IngestionExport {
1175                        ingestion_id,
1176                        external_reference,
1177                        details,
1178                        data_config,
1179                    } => DataSourceDesc::IngestionExport {
1180                        ingestion_id,
1181                        external_reference,
1182                        details,
1183                        data_config,
1184                    },
1185                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1186                    mz_sql::plan::DataSourceDesc::Webhook {
1187                        validate_using,
1188                        body_format,
1189                        headers,
1190                        cluster_id,
1191                    } => {
1192                        mz_ore::soft_assert_or_log!(
1193                            cluster_id.is_none(),
1194                            "cluster_id set at Source level for Webhooks"
1195                        );
1196                        DataSourceDesc::Webhook {
1197                            validate_using,
1198                            body_format,
1199                            headers,
1200                            cluster_id: in_cluster
1201                                .expect("webhook sources must use an existing cluster"),
1202                        }
1203                    }
1204                },
1205                desc: source.desc,
1206                global_id,
1207                timeline,
1208                resolved_ids,
1209                custom_logical_compaction_window: source
1210                    .compaction_window
1211                    .or(custom_logical_compaction_window),
1212                is_retained_metrics_object,
1213            }),
1214            Plan::CreateView(CreateViewPlan { view, .. }) => {
1215                // Collect optimizer parameters.
1216                let optimizer_config =
1217                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1218                let previous_exprs = previous_item.map(|item| match item {
1219                    CatalogItem::View(view) => (view.raw_expr, view.optimized_expr),
1220                    item => unreachable!("expected view, found: {item:#?}"),
1221                });
1222
1223                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1224                    (Some(local_expr), _)
1225                        if local_expr.optimizer_features == optimizer_config.features =>
1226                    {
1227                        debug!("local expression cache hit for {global_id:?}");
1228                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1229                    }
1230                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1231                    (_, Some((raw_expr, optimized_expr))) if *raw_expr == view.expr => {
1232                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1233                    }
1234                    (cached_expr, _) => {
1235                        let optimizer_features = optimizer_config.features.clone();
1236                        // Build an optimizer for this VIEW.
1237                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1238
1239                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1240                        let raw_expr = view.expr;
1241                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1242                            Ok(optimzed_expr) => optimzed_expr,
1243                            Err(err) => return Err((err.into(), cached_expr)),
1244                        };
1245
1246                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1247
1248                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1249                    }
1250                };
1251
1252                // Resolve all item dependencies from the HIR expression.
1253                let dependencies: BTreeSet<_> = raw_expr
1254                    .depends_on()
1255                    .into_iter()
1256                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1257                    .collect();
1258
1259                CatalogItem::View(View {
1260                    create_sql: view.create_sql,
1261                    global_id,
1262                    raw_expr,
1263                    desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1264                    optimized_expr,
1265                    conn_id: None,
1266                    resolved_ids,
1267                    dependencies: DependencyIds(dependencies),
1268                })
1269            }
1270            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1271                materialized_view, ..
1272            }) => {
1273                // Collect optimizer parameters.
1274                let optimizer_config =
1275                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1276                let previous_exprs = previous_item.map(|item| match item {
1277                    CatalogItem::MaterializedView(materialized_view) => {
1278                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1279                    }
1280                    item => unreachable!("expected materialized view, found: {item:#?}"),
1281                });
1282
1283                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1284                    (Some(local_expr), _)
1285                        if local_expr.optimizer_features == optimizer_config.features =>
1286                    {
1287                        debug!("local expression cache hit for {global_id:?}");
1288                        (
1289                            Arc::new(materialized_view.expr),
1290                            Arc::new(local_expr.local_mir),
1291                        )
1292                    }
1293                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1294                    (_, Some((raw_expr, optimized_expr)))
1295                        if *raw_expr == materialized_view.expr =>
1296                    {
1297                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1298                    }
1299                    (cached_expr, _) => {
1300                        let optimizer_features = optimizer_config.features.clone();
1301                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1302                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1303
1304                        let raw_expr = materialized_view.expr;
1305                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1306                            Ok(optimized_expr) => optimized_expr,
1307                            Err(err) => return Err((err.into(), cached_expr)),
1308                        };
1309
1310                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1311
1312                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1313                    }
1314                };
1315                let mut typ = optimized_expr.typ();
1316                for &i in &materialized_view.non_null_assertions {
1317                    typ.column_types[i].nullable = false;
1318                }
1319                let desc = RelationDesc::new(typ, materialized_view.column_names);
1320
1321                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1322
1323                // Resolve all item dependencies from the HIR expression.
1324                let dependencies = raw_expr
1325                    .depends_on()
1326                    .into_iter()
1327                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1328                    .collect();
1329
1330                CatalogItem::MaterializedView(MaterializedView {
1331                    create_sql: materialized_view.create_sql,
1332                    global_id,
1333                    raw_expr,
1334                    optimized_expr,
1335                    desc,
1336                    resolved_ids,
1337                    dependencies,
1338                    cluster_id: materialized_view.cluster_id,
1339                    non_null_assertions: materialized_view.non_null_assertions,
1340                    custom_logical_compaction_window: materialized_view.compaction_window,
1341                    refresh_schedule: materialized_view.refresh_schedule,
1342                    initial_as_of,
1343                })
1344            }
1345            Plan::CreateContinualTask(plan) => {
1346                let ct =
1347                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1348                        Ok(ct) => ct,
1349                        Err(err) => return Err((err, cached_expr)),
1350                    };
1351                CatalogItem::ContinualTask(ct)
1352            }
1353            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1354                create_sql: index.create_sql,
1355                global_id,
1356                on: index.on,
1357                keys: index.keys.into(),
1358                conn_id: None,
1359                resolved_ids,
1360                cluster_id: index.cluster_id,
1361                custom_logical_compaction_window: custom_logical_compaction_window
1362                    .or(index.compaction_window),
1363                is_retained_metrics_object,
1364            }),
1365            Plan::CreateSink(CreateSinkPlan {
1366                sink,
1367                with_snapshot,
1368                in_cluster,
1369                ..
1370            }) => CatalogItem::Sink(Sink {
1371                create_sql: sink.create_sql,
1372                global_id,
1373                from: sink.from,
1374                connection: sink.connection,
1375                envelope: sink.envelope,
1376                version: sink.version,
1377                with_snapshot,
1378                resolved_ids,
1379                cluster_id: in_cluster,
1380            }),
1381            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1382                let desc = match typ.inner.desc(&session_catalog) {
1383                    Ok(desc) => desc,
1384                    Err(err) => return Err((err.into(), cached_expr)),
1385                };
1386                CatalogItem::Type(Type {
1387                    create_sql: Some(typ.create_sql),
1388                    global_id,
1389                    desc,
1390                    details: CatalogTypeDetails {
1391                        array_id: None,
1392                        typ: typ.inner,
1393                        pg_metadata: None,
1394                    },
1395                    resolved_ids,
1396                })
1397            }
1398            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1399                create_sql: secret.create_sql,
1400                global_id,
1401            }),
1402            Plan::CreateConnection(CreateConnectionPlan {
1403                connection:
1404                    mz_sql::plan::Connection {
1405                        create_sql,
1406                        details,
1407                    },
1408                ..
1409            }) => CatalogItem::Connection(Connection {
1410                create_sql,
1411                global_id,
1412                details,
1413                resolved_ids,
1414            }),
1415            _ => {
1416                return Err((
1417                    Error::new(ErrorKind::Corruption {
1418                        detail: "catalog entry generated inappropriate plan".to_string(),
1419                    })
1420                    .into(),
1421                    cached_expr,
1422                ));
1423            }
1424        };
1425
1426        Ok((item, uncached_expr))
1427    }
1428
1429    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1430    /// Calling this method will not permanently modify any system configuration variables.
1431    ///
1432    /// WARNING:
1433    /// Any modifications made to the system configuration variables in `f`, will be lost.
1434    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1435        // Enable catalog features that might be required during planning existing
1436        // catalog items. Existing catalog items might have been created while
1437        // a specific feature flag was turned on, so we need to ensure that this
1438        // is also the case during catalog rehydration in order to avoid panics.
1439        //
1440        // WARNING / CONTRACT:
1441        // 1. Features used in this method that related to parsing / planning
1442        //    should be `enable_for_item_parsing` set to `true`.
1443        // 2. After this step, feature flag configuration must not be
1444        //    overridden.
1445        let restore = self.system_configuration.clone();
1446        self.system_configuration.enable_for_item_parsing();
1447        let res = f(self);
1448        self.system_configuration = restore;
1449        res
1450    }
1451
1452    /// Returns all indexes on the given object and cluster known in the catalog.
1453    pub fn get_indexes_on(
1454        &self,
1455        id: GlobalId,
1456        cluster: ClusterId,
1457    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1458        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1459
1460        self.try_get_entry_by_global_id(&id)
1461            .into_iter()
1462            .map(move |e| {
1463                e.used_by()
1464                    .iter()
1465                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1466                        CatalogItem::Index(index) if index_matches(index) => {
1467                            Some((index.global_id(), index))
1468                        }
1469                        _ => None,
1470                    })
1471            })
1472            .flatten()
1473    }
1474
1475    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1476        &self.database_by_id[database_id]
1477    }
1478
1479    /// Gets a reference to the specified replica of the specified cluster.
1480    ///
1481    /// Returns `None` if either the cluster or the replica does not
1482    /// exist.
1483    pub(super) fn try_get_cluster_replica(
1484        &self,
1485        id: ClusterId,
1486        replica_id: ReplicaId,
1487    ) -> Option<&ClusterReplica> {
1488        self.try_get_cluster(id)
1489            .and_then(|cluster| cluster.replica(replica_id))
1490    }
1491
1492    /// Gets a reference to the specified replica of the specified cluster.
1493    ///
1494    /// Panics if either the cluster or the replica does not exist.
1495    pub(super) fn get_cluster_replica(
1496        &self,
1497        cluster_id: ClusterId,
1498        replica_id: ReplicaId,
1499    ) -> &ClusterReplica {
1500        self.try_get_cluster_replica(cluster_id, replica_id)
1501            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1502    }
1503
1504    pub(super) fn resolve_replica_in_cluster(
1505        &self,
1506        cluster_id: &ClusterId,
1507        replica_name: &str,
1508    ) -> Result<&ClusterReplica, SqlCatalogError> {
1509        let cluster = self.get_cluster(*cluster_id);
1510        let replica_id = cluster
1511            .replica_id_by_name_
1512            .get(replica_name)
1513            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1514        Ok(&cluster.replicas_by_id_[replica_id])
1515    }
1516
1517    /// Get system configuration `name`.
1518    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1519        Ok(self.system_configuration.get(name)?)
1520    }
1521
1522    /// Parse system configuration `name` with `value` int.
1523    ///
1524    /// Returns the parsed value as a string.
1525    pub(super) fn parse_system_configuration(
1526        &self,
1527        name: &str,
1528        value: VarInput,
1529    ) -> Result<String, Error> {
1530        let value = self.system_configuration.parse(name, value)?;
1531        Ok(value.format())
1532    }
1533
1534    /// Gets the schema map for the database matching `database_spec`.
1535    pub(super) fn resolve_schema_in_database(
1536        &self,
1537        database_spec: &ResolvedDatabaseSpecifier,
1538        schema_name: &str,
1539        conn_id: &ConnectionId,
1540    ) -> Result<&Schema, SqlCatalogError> {
1541        let schema = match database_spec {
1542            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1543                self.temporary_schemas.get(conn_id)
1544            }
1545            ResolvedDatabaseSpecifier::Ambient => self
1546                .ambient_schemas_by_name
1547                .get(schema_name)
1548                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1549            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1550                db.schemas_by_name
1551                    .get(schema_name)
1552                    .and_then(|id| db.schemas_by_id.get(id))
1553            }),
1554        };
1555        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1556    }
1557
1558    pub fn get_schema(
1559        &self,
1560        database_spec: &ResolvedDatabaseSpecifier,
1561        schema_spec: &SchemaSpecifier,
1562        conn_id: &ConnectionId,
1563    ) -> &Schema {
1564        // Keep in sync with `get_schemas_mut`
1565        match (database_spec, schema_spec) {
1566            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1567                &self.temporary_schemas[conn_id]
1568            }
1569            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1570                &self.ambient_schemas_by_id[id]
1571            }
1572
1573            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1574                &self.database_by_id[database_id].schemas_by_id[schema_id]
1575            }
1576            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1577                unreachable!("temporary schemas are in the ambient database")
1578            }
1579        }
1580    }
1581
1582    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1583        self.database_by_id
1584            .values()
1585            .filter_map(|database| database.schemas_by_id.get(schema_id))
1586            .chain(self.ambient_schemas_by_id.values())
1587            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1588            .into_first()
1589    }
1590
1591    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1592        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1593    }
1594
1595    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1596        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1597    }
1598
1599    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1600        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1601    }
1602
1603    pub fn get_information_schema_id(&self) -> SchemaId {
1604        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1605    }
1606
1607    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1608        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1609    }
1610
1611    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1612        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1613    }
1614
1615    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1616        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1617    }
1618
1619    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1620        SYSTEM_SCHEMAS
1621            .iter()
1622            .map(|name| self.ambient_schemas_by_name[*name])
1623    }
1624
1625    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1626        self.system_schema_ids().contains(&id)
1627    }
1628
1629    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1630        match spec {
1631            SchemaSpecifier::Temporary => false,
1632            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1633        }
1634    }
1635
1636    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1637        UNSTABLE_SCHEMAS
1638            .iter()
1639            .map(|name| self.ambient_schemas_by_name[*name])
1640    }
1641
1642    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1643        self.unstable_schema_ids().contains(&id)
1644    }
1645
1646    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1647        match spec {
1648            SchemaSpecifier::Temporary => false,
1649            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1650        }
1651    }
1652
1653    /// Creates a new schema in the `Catalog` for temporary items
1654    /// indicated by the TEMPORARY or TEMP keywords.
1655    pub fn create_temporary_schema(
1656        &mut self,
1657        conn_id: &ConnectionId,
1658        owner_id: RoleId,
1659    ) -> Result<(), Error> {
1660        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1661        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1662        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1663        // all -1.
1664        let oid = INVALID_OID;
1665        self.temporary_schemas.insert(
1666            conn_id.clone(),
1667            Schema {
1668                name: QualifiedSchemaName {
1669                    database: ResolvedDatabaseSpecifier::Ambient,
1670                    schema: MZ_TEMP_SCHEMA.into(),
1671                },
1672                id: SchemaSpecifier::Temporary,
1673                oid,
1674                items: BTreeMap::new(),
1675                functions: BTreeMap::new(),
1676                types: BTreeMap::new(),
1677                owner_id,
1678                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1679                    mz_sql::catalog::ObjectType::Schema,
1680                    owner_id,
1681                )]),
1682            },
1683        );
1684        Ok(())
1685    }
1686
1687    /// Return all OIDs that are allocated to temporary objects.
1688    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1689        std::iter::empty()
1690            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1691                if schema.id.is_temporary() {
1692                    Some(schema.oid)
1693                } else {
1694                    None
1695                }
1696            }))
1697            .chain(self.entry_by_id.values().filter_map(|entry| {
1698                if entry.item().is_temporary() {
1699                    Some(entry.oid)
1700                } else {
1701                    None
1702                }
1703            }))
1704    }
1705
1706    /// Optimized lookup for a builtin table.
1707    ///
1708    /// Panics if the builtin table doesn't exist in the catalog.
1709    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1710        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1711    }
1712
1713    /// Optimized lookup for a builtin log.
1714    ///
1715    /// Panics if the builtin log doesn't exist in the catalog.
1716    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1717        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1718        let log = match self.get_entry(&item_id).item() {
1719            CatalogItem::Log(log) => log,
1720            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1721        };
1722        (item_id, log.global_id)
1723    }
1724
1725    /// Optimized lookup for a builtin storage collection.
1726    ///
1727    /// Panics if the builtin storage collection doesn't exist in the catalog.
1728    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1729        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1730    }
1731
1732    /// Optimized lookup for a builtin object.
1733    ///
1734    /// Panics if the builtin object doesn't exist in the catalog.
1735    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1736        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1737        let schema = &self.ambient_schemas_by_id[schema_id];
1738        match builtin.catalog_item_type() {
1739            CatalogItemType::Type => schema.types[builtin.name()],
1740            CatalogItemType::Func => schema.functions[builtin.name()],
1741            CatalogItemType::Table
1742            | CatalogItemType::Source
1743            | CatalogItemType::Sink
1744            | CatalogItemType::View
1745            | CatalogItemType::MaterializedView
1746            | CatalogItemType::Index
1747            | CatalogItemType::Secret
1748            | CatalogItemType::Connection
1749            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1750        }
1751    }
1752
1753    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1754    pub fn resolve_builtin_type_references(
1755        &self,
1756        builtin: &BuiltinType<NameReference>,
1757    ) -> BuiltinType<IdReference> {
1758        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1759            CatalogType::AclItem => CatalogType::AclItem,
1760            CatalogType::Array { element_reference } => CatalogType::Array {
1761                element_reference: self.get_system_type(element_reference).id,
1762            },
1763            CatalogType::List {
1764                element_reference,
1765                element_modifiers,
1766            } => CatalogType::List {
1767                element_reference: self.get_system_type(element_reference).id,
1768                element_modifiers: element_modifiers.clone(),
1769            },
1770            CatalogType::Map {
1771                key_reference,
1772                value_reference,
1773                key_modifiers,
1774                value_modifiers,
1775            } => CatalogType::Map {
1776                key_reference: self.get_system_type(key_reference).id,
1777                value_reference: self.get_system_type(value_reference).id,
1778                key_modifiers: key_modifiers.clone(),
1779                value_modifiers: value_modifiers.clone(),
1780            },
1781            CatalogType::Range { element_reference } => CatalogType::Range {
1782                element_reference: self.get_system_type(element_reference).id,
1783            },
1784            CatalogType::Record { fields } => CatalogType::Record {
1785                fields: fields
1786                    .into_iter()
1787                    .map(|f| CatalogRecordField {
1788                        name: f.name.clone(),
1789                        type_reference: self.get_system_type(f.type_reference).id,
1790                        type_modifiers: f.type_modifiers.clone(),
1791                    })
1792                    .collect(),
1793            },
1794            CatalogType::Bool => CatalogType::Bool,
1795            CatalogType::Bytes => CatalogType::Bytes,
1796            CatalogType::Char => CatalogType::Char,
1797            CatalogType::Date => CatalogType::Date,
1798            CatalogType::Float32 => CatalogType::Float32,
1799            CatalogType::Float64 => CatalogType::Float64,
1800            CatalogType::Int16 => CatalogType::Int16,
1801            CatalogType::Int32 => CatalogType::Int32,
1802            CatalogType::Int64 => CatalogType::Int64,
1803            CatalogType::UInt16 => CatalogType::UInt16,
1804            CatalogType::UInt32 => CatalogType::UInt32,
1805            CatalogType::UInt64 => CatalogType::UInt64,
1806            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1807            CatalogType::Interval => CatalogType::Interval,
1808            CatalogType::Jsonb => CatalogType::Jsonb,
1809            CatalogType::Numeric => CatalogType::Numeric,
1810            CatalogType::Oid => CatalogType::Oid,
1811            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1812            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1813            CatalogType::Pseudo => CatalogType::Pseudo,
1814            CatalogType::RegClass => CatalogType::RegClass,
1815            CatalogType::RegProc => CatalogType::RegProc,
1816            CatalogType::RegType => CatalogType::RegType,
1817            CatalogType::String => CatalogType::String,
1818            CatalogType::Time => CatalogType::Time,
1819            CatalogType::Timestamp => CatalogType::Timestamp,
1820            CatalogType::TimestampTz => CatalogType::TimestampTz,
1821            CatalogType::Uuid => CatalogType::Uuid,
1822            CatalogType::VarChar => CatalogType::VarChar,
1823            CatalogType::Int2Vector => CatalogType::Int2Vector,
1824            CatalogType::MzAclItem => CatalogType::MzAclItem,
1825        };
1826
1827        BuiltinType {
1828            name: builtin.name,
1829            schema: builtin.schema,
1830            oid: builtin.oid,
1831            details: CatalogTypeDetails {
1832                array_id: builtin.details.array_id,
1833                typ,
1834                pg_metadata: builtin.details.pg_metadata.clone(),
1835            },
1836        }
1837    }
1838
1839    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1840        &self.config
1841    }
1842
1843    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1844        match self.database_by_name.get(database_name) {
1845            Some(id) => Ok(&self.database_by_id[id]),
1846            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1847        }
1848    }
1849
1850    pub fn resolve_schema(
1851        &self,
1852        current_database: Option<&DatabaseId>,
1853        database_name: Option<&str>,
1854        schema_name: &str,
1855        conn_id: &ConnectionId,
1856    ) -> Result<&Schema, SqlCatalogError> {
1857        let database_spec = match database_name {
1858            // If a database is explicitly specified, validate it. Note that we
1859            // intentionally do not validate `current_database` to permit
1860            // querying `mz_catalog` with an invalid session database, e.g., so
1861            // that you can run `SHOW DATABASES` to *find* a valid database.
1862            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1863                self.resolve_database(database)?.id().clone(),
1864            )),
1865            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1866        };
1867
1868        // First try to find the schema in the named database.
1869        if let Some(database_spec) = database_spec {
1870            if let Ok(schema) =
1871                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1872            {
1873                return Ok(schema);
1874            }
1875        }
1876
1877        // Then fall back to the ambient database.
1878        if let Ok(schema) = self.resolve_schema_in_database(
1879            &ResolvedDatabaseSpecifier::Ambient,
1880            schema_name,
1881            conn_id,
1882        ) {
1883            return Ok(schema);
1884        }
1885
1886        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1887    }
1888
1889    /// Optimized lookup for a system schema.
1890    ///
1891    /// Panics if the system schema doesn't exist in the catalog.
1892    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1893        self.ambient_schemas_by_name[name]
1894    }
1895
1896    pub fn resolve_search_path(
1897        &self,
1898        session: &dyn SessionMetadata,
1899    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1900        let database = self
1901            .database_by_name
1902            .get(session.database())
1903            .map(|id| id.clone());
1904
1905        session
1906            .search_path()
1907            .iter()
1908            .map(|schema| {
1909                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1910            })
1911            .filter_map(|schema| schema.ok())
1912            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1913            .collect()
1914    }
1915
1916    pub fn effective_search_path(
1917        &self,
1918        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1919        include_temp_schema: bool,
1920    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1921        let mut v = Vec::with_capacity(search_path.len() + 3);
1922        // Temp schema is only included for relations and data types, not for functions and operators
1923        let temp_schema = (
1924            ResolvedDatabaseSpecifier::Ambient,
1925            SchemaSpecifier::Temporary,
1926        );
1927        if include_temp_schema && !search_path.contains(&temp_schema) {
1928            v.push(temp_schema);
1929        }
1930        let default_schemas = [
1931            (
1932                ResolvedDatabaseSpecifier::Ambient,
1933                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1934            ),
1935            (
1936                ResolvedDatabaseSpecifier::Ambient,
1937                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1938            ),
1939        ];
1940        for schema in default_schemas.into_iter() {
1941            if !search_path.contains(&schema) {
1942                v.push(schema);
1943            }
1944        }
1945        v.extend_from_slice(search_path);
1946        v
1947    }
1948
1949    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1950        let id = self
1951            .clusters_by_name
1952            .get(name)
1953            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1954        Ok(&self.clusters_by_id[id])
1955    }
1956
1957    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1958        let id = self
1959            .clusters_by_name
1960            .get(cluster.name)
1961            .expect("failed to lookup BuiltinCluster by name");
1962        self.clusters_by_id
1963            .get(id)
1964            .expect("failed to lookup BuiltinCluster by ID")
1965    }
1966
1967    pub fn resolve_cluster_replica(
1968        &self,
1969        cluster_replica_name: &QualifiedReplica,
1970    ) -> Result<&ClusterReplica, SqlCatalogError> {
1971        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
1972        let replica_name = cluster_replica_name.replica.as_str();
1973        let replica_id = cluster
1974            .replica_id(replica_name)
1975            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1976        Ok(cluster.replica(replica_id).expect("Must exist"))
1977    }
1978
1979    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
1980    ///
1981    /// If `name` does not specify a database, the `current_database` is used.
1982    /// If `name` does not specify a schema, then the schemas in `search_path`
1983    /// are searched in order.
1984    #[allow(clippy::useless_let_if_seq)]
1985    pub fn resolve(
1986        &self,
1987        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
1988        current_database: Option<&DatabaseId>,
1989        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
1990        name: &PartialItemName,
1991        conn_id: &ConnectionId,
1992        err_gen: fn(String) -> SqlCatalogError,
1993    ) -> Result<&CatalogEntry, SqlCatalogError> {
1994        // If a schema name was specified, just try to find the item in that
1995        // schema. If no schema was specified, try to find the item in the connection's
1996        // temporary schema. If the item is not found, try to find the item in every
1997        // schema in the search path.
1998        let schemas = match &name.schema {
1999            Some(schema_name) => {
2000                match self.resolve_schema(
2001                    current_database,
2002                    name.database.as_deref(),
2003                    schema_name,
2004                    conn_id,
2005                ) {
2006                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2007                    Err(e) => return Err(e),
2008                }
2009            }
2010            None => match self
2011                .get_schema(
2012                    &ResolvedDatabaseSpecifier::Ambient,
2013                    &SchemaSpecifier::Temporary,
2014                    conn_id,
2015                )
2016                .items
2017                .get(&name.item)
2018            {
2019                Some(id) => return Ok(self.get_entry(id)),
2020                None => search_path.to_vec(),
2021            },
2022        };
2023
2024        for (database_spec, schema_spec) in &schemas {
2025            let schema = self.get_schema(database_spec, schema_spec, conn_id);
2026
2027            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2028                return Ok(&self.entry_by_id[id]);
2029            }
2030        }
2031
2032        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2033        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2034        // automatically let uses of the old schema resolve to the new ones as well.
2035        // TODO(database-issues#8173) remove this after sufficient time has passed
2036        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2037        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2038            for schema_id in [
2039                self.get_mz_catalog_unstable_schema_id(),
2040                self.get_mz_introspection_schema_id(),
2041            ] {
2042                let schema = self.get_schema(
2043                    &ResolvedDatabaseSpecifier::Ambient,
2044                    &SchemaSpecifier::Id(schema_id),
2045                    conn_id,
2046                );
2047
2048                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2049                    debug!(
2050                        github_27831 = true,
2051                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2052                    );
2053                    return Ok(&self.entry_by_id[id]);
2054                }
2055            }
2056        }
2057
2058        Err(err_gen(name.to_string()))
2059    }
2060
2061    /// Resolves `name` to a non-function [`CatalogEntry`].
2062    pub fn resolve_entry(
2063        &self,
2064        current_database: Option<&DatabaseId>,
2065        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2066        name: &PartialItemName,
2067        conn_id: &ConnectionId,
2068    ) -> Result<&CatalogEntry, SqlCatalogError> {
2069        self.resolve(
2070            |schema| &schema.items,
2071            current_database,
2072            search_path,
2073            name,
2074            conn_id,
2075            SqlCatalogError::UnknownItem,
2076        )
2077    }
2078
2079    /// Resolves `name` to a function [`CatalogEntry`].
2080    pub fn resolve_function(
2081        &self,
2082        current_database: Option<&DatabaseId>,
2083        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2084        name: &PartialItemName,
2085        conn_id: &ConnectionId,
2086    ) -> Result<&CatalogEntry, SqlCatalogError> {
2087        self.resolve(
2088            |schema| &schema.functions,
2089            current_database,
2090            search_path,
2091            name,
2092            conn_id,
2093            |name| SqlCatalogError::UnknownFunction {
2094                name,
2095                alternative: None,
2096            },
2097        )
2098    }
2099
2100    /// Resolves `name` to a type [`CatalogEntry`].
2101    pub fn resolve_type(
2102        &self,
2103        current_database: Option<&DatabaseId>,
2104        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2105        name: &PartialItemName,
2106        conn_id: &ConnectionId,
2107    ) -> Result<&CatalogEntry, SqlCatalogError> {
2108        static NON_PG_CATALOG_TYPES: LazyLock<
2109            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2110        > = LazyLock::new(|| {
2111            BUILTINS::types()
2112                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2113                .map(|typ| (typ.name, typ))
2114                .collect()
2115        });
2116
2117        let entry = self.resolve(
2118            |schema| &schema.types,
2119            current_database,
2120            search_path,
2121            name,
2122            conn_id,
2123            |name| SqlCatalogError::UnknownType { name },
2124        )?;
2125
2126        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2127            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2128                warn!(
2129                    "user specified an incorrect schema of {} for the type {}, which should be in \
2130                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2131                    PG_CATALOG_SCHEMA.quoted(),
2132                    typ.name.quoted(),
2133                    typ.schema.quoted(),
2134                )
2135            }
2136        }
2137
2138        Ok(entry)
2139    }
2140
2141    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2142    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2143        match object_id {
2144            ObjectId::Item(item_id) => {
2145                let entry = self.get_entry(&item_id);
2146                match entry.item_type() {
2147                    CatalogItemType::Table => CommentObjectId::Table(item_id),
2148                    CatalogItemType::Source => CommentObjectId::Source(item_id),
2149                    CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2150                    CatalogItemType::View => CommentObjectId::View(item_id),
2151                    CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2152                    CatalogItemType::Index => CommentObjectId::Index(item_id),
2153                    CatalogItemType::Func => CommentObjectId::Func(item_id),
2154                    CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2155                    CatalogItemType::Type => CommentObjectId::Type(item_id),
2156                    CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2157                    CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2158                }
2159            }
2160            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2161            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2162            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2163            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2164            ObjectId::ClusterReplica(cluster_replica_id) => {
2165                CommentObjectId::ClusterReplica(cluster_replica_id)
2166            }
2167            ObjectId::NetworkPolicy(network_policy_id) => {
2168                CommentObjectId::NetworkPolicy(network_policy_id)
2169            }
2170        }
2171    }
2172
2173    /// Return current system configuration.
2174    pub fn system_config(&self) -> &SystemVars {
2175        &self.system_configuration
2176    }
2177
2178    /// Return a mutable reference to the current system configuration.
2179    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2180        &mut self.system_configuration
2181    }
2182
2183    /// Serializes the catalog's in-memory state.
2184    ///
2185    /// There are no guarantees about the format of the serialized state, except
2186    /// that the serialized state for two identical catalogs will compare
2187    /// identically.
2188    ///
2189    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2190    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2191    /// argument.
2192    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2193        // Dump the base catalog.
2194        let mut dump = serde_json::to_value(&self).map_err(|e| {
2195            Error::new(ErrorKind::Unstructured(format!(
2196                // Don't panic here because we don't have compile-time failures for maps with
2197                // non-string keys.
2198                "internal error: could not dump catalog: {}",
2199                e
2200            )))
2201        })?;
2202
2203        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2204        // Stitch in system parameter defaults.
2205        dump_obj.insert(
2206            "system_parameter_defaults".into(),
2207            serde_json::json!(self.system_config().defaults()),
2208        );
2209        // Potentially overwrite unfinalized shards.
2210        if let Some(unfinalized_shards) = unfinalized_shards {
2211            dump_obj
2212                .get_mut("storage_metadata")
2213                .expect("known to exist")
2214                .as_object_mut()
2215                .expect("storage_metadata is an object")
2216                .insert(
2217                    "unfinalized_shards".into(),
2218                    serde_json::json!(unfinalized_shards),
2219                );
2220        }
2221        // Remove GlobalIds for temporary objects from the mapping.
2222        //
2223        // Post-test consistency checks with the durable catalog don't know about temporary items
2224        // since they're kept entirely in memory.
2225        let temporary_gids: Vec<_> = self
2226            .entry_by_global_id
2227            .iter()
2228            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2229            .map(|(gid, _item_id)| *gid)
2230            .collect();
2231        if !temporary_gids.is_empty() {
2232            let gids = dump_obj
2233                .get_mut("entry_by_global_id")
2234                .expect("known_to_exist")
2235                .as_object_mut()
2236                .expect("entry_by_global_id is an object");
2237            for gid in temporary_gids {
2238                gids.remove(&gid.to_string());
2239            }
2240        }
2241
2242        // Emit as pretty-printed JSON.
2243        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2244    }
2245
2246    pub fn availability_zones(&self) -> &[String] {
2247        &self.availability_zones
2248    }
2249
2250    pub fn concretize_replica_location(
2251        &self,
2252        location: mz_catalog::durable::ReplicaLocation,
2253        allowed_sizes: &Vec<String>,
2254        allowed_availability_zones: Option<&[String]>,
2255    ) -> Result<ReplicaLocation, Error> {
2256        let location = match location {
2257            mz_catalog::durable::ReplicaLocation::Unmanaged {
2258                storagectl_addrs,
2259                computectl_addrs,
2260            } => {
2261                if allowed_availability_zones.is_some() {
2262                    return Err(Error {
2263                        kind: ErrorKind::Internal(
2264                            "tried concretize unmanaged replica with specific availability_zones"
2265                                .to_string(),
2266                        ),
2267                    });
2268                }
2269                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2270                    storagectl_addrs,
2271                    computectl_addrs,
2272                })
2273            }
2274            mz_catalog::durable::ReplicaLocation::Managed {
2275                size,
2276                availability_zone,
2277                billed_as,
2278                internal,
2279                pending,
2280            } => {
2281                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2282                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2283                    return Err(Error {
2284                        kind: ErrorKind::Internal(message.to_string()),
2285                    });
2286                }
2287                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2288                let cluster_replica_sizes = &self.cluster_replica_sizes;
2289
2290                ReplicaLocation::Managed(ManagedReplicaLocation {
2291                    allocation: cluster_replica_sizes
2292                        .0
2293                        .get(&size)
2294                        .expect("catalog out of sync")
2295                        .clone(),
2296                    availability_zones: match (availability_zone, allowed_availability_zones) {
2297                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2298                        (None, Some(azs)) if azs.is_empty() => {
2299                            ManagedReplicaAvailabilityZones::FromCluster(None)
2300                        }
2301                        (None, Some(azs)) => {
2302                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2303                        }
2304                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2305                    },
2306                    size,
2307                    billed_as,
2308                    internal,
2309                    pending,
2310                })
2311            }
2312        };
2313        Ok(location)
2314    }
2315
2316    /// Return whether the given replica size requests a disk.
2317    ///
2318    /// # Panics
2319    ///
2320    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2321    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2322        let alloc = &self.cluster_replica_sizes.0[size];
2323        alloc.disk_limit != Some(DiskLimit::ZERO)
2324    }
2325
2326    pub(crate) fn ensure_valid_replica_size(
2327        &self,
2328        allowed_sizes: &[String],
2329        size: &String,
2330    ) -> Result<(), Error> {
2331        let cluster_replica_sizes = &self.cluster_replica_sizes;
2332
2333        if !cluster_replica_sizes.0.contains_key(size)
2334            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2335            || cluster_replica_sizes.0[size].disabled
2336        {
2337            let mut entries = cluster_replica_sizes
2338                .enabled_allocations()
2339                .collect::<Vec<_>>();
2340
2341            if !allowed_sizes.is_empty() {
2342                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2343                entries.retain(|(name, _)| allowed_sizes.contains(name));
2344            }
2345
2346            entries.sort_by_key(
2347                |(
2348                    _name,
2349                    ReplicaAllocation {
2350                        scale, cpu_limit, ..
2351                    },
2352                )| (scale, cpu_limit),
2353            );
2354
2355            Err(Error {
2356                kind: ErrorKind::InvalidClusterReplicaSize {
2357                    size: size.to_owned(),
2358                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2359                },
2360            })
2361        } else {
2362            Ok(())
2363        }
2364    }
2365
2366    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2367        if role_id.is_builtin() {
2368            let role = self.get_role(role_id);
2369            Err(Error::new(ErrorKind::ReservedRoleName(
2370                role.name().to_string(),
2371            )))
2372        } else {
2373            Ok(())
2374        }
2375    }
2376
2377    pub fn ensure_not_reserved_network_policy(
2378        &self,
2379        network_policy_id: &NetworkPolicyId,
2380    ) -> Result<(), Error> {
2381        if network_policy_id.is_builtin() {
2382            let policy = self.get_network_policy(network_policy_id);
2383            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2384                policy.name.clone(),
2385            )))
2386        } else {
2387            Ok(())
2388        }
2389    }
2390
2391    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2392        let is_grantable = !role_id.is_public() && !role_id.is_system();
2393        if is_grantable {
2394            Ok(())
2395        } else {
2396            let role = self.get_role(role_id);
2397            Err(Error::new(ErrorKind::UngrantableRoleName(
2398                role.name().to_string(),
2399            )))
2400        }
2401    }
2402
2403    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2404        if role_id.is_system() {
2405            let role = self.get_role(role_id);
2406            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2407                role.name().to_string(),
2408            )))
2409        } else {
2410            Ok(())
2411        }
2412    }
2413
2414    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2415        if role_id.is_predefined() {
2416            let role = self.get_role(role_id);
2417            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2418                role.name().to_string(),
2419            )))
2420        } else {
2421            Ok(())
2422        }
2423    }
2424
2425    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2426    // passing tx, and session?
2427    pub(crate) fn add_to_audit_log(
2428        system_configuration: &SystemVars,
2429        oracle_write_ts: mz_repr::Timestamp,
2430        session: Option<&ConnMeta>,
2431        tx: &mut mz_catalog::durable::Transaction,
2432        audit_events: &mut Vec<VersionedEvent>,
2433        event_type: EventType,
2434        object_type: ObjectType,
2435        details: EventDetails,
2436    ) -> Result<(), Error> {
2437        let user = session.map(|session| session.user().name.to_string());
2438
2439        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2440
2441        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2442            Some(ts) => ts.into(),
2443            _ => oracle_write_ts.into(),
2444        };
2445        let id = tx.allocate_audit_log_id()?;
2446        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2447        audit_events.push(event.clone());
2448        tx.insert_audit_log_event(event);
2449        Ok(())
2450    }
2451
2452    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2453        match id {
2454            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2455            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2456                self.get_cluster_replica(*cluster_id, *replica_id)
2457                    .owner_id(),
2458            ),
2459            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2460            ObjectId::Schema((database_spec, schema_spec)) => Some(
2461                self.get_schema(database_spec, schema_spec, conn_id)
2462                    .owner_id(),
2463            ),
2464            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2465            ObjectId::Role(_) => None,
2466            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2467        }
2468    }
2469
2470    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2471        match object_id {
2472            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2473            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2474            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2475            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2476            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2477            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2478            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2479        }
2480    }
2481
2482    pub(super) fn get_system_object_type(
2483        &self,
2484        id: &SystemObjectId,
2485    ) -> mz_sql::catalog::SystemObjectType {
2486        match id {
2487            SystemObjectId::Object(object_id) => {
2488                SystemObjectType::Object(self.get_object_type(object_id))
2489            }
2490            SystemObjectId::System => SystemObjectType::System,
2491        }
2492    }
2493
2494    /// Returns a read-only view of the current [`StorageMetadata`].
2495    ///
2496    /// To write to this struct, you must use a catalog transaction.
2497    pub fn storage_metadata(&self) -> &StorageMetadata {
2498        &self.storage_metadata
2499    }
2500
2501    /// For the Sources ids in `ids`, return their compaction windows.
2502    pub fn source_compaction_windows(
2503        &self,
2504        ids: impl IntoIterator<Item = CatalogItemId>,
2505    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2506        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2507        let mut seen = BTreeSet::new();
2508        for item_id in ids {
2509            if !seen.insert(item_id) {
2510                continue;
2511            }
2512            let entry = self.get_entry(&item_id);
2513            match entry.item() {
2514                CatalogItem::Source(source) => {
2515                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2516                    match source.data_source {
2517                        DataSourceDesc::Ingestion { .. } => {
2518                            cws.entry(source_cw).or_default().insert(item_id);
2519                        }
2520                        DataSourceDesc::IngestionExport { .. } => {
2521                            cws.entry(source_cw).or_default().insert(item_id);
2522                        }
2523                        DataSourceDesc::Introspection(_)
2524                        | DataSourceDesc::Progress
2525                        | DataSourceDesc::Webhook { .. } => {
2526                            cws.entry(source_cw).or_default().insert(item_id);
2527                        }
2528                    }
2529                }
2530                CatalogItem::Table(table) => {
2531                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2532                    match &table.data_source {
2533                        TableDataSource::DataSource {
2534                            desc: DataSourceDesc::IngestionExport { .. },
2535                            timeline: _,
2536                        } => {
2537                            cws.entry(table_cw).or_default().insert(item_id);
2538                        }
2539                        _ => {}
2540                    }
2541                }
2542                _ => {
2543                    // Views could depend on sources, so ignore them if added by used_by above.
2544                    continue;
2545                }
2546            }
2547        }
2548        cws
2549    }
2550
2551    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2552        match id {
2553            CommentObjectId::Table(id)
2554            | CommentObjectId::View(id)
2555            | CommentObjectId::MaterializedView(id)
2556            | CommentObjectId::Source(id)
2557            | CommentObjectId::Sink(id)
2558            | CommentObjectId::Index(id)
2559            | CommentObjectId::Func(id)
2560            | CommentObjectId::Connection(id)
2561            | CommentObjectId::Type(id)
2562            | CommentObjectId::Secret(id)
2563            | CommentObjectId::ContinualTask(id) => Some(*id),
2564            CommentObjectId::Role(_)
2565            | CommentObjectId::Database(_)
2566            | CommentObjectId::Schema(_)
2567            | CommentObjectId::Cluster(_)
2568            | CommentObjectId::ClusterReplica(_)
2569            | CommentObjectId::NetworkPolicy(_) => None,
2570        }
2571    }
2572
2573    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2574        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2575    }
2576
2577    pub fn comment_id_to_audit_log_name(
2578        &self,
2579        id: CommentObjectId,
2580        conn_id: &ConnectionId,
2581    ) -> String {
2582        match id {
2583            CommentObjectId::Table(id)
2584            | CommentObjectId::View(id)
2585            | CommentObjectId::MaterializedView(id)
2586            | CommentObjectId::Source(id)
2587            | CommentObjectId::Sink(id)
2588            | CommentObjectId::Index(id)
2589            | CommentObjectId::Func(id)
2590            | CommentObjectId::Connection(id)
2591            | CommentObjectId::Type(id)
2592            | CommentObjectId::Secret(id)
2593            | CommentObjectId::ContinualTask(id) => {
2594                let item = self.get_entry(&id);
2595                let name = self.resolve_full_name(item.name(), Some(conn_id));
2596                name.to_string()
2597            }
2598            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2599            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2600            CommentObjectId::Schema((spec, schema_id)) => {
2601                let schema = self.get_schema(&spec, &schema_id, conn_id);
2602                self.resolve_full_schema_name(&schema.name).to_string()
2603            }
2604            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2605            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2606                let cluster = self.get_cluster(cluster_id);
2607                let replica = self.get_cluster_replica(cluster_id, replica_id);
2608                QualifiedReplica {
2609                    cluster: Ident::new_unchecked(cluster.name.clone()),
2610                    replica: Ident::new_unchecked(replica.name.clone()),
2611                }
2612                .to_string()
2613            }
2614            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2615        }
2616    }
2617}
2618
2619impl ConnectionResolver for CatalogState {
2620    fn resolve_connection(
2621        &self,
2622        id: CatalogItemId,
2623    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2624        use mz_storage_types::connections::Connection::*;
2625        match self
2626            .get_entry(&id)
2627            .connection()
2628            .expect("catalog out of sync")
2629            .details
2630            .to_connection()
2631        {
2632            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2633            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2634            Csr(conn) => Csr(conn.into_inline_connection(self)),
2635            Ssh(conn) => Ssh(conn),
2636            Aws(conn) => Aws(conn),
2637            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2638            MySql(conn) => MySql(conn.into_inline_connection(self)),
2639            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2640            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2641        }
2642    }
2643}
2644
2645impl OptimizerCatalog for CatalogState {
2646    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2647        CatalogState::get_entry_by_global_id(self, id)
2648    }
2649    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2650        CatalogState::get_entry(self, id)
2651    }
2652    fn resolve_full_name(
2653        &self,
2654        name: &QualifiedItemName,
2655        conn_id: Option<&ConnectionId>,
2656    ) -> FullItemName {
2657        CatalogState::resolve_full_name(self, name, conn_id)
2658    }
2659    fn get_indexes_on(
2660        &self,
2661        id: GlobalId,
2662        cluster: ClusterId,
2663    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2664        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2665    }
2666}
2667
2668impl OptimizerCatalog for Catalog {
2669    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2670        self.state.get_entry_by_global_id(id)
2671    }
2672
2673    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2674        self.state.get_entry(id)
2675    }
2676
2677    fn resolve_full_name(
2678        &self,
2679        name: &QualifiedItemName,
2680        conn_id: Option<&ConnectionId>,
2681    ) -> FullItemName {
2682        self.state.resolve_full_name(name, conn_id)
2683    }
2684
2685    fn get_indexes_on(
2686        &self,
2687        id: GlobalId,
2688        cluster: ClusterId,
2689    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2690        Box::new(self.state.get_indexes_on(id, cluster))
2691    }
2692}
2693
2694impl Catalog {
2695    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2696        self
2697    }
2698}