mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62    VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71    TypeReference,
72};
73use mz_sql::names::{
74    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81    Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91    ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98// DO NOT add any more imports from `crate` outside of `crate::catalog`.
99use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::ConnMeta;
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105/// The in-memory representation of the Catalog. This struct is not directly used to persist
106/// metadata to persistent storage. For persistent metadata see
107/// [`mz_catalog::durable::DurableCatalogState`].
108///
109/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
110/// storing the contents of this struct on disk.
111#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
114    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
115    // include:
116    //  - Temporary items.
117    //  - Certain objects are partially derived from read-only state.
118    pub(super) database_by_name: BTreeMap<String, DatabaseId>,
119    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120    pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
121    #[serde(serialize_with = "skip_temp_items")]
122    pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
125    pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
128    pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
131    pub(super) roles_by_name: BTreeMap<String, RoleId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) roles_by_id: BTreeMap<RoleId, Role>,
134    pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
135    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136    pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
137    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138    pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
139
140    #[serde(skip)]
141    pub(super) system_configuration: SystemVars,
142    pub(super) default_privileges: DefaultPrivileges,
143    pub(super) system_privileges: PrivilegeMap,
144    pub(super) comments: CommentsMap,
145    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146    pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
147    pub(super) storage_metadata: StorageMetadata,
148    pub(super) mock_authentication_nonce: Option<String>,
149
150    // Mutable state not derived from the durable catalog.
151    #[serde(skip)]
152    pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
153
154    // Read-only state not derived from the durable catalog.
155    #[serde(skip)]
156    pub(super) config: mz_sql::catalog::CatalogConfig,
157    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158    #[serde(skip)]
159    pub(crate) availability_zones: Vec<String>,
160
161    // Read-only not derived from the durable catalog.
162    #[serde(skip)]
163    pub(super) egress_addresses: Vec<IpNet>,
164    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166    pub(super) http_host_name: Option<String>,
167
168    // Read-only not derived from the durable catalog.
169    #[serde(skip)]
170    pub(super) license_key: ValidatedLicenseKey,
171}
172
173/// Keeps track of what expressions are cached or not during startup.
174#[derive(Debug, Clone, Serialize)]
175pub(crate) enum LocalExpressionCache {
176    /// The cache is being used.
177    Open {
178        /// The local expressions that were cached in the expression cache.
179        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
180        /// The local expressions that were NOT cached in the expression cache.
181        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
182    },
183    /// The cache is not being used.
184    Closed,
185}
186
187impl LocalExpressionCache {
188    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
189        Self::Open {
190            cached_exprs,
191            uncached_exprs: BTreeMap::new(),
192        }
193    }
194
195    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
196        match self {
197            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
198            LocalExpressionCache::Closed => None,
199        }
200    }
201
202    /// Insert an expression that was cached, back into the cache. This is generally needed when
203    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
204    pub(super) fn insert_cached_expression(
205        &mut self,
206        id: GlobalId,
207        local_expressions: LocalExpressions,
208    ) {
209        match self {
210            LocalExpressionCache::Open { cached_exprs, .. } => {
211                cached_exprs.insert(id, local_expressions);
212            }
213            LocalExpressionCache::Closed => {}
214        }
215    }
216
217    /// Inform the cache that `id` was not found in the cache and that we should add it as
218    /// `local_mir` and `optimizer_features`.
219    pub(super) fn insert_uncached_expression(
220        &mut self,
221        id: GlobalId,
222        local_mir: OptimizedMirRelationExpr,
223        optimizer_features: OptimizerFeatures,
224    ) {
225        match self {
226            LocalExpressionCache::Open { uncached_exprs, .. } => {
227                let local_expr = LocalExpressions {
228                    local_mir,
229                    optimizer_features,
230                };
231                // If we are trying to cache the same item a second time, with a different
232                // expression, then we must be migrating the object or doing something else weird.
233                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
234                // version after a restart. Caching the migrated version may cause us to incorrectly
235                // think that the object has already been migrated. To simplify things, we cache
236                // neither.
237                let prev = uncached_exprs.remove(&id);
238                match prev {
239                    Some(prev) if prev == local_expr => {
240                        uncached_exprs.insert(id, local_expr);
241                    }
242                    None => {
243                        uncached_exprs.insert(id, local_expr);
244                    }
245                    Some(_) => {}
246                }
247            }
248            LocalExpressionCache::Closed => {}
249        }
250    }
251
252    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
253        match self {
254            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
255            LocalExpressionCache::Closed => BTreeMap::new(),
256        }
257    }
258}
259
260fn skip_temp_items<S>(
261    entries: &BTreeMap<CatalogItemId, CatalogEntry>,
262    serializer: S,
263) -> Result<S::Ok, S::Error>
264where
265    S: serde::Serializer,
266{
267    mz_ore::serde::map_key_to_string(
268        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
269        serializer,
270    )
271}
272
273impl CatalogState {
274    /// Returns an empty [`CatalogState`] that can be used in tests.
275    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
276    // tests are structured in this repository.
277    pub fn empty_test() -> Self {
278        CatalogState {
279            database_by_name: Default::default(),
280            database_by_id: Default::default(),
281            entry_by_id: Default::default(),
282            entry_by_global_id: Default::default(),
283            ambient_schemas_by_name: Default::default(),
284            ambient_schemas_by_id: Default::default(),
285            temporary_schemas: Default::default(),
286            clusters_by_id: Default::default(),
287            clusters_by_name: Default::default(),
288            network_policies_by_name: Default::default(),
289            roles_by_name: Default::default(),
290            roles_by_id: Default::default(),
291            network_policies_by_id: Default::default(),
292            role_auth_by_id: Default::default(),
293            config: CatalogConfig {
294                start_time: Default::default(),
295                start_instant: Instant::now(),
296                nonce: Default::default(),
297                environment_id: EnvironmentId::for_tests(),
298                session_id: Default::default(),
299                build_info: &DUMMY_BUILD_INFO,
300                timestamp_interval: Default::default(),
301                now: NOW_ZERO.clone(),
302                connection_context: ConnectionContext::for_tests(Arc::new(
303                    InMemorySecretsController::new(),
304                )),
305                builtins_cfg: BuiltinsConfig {
306                    include_continual_tasks: true,
307                },
308                helm_chart_version: None,
309            },
310            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
311            availability_zones: Default::default(),
312            system_configuration: Default::default(),
313            egress_addresses: Default::default(),
314            aws_principal_context: Default::default(),
315            aws_privatelink_availability_zones: Default::default(),
316            http_host_name: Default::default(),
317            default_privileges: Default::default(),
318            system_privileges: Default::default(),
319            comments: Default::default(),
320            source_references: Default::default(),
321            storage_metadata: Default::default(),
322            license_key: ValidatedLicenseKey::for_tests(),
323            mock_authentication_nonce: Default::default(),
324        }
325    }
326
327    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
328        let search_path = self.resolve_search_path(session);
329        let database = self
330            .database_by_name
331            .get(session.vars().database())
332            .map(|id| id.clone());
333        let state = match session.transaction().catalog_state() {
334            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
335            None => Cow::Borrowed(self),
336        };
337        ConnCatalog {
338            state,
339            unresolvable_ids: BTreeSet::new(),
340            conn_id: session.conn_id().clone(),
341            cluster: session.vars().cluster().into(),
342            database,
343            search_path,
344            role_id: session.current_role_id().clone(),
345            prepared_statements: Some(session.prepared_statements()),
346            portals: Some(session.portals()),
347            notices_tx: session.retain_notice_transmitter(),
348        }
349    }
350
351    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
352        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
353        let cluster = self.system_configuration.default_cluster();
354
355        ConnCatalog {
356            state: Cow::Borrowed(self),
357            unresolvable_ids: BTreeSet::new(),
358            conn_id: SYSTEM_CONN_ID.clone(),
359            cluster,
360            database: self
361                .resolve_database(DEFAULT_DATABASE_NAME)
362                .ok()
363                .map(|db| db.id()),
364            // Leaving the system's search path empty allows us to catch issues
365            // where catalog object names have not been normalized correctly.
366            search_path: Vec::new(),
367            role_id,
368            prepared_statements: None,
369            portals: None,
370            notices_tx,
371        }
372    }
373
374    pub fn for_system_session(&self) -> ConnCatalog<'_> {
375        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
376    }
377
378    /// Returns an iterator over the deduplicated identifiers of all
379    /// objects this catalog entry transitively depends on (where
380    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
381    /// [`CatalogItem::references`]).
382    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
383        struct I<'a> {
384            queue: VecDeque<CatalogItemId>,
385            seen: BTreeSet<CatalogItemId>,
386            this: &'a CatalogState,
387        }
388        impl<'a> Iterator for I<'a> {
389            type Item = CatalogItemId;
390            fn next(&mut self) -> Option<Self::Item> {
391                if let Some(next) = self.queue.pop_front() {
392                    for child in self.this.get_entry(&next).item().uses() {
393                        if !self.seen.contains(&child) {
394                            self.queue.push_back(child);
395                            self.seen.insert(child);
396                        }
397                    }
398                    Some(next)
399                } else {
400                    None
401                }
402            }
403        }
404
405        I {
406            queue: [id].into_iter().collect(),
407            seen: [id].into_iter().collect(),
408            this: self,
409        }
410    }
411
412    /// Computes the IDs of any log sources this catalog entry transitively
413    /// depends on.
414    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
415        let mut out = Vec::new();
416        self.introspection_dependencies_inner(id, &mut out);
417        out
418    }
419
420    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
421        match self.get_entry(&id).item() {
422            CatalogItem::Log(_) => out.push(id),
423            item @ (CatalogItem::View(_)
424            | CatalogItem::MaterializedView(_)
425            | CatalogItem::Connection(_)
426            | CatalogItem::ContinualTask(_)) => {
427                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
428                for item_id in item.references().items() {
429                    self.introspection_dependencies_inner(*item_id, out);
430                }
431            }
432            CatalogItem::Sink(sink) => {
433                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
434                self.introspection_dependencies_inner(from_item_id, out)
435            }
436            CatalogItem::Index(idx) => {
437                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
438                self.introspection_dependencies_inner(on_item_id, out)
439            }
440            CatalogItem::Table(_)
441            | CatalogItem::Source(_)
442            | CatalogItem::Type(_)
443            | CatalogItem::Func(_)
444            | CatalogItem::Secret(_) => (),
445        }
446    }
447
448    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
449    ///
450    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
451    /// earlier in the list than the roots. This is particularly useful for the order to drop
452    /// objects.
453    pub(super) fn object_dependents(
454        &self,
455        object_ids: &Vec<ObjectId>,
456        conn_id: &ConnectionId,
457        seen: &mut BTreeSet<ObjectId>,
458    ) -> Vec<ObjectId> {
459        let mut dependents = Vec::new();
460        for object_id in object_ids {
461            match object_id {
462                ObjectId::Cluster(id) => {
463                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
464                }
465                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
466                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
467                ),
468                ObjectId::Database(id) => {
469                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
470                }
471                ObjectId::Schema((database_spec, schema_spec)) => {
472                    dependents.extend_from_slice(&self.schema_dependents(
473                        database_spec.clone(),
474                        schema_spec.clone(),
475                        conn_id,
476                        seen,
477                    ));
478                }
479                ObjectId::NetworkPolicy(id) => {
480                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
481                }
482                id @ ObjectId::Role(_) => {
483                    let unseen = seen.insert(id.clone());
484                    if unseen {
485                        dependents.push(id.clone());
486                    }
487                }
488                ObjectId::Item(id) => {
489                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
490                }
491            }
492        }
493        dependents
494    }
495
496    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
497    /// itself.
498    ///
499    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
500    /// earlier in the list than the roots. This is particularly useful for the order to drop
501    /// objects.
502    fn cluster_dependents(
503        &self,
504        cluster_id: ClusterId,
505        seen: &mut BTreeSet<ObjectId>,
506    ) -> Vec<ObjectId> {
507        let mut dependents = Vec::new();
508        let object_id = ObjectId::Cluster(cluster_id);
509        if !seen.contains(&object_id) {
510            seen.insert(object_id.clone());
511            let cluster = self.get_cluster(cluster_id);
512            for item_id in cluster.bound_objects() {
513                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
514            }
515            for replica_id in cluster.replica_ids().values() {
516                dependents.extend_from_slice(&self.cluster_replica_dependents(
517                    cluster_id,
518                    *replica_id,
519                    seen,
520                ));
521            }
522            dependents.push(object_id);
523        }
524        dependents
525    }
526
527    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
528    /// itself.
529    ///
530    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
531    /// earlier in the list than the roots. This is particularly useful for the order to drop
532    /// objects.
533    pub(super) fn cluster_replica_dependents(
534        &self,
535        cluster_id: ClusterId,
536        replica_id: ReplicaId,
537        seen: &mut BTreeSet<ObjectId>,
538    ) -> Vec<ObjectId> {
539        let mut dependents = Vec::new();
540        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
541        if !seen.contains(&object_id) {
542            seen.insert(object_id.clone());
543            dependents.push(object_id);
544        }
545        dependents
546    }
547
548    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
549    /// itself.
550    ///
551    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
552    /// earlier in the list than the roots. This is particularly useful for the order to drop
553    /// objects.
554    fn database_dependents(
555        &self,
556        database_id: DatabaseId,
557        conn_id: &ConnectionId,
558        seen: &mut BTreeSet<ObjectId>,
559    ) -> Vec<ObjectId> {
560        let mut dependents = Vec::new();
561        let object_id = ObjectId::Database(database_id);
562        if !seen.contains(&object_id) {
563            seen.insert(object_id.clone());
564            let database = self.get_database(&database_id);
565            for schema_id in database.schema_ids().values() {
566                dependents.extend_from_slice(&self.schema_dependents(
567                    ResolvedDatabaseSpecifier::Id(database_id),
568                    SchemaSpecifier::Id(*schema_id),
569                    conn_id,
570                    seen,
571                ));
572            }
573            dependents.push(object_id);
574        }
575        dependents
576    }
577
578    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
579    /// itself.
580    ///
581    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
582    /// earlier in the list than the roots. This is particularly useful for the order to drop
583    /// objects.
584    fn schema_dependents(
585        &self,
586        database_spec: ResolvedDatabaseSpecifier,
587        schema_spec: SchemaSpecifier,
588        conn_id: &ConnectionId,
589        seen: &mut BTreeSet<ObjectId>,
590    ) -> Vec<ObjectId> {
591        let mut dependents = Vec::new();
592        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
593        if !seen.contains(&object_id) {
594            seen.insert(object_id.clone());
595            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
596            for item_id in schema.item_ids() {
597                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
598            }
599            dependents.push(object_id)
600        }
601        dependents
602    }
603
604    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
605    /// itself.
606    ///
607    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
608    /// earlier in the list than the roots. This is particularly useful for the order to drop
609    /// objects.
610    pub(super) fn item_dependents(
611        &self,
612        item_id: CatalogItemId,
613        seen: &mut BTreeSet<ObjectId>,
614    ) -> Vec<ObjectId> {
615        let mut dependents = Vec::new();
616        let object_id = ObjectId::Item(item_id);
617        if !seen.contains(&object_id) {
618            seen.insert(object_id.clone());
619            let entry = self.get_entry(&item_id);
620            for dependent_id in entry.used_by() {
621                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
622            }
623            dependents.push(object_id);
624            // We treat the progress collection as if it depends on the source
625            // for dropping. We have additional code in planning to create a
626            // kind of special-case "CASCADE" for this dependency.
627            if let Some(progress_id) = entry.progress_id() {
628                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
629            }
630        }
631        dependents
632    }
633
634    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
635    /// itself.
636    ///
637    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
638    /// earlier in the list than the roots. This is particularly useful for the order to drop
639    /// objects.
640    pub(super) fn network_policy_dependents(
641        &self,
642        network_policy_id: NetworkPolicyId,
643        _seen: &mut BTreeSet<ObjectId>,
644    ) -> Vec<ObjectId> {
645        let object_id = ObjectId::NetworkPolicy(network_policy_id);
646        // Currently network policies have no dependents
647        // when we add the ability for users or sources/sinks to have policies
648        // this method will need to be updated.
649        vec![object_id]
650    }
651
652    /// Indicates whether the indicated item is considered stable or not.
653    ///
654    /// Only stable items can be used as dependencies of other catalog items.
655    fn is_stable(&self, id: CatalogItemId) -> bool {
656        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
657        !self.is_unstable_schema_specifier(spec)
658    }
659
660    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
661        if self.system_config().unsafe_enable_unstable_dependencies() {
662            return Ok(());
663        }
664
665        let unstable_dependencies: Vec<_> = item
666            .references()
667            .items()
668            .filter(|id| !self.is_stable(**id))
669            .map(|id| self.get_entry(id).name().item.clone())
670            .collect();
671
672        // It's okay to create a temporary object with unstable
673        // dependencies, since we will never need to reboot a catalog
674        // that contains it.
675        if unstable_dependencies.is_empty() || item.is_temporary() {
676            Ok(())
677        } else {
678            let object_type = item.typ().to_string();
679            Err(Error {
680                kind: ErrorKind::UnstableDependency {
681                    object_type,
682                    unstable_dependencies,
683                },
684            })
685        }
686    }
687
688    pub fn resolve_full_name(
689        &self,
690        name: &QualifiedItemName,
691        conn_id: Option<&ConnectionId>,
692    ) -> FullItemName {
693        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
694
695        let database = match &name.qualifiers.database_spec {
696            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
697            ResolvedDatabaseSpecifier::Id(id) => {
698                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
699            }
700        };
701        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
702        // and the schema may not exist yet if no temporary items have been created.
703        let schema = match &name.qualifiers.schema_spec {
704            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
705            _ => self
706                .get_schema(
707                    &name.qualifiers.database_spec,
708                    &name.qualifiers.schema_spec,
709                    conn_id,
710                )
711                .name()
712                .schema
713                .clone(),
714        };
715        FullItemName {
716            database,
717            schema,
718            item: name.item.clone(),
719        }
720    }
721
722    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
723        let database = match &name.database {
724            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
725            ResolvedDatabaseSpecifier::Id(id) => {
726                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
727            }
728        };
729        FullSchemaName {
730            database,
731            schema: name.schema.clone(),
732        }
733    }
734
735    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
736        self.entry_by_id
737            .get(id)
738            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
739    }
740
741    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
742        let item_id = self
743            .entry_by_global_id
744            .get(id)
745            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
746
747        let entry = self.get_entry(item_id).clone();
748        let version = match entry.item() {
749            CatalogItem::Table(table) => {
750                let (version, _) = table
751                    .collections
752                    .iter()
753                    .find(|(_verison, gid)| *gid == id)
754                    .expect("version to exist");
755                RelationVersionSelector::Specific(*version)
756            }
757            _ => RelationVersionSelector::Latest,
758        };
759        CatalogCollectionEntry { entry, version }
760    }
761
762    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
763        self.entry_by_id.iter()
764    }
765
766    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
767        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
768        self.temporary_schemas
769            .get(conn)
770            .into_iter()
771            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
772    }
773
774    /// Returns true if a temporary schema exists for the given connection.
775    ///
776    /// Temporary schemas are created lazily when the first temporary object is created
777    /// for a connection, so this may return false for connections that haven't created
778    /// any temporary objects.
779    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
780        self.temporary_schemas.contains_key(conn)
781    }
782
783    /// Gets a type named `name` from exactly one of the system schemas.
784    ///
785    /// # Panics
786    /// - If `name` is not an entry in any system schema
787    /// - If more than one system schema has an entry named `name`.
788    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
789        let mut res = None;
790        for schema_id in self.system_schema_ids() {
791            let schema = &self.ambient_schemas_by_id[&schema_id];
792            if let Some(global_id) = schema.types.get(name) {
793                match res {
794                    None => res = Some(self.get_entry(global_id)),
795                    Some(_) => panic!(
796                        "only call get_system_type on objects uniquely identifiable in one system schema"
797                    ),
798                }
799            }
800        }
801
802        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
803    }
804
805    pub fn get_item_by_name(
806        &self,
807        name: &QualifiedItemName,
808        conn_id: &ConnectionId,
809    ) -> Option<&CatalogEntry> {
810        self.get_schema(
811            &name.qualifiers.database_spec,
812            &name.qualifiers.schema_spec,
813            conn_id,
814        )
815        .items
816        .get(&name.item)
817        .and_then(|id| self.try_get_entry(id))
818    }
819
820    pub fn get_type_by_name(
821        &self,
822        name: &QualifiedItemName,
823        conn_id: &ConnectionId,
824    ) -> Option<&CatalogEntry> {
825        self.get_schema(
826            &name.qualifiers.database_spec,
827            &name.qualifiers.schema_spec,
828            conn_id,
829        )
830        .types
831        .get(&name.item)
832        .and_then(|id| self.try_get_entry(id))
833    }
834
835    pub(super) fn find_available_name(
836        &self,
837        mut name: QualifiedItemName,
838        conn_id: &ConnectionId,
839    ) -> QualifiedItemName {
840        let mut i = 0;
841        let orig_item_name = name.item.clone();
842        while self.get_item_by_name(&name, conn_id).is_some() {
843            i += 1;
844            name.item = format!("{}{}", orig_item_name, i);
845        }
846        name
847    }
848
849    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
850        self.entry_by_id.get(id)
851    }
852
853    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
854        let item_id = self.entry_by_global_id.get(id)?;
855        self.try_get_entry(item_id)
856    }
857
858    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
859    /// object that returns rows.
860    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
861        let entry = self.try_get_entry_by_global_id(id)?;
862        let desc = match entry.item() {
863            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
864            // TODO(alter_table): Support schema evolution on sources.
865            other => other.relation_desc(RelationVersionSelector::Latest)?,
866        };
867        Some(desc)
868    }
869
870    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
871        self.try_get_cluster(cluster_id)
872            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
873    }
874
875    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
876        self.clusters_by_id.get(&cluster_id)
877    }
878
879    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
880        self.roles_by_id.get(id)
881    }
882
883    pub fn get_role(&self, id: &RoleId) -> &Role {
884        self.roles_by_id.get(id).expect("catalog out of sync")
885    }
886
887    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
888        self.roles_by_id.keys()
889    }
890
891    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
892        self.roles_by_name
893            .get(role_name)
894            .map(|id| &self.roles_by_id[id])
895    }
896
897    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
898        self.role_auth_by_id
899            .get(id)
900            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
901    }
902
903    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
904        self.role_auth_by_id.get(id)
905    }
906
907    pub(super) fn try_get_network_policy_by_name(
908        &self,
909        policy_name: &str,
910    ) -> Option<&NetworkPolicy> {
911        self.network_policies_by_name
912            .get(policy_name)
913            .map(|id| &self.network_policies_by_id[id])
914    }
915
916    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
917        let mut membership = BTreeSet::new();
918        let mut queue = VecDeque::from(vec![id]);
919        while let Some(cur_id) = queue.pop_front() {
920            if !membership.contains(cur_id) {
921                membership.insert(cur_id.clone());
922                let role = self.get_role(cur_id);
923                soft_assert_no_log!(
924                    !role.membership().keys().contains(id),
925                    "circular membership exists in the catalog"
926                );
927                queue.extend(role.membership().keys());
928            }
929        }
930        membership.insert(RoleId::Public);
931        membership
932    }
933
934    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
935        self.network_policies_by_id
936            .get(id)
937            .expect("catalog out of sync")
938    }
939
940    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
941        self.network_policies_by_id.keys()
942    }
943
944    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
945    /// source.
946    ///
947    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
948    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
949        let entry = self.try_get_entry(id)?;
950        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
951        let name = self.resolve_full_name(entry.name(), None);
952        let host_name = self
953            .http_host_name
954            .as_ref()
955            .map(|x| x.as_str())
956            .unwrap_or_else(|| "HOST");
957
958        let RawDatabaseSpecifier::Name(database) = name.database else {
959            return None;
960        };
961
962        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
963        url.path_segments_mut()
964            .ok()?
965            .push(&database)
966            .push(&name.schema)
967            .push(&name.item);
968
969        Some(url)
970    }
971
972    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
973    ///
974    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
975    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
976    ///
977    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
978    /// their original state before the method returns.
979    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
980        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
981        // caller if this method changed the state of the catalog.
982        &mut self,
983        create_sql: &str,
984        force_if_exists_skip: bool,
985    ) -> Result<(Plan, ResolvedIds), AdapterError> {
986        self.with_enable_for_item_parsing(|state| {
987            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
988            let pcx = Some(&pcx);
989            let session_catalog = state.for_system_session();
990
991            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
992            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
993            let plan =
994                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
995
996            Ok((plan, resolved_ids))
997        })
998    }
999
1000    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1001    #[mz_ore::instrument]
1002    pub(crate) fn parse_plan(
1003        create_sql: &str,
1004        pcx: Option<&PlanContext>,
1005        catalog: &ConnCatalog,
1006    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1007        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1008        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1009        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1010
1011        Ok((plan, resolved_ids))
1012    }
1013
1014    /// Parses the given SQL string into a pair of [`CatalogItem`].
1015    pub(crate) fn deserialize_item(
1016        &self,
1017        global_id: GlobalId,
1018        create_sql: &str,
1019        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1020        local_expression_cache: &mut LocalExpressionCache,
1021        previous_item: Option<CatalogItem>,
1022    ) -> Result<CatalogItem, AdapterError> {
1023        self.parse_item(
1024            global_id,
1025            create_sql,
1026            extra_versions,
1027            None,
1028            false,
1029            None,
1030            local_expression_cache,
1031            previous_item,
1032        )
1033    }
1034
1035    /// Parses the given SQL string into a `CatalogItem`.
1036    #[mz_ore::instrument]
1037    pub(crate) fn parse_item(
1038        &self,
1039        global_id: GlobalId,
1040        create_sql: &str,
1041        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1042        pcx: Option<&PlanContext>,
1043        is_retained_metrics_object: bool,
1044        custom_logical_compaction_window: Option<CompactionWindow>,
1045        local_expression_cache: &mut LocalExpressionCache,
1046        previous_item: Option<CatalogItem>,
1047    ) -> Result<CatalogItem, AdapterError> {
1048        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1049        match self.parse_item_inner(
1050            global_id,
1051            create_sql,
1052            extra_versions,
1053            pcx,
1054            is_retained_metrics_object,
1055            custom_logical_compaction_window,
1056            cached_expr,
1057            previous_item,
1058        ) {
1059            Ok((item, uncached_expr)) => {
1060                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1061                    local_expression_cache.insert_uncached_expression(
1062                        global_id,
1063                        uncached_expr,
1064                        optimizer_features,
1065                    );
1066                }
1067                Ok(item)
1068            }
1069            Err((err, cached_expr)) => {
1070                if let Some(local_expr) = cached_expr {
1071                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1072                }
1073                Err(err)
1074            }
1075        }
1076    }
1077
1078    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1079    ///
1080    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1081    /// not cached.
1082    ///
1083    /// On failure returns an error and `cached_expr` so it can be used later.
1084    #[mz_ore::instrument]
1085    pub(crate) fn parse_item_inner(
1086        &self,
1087        global_id: GlobalId,
1088        create_sql: &str,
1089        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1090        pcx: Option<&PlanContext>,
1091        is_retained_metrics_object: bool,
1092        custom_logical_compaction_window: Option<CompactionWindow>,
1093        cached_expr: Option<LocalExpressions>,
1094        previous_item: Option<CatalogItem>,
1095    ) -> Result<
1096        (
1097            CatalogItem,
1098            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1099        ),
1100        (AdapterError, Option<LocalExpressions>),
1101    > {
1102        let session_catalog = self.for_system_session();
1103
1104        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1105            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1106            Err(err) => return Err((err, cached_expr)),
1107        };
1108
1109        let mut uncached_expr = None;
1110
1111        let item = match plan {
1112            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1113                let collections = extra_versions
1114                    .iter()
1115                    .map(|(version, gid)| (*version, *gid))
1116                    .chain([(RelationVersion::root(), global_id)].into_iter())
1117                    .collect();
1118
1119                CatalogItem::Table(Table {
1120                    create_sql: Some(table.create_sql),
1121                    desc: table.desc,
1122                    collections,
1123                    conn_id: None,
1124                    resolved_ids,
1125                    custom_logical_compaction_window: custom_logical_compaction_window
1126                        .or(table.compaction_window),
1127                    is_retained_metrics_object,
1128                    data_source: match table.data_source {
1129                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1130                            TableDataSource::TableWrites { defaults }
1131                        }
1132                        mz_sql::plan::TableDataSource::DataSource {
1133                            desc: data_source_desc,
1134                            timeline,
1135                        } => match data_source_desc {
1136                            mz_sql::plan::DataSourceDesc::IngestionExport {
1137                                ingestion_id,
1138                                external_reference,
1139                                details,
1140                                data_config,
1141                            } => TableDataSource::DataSource {
1142                                desc: DataSourceDesc::IngestionExport {
1143                                    ingestion_id,
1144                                    external_reference,
1145                                    details,
1146                                    data_config,
1147                                },
1148                                timeline,
1149                            },
1150                            mz_sql::plan::DataSourceDesc::Webhook {
1151                                validate_using,
1152                                body_format,
1153                                headers,
1154                                cluster_id,
1155                            } => TableDataSource::DataSource {
1156                                desc: DataSourceDesc::Webhook {
1157                                    validate_using,
1158                                    body_format,
1159                                    headers,
1160                                    cluster_id: cluster_id
1161                                        .expect("Webhook Tables must have a cluster_id set"),
1162                                },
1163                                timeline,
1164                            },
1165                            _ => {
1166                                return Err((
1167                                    AdapterError::Unstructured(anyhow::anyhow!(
1168                                        "unsupported data source for table"
1169                                    )),
1170                                    cached_expr,
1171                                ));
1172                            }
1173                        },
1174                    },
1175                })
1176            }
1177            Plan::CreateSource(CreateSourcePlan {
1178                source,
1179                timeline,
1180                in_cluster,
1181                ..
1182            }) => CatalogItem::Source(Source {
1183                create_sql: Some(source.create_sql),
1184                data_source: match source.data_source {
1185                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1186                        desc,
1187                        cluster_id: match in_cluster {
1188                            Some(id) => id,
1189                            None => {
1190                                return Err((
1191                                    AdapterError::Unstructured(anyhow::anyhow!(
1192                                        "ingestion-based sources must have cluster specified"
1193                                    )),
1194                                    cached_expr,
1195                                ));
1196                            }
1197                        },
1198                    },
1199                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1200                        desc,
1201                        progress_subsource,
1202                        data_config,
1203                        details,
1204                    } => DataSourceDesc::OldSyntaxIngestion {
1205                        desc,
1206                        progress_subsource,
1207                        data_config,
1208                        details,
1209                        cluster_id: match in_cluster {
1210                            Some(id) => id,
1211                            None => {
1212                                return Err((
1213                                    AdapterError::Unstructured(anyhow::anyhow!(
1214                                        "ingestion-based sources must have cluster specified"
1215                                    )),
1216                                    cached_expr,
1217                                ));
1218                            }
1219                        },
1220                    },
1221                    mz_sql::plan::DataSourceDesc::IngestionExport {
1222                        ingestion_id,
1223                        external_reference,
1224                        details,
1225                        data_config,
1226                    } => DataSourceDesc::IngestionExport {
1227                        ingestion_id,
1228                        external_reference,
1229                        details,
1230                        data_config,
1231                    },
1232                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1233                    mz_sql::plan::DataSourceDesc::Webhook {
1234                        validate_using,
1235                        body_format,
1236                        headers,
1237                        cluster_id,
1238                    } => {
1239                        mz_ore::soft_assert_or_log!(
1240                            cluster_id.is_none(),
1241                            "cluster_id set at Source level for Webhooks"
1242                        );
1243                        DataSourceDesc::Webhook {
1244                            validate_using,
1245                            body_format,
1246                            headers,
1247                            cluster_id: in_cluster
1248                                .expect("webhook sources must use an existing cluster"),
1249                        }
1250                    }
1251                },
1252                desc: source.desc,
1253                global_id,
1254                timeline,
1255                resolved_ids,
1256                custom_logical_compaction_window: source
1257                    .compaction_window
1258                    .or(custom_logical_compaction_window),
1259                is_retained_metrics_object,
1260            }),
1261            Plan::CreateView(CreateViewPlan { view, .. }) => {
1262                // Collect optimizer parameters.
1263                let optimizer_config =
1264                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1265                let previous_exprs = previous_item.map(|item| match item {
1266                    CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1267                    _ => None,
1268                });
1269
1270                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1271                    (Some(local_expr), _)
1272                        if local_expr.optimizer_features == optimizer_config.features =>
1273                    {
1274                        debug!("local expression cache hit for {global_id:?}");
1275                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1276                    }
1277                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1278                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1279                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1280                    }
1281                    (cached_expr, _) => {
1282                        let optimizer_features = optimizer_config.features.clone();
1283                        // Build an optimizer for this VIEW.
1284                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1285
1286                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1287                        let raw_expr = view.expr;
1288                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1289                            Ok(optimzed_expr) => optimzed_expr,
1290                            Err(err) => return Err((err.into(), cached_expr)),
1291                        };
1292
1293                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1294
1295                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1296                    }
1297                };
1298
1299                // Resolve all item dependencies from the HIR expression.
1300                let dependencies: BTreeSet<_> = raw_expr
1301                    .depends_on()
1302                    .into_iter()
1303                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1304                    .collect();
1305
1306                CatalogItem::View(View {
1307                    create_sql: view.create_sql,
1308                    global_id,
1309                    raw_expr,
1310                    desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1311                    optimized_expr,
1312                    conn_id: None,
1313                    resolved_ids,
1314                    dependencies: DependencyIds(dependencies),
1315                })
1316            }
1317            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1318                materialized_view, ..
1319            }) => {
1320                let collections = extra_versions
1321                    .iter()
1322                    .map(|(version, gid)| (*version, *gid))
1323                    .chain([(RelationVersion::root(), global_id)].into_iter())
1324                    .collect();
1325
1326                // Collect optimizer parameters.
1327                let optimizer_config =
1328                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1329                let previous_exprs = previous_item.map(|item| match item {
1330                    CatalogItem::MaterializedView(materialized_view) => {
1331                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1332                    }
1333                    item => unreachable!("expected materialized view, found: {item:#?}"),
1334                });
1335
1336                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1337                    (Some(local_expr), _)
1338                        if local_expr.optimizer_features == optimizer_config.features =>
1339                    {
1340                        debug!("local expression cache hit for {global_id:?}");
1341                        (
1342                            Arc::new(materialized_view.expr),
1343                            Arc::new(local_expr.local_mir),
1344                        )
1345                    }
1346                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1347                    (_, Some((raw_expr, optimized_expr)))
1348                        if *raw_expr == materialized_view.expr =>
1349                    {
1350                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1351                    }
1352                    (cached_expr, _) => {
1353                        let optimizer_features = optimizer_config.features.clone();
1354                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1355                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1356
1357                        let raw_expr = materialized_view.expr;
1358                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1359                            Ok(optimized_expr) => optimized_expr,
1360                            Err(err) => return Err((err.into(), cached_expr)),
1361                        };
1362
1363                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1364
1365                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1366                    }
1367                };
1368                let mut typ = optimized_expr.typ();
1369                for &i in &materialized_view.non_null_assertions {
1370                    typ.column_types[i].nullable = false;
1371                }
1372                let desc = RelationDesc::new(typ, materialized_view.column_names);
1373                let desc = VersionedRelationDesc::new(desc);
1374
1375                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1376
1377                // Resolve all item dependencies from the HIR expression.
1378                let dependencies = raw_expr
1379                    .depends_on()
1380                    .into_iter()
1381                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1382                    .collect();
1383
1384                CatalogItem::MaterializedView(MaterializedView {
1385                    create_sql: materialized_view.create_sql,
1386                    collections,
1387                    raw_expr,
1388                    optimized_expr,
1389                    desc,
1390                    resolved_ids,
1391                    dependencies,
1392                    replacement_target: materialized_view.replacement_target,
1393                    cluster_id: materialized_view.cluster_id,
1394                    non_null_assertions: materialized_view.non_null_assertions,
1395                    custom_logical_compaction_window: materialized_view.compaction_window,
1396                    refresh_schedule: materialized_view.refresh_schedule,
1397                    initial_as_of,
1398                })
1399            }
1400            Plan::CreateContinualTask(plan) => {
1401                let ct =
1402                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1403                        Ok(ct) => ct,
1404                        Err(err) => return Err((err, cached_expr)),
1405                    };
1406                CatalogItem::ContinualTask(ct)
1407            }
1408            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1409                create_sql: index.create_sql,
1410                global_id,
1411                on: index.on,
1412                keys: index.keys.into(),
1413                conn_id: None,
1414                resolved_ids,
1415                cluster_id: index.cluster_id,
1416                custom_logical_compaction_window: custom_logical_compaction_window
1417                    .or(index.compaction_window),
1418                is_retained_metrics_object,
1419            }),
1420            Plan::CreateSink(CreateSinkPlan {
1421                sink,
1422                with_snapshot,
1423                in_cluster,
1424                ..
1425            }) => CatalogItem::Sink(Sink {
1426                create_sql: sink.create_sql,
1427                global_id,
1428                from: sink.from,
1429                connection: sink.connection,
1430                envelope: sink.envelope,
1431                version: sink.version,
1432                with_snapshot,
1433                resolved_ids,
1434                cluster_id: in_cluster,
1435                commit_interval: sink.commit_interval,
1436            }),
1437            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1438                // Even if we don't need the `RelationDesc` here, error out
1439                // early and eagerly, as a kind of soft assertion that we _can_
1440                // build the `RelationDesc` when needed.
1441                if let Err(err) = typ.inner.desc(&session_catalog) {
1442                    return Err((err.into(), cached_expr));
1443                }
1444                CatalogItem::Type(Type {
1445                    create_sql: Some(typ.create_sql),
1446                    global_id,
1447                    details: CatalogTypeDetails {
1448                        array_id: None,
1449                        typ: typ.inner,
1450                        pg_metadata: None,
1451                    },
1452                    resolved_ids,
1453                })
1454            }
1455            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1456                create_sql: secret.create_sql,
1457                global_id,
1458            }),
1459            Plan::CreateConnection(CreateConnectionPlan {
1460                connection:
1461                    mz_sql::plan::Connection {
1462                        create_sql,
1463                        details,
1464                    },
1465                ..
1466            }) => CatalogItem::Connection(Connection {
1467                create_sql,
1468                global_id,
1469                details,
1470                resolved_ids,
1471            }),
1472            _ => {
1473                return Err((
1474                    Error::new(ErrorKind::Corruption {
1475                        detail: "catalog entry generated inappropriate plan".to_string(),
1476                    })
1477                    .into(),
1478                    cached_expr,
1479                ));
1480            }
1481        };
1482
1483        Ok((item, uncached_expr))
1484    }
1485
1486    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1487    /// Calling this method will not permanently modify any system configuration variables.
1488    ///
1489    /// WARNING:
1490    /// Any modifications made to the system configuration variables in `f`, will be lost.
1491    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1492        // Enable catalog features that might be required during planning existing
1493        // catalog items. Existing catalog items might have been created while
1494        // a specific feature flag was turned on, so we need to ensure that this
1495        // is also the case during catalog rehydration in order to avoid panics.
1496        //
1497        // WARNING / CONTRACT:
1498        // 1. Features used in this method that related to parsing / planning
1499        //    should be `enable_for_item_parsing` set to `true`.
1500        // 2. After this step, feature flag configuration must not be
1501        //    overridden.
1502        let restore = self.system_configuration.clone();
1503        self.system_configuration.enable_for_item_parsing();
1504        let res = f(self);
1505        self.system_configuration = restore;
1506        res
1507    }
1508
1509    /// Returns all indexes on the given object and cluster known in the catalog.
1510    pub fn get_indexes_on(
1511        &self,
1512        id: GlobalId,
1513        cluster: ClusterId,
1514    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1515        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1516
1517        self.try_get_entry_by_global_id(&id)
1518            .into_iter()
1519            .map(move |e| {
1520                e.used_by()
1521                    .iter()
1522                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1523                        CatalogItem::Index(index) if index_matches(index) => {
1524                            Some((index.global_id(), index))
1525                        }
1526                        _ => None,
1527                    })
1528            })
1529            .flatten()
1530    }
1531
1532    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1533        &self.database_by_id[database_id]
1534    }
1535
1536    /// Gets a reference to the specified replica of the specified cluster.
1537    ///
1538    /// Returns `None` if either the cluster or the replica does not
1539    /// exist.
1540    pub(super) fn try_get_cluster_replica(
1541        &self,
1542        id: ClusterId,
1543        replica_id: ReplicaId,
1544    ) -> Option<&ClusterReplica> {
1545        self.try_get_cluster(id)
1546            .and_then(|cluster| cluster.replica(replica_id))
1547    }
1548
1549    /// Gets a reference to the specified replica of the specified cluster.
1550    ///
1551    /// Panics if either the cluster or the replica does not exist.
1552    pub(crate) fn get_cluster_replica(
1553        &self,
1554        cluster_id: ClusterId,
1555        replica_id: ReplicaId,
1556    ) -> &ClusterReplica {
1557        self.try_get_cluster_replica(cluster_id, replica_id)
1558            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1559    }
1560
1561    pub(super) fn resolve_replica_in_cluster(
1562        &self,
1563        cluster_id: &ClusterId,
1564        replica_name: &str,
1565    ) -> Result<&ClusterReplica, SqlCatalogError> {
1566        let cluster = self.get_cluster(*cluster_id);
1567        let replica_id = cluster
1568            .replica_id_by_name_
1569            .get(replica_name)
1570            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1571        Ok(&cluster.replicas_by_id_[replica_id])
1572    }
1573
1574    /// Get system configuration `name`.
1575    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1576        Ok(self.system_configuration.get(name)?)
1577    }
1578
1579    /// Parse system configuration `name` with `value` int.
1580    ///
1581    /// Returns the parsed value as a string.
1582    pub(super) fn parse_system_configuration(
1583        &self,
1584        name: &str,
1585        value: VarInput,
1586    ) -> Result<String, Error> {
1587        let value = self.system_configuration.parse(name, value)?;
1588        Ok(value.format())
1589    }
1590
1591    /// Gets the schema map for the database matching `database_spec`.
1592    pub(super) fn resolve_schema_in_database(
1593        &self,
1594        database_spec: &ResolvedDatabaseSpecifier,
1595        schema_name: &str,
1596        conn_id: &ConnectionId,
1597    ) -> Result<&Schema, SqlCatalogError> {
1598        let schema = match database_spec {
1599            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1600                self.temporary_schemas.get(conn_id)
1601            }
1602            ResolvedDatabaseSpecifier::Ambient => self
1603                .ambient_schemas_by_name
1604                .get(schema_name)
1605                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1606            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1607                db.schemas_by_name
1608                    .get(schema_name)
1609                    .and_then(|id| db.schemas_by_id.get(id))
1610            }),
1611        };
1612        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1613    }
1614
1615    /// Try to get a schema, returning `None` if it doesn't exist.
1616    ///
1617    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1618    /// (temporary schemas are created lazily when the first temporary object is created).
1619    pub fn try_get_schema(
1620        &self,
1621        database_spec: &ResolvedDatabaseSpecifier,
1622        schema_spec: &SchemaSpecifier,
1623        conn_id: &ConnectionId,
1624    ) -> Option<&Schema> {
1625        // Keep in sync with `get_schema` and `get_schemas_mut`
1626        match (database_spec, schema_spec) {
1627            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1628                self.temporary_schemas.get(conn_id)
1629            }
1630            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1631                self.ambient_schemas_by_id.get(id)
1632            }
1633            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1634                .database_by_id
1635                .get(database_id)
1636                .and_then(|db| db.schemas_by_id.get(schema_id)),
1637            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1638                unreachable!("temporary schemas are in the ambient database")
1639            }
1640        }
1641    }
1642
1643    pub fn get_schema(
1644        &self,
1645        database_spec: &ResolvedDatabaseSpecifier,
1646        schema_spec: &SchemaSpecifier,
1647        conn_id: &ConnectionId,
1648    ) -> &Schema {
1649        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1650        self.try_get_schema(database_spec, schema_spec, conn_id)
1651            .expect("schema must exist")
1652    }
1653
1654    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1655        self.database_by_id
1656            .values()
1657            .filter_map(|database| database.schemas_by_id.get(schema_id))
1658            .chain(self.ambient_schemas_by_id.values())
1659            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1660            .into_first()
1661    }
1662
1663    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1664        self.temporary_schemas
1665            .values()
1666            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1667            .into_first()
1668    }
1669
1670    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1671        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1672    }
1673
1674    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1675        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1676    }
1677
1678    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1679        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1680    }
1681
1682    pub fn get_information_schema_id(&self) -> SchemaId {
1683        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1684    }
1685
1686    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1687        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1688    }
1689
1690    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1691        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1692    }
1693
1694    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1695        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1696    }
1697
1698    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1699        SYSTEM_SCHEMAS
1700            .iter()
1701            .map(|name| self.ambient_schemas_by_name[*name])
1702    }
1703
1704    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1705        self.system_schema_ids().contains(&id)
1706    }
1707
1708    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1709        match spec {
1710            SchemaSpecifier::Temporary => false,
1711            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1712        }
1713    }
1714
1715    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1716        UNSTABLE_SCHEMAS
1717            .iter()
1718            .map(|name| self.ambient_schemas_by_name[*name])
1719    }
1720
1721    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1722        self.unstable_schema_ids().contains(&id)
1723    }
1724
1725    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1726        match spec {
1727            SchemaSpecifier::Temporary => false,
1728            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1729        }
1730    }
1731
1732    /// Creates a new schema in the `Catalog` for temporary items
1733    /// indicated by the TEMPORARY or TEMP keywords.
1734    pub fn create_temporary_schema(
1735        &mut self,
1736        conn_id: &ConnectionId,
1737        owner_id: RoleId,
1738    ) -> Result<(), Error> {
1739        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1740        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1741        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1742        // all -1.
1743        let oid = INVALID_OID;
1744        self.temporary_schemas.insert(
1745            conn_id.clone(),
1746            Schema {
1747                name: QualifiedSchemaName {
1748                    database: ResolvedDatabaseSpecifier::Ambient,
1749                    schema: MZ_TEMP_SCHEMA.into(),
1750                },
1751                id: SchemaSpecifier::Temporary,
1752                oid,
1753                items: BTreeMap::new(),
1754                functions: BTreeMap::new(),
1755                types: BTreeMap::new(),
1756                owner_id,
1757                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1758                    mz_sql::catalog::ObjectType::Schema,
1759                    owner_id,
1760                )]),
1761            },
1762        );
1763        Ok(())
1764    }
1765
1766    /// Return all OIDs that are allocated to temporary objects.
1767    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1768        std::iter::empty()
1769            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1770                if schema.id.is_temporary() {
1771                    Some(schema.oid)
1772                } else {
1773                    None
1774                }
1775            }))
1776            .chain(self.entry_by_id.values().filter_map(|entry| {
1777                if entry.item().is_temporary() {
1778                    Some(entry.oid)
1779                } else {
1780                    None
1781                }
1782            }))
1783    }
1784
1785    /// Optimized lookup for a builtin table.
1786    ///
1787    /// Panics if the builtin table doesn't exist in the catalog.
1788    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1789        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1790    }
1791
1792    /// Optimized lookup for a builtin log.
1793    ///
1794    /// Panics if the builtin log doesn't exist in the catalog.
1795    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1796        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1797        let log = match self.get_entry(&item_id).item() {
1798            CatalogItem::Log(log) => log,
1799            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1800        };
1801        (item_id, log.global_id)
1802    }
1803
1804    /// Optimized lookup for a builtin storage collection.
1805    ///
1806    /// Panics if the builtin storage collection doesn't exist in the catalog.
1807    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1808        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1809    }
1810
1811    /// Optimized lookup for a builtin object.
1812    ///
1813    /// Panics if the builtin object doesn't exist in the catalog.
1814    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1815        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1816        let schema = &self.ambient_schemas_by_id[schema_id];
1817        match builtin.catalog_item_type() {
1818            CatalogItemType::Type => schema.types[builtin.name()],
1819            CatalogItemType::Func => schema.functions[builtin.name()],
1820            CatalogItemType::Table
1821            | CatalogItemType::Source
1822            | CatalogItemType::Sink
1823            | CatalogItemType::View
1824            | CatalogItemType::MaterializedView
1825            | CatalogItemType::Index
1826            | CatalogItemType::Secret
1827            | CatalogItemType::Connection
1828            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1829        }
1830    }
1831
1832    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1833    pub fn resolve_builtin_type_references(
1834        &self,
1835        builtin: &BuiltinType<NameReference>,
1836    ) -> BuiltinType<IdReference> {
1837        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1838            CatalogType::AclItem => CatalogType::AclItem,
1839            CatalogType::Array { element_reference } => CatalogType::Array {
1840                element_reference: self.get_system_type(element_reference).id,
1841            },
1842            CatalogType::List {
1843                element_reference,
1844                element_modifiers,
1845            } => CatalogType::List {
1846                element_reference: self.get_system_type(element_reference).id,
1847                element_modifiers: element_modifiers.clone(),
1848            },
1849            CatalogType::Map {
1850                key_reference,
1851                value_reference,
1852                key_modifiers,
1853                value_modifiers,
1854            } => CatalogType::Map {
1855                key_reference: self.get_system_type(key_reference).id,
1856                value_reference: self.get_system_type(value_reference).id,
1857                key_modifiers: key_modifiers.clone(),
1858                value_modifiers: value_modifiers.clone(),
1859            },
1860            CatalogType::Range { element_reference } => CatalogType::Range {
1861                element_reference: self.get_system_type(element_reference).id,
1862            },
1863            CatalogType::Record { fields } => CatalogType::Record {
1864                fields: fields
1865                    .into_iter()
1866                    .map(|f| CatalogRecordField {
1867                        name: f.name.clone(),
1868                        type_reference: self.get_system_type(f.type_reference).id,
1869                        type_modifiers: f.type_modifiers.clone(),
1870                    })
1871                    .collect(),
1872            },
1873            CatalogType::Bool => CatalogType::Bool,
1874            CatalogType::Bytes => CatalogType::Bytes,
1875            CatalogType::Char => CatalogType::Char,
1876            CatalogType::Date => CatalogType::Date,
1877            CatalogType::Float32 => CatalogType::Float32,
1878            CatalogType::Float64 => CatalogType::Float64,
1879            CatalogType::Int16 => CatalogType::Int16,
1880            CatalogType::Int32 => CatalogType::Int32,
1881            CatalogType::Int64 => CatalogType::Int64,
1882            CatalogType::UInt16 => CatalogType::UInt16,
1883            CatalogType::UInt32 => CatalogType::UInt32,
1884            CatalogType::UInt64 => CatalogType::UInt64,
1885            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1886            CatalogType::Interval => CatalogType::Interval,
1887            CatalogType::Jsonb => CatalogType::Jsonb,
1888            CatalogType::Numeric => CatalogType::Numeric,
1889            CatalogType::Oid => CatalogType::Oid,
1890            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1891            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1892            CatalogType::Pseudo => CatalogType::Pseudo,
1893            CatalogType::RegClass => CatalogType::RegClass,
1894            CatalogType::RegProc => CatalogType::RegProc,
1895            CatalogType::RegType => CatalogType::RegType,
1896            CatalogType::String => CatalogType::String,
1897            CatalogType::Time => CatalogType::Time,
1898            CatalogType::Timestamp => CatalogType::Timestamp,
1899            CatalogType::TimestampTz => CatalogType::TimestampTz,
1900            CatalogType::Uuid => CatalogType::Uuid,
1901            CatalogType::VarChar => CatalogType::VarChar,
1902            CatalogType::Int2Vector => CatalogType::Int2Vector,
1903            CatalogType::MzAclItem => CatalogType::MzAclItem,
1904        };
1905
1906        BuiltinType {
1907            name: builtin.name,
1908            schema: builtin.schema,
1909            oid: builtin.oid,
1910            details: CatalogTypeDetails {
1911                array_id: builtin.details.array_id,
1912                typ,
1913                pg_metadata: builtin.details.pg_metadata.clone(),
1914            },
1915        }
1916    }
1917
1918    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1919        &self.config
1920    }
1921
1922    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1923        match self.database_by_name.get(database_name) {
1924            Some(id) => Ok(&self.database_by_id[id]),
1925            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1926        }
1927    }
1928
1929    pub fn resolve_schema(
1930        &self,
1931        current_database: Option<&DatabaseId>,
1932        database_name: Option<&str>,
1933        schema_name: &str,
1934        conn_id: &ConnectionId,
1935    ) -> Result<&Schema, SqlCatalogError> {
1936        let database_spec = match database_name {
1937            // If a database is explicitly specified, validate it. Note that we
1938            // intentionally do not validate `current_database` to permit
1939            // querying `mz_catalog` with an invalid session database, e.g., so
1940            // that you can run `SHOW DATABASES` to *find* a valid database.
1941            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1942                self.resolve_database(database)?.id().clone(),
1943            )),
1944            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1945        };
1946
1947        // First try to find the schema in the named database.
1948        if let Some(database_spec) = database_spec {
1949            if let Ok(schema) =
1950                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1951            {
1952                return Ok(schema);
1953            }
1954        }
1955
1956        // Then fall back to the ambient database.
1957        if let Ok(schema) = self.resolve_schema_in_database(
1958            &ResolvedDatabaseSpecifier::Ambient,
1959            schema_name,
1960            conn_id,
1961        ) {
1962            return Ok(schema);
1963        }
1964
1965        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1966    }
1967
1968    /// Optimized lookup for a system schema.
1969    ///
1970    /// Panics if the system schema doesn't exist in the catalog.
1971    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1972        self.ambient_schemas_by_name[name]
1973    }
1974
1975    pub fn resolve_search_path(
1976        &self,
1977        session: &dyn SessionMetadata,
1978    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1979        let database = self
1980            .database_by_name
1981            .get(session.database())
1982            .map(|id| id.clone());
1983
1984        session
1985            .search_path()
1986            .iter()
1987            .map(|schema| {
1988                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1989            })
1990            .filter_map(|schema| schema.ok())
1991            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1992            .collect()
1993    }
1994
1995    pub fn effective_search_path(
1996        &self,
1997        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1998        include_temp_schema: bool,
1999    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2000        let mut v = Vec::with_capacity(search_path.len() + 3);
2001        // Temp schema is only included for relations and data types, not for functions and operators
2002        let temp_schema = (
2003            ResolvedDatabaseSpecifier::Ambient,
2004            SchemaSpecifier::Temporary,
2005        );
2006        if include_temp_schema && !search_path.contains(&temp_schema) {
2007            v.push(temp_schema);
2008        }
2009        let default_schemas = [
2010            (
2011                ResolvedDatabaseSpecifier::Ambient,
2012                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2013            ),
2014            (
2015                ResolvedDatabaseSpecifier::Ambient,
2016                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2017            ),
2018        ];
2019        for schema in default_schemas.into_iter() {
2020            if !search_path.contains(&schema) {
2021                v.push(schema);
2022            }
2023        }
2024        v.extend_from_slice(search_path);
2025        v
2026    }
2027
2028    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2029        let id = self
2030            .clusters_by_name
2031            .get(name)
2032            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2033        Ok(&self.clusters_by_id[id])
2034    }
2035
2036    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2037        let id = self
2038            .clusters_by_name
2039            .get(cluster.name)
2040            .expect("failed to lookup BuiltinCluster by name");
2041        self.clusters_by_id
2042            .get(id)
2043            .expect("failed to lookup BuiltinCluster by ID")
2044    }
2045
2046    pub fn resolve_cluster_replica(
2047        &self,
2048        cluster_replica_name: &QualifiedReplica,
2049    ) -> Result<&ClusterReplica, SqlCatalogError> {
2050        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2051        let replica_name = cluster_replica_name.replica.as_str();
2052        let replica_id = cluster
2053            .replica_id(replica_name)
2054            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2055        Ok(cluster.replica(replica_id).expect("Must exist"))
2056    }
2057
2058    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2059    ///
2060    /// If `name` does not specify a database, the `current_database` is used.
2061    /// If `name` does not specify a schema, then the schemas in `search_path`
2062    /// are searched in order.
2063    #[allow(clippy::useless_let_if_seq)]
2064    pub fn resolve(
2065        &self,
2066        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2067        current_database: Option<&DatabaseId>,
2068        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2069        name: &PartialItemName,
2070        conn_id: &ConnectionId,
2071        err_gen: fn(String) -> SqlCatalogError,
2072    ) -> Result<&CatalogEntry, SqlCatalogError> {
2073        // If a schema name was specified, just try to find the item in that
2074        // schema. If no schema was specified, try to find the item in the connection's
2075        // temporary schema. If the item is not found, try to find the item in every
2076        // schema in the search path.
2077        let schemas = match &name.schema {
2078            Some(schema_name) => {
2079                match self.resolve_schema(
2080                    current_database,
2081                    name.database.as_deref(),
2082                    schema_name,
2083                    conn_id,
2084                ) {
2085                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2086                    Err(e) => return Err(e),
2087                }
2088            }
2089            None => match self
2090                .try_get_schema(
2091                    &ResolvedDatabaseSpecifier::Ambient,
2092                    &SchemaSpecifier::Temporary,
2093                    conn_id,
2094                )
2095                .and_then(|schema| schema.items.get(&name.item))
2096            {
2097                Some(id) => return Ok(self.get_entry(id)),
2098                None => search_path.to_vec(),
2099            },
2100        };
2101
2102        for (database_spec, schema_spec) in &schemas {
2103            // Use try_get_schema because the temp schema might not exist yet
2104            // (it's created lazily when the first temp object is created).
2105            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2106                continue;
2107            };
2108
2109            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2110                return Ok(&self.entry_by_id[id]);
2111            }
2112        }
2113
2114        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2115        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2116        // automatically let uses of the old schema resolve to the new ones as well.
2117        // TODO(database-issues#8173) remove this after sufficient time has passed
2118        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2119        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2120            for schema_id in [
2121                self.get_mz_catalog_unstable_schema_id(),
2122                self.get_mz_introspection_schema_id(),
2123            ] {
2124                let schema = self.get_schema(
2125                    &ResolvedDatabaseSpecifier::Ambient,
2126                    &SchemaSpecifier::Id(schema_id),
2127                    conn_id,
2128                );
2129
2130                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2131                    debug!(
2132                        github_27831 = true,
2133                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2134                    );
2135                    return Ok(&self.entry_by_id[id]);
2136                }
2137            }
2138        }
2139
2140        Err(err_gen(name.to_string()))
2141    }
2142
2143    /// Resolves `name` to a non-function [`CatalogEntry`].
2144    pub fn resolve_entry(
2145        &self,
2146        current_database: Option<&DatabaseId>,
2147        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2148        name: &PartialItemName,
2149        conn_id: &ConnectionId,
2150    ) -> Result<&CatalogEntry, SqlCatalogError> {
2151        self.resolve(
2152            |schema| &schema.items,
2153            current_database,
2154            search_path,
2155            name,
2156            conn_id,
2157            SqlCatalogError::UnknownItem,
2158        )
2159    }
2160
2161    /// Resolves `name` to a function [`CatalogEntry`].
2162    pub fn resolve_function(
2163        &self,
2164        current_database: Option<&DatabaseId>,
2165        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2166        name: &PartialItemName,
2167        conn_id: &ConnectionId,
2168    ) -> Result<&CatalogEntry, SqlCatalogError> {
2169        self.resolve(
2170            |schema| &schema.functions,
2171            current_database,
2172            search_path,
2173            name,
2174            conn_id,
2175            |name| SqlCatalogError::UnknownFunction {
2176                name,
2177                alternative: None,
2178            },
2179        )
2180    }
2181
2182    /// Resolves `name` to a type [`CatalogEntry`].
2183    pub fn resolve_type(
2184        &self,
2185        current_database: Option<&DatabaseId>,
2186        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2187        name: &PartialItemName,
2188        conn_id: &ConnectionId,
2189    ) -> Result<&CatalogEntry, SqlCatalogError> {
2190        static NON_PG_CATALOG_TYPES: LazyLock<
2191            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2192        > = LazyLock::new(|| {
2193            BUILTINS::types()
2194                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2195                .map(|typ| (typ.name, typ))
2196                .collect()
2197        });
2198
2199        let entry = self.resolve(
2200            |schema| &schema.types,
2201            current_database,
2202            search_path,
2203            name,
2204            conn_id,
2205            |name| SqlCatalogError::UnknownType { name },
2206        )?;
2207
2208        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2209            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2210                warn!(
2211                    "user specified an incorrect schema of {} for the type {}, which should be in \
2212                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2213                    PG_CATALOG_SCHEMA.quoted(),
2214                    typ.name.quoted(),
2215                    typ.schema.quoted(),
2216                )
2217            }
2218        }
2219
2220        Ok(entry)
2221    }
2222
2223    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2224    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2225        match object_id {
2226            ObjectId::Item(item_id) => {
2227                let entry = self.get_entry(&item_id);
2228                match entry.item_type() {
2229                    CatalogItemType::Table => CommentObjectId::Table(item_id),
2230                    CatalogItemType::Source => CommentObjectId::Source(item_id),
2231                    CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2232                    CatalogItemType::View => CommentObjectId::View(item_id),
2233                    CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2234                    CatalogItemType::Index => CommentObjectId::Index(item_id),
2235                    CatalogItemType::Func => CommentObjectId::Func(item_id),
2236                    CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2237                    CatalogItemType::Type => CommentObjectId::Type(item_id),
2238                    CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2239                    CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2240                }
2241            }
2242            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2243            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2244            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2245            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2246            ObjectId::ClusterReplica(cluster_replica_id) => {
2247                CommentObjectId::ClusterReplica(cluster_replica_id)
2248            }
2249            ObjectId::NetworkPolicy(network_policy_id) => {
2250                CommentObjectId::NetworkPolicy(network_policy_id)
2251            }
2252        }
2253    }
2254
2255    /// Return current system configuration.
2256    pub fn system_config(&self) -> &SystemVars {
2257        &self.system_configuration
2258    }
2259
2260    /// Return a mutable reference to the current system configuration.
2261    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2262        &mut self.system_configuration
2263    }
2264
2265    /// Serializes the catalog's in-memory state.
2266    ///
2267    /// There are no guarantees about the format of the serialized state, except
2268    /// that the serialized state for two identical catalogs will compare
2269    /// identically.
2270    ///
2271    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2272    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2273    /// argument.
2274    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2275        // Dump the base catalog.
2276        let mut dump = serde_json::to_value(&self).map_err(|e| {
2277            Error::new(ErrorKind::Unstructured(format!(
2278                // Don't panic here because we don't have compile-time failures for maps with
2279                // non-string keys.
2280                "internal error: could not dump catalog: {}",
2281                e
2282            )))
2283        })?;
2284
2285        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2286        // Stitch in system parameter defaults.
2287        dump_obj.insert(
2288            "system_parameter_defaults".into(),
2289            serde_json::json!(self.system_config().defaults()),
2290        );
2291        // Potentially overwrite unfinalized shards.
2292        if let Some(unfinalized_shards) = unfinalized_shards {
2293            dump_obj
2294                .get_mut("storage_metadata")
2295                .expect("known to exist")
2296                .as_object_mut()
2297                .expect("storage_metadata is an object")
2298                .insert(
2299                    "unfinalized_shards".into(),
2300                    serde_json::json!(unfinalized_shards),
2301                );
2302        }
2303        // Remove GlobalIds for temporary objects from the mapping.
2304        //
2305        // Post-test consistency checks with the durable catalog don't know about temporary items
2306        // since they're kept entirely in memory.
2307        let temporary_gids: Vec<_> = self
2308            .entry_by_global_id
2309            .iter()
2310            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2311            .map(|(gid, _item_id)| *gid)
2312            .collect();
2313        if !temporary_gids.is_empty() {
2314            let gids = dump_obj
2315                .get_mut("entry_by_global_id")
2316                .expect("known_to_exist")
2317                .as_object_mut()
2318                .expect("entry_by_global_id is an object");
2319            for gid in temporary_gids {
2320                gids.remove(&gid.to_string());
2321            }
2322        }
2323        // We exclude role_auth_by_id because it contains password information
2324        // which should not be included in the dump.
2325        dump_obj.remove("role_auth_by_id");
2326
2327        // Emit as pretty-printed JSON.
2328        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2329    }
2330
2331    pub fn availability_zones(&self) -> &[String] {
2332        &self.availability_zones
2333    }
2334
2335    pub fn concretize_replica_location(
2336        &self,
2337        location: mz_catalog::durable::ReplicaLocation,
2338        allowed_sizes: &Vec<String>,
2339        allowed_availability_zones: Option<&[String]>,
2340    ) -> Result<ReplicaLocation, Error> {
2341        let location = match location {
2342            mz_catalog::durable::ReplicaLocation::Unmanaged {
2343                storagectl_addrs,
2344                computectl_addrs,
2345            } => {
2346                if allowed_availability_zones.is_some() {
2347                    return Err(Error {
2348                        kind: ErrorKind::Internal(
2349                            "tried concretize unmanaged replica with specific availability_zones"
2350                                .to_string(),
2351                        ),
2352                    });
2353                }
2354                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2355                    storagectl_addrs,
2356                    computectl_addrs,
2357                })
2358            }
2359            mz_catalog::durable::ReplicaLocation::Managed {
2360                size,
2361                availability_zone,
2362                billed_as,
2363                internal,
2364                pending,
2365            } => {
2366                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2367                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2368                    return Err(Error {
2369                        kind: ErrorKind::Internal(message.to_string()),
2370                    });
2371                }
2372                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2373                let cluster_replica_sizes = &self.cluster_replica_sizes;
2374
2375                ReplicaLocation::Managed(ManagedReplicaLocation {
2376                    allocation: cluster_replica_sizes
2377                        .0
2378                        .get(&size)
2379                        .expect("catalog out of sync")
2380                        .clone(),
2381                    availability_zones: match (availability_zone, allowed_availability_zones) {
2382                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2383                        (None, Some(azs)) if azs.is_empty() => {
2384                            ManagedReplicaAvailabilityZones::FromCluster(None)
2385                        }
2386                        (None, Some(azs)) => {
2387                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2388                        }
2389                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2390                    },
2391                    size,
2392                    billed_as,
2393                    internal,
2394                    pending,
2395                })
2396            }
2397        };
2398        Ok(location)
2399    }
2400
2401    /// Return whether the given replica size requests a disk.
2402    ///
2403    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2404    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2405    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2406    /// not something we can know at this point.
2407    ///
2408    /// # Panics
2409    ///
2410    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2411    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2412        let alloc = &self.cluster_replica_sizes.0[size];
2413        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2414    }
2415
2416    pub(crate) fn ensure_valid_replica_size(
2417        &self,
2418        allowed_sizes: &[String],
2419        size: &String,
2420    ) -> Result<(), Error> {
2421        let cluster_replica_sizes = &self.cluster_replica_sizes;
2422
2423        if !cluster_replica_sizes.0.contains_key(size)
2424            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2425            || cluster_replica_sizes.0[size].disabled
2426        {
2427            let mut entries = cluster_replica_sizes
2428                .enabled_allocations()
2429                .collect::<Vec<_>>();
2430
2431            if !allowed_sizes.is_empty() {
2432                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2433                entries.retain(|(name, _)| allowed_sizes.contains(name));
2434            }
2435
2436            entries.sort_by_key(
2437                |(
2438                    _name,
2439                    ReplicaAllocation {
2440                        scale, cpu_limit, ..
2441                    },
2442                )| (scale, cpu_limit),
2443            );
2444
2445            Err(Error {
2446                kind: ErrorKind::InvalidClusterReplicaSize {
2447                    size: size.to_owned(),
2448                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2449                },
2450            })
2451        } else {
2452            Ok(())
2453        }
2454    }
2455
2456    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2457        if role_id.is_builtin() {
2458            let role = self.get_role(role_id);
2459            Err(Error::new(ErrorKind::ReservedRoleName(
2460                role.name().to_string(),
2461            )))
2462        } else {
2463            Ok(())
2464        }
2465    }
2466
2467    pub fn ensure_not_reserved_network_policy(
2468        &self,
2469        network_policy_id: &NetworkPolicyId,
2470    ) -> Result<(), Error> {
2471        if network_policy_id.is_builtin() {
2472            let policy = self.get_network_policy(network_policy_id);
2473            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2474                policy.name.clone(),
2475            )))
2476        } else {
2477            Ok(())
2478        }
2479    }
2480
2481    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2482        let is_grantable = !role_id.is_public() && !role_id.is_system();
2483        if is_grantable {
2484            Ok(())
2485        } else {
2486            let role = self.get_role(role_id);
2487            Err(Error::new(ErrorKind::UngrantableRoleName(
2488                role.name().to_string(),
2489            )))
2490        }
2491    }
2492
2493    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2494        if role_id.is_system() {
2495            let role = self.get_role(role_id);
2496            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2497                role.name().to_string(),
2498            )))
2499        } else {
2500            Ok(())
2501        }
2502    }
2503
2504    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2505        if role_id.is_predefined() {
2506            let role = self.get_role(role_id);
2507            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2508                role.name().to_string(),
2509            )))
2510        } else {
2511            Ok(())
2512        }
2513    }
2514
2515    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2516    // passing tx, and session?
2517    pub(crate) fn add_to_audit_log(
2518        system_configuration: &SystemVars,
2519        oracle_write_ts: mz_repr::Timestamp,
2520        session: Option<&ConnMeta>,
2521        tx: &mut mz_catalog::durable::Transaction,
2522        audit_events: &mut Vec<VersionedEvent>,
2523        event_type: EventType,
2524        object_type: ObjectType,
2525        details: EventDetails,
2526    ) -> Result<(), Error> {
2527        let user = session.map(|session| session.user().name.to_string());
2528
2529        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2530
2531        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2532            Some(ts) => ts.into(),
2533            _ => oracle_write_ts.into(),
2534        };
2535        let id = tx.allocate_audit_log_id()?;
2536        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2537        audit_events.push(event.clone());
2538        tx.insert_audit_log_event(event);
2539        Ok(())
2540    }
2541
2542    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2543        match id {
2544            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2545            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2546                self.get_cluster_replica(*cluster_id, *replica_id)
2547                    .owner_id(),
2548            ),
2549            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2550            ObjectId::Schema((database_spec, schema_spec)) => Some(
2551                self.get_schema(database_spec, schema_spec, conn_id)
2552                    .owner_id(),
2553            ),
2554            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2555            ObjectId::Role(_) => None,
2556            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2557        }
2558    }
2559
2560    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2561        match object_id {
2562            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2563            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2564            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2565            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2566            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2567            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2568            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2569        }
2570    }
2571
2572    pub(super) fn get_system_object_type(
2573        &self,
2574        id: &SystemObjectId,
2575    ) -> mz_sql::catalog::SystemObjectType {
2576        match id {
2577            SystemObjectId::Object(object_id) => {
2578                SystemObjectType::Object(self.get_object_type(object_id))
2579            }
2580            SystemObjectId::System => SystemObjectType::System,
2581        }
2582    }
2583
2584    /// Returns a read-only view of the current [`StorageMetadata`].
2585    ///
2586    /// To write to this struct, you must use a catalog transaction.
2587    pub fn storage_metadata(&self) -> &StorageMetadata {
2588        &self.storage_metadata
2589    }
2590
2591    /// For the Sources ids in `ids`, return their compaction windows.
2592    pub fn source_compaction_windows(
2593        &self,
2594        ids: impl IntoIterator<Item = CatalogItemId>,
2595    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2596        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2597        let mut seen = BTreeSet::new();
2598        for item_id in ids {
2599            if !seen.insert(item_id) {
2600                continue;
2601            }
2602            let entry = self.get_entry(&item_id);
2603            match entry.item() {
2604                CatalogItem::Source(source) => {
2605                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2606                    match source.data_source {
2607                        DataSourceDesc::Ingestion { .. }
2608                        | DataSourceDesc::OldSyntaxIngestion { .. }
2609                        | DataSourceDesc::IngestionExport { .. } => {
2610                            cws.entry(source_cw).or_default().insert(item_id);
2611                        }
2612                        DataSourceDesc::Introspection(_)
2613                        | DataSourceDesc::Progress
2614                        | DataSourceDesc::Webhook { .. } => {
2615                            cws.entry(source_cw).or_default().insert(item_id);
2616                        }
2617                    }
2618                }
2619                CatalogItem::Table(table) => {
2620                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2621                    match &table.data_source {
2622                        TableDataSource::DataSource {
2623                            desc: DataSourceDesc::IngestionExport { .. },
2624                            timeline: _,
2625                        } => {
2626                            cws.entry(table_cw).or_default().insert(item_id);
2627                        }
2628                        _ => {}
2629                    }
2630                }
2631                _ => {
2632                    // Views could depend on sources, so ignore them if added by used_by above.
2633                    continue;
2634                }
2635            }
2636        }
2637        cws
2638    }
2639
2640    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2641        match id {
2642            CommentObjectId::Table(id)
2643            | CommentObjectId::View(id)
2644            | CommentObjectId::MaterializedView(id)
2645            | CommentObjectId::Source(id)
2646            | CommentObjectId::Sink(id)
2647            | CommentObjectId::Index(id)
2648            | CommentObjectId::Func(id)
2649            | CommentObjectId::Connection(id)
2650            | CommentObjectId::Type(id)
2651            | CommentObjectId::Secret(id)
2652            | CommentObjectId::ContinualTask(id) => Some(*id),
2653            CommentObjectId::Role(_)
2654            | CommentObjectId::Database(_)
2655            | CommentObjectId::Schema(_)
2656            | CommentObjectId::Cluster(_)
2657            | CommentObjectId::ClusterReplica(_)
2658            | CommentObjectId::NetworkPolicy(_) => None,
2659        }
2660    }
2661
2662    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2663        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2664    }
2665
2666    pub fn comment_id_to_audit_log_name(
2667        &self,
2668        id: CommentObjectId,
2669        conn_id: &ConnectionId,
2670    ) -> String {
2671        match id {
2672            CommentObjectId::Table(id)
2673            | CommentObjectId::View(id)
2674            | CommentObjectId::MaterializedView(id)
2675            | CommentObjectId::Source(id)
2676            | CommentObjectId::Sink(id)
2677            | CommentObjectId::Index(id)
2678            | CommentObjectId::Func(id)
2679            | CommentObjectId::Connection(id)
2680            | CommentObjectId::Type(id)
2681            | CommentObjectId::Secret(id)
2682            | CommentObjectId::ContinualTask(id) => {
2683                let item = self.get_entry(&id);
2684                let name = self.resolve_full_name(item.name(), Some(conn_id));
2685                name.to_string()
2686            }
2687            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2688            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2689            CommentObjectId::Schema((spec, schema_id)) => {
2690                let schema = self.get_schema(&spec, &schema_id, conn_id);
2691                self.resolve_full_schema_name(&schema.name).to_string()
2692            }
2693            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2694            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2695                let cluster = self.get_cluster(cluster_id);
2696                let replica = self.get_cluster_replica(cluster_id, replica_id);
2697                QualifiedReplica {
2698                    cluster: Ident::new_unchecked(cluster.name.clone()),
2699                    replica: Ident::new_unchecked(replica.name.clone()),
2700                }
2701                .to_string()
2702            }
2703            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2704        }
2705    }
2706
2707    pub fn mock_authentication_nonce(&self) -> String {
2708        self.mock_authentication_nonce.clone().unwrap_or_default()
2709    }
2710}
2711
2712impl ConnectionResolver for CatalogState {
2713    fn resolve_connection(
2714        &self,
2715        id: CatalogItemId,
2716    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2717        use mz_storage_types::connections::Connection::*;
2718        match self
2719            .get_entry(&id)
2720            .connection()
2721            .expect("catalog out of sync")
2722            .details
2723            .to_connection()
2724        {
2725            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2726            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2727            Csr(conn) => Csr(conn.into_inline_connection(self)),
2728            Ssh(conn) => Ssh(conn),
2729            Aws(conn) => Aws(conn),
2730            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2731            MySql(conn) => MySql(conn.into_inline_connection(self)),
2732            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2733            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2734        }
2735    }
2736}
2737
2738impl OptimizerCatalog for CatalogState {
2739    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2740        CatalogState::get_entry_by_global_id(self, id)
2741    }
2742    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2743        CatalogState::get_entry(self, id)
2744    }
2745    fn resolve_full_name(
2746        &self,
2747        name: &QualifiedItemName,
2748        conn_id: Option<&ConnectionId>,
2749    ) -> FullItemName {
2750        CatalogState::resolve_full_name(self, name, conn_id)
2751    }
2752    fn get_indexes_on(
2753        &self,
2754        id: GlobalId,
2755        cluster: ClusterId,
2756    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2757        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2758    }
2759}
2760
2761impl OptimizerCatalog for Catalog {
2762    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2763        self.state.get_entry_by_global_id(id)
2764    }
2765
2766    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2767        self.state.get_entry(id)
2768    }
2769
2770    fn resolve_full_name(
2771        &self,
2772        name: &QualifiedItemName,
2773        conn_id: Option<&ConnectionId>,
2774    ) -> FullItemName {
2775        self.state.resolve_full_name(name, conn_id)
2776    }
2777
2778    fn get_indexes_on(
2779        &self,
2780        id: GlobalId,
2781        cluster: ClusterId,
2782    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2783        Box::new(self.state.get_indexes_on(id, cluster))
2784    }
2785}
2786
2787impl Catalog {
2788    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2789        self
2790    }
2791}