Skip to main content

mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation, UnmanagedReplicaLocation,
40};
41use mz_controller_types::{ClusterId, ReplicaId};
42use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
43use mz_license_keys::ValidatedLicenseKey;
44use mz_ore::collections::CollectionExt;
45use mz_ore::now::NOW_ZERO;
46use mz_ore::soft_assert_no_log;
47use mz_ore::str::StrExt;
48use mz_pgrepr::oid::INVALID_OID;
49use mz_repr::adt::mz_acl_item::PrivilegeMap;
50use mz_repr::namespaces::{
51    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
52    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
53    UNSTABLE_SCHEMAS,
54};
55use mz_repr::network_policy_id::NetworkPolicyId;
56use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
57use mz_repr::role_id::RoleId;
58use mz_repr::{
59    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
60    VersionedRelationDesc,
61};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::ast::Ident;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68    TypeReference,
69};
70use mz_sql::catalog::{CatalogConfig, EnvironmentId};
71use mz_sql::names::{
72    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
73    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
74    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
75};
76use mz_sql::plan::{
77    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
78    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
79    Plan, PlanContext,
80};
81use mz_sql::rbac;
82use mz_sql::session::metadata::SessionMetadata;
83use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
84use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
85use mz_sql_parser::ast::QualifiedReplica;
86use mz_storage_client::controller::StorageMetadata;
87use mz_storage_types::connections::ConnectionContext;
88use mz_storage_types::connections::inline::{
89    ConnectionResolver, InlinedConnection, IntoInlineConnection,
90};
91use mz_transform::notice::OptimizerNotice;
92use serde::Serialize;
93use timely::progress::Antichain;
94use tokio::sync::mpsc;
95use tracing::{debug, warn};
96
97// DO NOT add any more imports from `crate` outside of `crate::catalog`.
98use crate::AdapterError;
99use crate::catalog::{Catalog, ConnCatalog};
100use crate::config::ScopedParameters;
101use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105/// The in-memory representation of the Catalog. This struct is not directly used to persist
106/// metadata to persistent storage. For persistent metadata see
107/// [`mz_catalog::durable::DurableCatalogState`].
108///
109/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
110/// storing the contents of this struct on disk.
111#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
114    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
115    // include:
116    //  - Temporary items.
117    //  - Certain objects are partially derived from read-only state.
118    pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
119    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120    pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
121    #[serde(serialize_with = "skip_temp_items")]
122    pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
125    pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
128    pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
131    pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
134    pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
135    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136    pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
137    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138    pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
139
140    #[serde(skip)]
141    pub(super) system_configuration: Arc<SystemVars>,
142    /// In-memory mirror of the durable scoped (per-cluster and per-replica)
143    /// system-parameter cache, maintained by `apply.rs` from the durable
144    /// collections. Resolution reads from here: the optimizer's per-cluster
145    /// feature overrides (`cluster_scoped_optimizer_overrides`) and the
146    /// coordinator's per-replica dyncfg push. See the scoped feature flags
147    /// design. Skipped in the consistency-check snapshot because it is fully
148    /// derived from the durable catalog.
149    #[serde(skip)]
150    pub(super) scoped_system_parameters: ScopedParameters,
151    pub(super) default_privileges: Arc<DefaultPrivileges>,
152    pub(super) system_privileges: Arc<PrivilegeMap>,
153    pub(super) comments: Arc<CommentsMap>,
154    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
155    pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
156    pub(super) storage_metadata: Arc<StorageMetadata>,
157    pub(super) mock_authentication_nonce: Option<String>,
158
159    // Mutable state not derived from the durable catalog. Populated
160    // during dataflow bootstrapping (`bootstrap_dataflow_plans`), which
161    // doesn't run in Testdrive's read-only consistency check, so this
162    // must be `#[serde(skip)]`.
163    #[serde(skip)]
164    pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
165
166    // Populated by active connections creating temporary objects. The
167    // read-only catalog opened by Testdrive's consistency check has no
168    // active connections, so this must be `#[serde(skip)]`.
169    #[serde(skip)]
170    pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
171
172    // Read-only state not derived from the durable catalog.
173    #[serde(skip)]
174    pub(super) config: mz_sql::catalog::CatalogConfig,
175    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
176    #[serde(skip)]
177    pub(crate) availability_zones: Vec<String>,
178
179    // Read-only not derived from the durable catalog.
180    #[serde(skip)]
181    pub(super) egress_addresses: Vec<IpNet>,
182    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
183    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
184    pub(super) http_host_name: Option<String>,
185
186    // Read-only not derived from the durable catalog.
187    #[serde(skip)]
188    pub(super) license_key: ValidatedLicenseKey,
189}
190
191/// Keeps track of what expressions are cached or not during startup.
192/// It's also used during catalog transactions to avoid re-optimizing CREATE VIEW / CREATE MAT VIEW
193/// statements when going back and forth between durable catalog operations and in-memory catalog
194/// operations.
195#[derive(Debug, Clone, Serialize)]
196pub(crate) enum LocalExpressionCache {
197    /// The cache is being used.
198    Open {
199        /// The local expressions that were cached in the expression cache.
200        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
201        /// The local expressions that were NOT cached in the expression cache.
202        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
203    },
204    /// The cache is not being used.
205    Closed,
206}
207
208impl LocalExpressionCache {
209    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
210        Self::Open {
211            cached_exprs,
212            uncached_exprs: BTreeMap::new(),
213        }
214    }
215
216    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
217        match self {
218            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
219            LocalExpressionCache::Closed => None,
220        }
221    }
222
223    /// Insert an expression that was cached, back into the cache. This is generally needed when
224    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
225    pub(super) fn insert_cached_expression(
226        &mut self,
227        id: GlobalId,
228        local_expressions: LocalExpressions,
229    ) {
230        match self {
231            LocalExpressionCache::Open { cached_exprs, .. } => {
232                cached_exprs.insert(id, local_expressions);
233            }
234            LocalExpressionCache::Closed => {}
235        }
236    }
237
238    /// Inform the cache that `id` was not found in the cache and that we should add it as
239    /// `local_mir` and `optimizer_features`.
240    pub(super) fn insert_uncached_expression(
241        &mut self,
242        id: GlobalId,
243        local_mir: OptimizedMirRelationExpr,
244        optimizer_features: OptimizerFeatures,
245    ) {
246        match self {
247            LocalExpressionCache::Open { uncached_exprs, .. } => {
248                let local_expr = LocalExpressions {
249                    local_mir,
250                    optimizer_features,
251                };
252                // If we are trying to cache the same item a second time, with a different
253                // expression, then we must be migrating the object or doing something else weird.
254                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
255                // version after a restart. Caching the migrated version may cause us to incorrectly
256                // think that the object has already been migrated. To simplify things, we cache
257                // neither.
258                let prev = uncached_exprs.remove(&id);
259                match prev {
260                    Some(prev) if prev == local_expr => {
261                        uncached_exprs.insert(id, local_expr);
262                    }
263                    None => {
264                        uncached_exprs.insert(id, local_expr);
265                    }
266                    Some(_) => {}
267                }
268            }
269            LocalExpressionCache::Closed => {}
270        }
271    }
272
273    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
274        match self {
275            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
276            LocalExpressionCache::Closed => BTreeMap::new(),
277        }
278    }
279}
280
281fn skip_temp_items<S>(
282    entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
283    serializer: S,
284) -> Result<S::Ok, S::Error>
285where
286    S: serde::Serializer,
287{
288    mz_ore::serde::map_key_to_string(
289        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
290        serializer,
291    )
292}
293
294impl CatalogState {
295    /// Returns an empty [`CatalogState`] that can be used in tests.
296    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
297    // tests are structured in this repository.
298    pub fn empty_test() -> Self {
299        CatalogState {
300            database_by_name: Default::default(),
301            database_by_id: Default::default(),
302            entry_by_id: Default::default(),
303            entry_by_global_id: Default::default(),
304            notices_by_dep_id: Default::default(),
305            ambient_schemas_by_name: Default::default(),
306            ambient_schemas_by_id: Default::default(),
307            temporary_schemas: Default::default(),
308            clusters_by_id: Default::default(),
309            clusters_by_name: Default::default(),
310            network_policies_by_name: Default::default(),
311            roles_by_name: Default::default(),
312            roles_by_id: Default::default(),
313            network_policies_by_id: Default::default(),
314            role_auth_by_id: Default::default(),
315            config: CatalogConfig {
316                start_time: Default::default(),
317                start_instant: Instant::now(),
318                nonce: Default::default(),
319                environment_id: EnvironmentId::for_tests(),
320                session_id: Default::default(),
321                build_info: &DUMMY_BUILD_INFO,
322                now: NOW_ZERO.clone(),
323                connection_context: ConnectionContext::for_tests(Arc::new(
324                    InMemorySecretsController::new(),
325                )),
326                helm_chart_version: None,
327            },
328            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
329            availability_zones: Default::default(),
330            system_configuration: Arc::new(SystemVars::default()),
331            scoped_system_parameters: Default::default(),
332            egress_addresses: Default::default(),
333            aws_principal_context: Default::default(),
334            aws_privatelink_availability_zones: Default::default(),
335            http_host_name: Default::default(),
336            default_privileges: Arc::new(DefaultPrivileges::default()),
337            system_privileges: Arc::new(PrivilegeMap::default()),
338            comments: Arc::new(CommentsMap::default()),
339            source_references: Default::default(),
340            storage_metadata: Arc::new(StorageMetadata::default()),
341            license_key: ValidatedLicenseKey::for_tests(),
342            mock_authentication_nonce: Default::default(),
343        }
344    }
345
346    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
347        let search_path = self.resolve_search_path(session);
348        let database = self
349            .database_by_name
350            .get(session.vars().database())
351            .map(|id| id.clone());
352        let state = match session.transaction().catalog_state() {
353            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
354            None => Cow::Borrowed(self),
355        };
356        ConnCatalog {
357            state,
358            unresolvable_ids: BTreeSet::new(),
359            conn_id: session.conn_id().clone(),
360            cluster: session.vars().cluster().into(),
361            database,
362            search_path,
363            role_id: session.current_role_id().clone(),
364            prepared_statements: Some(session.prepared_statements()),
365            portals: Some(session.portals()),
366            notices_tx: session.retain_notice_transmitter(),
367            restrict_to_user_objects: session.vars().restrict_to_user_objects(),
368        }
369    }
370
371    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
372        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
373        let cluster = self.system_configuration.default_cluster();
374
375        ConnCatalog {
376            state: Cow::Borrowed(self),
377            unresolvable_ids: BTreeSet::new(),
378            conn_id: SYSTEM_CONN_ID.clone(),
379            cluster,
380            database: self
381                .resolve_database(DEFAULT_DATABASE_NAME)
382                .ok()
383                .map(|db| db.id()),
384            // Leaving the system's search path empty allows us to catch issues
385            // where catalog object names have not been normalized correctly.
386            search_path: Vec::new(),
387            role_id,
388            prepared_statements: None,
389            portals: None,
390            notices_tx,
391            restrict_to_user_objects: false,
392        }
393    }
394
395    pub fn for_system_session(&self) -> ConnCatalog<'_> {
396        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
397    }
398
399    /// Returns an iterator over the deduplicated identifiers of all
400    /// objects this catalog entry transitively depends on (where
401    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
402    /// [`CatalogItem::references`]).
403    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
404        struct I<'a> {
405            queue: VecDeque<CatalogItemId>,
406            seen: BTreeSet<CatalogItemId>,
407            this: &'a CatalogState,
408        }
409        impl<'a> Iterator for I<'a> {
410            type Item = CatalogItemId;
411            fn next(&mut self) -> Option<Self::Item> {
412                if let Some(next) = self.queue.pop_front() {
413                    for child in self.this.get_entry(&next).item().uses() {
414                        if !self.seen.contains(&child) {
415                            self.queue.push_back(child);
416                            self.seen.insert(child);
417                        }
418                    }
419                    Some(next)
420                } else {
421                    None
422                }
423            }
424        }
425
426        I {
427            queue: [id].into_iter().collect(),
428            seen: [id].into_iter().collect(),
429            this: self,
430        }
431    }
432
433    /// Computes the IDs of any log sources this catalog entry transitively
434    /// depends on.
435    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
436        let mut out = Vec::new();
437        self.introspection_dependencies_inner(id, &mut out);
438        out
439    }
440
441    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
442        match self.get_entry(&id).item() {
443            CatalogItem::Log(_) => out.push(id),
444            item @ (CatalogItem::View(_)
445            | CatalogItem::MaterializedView(_)
446            | CatalogItem::Connection(_)) => {
447                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
448                for item_id in item.references().items() {
449                    self.introspection_dependencies_inner(*item_id, out);
450                }
451            }
452            CatalogItem::Sink(sink) => {
453                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
454                self.introspection_dependencies_inner(from_item_id, out)
455            }
456            CatalogItem::Index(idx) => {
457                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
458                self.introspection_dependencies_inner(on_item_id, out)
459            }
460            CatalogItem::Table(_)
461            | CatalogItem::Source(_)
462            | CatalogItem::Type(_)
463            | CatalogItem::Func(_)
464            | CatalogItem::Secret(_) => (),
465        }
466    }
467
468    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
469    ///
470    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
471    /// earlier in the list than the roots. This is particularly useful for the order to drop
472    /// objects.
473    pub(super) fn object_dependents(
474        &self,
475        object_ids: &Vec<ObjectId>,
476        conn_id: &ConnectionId,
477        seen: &mut BTreeSet<ObjectId>,
478    ) -> Vec<ObjectId> {
479        let mut dependents = Vec::new();
480        for object_id in object_ids {
481            match object_id {
482                ObjectId::Cluster(id) => {
483                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
484                }
485                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
486                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
487                ),
488                ObjectId::Database(id) => {
489                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
490                }
491                ObjectId::Schema((database_spec, schema_spec)) => {
492                    dependents.extend_from_slice(&self.schema_dependents(
493                        database_spec.clone(),
494                        schema_spec.clone(),
495                        conn_id,
496                        seen,
497                    ));
498                }
499                ObjectId::NetworkPolicy(id) => {
500                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
501                }
502                id @ ObjectId::Role(_) => {
503                    let unseen = seen.insert(id.clone());
504                    if unseen {
505                        dependents.push(id.clone());
506                    }
507                }
508                ObjectId::Item(id) => {
509                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
510                }
511            }
512        }
513        dependents
514    }
515
516    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
517    /// itself.
518    ///
519    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
520    /// earlier in the list than the roots. This is particularly useful for the order to drop
521    /// objects.
522    fn cluster_dependents(
523        &self,
524        cluster_id: ClusterId,
525        seen: &mut BTreeSet<ObjectId>,
526    ) -> Vec<ObjectId> {
527        let mut dependents = Vec::new();
528        let object_id = ObjectId::Cluster(cluster_id);
529        if !seen.contains(&object_id) {
530            seen.insert(object_id.clone());
531            let cluster = self.get_cluster(cluster_id);
532            for item_id in cluster.bound_objects() {
533                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
534            }
535            for replica_id in cluster.replica_ids().values() {
536                dependents.extend_from_slice(&self.cluster_replica_dependents(
537                    cluster_id,
538                    *replica_id,
539                    seen,
540                ));
541            }
542            dependents.push(object_id);
543        }
544        dependents
545    }
546
547    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
548    /// itself.
549    ///
550    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
551    /// earlier in the list than the roots. This is particularly useful for the order to drop
552    /// objects.
553    pub(super) fn cluster_replica_dependents(
554        &self,
555        cluster_id: ClusterId,
556        replica_id: ReplicaId,
557        seen: &mut BTreeSet<ObjectId>,
558    ) -> Vec<ObjectId> {
559        let mut dependents = Vec::new();
560        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
561        if !seen.contains(&object_id) {
562            seen.insert(object_id.clone());
563            // Materialized views that target this replica are implicitly
564            // dropped with it, so cascade to their dependents to avoid leaving
565            // dangling references.
566            let cluster = self.get_cluster(cluster_id);
567            for item_id in cluster.bound_objects() {
568                if let CatalogItem::MaterializedView(mv) = self.get_entry(item_id).item()
569                    && mv.target_replica == Some(replica_id)
570                {
571                    dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
572                }
573            }
574            dependents.push(object_id);
575        }
576        dependents
577    }
578
579    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
580    /// itself.
581    ///
582    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
583    /// earlier in the list than the roots. This is particularly useful for the order to drop
584    /// objects.
585    fn database_dependents(
586        &self,
587        database_id: DatabaseId,
588        conn_id: &ConnectionId,
589        seen: &mut BTreeSet<ObjectId>,
590    ) -> Vec<ObjectId> {
591        let mut dependents = Vec::new();
592        let object_id = ObjectId::Database(database_id);
593        if !seen.contains(&object_id) {
594            seen.insert(object_id.clone());
595            let database = self.get_database(&database_id);
596            for schema_id in database.schema_ids().values() {
597                dependents.extend_from_slice(&self.schema_dependents(
598                    ResolvedDatabaseSpecifier::Id(database_id),
599                    SchemaSpecifier::Id(*schema_id),
600                    conn_id,
601                    seen,
602                ));
603            }
604            dependents.push(object_id);
605        }
606        dependents
607    }
608
609    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
610    /// itself.
611    ///
612    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
613    /// earlier in the list than the roots. This is particularly useful for the order to drop
614    /// objects.
615    fn schema_dependents(
616        &self,
617        database_spec: ResolvedDatabaseSpecifier,
618        schema_spec: SchemaSpecifier,
619        conn_id: &ConnectionId,
620        seen: &mut BTreeSet<ObjectId>,
621    ) -> Vec<ObjectId> {
622        let mut dependents = Vec::new();
623        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
624        if !seen.contains(&object_id) {
625            seen.insert(object_id.clone());
626            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
627            for item_id in schema.item_ids() {
628                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
629            }
630            dependents.push(object_id)
631        }
632        dependents
633    }
634
635    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
636    /// itself.
637    ///
638    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
639    /// earlier in the list than the roots. This is particularly useful for the order to drop
640    /// objects.
641    pub(super) fn item_dependents(
642        &self,
643        item_id: CatalogItemId,
644        seen: &mut BTreeSet<ObjectId>,
645    ) -> Vec<ObjectId> {
646        let mut dependents = Vec::new();
647        let object_id = ObjectId::Item(item_id);
648        if !seen.contains(&object_id) {
649            seen.insert(object_id.clone());
650            let entry = self.get_entry(&item_id);
651            for dependent_id in entry.used_by() {
652                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
653            }
654            dependents.push(object_id);
655            // We treat the progress collection as if it depends on the source
656            // for dropping. We have additional code in planning to create a
657            // kind of special-case "CASCADE" for this dependency.
658            if let Some(progress_id) = entry.progress_id() {
659                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
660            }
661        }
662        dependents
663    }
664
665    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
666    /// itself.
667    ///
668    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
669    /// earlier in the list than the roots. This is particularly useful for the order to drop
670    /// objects.
671    pub(super) fn network_policy_dependents(
672        &self,
673        network_policy_id: NetworkPolicyId,
674        _seen: &mut BTreeSet<ObjectId>,
675    ) -> Vec<ObjectId> {
676        let object_id = ObjectId::NetworkPolicy(network_policy_id);
677        // Currently network policies have no dependents
678        // when we add the ability for users or sources/sinks to have policies
679        // this method will need to be updated.
680        vec![object_id]
681    }
682
683    /// Indicates whether the indicated item is considered stable or not.
684    ///
685    /// Only stable items can be used as dependencies of other catalog items.
686    fn is_stable(&self, id: CatalogItemId) -> bool {
687        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
688        !self.is_unstable_schema_specifier(spec)
689    }
690
691    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
692        if self.system_config().unsafe_enable_unstable_dependencies() {
693            return Ok(());
694        }
695
696        let unstable_dependencies: Vec<_> = item
697            .references()
698            .items()
699            .filter(|id| !self.is_stable(**id))
700            .map(|id| self.get_entry(id).name().item.clone())
701            .collect();
702
703        // It's okay to create a temporary object with unstable
704        // dependencies, since we will never need to reboot a catalog
705        // that contains it.
706        if unstable_dependencies.is_empty() || item.is_temporary() {
707            Ok(())
708        } else {
709            let object_type = item.typ().to_string();
710            Err(Error {
711                kind: ErrorKind::UnstableDependency {
712                    object_type,
713                    unstable_dependencies,
714                },
715            })
716        }
717    }
718
719    pub fn resolve_full_name(
720        &self,
721        name: &QualifiedItemName,
722        conn_id: Option<&ConnectionId>,
723    ) -> FullItemName {
724        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
725
726        let database = match &name.qualifiers.database_spec {
727            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
728            ResolvedDatabaseSpecifier::Id(id) => {
729                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
730            }
731        };
732        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
733        // and the schema may not exist yet if no temporary items have been created.
734        let schema = match &name.qualifiers.schema_spec {
735            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
736            SchemaSpecifier::Id(_) => self
737                .get_schema(
738                    &name.qualifiers.database_spec,
739                    &name.qualifiers.schema_spec,
740                    conn_id,
741                )
742                .name()
743                .schema
744                .clone(),
745        };
746        FullItemName {
747            database,
748            schema,
749            item: name.item.clone(),
750        }
751    }
752
753    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
754        let database = match &name.database {
755            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
756            ResolvedDatabaseSpecifier::Id(id) => {
757                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
758            }
759        };
760        FullSchemaName {
761            database,
762            schema: name.schema.clone(),
763        }
764    }
765
766    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
767        self.entry_by_id
768            .get(id)
769            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
770    }
771
772    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
773        let item_id = self
774            .entry_by_global_id
775            .get(id)
776            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
777
778        let entry = self.get_entry(item_id).clone();
779        let version = match entry.item() {
780            CatalogItem::Table(table) => {
781                let (version, _) = table
782                    .collections
783                    .iter()
784                    .find(|(_verison, gid)| *gid == id)
785                    .expect("version to exist");
786                RelationVersionSelector::Specific(*version)
787            }
788            _ => RelationVersionSelector::Latest,
789        };
790        CatalogCollectionEntry { entry, version }
791    }
792
793    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
794        self.entry_by_id.iter()
795    }
796
797    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
798        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
799        self.temporary_schemas
800            .get(conn)
801            .into_iter()
802            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
803    }
804
805    /// Returns true if a temporary schema exists for the given connection.
806    ///
807    /// Temporary schemas are created lazily when the first temporary object is created
808    /// for a connection, so this may return false for connections that haven't created
809    /// any temporary objects.
810    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
811        self.temporary_schemas.contains_key(conn)
812    }
813
814    /// Gets a type named `name` from exactly one of the system schemas.
815    ///
816    /// # Panics
817    /// - If `name` is not an entry in any system schema
818    /// - If more than one system schema has an entry named `name`.
819    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
820        let mut res = None;
821        for schema_id in self.system_schema_ids() {
822            let schema = &self.ambient_schemas_by_id[&schema_id];
823            if let Some(global_id) = schema.types.get(name) {
824                match res {
825                    None => res = Some(self.get_entry(global_id)),
826                    Some(_) => panic!(
827                        "only call get_system_type on objects uniquely identifiable in one system schema"
828                    ),
829                }
830            }
831        }
832
833        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
834    }
835
836    pub fn get_item_by_name(
837        &self,
838        name: &QualifiedItemName,
839        conn_id: &ConnectionId,
840    ) -> Option<&CatalogEntry> {
841        self.get_schema(
842            &name.qualifiers.database_spec,
843            &name.qualifiers.schema_spec,
844            conn_id,
845        )
846        .items
847        .get(&name.item)
848        .and_then(|id| self.try_get_entry(id))
849    }
850
851    pub fn get_type_by_name(
852        &self,
853        name: &QualifiedItemName,
854        conn_id: &ConnectionId,
855    ) -> Option<&CatalogEntry> {
856        self.get_schema(
857            &name.qualifiers.database_spec,
858            &name.qualifiers.schema_spec,
859            conn_id,
860        )
861        .types
862        .get(&name.item)
863        .and_then(|id| self.try_get_entry(id))
864    }
865
866    pub(super) fn find_available_name(
867        &self,
868        mut name: QualifiedItemName,
869        conn_id: &ConnectionId,
870    ) -> QualifiedItemName {
871        let mut i = 0;
872        let orig_item_name = name.item.clone();
873        while self.get_item_by_name(&name, conn_id).is_some() {
874            i += 1;
875            name.item = format!("{}{}", orig_item_name, i);
876        }
877        name
878    }
879
880    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
881        self.entry_by_id.get(id)
882    }
883
884    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
885        let item_id = self.entry_by_global_id.get(id)?;
886        self.try_get_entry(item_id)
887    }
888
889    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
890    /// object that returns rows.
891    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
892        let entry = self.try_get_entry_by_global_id(id)?;
893        let desc = match entry.item() {
894            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
895            // TODO(alter_table): Support schema evolution on sources.
896            other => other.relation_desc(RelationVersionSelector::Latest)?,
897        };
898        Some(desc)
899    }
900
901    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
902        self.try_get_cluster(cluster_id)
903            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
904    }
905
906    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
907        self.clusters_by_id.get(&cluster_id)
908    }
909
910    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
911        self.roles_by_id.get(id)
912    }
913
914    pub fn get_role(&self, id: &RoleId) -> &Role {
915        self.roles_by_id.get(id).expect("catalog out of sync")
916    }
917
918    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
919        self.roles_by_id.keys()
920    }
921
922    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
923        self.roles_by_name
924            .get(role_name)
925            .map(|id| &self.roles_by_id[id])
926    }
927
928    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
929        self.role_auth_by_id
930            .get(id)
931            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
932    }
933
934    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
935        self.role_auth_by_id.get(id)
936    }
937
938    pub(super) fn try_get_network_policy_by_name(
939        &self,
940        policy_name: &str,
941    ) -> Option<&NetworkPolicy> {
942        self.network_policies_by_name
943            .get(policy_name)
944            .map(|id| &self.network_policies_by_id[id])
945    }
946
947    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
948        let mut membership = BTreeSet::new();
949        let mut queue = VecDeque::from(vec![id]);
950        while let Some(cur_id) = queue.pop_front() {
951            if !membership.contains(cur_id) {
952                membership.insert(cur_id.clone());
953                let role = self.get_role(cur_id);
954                soft_assert_no_log!(
955                    !role.membership().keys().contains(id),
956                    "circular membership exists in the catalog"
957                );
958                queue.extend(role.membership().keys());
959            }
960        }
961        membership.insert(RoleId::Public);
962        membership
963    }
964
965    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
966        self.network_policies_by_id
967            .get(id)
968            .expect("catalog out of sync")
969    }
970
971    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
972        self.network_policies_by_id.keys()
973    }
974
975    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
976    /// source.
977    ///
978    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
979    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
980        let entry = self.try_get_entry(id)?;
981        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
982        let name = self.resolve_full_name(entry.name(), None);
983        let host_name = self
984            .http_host_name
985            .as_ref()
986            .map(|x| x.as_str())
987            .unwrap_or_else(|| "HOST");
988
989        let RawDatabaseSpecifier::Name(database) = name.database else {
990            return None;
991        };
992
993        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
994        url.path_segments_mut()
995            .ok()?
996            .push(&database)
997            .push(&name.schema)
998            .push(&name.item);
999
1000        Some(url)
1001    }
1002
1003    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1004    ///
1005    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
1006    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
1007    ///
1008    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
1009    /// their original state before the method returns.
1010    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1011        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
1012        // caller if this method changed the state of the catalog.
1013        &mut self,
1014        create_sql: &str,
1015        force_if_exists_skip: bool,
1016    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1017        self.with_enable_for_item_parsing(|state| {
1018            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1019            let pcx = Some(&pcx);
1020            let session_catalog = state.for_system_session();
1021
1022            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1023            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1024            let (plan, _sql_impl_ids) =
1025                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1026
1027            Ok((plan, resolved_ids))
1028        })
1029    }
1030
1031    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1032    #[mz_ore::instrument]
1033    pub(crate) fn parse_plan(
1034        create_sql: &str,
1035        pcx: Option<&PlanContext>,
1036        catalog: &ConnCatalog,
1037    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1038        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1039        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1040        let (plan, _sql_impl_ids) =
1041            mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1042
1043        Ok((plan, resolved_ids))
1044    }
1045
1046    /// Parses the given SQL string into a pair of [`CatalogItem`].
1047    pub(crate) fn deserialize_item(
1048        &self,
1049        global_id: GlobalId,
1050        create_sql: &str,
1051        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1052        local_expression_cache: &mut LocalExpressionCache,
1053        previous_item: Option<CatalogItem>,
1054    ) -> Result<CatalogItem, AdapterError> {
1055        self.parse_item(
1056            global_id,
1057            create_sql,
1058            extra_versions,
1059            None,
1060            false,
1061            None,
1062            local_expression_cache,
1063            previous_item,
1064        )
1065    }
1066
1067    /// Parses the given SQL string into a `CatalogItem`.
1068    #[mz_ore::instrument]
1069    pub(crate) fn parse_item(
1070        &self,
1071        global_id: GlobalId,
1072        create_sql: &str,
1073        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1074        pcx: Option<&PlanContext>,
1075        is_retained_metrics_object: bool,
1076        custom_logical_compaction_window: Option<CompactionWindow>,
1077        local_expression_cache: &mut LocalExpressionCache,
1078        previous_item: Option<CatalogItem>,
1079    ) -> Result<CatalogItem, AdapterError> {
1080        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1081        match self.parse_item_inner(
1082            global_id,
1083            create_sql,
1084            extra_versions,
1085            pcx,
1086            is_retained_metrics_object,
1087            custom_logical_compaction_window,
1088            cached_expr,
1089            previous_item,
1090        ) {
1091            Ok((item, uncached_expr)) => {
1092                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1093                    local_expression_cache.insert_uncached_expression(
1094                        global_id,
1095                        uncached_expr,
1096                        optimizer_features,
1097                    );
1098                }
1099                Ok(item)
1100            }
1101            Err((err, cached_expr)) => {
1102                if let Some(local_expr) = cached_expr {
1103                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1104                }
1105                Err(err)
1106            }
1107        }
1108    }
1109
1110    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1111    ///
1112    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1113    /// not cached.
1114    ///
1115    /// On failure returns an error and `cached_expr` so it can be used later.
1116    #[mz_ore::instrument]
1117    pub(crate) fn parse_item_inner(
1118        &self,
1119        global_id: GlobalId,
1120        create_sql: &str,
1121        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1122        pcx: Option<&PlanContext>,
1123        is_retained_metrics_object: bool,
1124        custom_logical_compaction_window: Option<CompactionWindow>,
1125        cached_expr: Option<LocalExpressions>,
1126        previous_item: Option<CatalogItem>,
1127    ) -> Result<
1128        (
1129            CatalogItem,
1130            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1131        ),
1132        (AdapterError, Option<LocalExpressions>),
1133    > {
1134        let session_catalog = self.for_system_session();
1135
1136        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1137            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1138            Err(err) => return Err((err, cached_expr)),
1139        };
1140
1141        let mut uncached_expr = None;
1142
1143        // Carry over the plans (`optimized_plan`, `physical_plan`,
1144        // `dataflow_metainfo`) from the previous incarnation of this item when
1145        // re-parsing an existing item (e.g. after a RENAME). These fields live
1146        // on the `CatalogItem` since #35834, but are not reconstructable from
1147        // `create_sql` alone — they are populated by the sequencer `_finish`
1148        // paths at create time, and by the expression-cache / bootstrap
1149        // rendering path on boot. If we don't preserve them here, a RENAME
1150        // silently drops the plans and dataflow metainfo for the affected
1151        // MV/Index/CT.
1152        let previous_plans = previous_item.as_ref().map(|item| {
1153            (
1154                item.optimized_plan().cloned(),
1155                item.physical_plan().cloned(),
1156                item.dataflow_metainfo().cloned(),
1157            )
1158        });
1159
1160        let mut item = match plan {
1161            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1162                let collections = extra_versions
1163                    .iter()
1164                    .map(|(version, gid)| (*version, *gid))
1165                    .chain([(RelationVersion::root(), global_id)].into_iter())
1166                    .collect();
1167
1168                CatalogItem::Table(Table {
1169                    create_sql: Some(table.create_sql),
1170                    desc: table.desc,
1171                    collections,
1172                    conn_id: None,
1173                    resolved_ids,
1174                    custom_logical_compaction_window: custom_logical_compaction_window
1175                        .or(table.compaction_window),
1176                    is_retained_metrics_object,
1177                    data_source: match table.data_source {
1178                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1179                            TableDataSource::TableWrites { defaults }
1180                        }
1181                        mz_sql::plan::TableDataSource::DataSource {
1182                            desc: data_source_desc,
1183                            timeline,
1184                        } => match data_source_desc {
1185                            mz_sql::plan::DataSourceDesc::IngestionExport {
1186                                ingestion_id,
1187                                external_reference,
1188                                details,
1189                                data_config,
1190                            } => TableDataSource::DataSource {
1191                                desc: DataSourceDesc::IngestionExport {
1192                                    ingestion_id,
1193                                    external_reference,
1194                                    details,
1195                                    data_config,
1196                                },
1197                                timeline,
1198                            },
1199                            mz_sql::plan::DataSourceDesc::Webhook {
1200                                validate_using,
1201                                body_format,
1202                                headers,
1203                                cluster_id,
1204                            } => TableDataSource::DataSource {
1205                                desc: DataSourceDesc::Webhook {
1206                                    validate_using,
1207                                    body_format,
1208                                    headers,
1209                                    cluster_id: cluster_id
1210                                        .expect("Webhook Tables must have a cluster_id set"),
1211                                },
1212                                timeline,
1213                            },
1214                            _ => {
1215                                return Err((
1216                                    AdapterError::Unstructured(anyhow::anyhow!(
1217                                        "unsupported data source for table"
1218                                    )),
1219                                    cached_expr,
1220                                ));
1221                            }
1222                        },
1223                    },
1224                })
1225            }
1226            Plan::CreateSource(CreateSourcePlan {
1227                source,
1228                timeline,
1229                in_cluster,
1230                ..
1231            }) => CatalogItem::Source(Source {
1232                create_sql: Some(source.create_sql),
1233                data_source: match source.data_source {
1234                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1235                        desc,
1236                        cluster_id: match in_cluster {
1237                            Some(id) => id,
1238                            None => {
1239                                return Err((
1240                                    AdapterError::Unstructured(anyhow::anyhow!(
1241                                        "ingestion-based sources must have cluster specified"
1242                                    )),
1243                                    cached_expr,
1244                                ));
1245                            }
1246                        },
1247                    },
1248                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1249                        desc,
1250                        progress_subsource,
1251                        data_config,
1252                        details,
1253                    } => DataSourceDesc::OldSyntaxIngestion {
1254                        desc,
1255                        progress_subsource,
1256                        data_config,
1257                        details,
1258                        cluster_id: match in_cluster {
1259                            Some(id) => id,
1260                            None => {
1261                                return Err((
1262                                    AdapterError::Unstructured(anyhow::anyhow!(
1263                                        "ingestion-based sources must have cluster specified"
1264                                    )),
1265                                    cached_expr,
1266                                ));
1267                            }
1268                        },
1269                    },
1270                    mz_sql::plan::DataSourceDesc::IngestionExport {
1271                        ingestion_id,
1272                        external_reference,
1273                        details,
1274                        data_config,
1275                    } => DataSourceDesc::IngestionExport {
1276                        ingestion_id,
1277                        external_reference,
1278                        details,
1279                        data_config,
1280                    },
1281                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1282                    mz_sql::plan::DataSourceDesc::Webhook {
1283                        validate_using,
1284                        body_format,
1285                        headers,
1286                        cluster_id,
1287                    } => {
1288                        mz_ore::soft_assert_or_log!(
1289                            cluster_id.is_none(),
1290                            "cluster_id set at Source level for Webhooks"
1291                        );
1292                        DataSourceDesc::Webhook {
1293                            validate_using,
1294                            body_format,
1295                            headers,
1296                            cluster_id: in_cluster
1297                                .expect("webhook sources must use an existing cluster"),
1298                        }
1299                    }
1300                },
1301                desc: source.desc,
1302                global_id,
1303                timeline,
1304                resolved_ids,
1305                custom_logical_compaction_window: source
1306                    .compaction_window
1307                    .or(custom_logical_compaction_window),
1308                is_retained_metrics_object,
1309            }),
1310            Plan::CreateView(CreateViewPlan { view, .. }) => {
1311                // Collect optimizer parameters.
1312                let optimizer_config =
1313                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1314                let previous_exprs = previous_item.map(|item| match item {
1315                    CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1316                    _ => None,
1317                });
1318
1319                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1320                    (Some(local_expr), _)
1321                        if local_expr.optimizer_features == optimizer_config.features =>
1322                    {
1323                        debug!("local expression cache hit for {global_id:?}");
1324                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1325                    }
1326                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1327                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1328                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1329                    }
1330                    (cached_expr, _) => {
1331                        let optimizer_features = optimizer_config.features.clone();
1332                        // Build an optimizer for this VIEW.
1333                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1334
1335                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1336                        let raw_expr = view.expr;
1337                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1338                            Ok(optimzed_expr) => optimzed_expr,
1339                            Err(err) => return Err((err.into(), cached_expr)),
1340                        };
1341
1342                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1343
1344                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1345                    }
1346                };
1347
1348                // Resolve all item dependencies from the HIR expression.
1349                let dependencies: BTreeSet<_> = raw_expr
1350                    .depends_on()
1351                    .into_iter()
1352                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1353                    .collect();
1354
1355                let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1356                CatalogItem::View(View {
1357                    create_sql: view.create_sql,
1358                    global_id,
1359                    raw_expr,
1360                    desc: RelationDesc::new(typ, view.column_names),
1361                    locally_optimized_expr: optimized_expr,
1362                    conn_id: None,
1363                    resolved_ids,
1364                    dependencies: DependencyIds(dependencies),
1365                })
1366            }
1367            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1368                materialized_view, ..
1369            }) => {
1370                let collections = extra_versions
1371                    .iter()
1372                    .map(|(version, gid)| (*version, *gid))
1373                    .chain([(RelationVersion::root(), global_id)].into_iter())
1374                    .collect();
1375
1376                // Collect optimizer parameters.
1377                let system_vars = session_catalog.system_vars();
1378                let overrides = self
1379                    .get_cluster(materialized_view.cluster_id)
1380                    .config
1381                    .features();
1382                let optimizer_config =
1383                    optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1384                let previous_exprs = previous_item.map(|item| match item {
1385                    CatalogItem::MaterializedView(materialized_view) => (
1386                        materialized_view.raw_expr,
1387                        materialized_view.locally_optimized_expr,
1388                    ),
1389                    item => unreachable!("expected materialized view, found: {item:#?}"),
1390                });
1391
1392                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1393                    (Some(local_expr), _)
1394                        if local_expr.optimizer_features == optimizer_config.features =>
1395                    {
1396                        debug!("local expression cache hit for {global_id:?}");
1397                        (
1398                            Arc::new(materialized_view.expr),
1399                            Arc::new(local_expr.local_mir),
1400                        )
1401                    }
1402                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1403                    (_, Some((raw_expr, optimized_expr)))
1404                        if *raw_expr == materialized_view.expr =>
1405                    {
1406                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1407                    }
1408                    (cached_expr, _) => {
1409                        let optimizer_features = optimizer_config.features.clone();
1410                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1411                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1412
1413                        let raw_expr = materialized_view.expr;
1414                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1415                            Ok(optimized_expr) => optimized_expr,
1416                            Err(err) => return Err((err.into(), cached_expr)),
1417                        };
1418
1419                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1420
1421                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1422                    }
1423                };
1424                let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1425
1426                for &i in &materialized_view.non_null_assertions {
1427                    typ.column_types[i].nullable = false;
1428                }
1429                let desc = RelationDesc::new(typ, materialized_view.column_names);
1430                let desc = VersionedRelationDesc::new(desc);
1431
1432                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1433
1434                // Resolve all item dependencies from the HIR expression.
1435                let dependencies = raw_expr
1436                    .depends_on()
1437                    .into_iter()
1438                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1439                    .collect();
1440
1441                CatalogItem::MaterializedView(MaterializedView {
1442                    create_sql: materialized_view.create_sql,
1443                    collections,
1444                    raw_expr,
1445                    locally_optimized_expr: optimized_expr,
1446                    desc,
1447                    resolved_ids,
1448                    dependencies,
1449                    replacement_target: materialized_view.replacement_target,
1450                    cluster_id: materialized_view.cluster_id,
1451                    target_replica: materialized_view.target_replica,
1452                    non_null_assertions: materialized_view.non_null_assertions,
1453                    custom_logical_compaction_window: materialized_view.compaction_window,
1454                    refresh_schedule: materialized_view.refresh_schedule,
1455                    initial_as_of,
1456                    optimized_plan: None,
1457                    physical_plan: None,
1458                    dataflow_metainfo: None,
1459                })
1460            }
1461            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1462                create_sql: index.create_sql,
1463                global_id,
1464                on: index.on,
1465                keys: index.keys.into(),
1466                conn_id: None,
1467                resolved_ids,
1468                cluster_id: index.cluster_id,
1469                custom_logical_compaction_window: custom_logical_compaction_window
1470                    .or(index.compaction_window),
1471                is_retained_metrics_object,
1472                optimized_plan: None,
1473                physical_plan: None,
1474                dataflow_metainfo: None,
1475            }),
1476            Plan::CreateSink(CreateSinkPlan {
1477                sink,
1478                with_snapshot,
1479                in_cluster,
1480                ..
1481            }) => CatalogItem::Sink(Sink {
1482                create_sql: sink.create_sql,
1483                global_id,
1484                from: sink.from,
1485                connection: sink.connection,
1486                envelope: sink.envelope,
1487                version: sink.version,
1488                with_snapshot,
1489                resolved_ids,
1490                cluster_id: in_cluster,
1491                commit_interval: sink.commit_interval,
1492            }),
1493            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1494                // Even if we don't need the `RelationDesc` here, error out
1495                // early and eagerly, as a kind of soft assertion that we _can_
1496                // build the `RelationDesc` when needed.
1497                if let Err(err) = typ.inner.desc(&session_catalog) {
1498                    return Err((err.into(), cached_expr));
1499                }
1500                CatalogItem::Type(Type {
1501                    create_sql: Some(typ.create_sql),
1502                    global_id,
1503                    details: CatalogTypeDetails {
1504                        array_id: None,
1505                        typ: typ.inner,
1506                        pg_metadata: None,
1507                    },
1508                    resolved_ids,
1509                })
1510            }
1511            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1512                create_sql: secret.create_sql,
1513                global_id,
1514            }),
1515            Plan::CreateConnection(CreateConnectionPlan {
1516                connection:
1517                    mz_sql::plan::Connection {
1518                        create_sql,
1519                        details,
1520                    },
1521                ..
1522            }) => CatalogItem::Connection(Connection {
1523                create_sql,
1524                global_id,
1525                details,
1526                resolved_ids,
1527            }),
1528            _ => {
1529                return Err((
1530                    Error::new(ErrorKind::Corruption {
1531                        detail: "catalog entry generated inappropriate plan".to_string(),
1532                    })
1533                    .into(),
1534                    cached_expr,
1535                ));
1536            }
1537        };
1538
1539        // Carry over the plans (`optimized_plan`, `physical_plan`,
1540        // `dataflow_metainfo`) from the previous incarnation of this item, if
1541        // any. See the comment on `previous_plans` above.
1542        if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1543            if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1544            {
1545                *optimized_plan = prev_optimized;
1546                *physical_plan = prev_physical;
1547                *dataflow_metainfo = prev_metainfo;
1548            }
1549        }
1550
1551        Ok((item, uncached_expr))
1552    }
1553
1554    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1555    /// Calling this method will not permanently modify any system configuration variables.
1556    ///
1557    /// WARNING:
1558    /// Any modifications made to the system configuration variables in `f`, will be lost.
1559    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1560        // Enable catalog features that might be required during planning existing
1561        // catalog items. Existing catalog items might have been created while
1562        // a specific feature flag was turned on, so we need to ensure that this
1563        // is also the case during catalog rehydration in order to avoid panics.
1564        //
1565        // WARNING / CONTRACT:
1566        // 1. Features used in this method that related to parsing / planning
1567        //    should be `enable_for_item_parsing` set to `true`.
1568        // 2. After this step, feature flag configuration must not be
1569        //    overridden.
1570        let restore = Arc::clone(&self.system_configuration);
1571        Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1572        let res = f(self);
1573        self.system_configuration = restore;
1574        res
1575    }
1576
1577    /// Returns all indexes on the given object and cluster known in the catalog.
1578    pub fn get_indexes_on(
1579        &self,
1580        id: GlobalId,
1581        cluster: ClusterId,
1582    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1583        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1584
1585        self.try_get_entry_by_global_id(&id)
1586            .into_iter()
1587            .map(move |e| {
1588                e.used_by()
1589                    .iter()
1590                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1591                        CatalogItem::Index(index) if index_matches(index) => {
1592                            Some((index.global_id(), index))
1593                        }
1594                        _ => None,
1595                    })
1596            })
1597            .flatten()
1598    }
1599
1600    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1601        &self.database_by_id[database_id]
1602    }
1603
1604    /// Gets a reference to the specified replica of the specified cluster.
1605    ///
1606    /// Returns `None` if either the cluster or the replica does not
1607    /// exist.
1608    pub(super) fn try_get_cluster_replica(
1609        &self,
1610        id: ClusterId,
1611        replica_id: ReplicaId,
1612    ) -> Option<&ClusterReplica> {
1613        self.try_get_cluster(id)
1614            .and_then(|cluster| cluster.replica(replica_id))
1615    }
1616
1617    /// Gets a reference to the specified replica of the specified cluster.
1618    ///
1619    /// Panics if either the cluster or the replica does not exist.
1620    pub(crate) fn get_cluster_replica(
1621        &self,
1622        cluster_id: ClusterId,
1623        replica_id: ReplicaId,
1624    ) -> &ClusterReplica {
1625        self.try_get_cluster_replica(cluster_id, replica_id)
1626            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1627    }
1628
1629    pub(super) fn resolve_replica_in_cluster(
1630        &self,
1631        cluster_id: &ClusterId,
1632        replica_name: &str,
1633    ) -> Result<&ClusterReplica, SqlCatalogError> {
1634        let cluster = self.get_cluster(*cluster_id);
1635        let replica_id = cluster
1636            .replica_id_by_name_
1637            .get(replica_name)
1638            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1639        Ok(&cluster.replicas_by_id_[replica_id])
1640    }
1641
1642    /// Get system configuration `name`.
1643    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1644        Ok(self.system_configuration.get(name)?)
1645    }
1646
1647    /// Parse system configuration `name` with `value` int.
1648    ///
1649    /// Returns the parsed value as a string.
1650    pub(super) fn parse_system_configuration(
1651        &self,
1652        name: &str,
1653        value: VarInput,
1654    ) -> Result<String, Error> {
1655        let value = self.system_configuration.parse(name, value)?;
1656        Ok(value.format())
1657    }
1658
1659    /// Gets the schema map for the database matching `database_spec`.
1660    pub(super) fn resolve_schema_in_database(
1661        &self,
1662        database_spec: &ResolvedDatabaseSpecifier,
1663        schema_name: &str,
1664        conn_id: &ConnectionId,
1665    ) -> Result<&Schema, SqlCatalogError> {
1666        let schema = match database_spec {
1667            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1668                self.temporary_schemas.get(conn_id)
1669            }
1670            ResolvedDatabaseSpecifier::Ambient => self
1671                .ambient_schemas_by_name
1672                .get(schema_name)
1673                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1674            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1675                db.schemas_by_name
1676                    .get(schema_name)
1677                    .and_then(|id| db.schemas_by_id.get(id))
1678            }),
1679        };
1680        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1681    }
1682
1683    /// Try to get a schema, returning `None` if it doesn't exist.
1684    ///
1685    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1686    /// (temporary schemas are created lazily when the first temporary object is created).
1687    pub fn try_get_schema(
1688        &self,
1689        database_spec: &ResolvedDatabaseSpecifier,
1690        schema_spec: &SchemaSpecifier,
1691        conn_id: &ConnectionId,
1692    ) -> Option<&Schema> {
1693        // Keep in sync with `get_schema` and `get_schemas_mut`
1694        match (database_spec, schema_spec) {
1695            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1696                self.temporary_schemas.get(conn_id)
1697            }
1698            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1699                self.ambient_schemas_by_id.get(id)
1700            }
1701            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1702                .database_by_id
1703                .get(database_id)
1704                .and_then(|db| db.schemas_by_id.get(schema_id)),
1705            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1706                unreachable!("temporary schemas are in the ambient database")
1707            }
1708        }
1709    }
1710
1711    pub fn get_schema(
1712        &self,
1713        database_spec: &ResolvedDatabaseSpecifier,
1714        schema_spec: &SchemaSpecifier,
1715        conn_id: &ConnectionId,
1716    ) -> &Schema {
1717        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1718        self.try_get_schema(database_spec, schema_spec, conn_id)
1719            .expect("schema must exist")
1720    }
1721
1722    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1723        self.database_by_id
1724            .values()
1725            .filter_map(|database| database.schemas_by_id.get(schema_id))
1726            .chain(self.ambient_schemas_by_id.values())
1727            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1728            .into_first()
1729    }
1730
1731    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1732        self.temporary_schemas
1733            .values()
1734            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1735            .into_first()
1736    }
1737
1738    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1739        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1740    }
1741
1742    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1743        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1744    }
1745
1746    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1747        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1748    }
1749
1750    pub fn get_information_schema_id(&self) -> SchemaId {
1751        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1752    }
1753
1754    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1755        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1756    }
1757
1758    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1759        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1760    }
1761
1762    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1763        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1764    }
1765
1766    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1767        SYSTEM_SCHEMAS
1768            .iter()
1769            .map(|name| self.ambient_schemas_by_name[*name])
1770    }
1771
1772    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1773        self.system_schema_ids().contains(&id)
1774    }
1775
1776    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1777        match spec {
1778            SchemaSpecifier::Temporary => false,
1779            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1780        }
1781    }
1782
1783    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1784        UNSTABLE_SCHEMAS
1785            .iter()
1786            .map(|name| self.ambient_schemas_by_name[*name])
1787    }
1788
1789    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1790        self.unstable_schema_ids().contains(&id)
1791    }
1792
1793    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1794        match spec {
1795            SchemaSpecifier::Temporary => false,
1796            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1797        }
1798    }
1799
1800    /// Creates a new schema in the `Catalog` for temporary items
1801    /// indicated by the TEMPORARY or TEMP keywords.
1802    pub fn create_temporary_schema(
1803        &mut self,
1804        conn_id: &ConnectionId,
1805        owner_id: RoleId,
1806    ) -> Result<(), Error> {
1807        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1808        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1809        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1810        // all -1.
1811        let oid = INVALID_OID;
1812        self.temporary_schemas.insert(
1813            conn_id.clone(),
1814            Schema {
1815                name: QualifiedSchemaName {
1816                    database: ResolvedDatabaseSpecifier::Ambient,
1817                    schema: MZ_TEMP_SCHEMA.into(),
1818                },
1819                id: SchemaSpecifier::Temporary,
1820                oid,
1821                items: BTreeMap::new(),
1822                functions: BTreeMap::new(),
1823                types: BTreeMap::new(),
1824                owner_id,
1825                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1826                    mz_sql::catalog::ObjectType::Schema,
1827                    owner_id,
1828                )]),
1829            },
1830        );
1831        Ok(())
1832    }
1833
1834    /// Return all OIDs that are allocated to temporary objects.
1835    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1836        std::iter::empty()
1837            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1838                if schema.id.is_temporary() {
1839                    Some(schema.oid)
1840                } else {
1841                    None
1842                }
1843            }))
1844            .chain(self.entry_by_id.values().filter_map(|entry| {
1845                if entry.item().is_temporary() {
1846                    Some(entry.oid)
1847                } else {
1848                    None
1849                }
1850            }))
1851    }
1852
1853    /// Optimized lookup for a builtin table.
1854    ///
1855    /// Panics if the builtin table doesn't exist in the catalog.
1856    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1857        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1858    }
1859
1860    /// Optimized lookup for a builtin log.
1861    ///
1862    /// Panics if the builtin log doesn't exist in the catalog.
1863    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1864        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1865        let log = match self.get_entry(&item_id).item() {
1866            CatalogItem::Log(log) => log,
1867            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1868        };
1869        (item_id, log.global_id)
1870    }
1871
1872    /// Optimized lookup for a builtin storage collection.
1873    ///
1874    /// Panics if the builtin storage collection doesn't exist in the catalog.
1875    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1876        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1877    }
1878
1879    /// Optimized lookup for a builtin object.
1880    ///
1881    /// Panics if the builtin object doesn't exist in the catalog.
1882    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1883        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1884        let schema = &self.ambient_schemas_by_id[schema_id];
1885        match builtin.catalog_item_type() {
1886            CatalogItemType::Type => schema.types[builtin.name()],
1887            CatalogItemType::Func => schema.functions[builtin.name()],
1888            CatalogItemType::Table
1889            | CatalogItemType::Source
1890            | CatalogItemType::Sink
1891            | CatalogItemType::View
1892            | CatalogItemType::MaterializedView
1893            | CatalogItemType::Index
1894            | CatalogItemType::Secret
1895            | CatalogItemType::Connection => schema.items[builtin.name()],
1896        }
1897    }
1898
1899    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1900    pub fn resolve_builtin_type_references(
1901        &self,
1902        builtin: &BuiltinType<NameReference>,
1903    ) -> BuiltinType<IdReference> {
1904        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1905            CatalogType::AclItem => CatalogType::AclItem,
1906            CatalogType::Array { element_reference } => CatalogType::Array {
1907                element_reference: self.get_system_type(element_reference).id,
1908            },
1909            CatalogType::List {
1910                element_reference,
1911                element_modifiers,
1912            } => CatalogType::List {
1913                element_reference: self.get_system_type(element_reference).id,
1914                element_modifiers: element_modifiers.clone(),
1915            },
1916            CatalogType::Map {
1917                key_reference,
1918                value_reference,
1919                key_modifiers,
1920                value_modifiers,
1921            } => CatalogType::Map {
1922                key_reference: self.get_system_type(key_reference).id,
1923                value_reference: self.get_system_type(value_reference).id,
1924                key_modifiers: key_modifiers.clone(),
1925                value_modifiers: value_modifiers.clone(),
1926            },
1927            CatalogType::Range { element_reference } => CatalogType::Range {
1928                element_reference: self.get_system_type(element_reference).id,
1929            },
1930            CatalogType::Record { fields } => CatalogType::Record {
1931                fields: fields
1932                    .into_iter()
1933                    .map(|f| CatalogRecordField {
1934                        name: f.name.clone(),
1935                        type_reference: self.get_system_type(f.type_reference).id,
1936                        type_modifiers: f.type_modifiers.clone(),
1937                    })
1938                    .collect(),
1939            },
1940            CatalogType::Bool => CatalogType::Bool,
1941            CatalogType::Bytes => CatalogType::Bytes,
1942            CatalogType::Char => CatalogType::Char,
1943            CatalogType::Date => CatalogType::Date,
1944            CatalogType::Float32 => CatalogType::Float32,
1945            CatalogType::Float64 => CatalogType::Float64,
1946            CatalogType::Int16 => CatalogType::Int16,
1947            CatalogType::Int32 => CatalogType::Int32,
1948            CatalogType::Int64 => CatalogType::Int64,
1949            CatalogType::UInt16 => CatalogType::UInt16,
1950            CatalogType::UInt32 => CatalogType::UInt32,
1951            CatalogType::UInt64 => CatalogType::UInt64,
1952            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1953            CatalogType::Interval => CatalogType::Interval,
1954            CatalogType::Jsonb => CatalogType::Jsonb,
1955            CatalogType::Numeric => CatalogType::Numeric,
1956            CatalogType::Oid => CatalogType::Oid,
1957            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1958            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1959            CatalogType::Pseudo => CatalogType::Pseudo,
1960            CatalogType::RegClass => CatalogType::RegClass,
1961            CatalogType::RegProc => CatalogType::RegProc,
1962            CatalogType::RegType => CatalogType::RegType,
1963            CatalogType::String => CatalogType::String,
1964            CatalogType::Time => CatalogType::Time,
1965            CatalogType::Timestamp => CatalogType::Timestamp,
1966            CatalogType::TimestampTz => CatalogType::TimestampTz,
1967            CatalogType::Uuid => CatalogType::Uuid,
1968            CatalogType::VarChar => CatalogType::VarChar,
1969            CatalogType::Int2Vector => CatalogType::Int2Vector,
1970            CatalogType::MzAclItem => CatalogType::MzAclItem,
1971        };
1972
1973        BuiltinType {
1974            name: builtin.name,
1975            schema: builtin.schema,
1976            oid: builtin.oid,
1977            details: CatalogTypeDetails {
1978                array_id: builtin.details.array_id,
1979                typ,
1980                pg_metadata: builtin.details.pg_metadata.clone(),
1981            },
1982        }
1983    }
1984
1985    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1986        &self.config
1987    }
1988
1989    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1990        match self.database_by_name.get(database_name) {
1991            Some(id) => Ok(&self.database_by_id[id]),
1992            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1993        }
1994    }
1995
1996    pub fn resolve_schema(
1997        &self,
1998        current_database: Option<&DatabaseId>,
1999        database_name: Option<&str>,
2000        schema_name: &str,
2001        conn_id: &ConnectionId,
2002    ) -> Result<&Schema, SqlCatalogError> {
2003        let database_spec = match database_name {
2004            // If a database is explicitly specified, validate it. Note that we
2005            // intentionally do not validate `current_database` to permit
2006            // querying `mz_catalog` with an invalid session database, e.g., so
2007            // that you can run `SHOW DATABASES` to *find* a valid database.
2008            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
2009                self.resolve_database(database)?.id().clone(),
2010            )),
2011            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2012        };
2013
2014        // First try to find the schema in the named database.
2015        if let Some(database_spec) = database_spec {
2016            if let Ok(schema) =
2017                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2018            {
2019                return Ok(schema);
2020            }
2021        }
2022
2023        // Then fall back to the ambient database.
2024        if let Ok(schema) = self.resolve_schema_in_database(
2025            &ResolvedDatabaseSpecifier::Ambient,
2026            schema_name,
2027            conn_id,
2028        ) {
2029            return Ok(schema);
2030        }
2031
2032        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2033    }
2034
2035    /// Optimized lookup for a system schema.
2036    ///
2037    /// Panics if the system schema doesn't exist in the catalog.
2038    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2039        self.ambient_schemas_by_name[name]
2040    }
2041
2042    pub fn resolve_search_path(
2043        &self,
2044        session: &dyn SessionMetadata,
2045    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2046        let database = self
2047            .database_by_name
2048            .get(session.database())
2049            .map(|id| id.clone());
2050
2051        session
2052            .search_path()
2053            .iter()
2054            .map(|schema| {
2055                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2056            })
2057            .filter_map(|schema| schema.ok())
2058            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2059            .collect()
2060    }
2061
2062    pub fn effective_search_path(
2063        &self,
2064        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2065        include_temp_schema: bool,
2066    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2067        let mut v = Vec::with_capacity(search_path.len() + 3);
2068        // Temp schema is only included for relations and data types, not for functions and operators
2069        let temp_schema = (
2070            ResolvedDatabaseSpecifier::Ambient,
2071            SchemaSpecifier::Temporary,
2072        );
2073        if include_temp_schema && !search_path.contains(&temp_schema) {
2074            v.push(temp_schema);
2075        }
2076        let default_schemas = [
2077            (
2078                ResolvedDatabaseSpecifier::Ambient,
2079                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2080            ),
2081            (
2082                ResolvedDatabaseSpecifier::Ambient,
2083                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2084            ),
2085        ];
2086        for schema in default_schemas.into_iter() {
2087            if !search_path.contains(&schema) {
2088                v.push(schema);
2089            }
2090        }
2091        v.extend_from_slice(search_path);
2092        v
2093    }
2094
2095    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2096        let id = self
2097            .clusters_by_name
2098            .get(name)
2099            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2100        Ok(&self.clusters_by_id[id])
2101    }
2102
2103    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2104        let id = self
2105            .clusters_by_name
2106            .get(cluster.name)
2107            .expect("failed to lookup BuiltinCluster by name");
2108        self.clusters_by_id
2109            .get(id)
2110            .expect("failed to lookup BuiltinCluster by ID")
2111    }
2112
2113    pub fn resolve_cluster_replica(
2114        &self,
2115        cluster_replica_name: &QualifiedReplica,
2116    ) -> Result<&ClusterReplica, SqlCatalogError> {
2117        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2118        let replica_name = cluster_replica_name.replica.as_str();
2119        let replica_id = cluster
2120            .replica_id(replica_name)
2121            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2122        Ok(cluster.replica(replica_id).expect("Must exist"))
2123    }
2124
2125    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2126    ///
2127    /// If `name` does not specify a database, the `current_database` is used.
2128    /// If `name` does not specify a schema, then the schemas in `search_path`
2129    /// are searched in order.
2130    #[allow(clippy::useless_let_if_seq)]
2131    pub fn resolve(
2132        &self,
2133        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2134        current_database: Option<&DatabaseId>,
2135        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2136        name: &PartialItemName,
2137        conn_id: &ConnectionId,
2138        err_gen: fn(String) -> SqlCatalogError,
2139    ) -> Result<&CatalogEntry, SqlCatalogError> {
2140        // If a schema name was specified, just try to find the item in that
2141        // schema. If no schema was specified, try to find the item in the connection's
2142        // temporary schema. If the item is not found, try to find the item in every
2143        // schema in the search path.
2144        let schemas = match &name.schema {
2145            Some(schema_name) => {
2146                match self.resolve_schema(
2147                    current_database,
2148                    name.database.as_deref(),
2149                    schema_name,
2150                    conn_id,
2151                ) {
2152                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2153                    Err(e) => return Err(e),
2154                }
2155            }
2156            None => match self
2157                .try_get_schema(
2158                    &ResolvedDatabaseSpecifier::Ambient,
2159                    &SchemaSpecifier::Temporary,
2160                    conn_id,
2161                )
2162                .and_then(|schema| schema.items.get(&name.item))
2163            {
2164                Some(id) => return Ok(self.get_entry(id)),
2165                None => search_path.to_vec(),
2166            },
2167        };
2168
2169        for (database_spec, schema_spec) in &schemas {
2170            // Use try_get_schema because the temp schema might not exist yet
2171            // (it's created lazily when the first temp object is created).
2172            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2173                continue;
2174            };
2175
2176            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2177                return Ok(&self.entry_by_id[id]);
2178            }
2179        }
2180
2181        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2182        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2183        // automatically let uses of the old schema resolve to the new ones as well.
2184        // TODO(database-issues#8173) remove this after sufficient time has passed
2185        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2186        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2187            for schema_id in [
2188                self.get_mz_catalog_unstable_schema_id(),
2189                self.get_mz_introspection_schema_id(),
2190            ] {
2191                let schema = self.get_schema(
2192                    &ResolvedDatabaseSpecifier::Ambient,
2193                    &SchemaSpecifier::Id(schema_id),
2194                    conn_id,
2195                );
2196
2197                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2198                    debug!(
2199                        github_27831 = true,
2200                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2201                    );
2202                    return Ok(&self.entry_by_id[id]);
2203                }
2204            }
2205        }
2206
2207        Err(err_gen(name.to_string()))
2208    }
2209
2210    /// Resolves `name` to a non-function [`CatalogEntry`].
2211    pub fn resolve_entry(
2212        &self,
2213        current_database: Option<&DatabaseId>,
2214        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2215        name: &PartialItemName,
2216        conn_id: &ConnectionId,
2217    ) -> Result<&CatalogEntry, SqlCatalogError> {
2218        self.resolve(
2219            |schema| &schema.items,
2220            current_database,
2221            search_path,
2222            name,
2223            conn_id,
2224            SqlCatalogError::UnknownItem,
2225        )
2226    }
2227
2228    /// Resolves `name` to a function [`CatalogEntry`].
2229    pub fn resolve_function(
2230        &self,
2231        current_database: Option<&DatabaseId>,
2232        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2233        name: &PartialItemName,
2234        conn_id: &ConnectionId,
2235    ) -> Result<&CatalogEntry, SqlCatalogError> {
2236        self.resolve(
2237            |schema| &schema.functions,
2238            current_database,
2239            search_path,
2240            name,
2241            conn_id,
2242            |name| SqlCatalogError::UnknownFunction {
2243                name,
2244                alternative: None,
2245            },
2246        )
2247    }
2248
2249    /// Resolves `name` to a type [`CatalogEntry`].
2250    pub fn resolve_type(
2251        &self,
2252        current_database: Option<&DatabaseId>,
2253        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2254        name: &PartialItemName,
2255        conn_id: &ConnectionId,
2256    ) -> Result<&CatalogEntry, SqlCatalogError> {
2257        static NON_PG_CATALOG_TYPES: LazyLock<
2258            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2259        > = LazyLock::new(|| {
2260            BUILTINS::types()
2261                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2262                .map(|typ| (typ.name, typ))
2263                .collect()
2264        });
2265
2266        let entry = self.resolve(
2267            |schema| &schema.types,
2268            current_database,
2269            search_path,
2270            name,
2271            conn_id,
2272            |name| SqlCatalogError::UnknownType { name },
2273        )?;
2274
2275        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2276            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2277                warn!(
2278                    "user specified an incorrect schema of {} for the type {}, which should be in \
2279                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2280                    PG_CATALOG_SCHEMA.quoted(),
2281                    typ.name.quoted(),
2282                    typ.schema.quoted(),
2283                )
2284            }
2285        }
2286
2287        Ok(entry)
2288    }
2289
2290    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2291    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2292        match object_id {
2293            ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2294            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2295            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2296            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2297            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2298            ObjectId::ClusterReplica(cluster_replica_id) => {
2299                CommentObjectId::ClusterReplica(cluster_replica_id)
2300            }
2301            ObjectId::NetworkPolicy(network_policy_id) => {
2302                CommentObjectId::NetworkPolicy(network_policy_id)
2303            }
2304        }
2305    }
2306
2307    /// Return current system configuration.
2308    pub fn system_config(&self) -> &SystemVars {
2309        &self.system_configuration
2310    }
2311
2312    /// Returns the cluster-coherent scoped optimizer-feature overrides for
2313    /// `cluster_id` from the in-memory scoped-parameter working copy, or empty
2314    /// if the cluster has none.
2315    pub fn cluster_scoped_optimizer_overrides(
2316        &self,
2317        cluster_id: ClusterId,
2318    ) -> OptimizerFeatureOverrides {
2319        self.scoped_system_parameters
2320            .cluster
2321            .get(&cluster_id)
2322            .cloned()
2323            .map(OptimizerFeatureOverrides::from)
2324            .unwrap_or_default()
2325    }
2326
2327    /// Returns the entire scoped system-parameter working copy, read by the
2328    /// coordinator's scoped-parameter reconcile path.
2329    pub fn scoped_system_parameters(&self) -> &ScopedParameters {
2330        &self.scoped_system_parameters
2331    }
2332
2333    /// Return a mutable reference to the current system configuration.
2334    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2335        Arc::make_mut(&mut self.system_configuration)
2336    }
2337
2338    /// Serializes the catalog's in-memory state.
2339    ///
2340    /// There are no guarantees about the format of the serialized state, except
2341    /// that the serialized state for two identical catalogs will compare
2342    /// identically.
2343    ///
2344    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2345    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2346    /// argument.
2347    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2348        // Dump the base catalog.
2349        let mut dump = serde_json::to_value(&self).map_err(|e| {
2350            Error::new(ErrorKind::Unstructured(format!(
2351                // Don't panic here because we don't have compile-time failures for maps with
2352                // non-string keys.
2353                "internal error: could not dump catalog: {}",
2354                e
2355            )))
2356        })?;
2357
2358        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2359        // Stitch in system parameter defaults.
2360        dump_obj.insert(
2361            "system_parameter_defaults".into(),
2362            serde_json::json!(self.system_config().defaults()),
2363        );
2364        // Potentially overwrite unfinalized shards.
2365        if let Some(unfinalized_shards) = unfinalized_shards {
2366            dump_obj
2367                .get_mut("storage_metadata")
2368                .expect("known to exist")
2369                .as_object_mut()
2370                .expect("storage_metadata is an object")
2371                .insert(
2372                    "unfinalized_shards".into(),
2373                    serde_json::json!(unfinalized_shards),
2374                );
2375        }
2376        // Remove GlobalIds for temporary objects from the mapping.
2377        //
2378        // Post-test consistency checks with the durable catalog don't know about temporary items
2379        // since they're kept entirely in memory.
2380        let temporary_gids: Vec<_> = self
2381            .entry_by_global_id
2382            .iter()
2383            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2384            .map(|(gid, _item_id)| *gid)
2385            .collect();
2386        if !temporary_gids.is_empty() {
2387            let gids = dump_obj
2388                .get_mut("entry_by_global_id")
2389                .expect("known_to_exist")
2390                .as_object_mut()
2391                .expect("entry_by_global_id is an object");
2392            for gid in temporary_gids {
2393                gids.remove(&gid.to_string());
2394            }
2395        }
2396        // We exclude role_auth_by_id because it contains password information
2397        // which should not be included in the dump.
2398        dump_obj.remove("role_auth_by_id");
2399
2400        // Emit as pretty-printed JSON.
2401        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2402    }
2403
2404    pub fn availability_zones(&self) -> &[String] {
2405        &self.availability_zones
2406    }
2407
2408    pub fn concretize_replica_location(
2409        &self,
2410        location: mz_catalog::durable::ReplicaLocation,
2411        allowed_sizes: &Vec<String>,
2412        allowed_availability_zones: Option<&[String]>,
2413        allow_disabled: bool,
2414    ) -> Result<ReplicaLocation, Error> {
2415        let location = match location {
2416            mz_catalog::durable::ReplicaLocation::Unmanaged {
2417                storagectl_addrs,
2418                computectl_addrs,
2419            } => {
2420                if allowed_availability_zones.is_some() {
2421                    return Err(Error {
2422                        kind: ErrorKind::Internal(
2423                            "tried concretize unmanaged replica with specific availability_zones"
2424                                .to_string(),
2425                        ),
2426                    });
2427                }
2428                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2429                    storagectl_addrs,
2430                    computectl_addrs,
2431                })
2432            }
2433            mz_catalog::durable::ReplicaLocation::Managed {
2434                size,
2435                // The AZ list the replica was provisioned under: provisioning
2436                // paths pass the cluster's pool as `allowed_availability_zones`
2437                // to stamp it, while rebuilds from durable state pass `None` to
2438                // keep it. For an unmanaged cluster's replica it is the single
2439                // user-pinned AZ.
2440                availability_zones,
2441                billed_as,
2442                internal,
2443                pending,
2444            } => {
2445                self.ensure_valid_replica_size(allowed_sizes, &size, allow_disabled)?;
2446                let cluster_replica_sizes = &self.cluster_replica_sizes;
2447
2448                ReplicaLocation::Managed(ManagedReplicaLocation {
2449                    allocation: cluster_replica_sizes
2450                        .0
2451                        .get(&size)
2452                        .expect("catalog out of sync")
2453                        .clone(),
2454                    availability_zones: match allowed_availability_zones {
2455                        Some(azs) => azs.to_vec(),
2456                        None => availability_zones,
2457                    },
2458                    size,
2459                    billed_as,
2460                    internal,
2461                    pending,
2462                })
2463            }
2464        };
2465        Ok(location)
2466    }
2467
2468    pub(crate) fn ensure_valid_replica_size(
2469        &self,
2470        allowed_sizes: &[String],
2471        size: &String,
2472        allow_disabled: bool,
2473    ) -> Result<(), Error> {
2474        let cluster_replica_sizes = &self.cluster_replica_sizes;
2475
2476        if !cluster_replica_sizes.0.contains_key(size)
2477            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2478            || (!allow_disabled && cluster_replica_sizes.0[size].disabled)
2479        {
2480            let mut entries = cluster_replica_sizes
2481                .enabled_allocations()
2482                .collect::<Vec<_>>();
2483
2484            if !allowed_sizes.is_empty() {
2485                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2486                entries.retain(|(name, _)| allowed_sizes.contains(name));
2487            }
2488
2489            entries.sort_by_key(
2490                |(
2491                    _name,
2492                    ReplicaAllocation {
2493                        scale, cpu_limit, ..
2494                    },
2495                )| (scale, cpu_limit),
2496            );
2497
2498            Err(Error {
2499                kind: ErrorKind::InvalidClusterReplicaSize {
2500                    size: size.to_owned(),
2501                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2502                },
2503            })
2504        } else {
2505            Ok(())
2506        }
2507    }
2508
2509    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2510        if role_id.is_builtin() {
2511            let role = self.get_role(role_id);
2512            Err(Error::new(ErrorKind::ReservedRoleName(
2513                role.name().to_string(),
2514            )))
2515        } else {
2516            Ok(())
2517        }
2518    }
2519
2520    pub fn ensure_not_reserved_network_policy(
2521        &self,
2522        network_policy_id: &NetworkPolicyId,
2523    ) -> Result<(), Error> {
2524        if network_policy_id.is_builtin() {
2525            let policy = self.get_network_policy(network_policy_id);
2526            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2527                policy.name.clone(),
2528            )))
2529        } else {
2530            Ok(())
2531        }
2532    }
2533
2534    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2535        let is_grantable = !role_id.is_public() && !role_id.is_system();
2536        if is_grantable {
2537            Ok(())
2538        } else {
2539            let role = self.get_role(role_id);
2540            Err(Error::new(ErrorKind::UngrantableRoleName(
2541                role.name().to_string(),
2542            )))
2543        }
2544    }
2545
2546    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2547        if role_id.is_system() {
2548            let role = self.get_role(role_id);
2549            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2550                role.name().to_string(),
2551            )))
2552        } else {
2553            Ok(())
2554        }
2555    }
2556
2557    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2558        if role_id.is_predefined() {
2559            let role = self.get_role(role_id);
2560            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2561                role.name().to_string(),
2562            )))
2563        } else {
2564            Ok(())
2565        }
2566    }
2567
2568    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2569    // passing tx, and session?
2570    pub(crate) fn add_to_audit_log(
2571        system_configuration: &SystemVars,
2572        oracle_write_ts: mz_repr::Timestamp,
2573        session: Option<&ConnMeta>,
2574        tx: &mut mz_catalog::durable::Transaction,
2575        audit_events: &mut Vec<VersionedEvent>,
2576        event_type: EventType,
2577        object_type: ObjectType,
2578        details: EventDetails,
2579    ) -> Result<(), Error> {
2580        let user = session.map(|session| session.user().name.to_string());
2581
2582        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2583
2584        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2585            Some(ts) => ts.into(),
2586            _ => oracle_write_ts.into(),
2587        };
2588        let id = tx.allocate_audit_log_id()?;
2589        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2590        audit_events.push(event.clone());
2591        tx.insert_audit_log_event(event);
2592        Ok(())
2593    }
2594
2595    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2596        match id {
2597            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2598            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2599                self.get_cluster_replica(*cluster_id, *replica_id)
2600                    .owner_id(),
2601            ),
2602            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2603            ObjectId::Schema((database_spec, schema_spec)) => Some(
2604                self.get_schema(database_spec, schema_spec, conn_id)
2605                    .owner_id(),
2606            ),
2607            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2608            ObjectId::Role(_) => None,
2609            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2610        }
2611    }
2612
2613    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2614        match object_id {
2615            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2616            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2617            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2618            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2619            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2620            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2621            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2622        }
2623    }
2624
2625    pub(super) fn get_system_object_type(
2626        &self,
2627        id: &SystemObjectId,
2628    ) -> mz_sql::catalog::SystemObjectType {
2629        match id {
2630            SystemObjectId::Object(object_id) => {
2631                SystemObjectType::Object(self.get_object_type(object_id))
2632            }
2633            SystemObjectId::System => SystemObjectType::System,
2634        }
2635    }
2636
2637    /// Returns a read-only view of the current [`StorageMetadata`].
2638    ///
2639    /// To write to this struct, you must use a catalog transaction.
2640    pub fn storage_metadata(&self) -> &StorageMetadata {
2641        &self.storage_metadata
2642    }
2643
2644    /// For the Sources ids in `ids`, return their compaction windows.
2645    pub fn source_compaction_windows(
2646        &self,
2647        ids: impl IntoIterator<Item = CatalogItemId>,
2648    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2649        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2650        let mut seen = BTreeSet::new();
2651        for item_id in ids {
2652            if !seen.insert(item_id) {
2653                continue;
2654            }
2655            let entry = self.get_entry(&item_id);
2656            match entry.item() {
2657                CatalogItem::Source(source) => {
2658                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2659                    cws.entry(source_cw).or_default().insert(item_id);
2660                }
2661                CatalogItem::Table(table) => {
2662                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2663                    match &table.data_source {
2664                        TableDataSource::DataSource {
2665                            desc:
2666                                DataSourceDesc::IngestionExport { .. }
2667                                // Also match webhook tables (source-to-table migration).
2668                                | DataSourceDesc::Webhook { .. },
2669                            timeline: _,
2670                        } => {
2671                            cws.entry(table_cw).or_default().insert(item_id);
2672                        }
2673                        // Regular tables handle compaction directly in
2674                        // catalog_implications, not through this function.
2675                        TableDataSource::TableWrites { .. } => {}
2676                        TableDataSource::DataSource {
2677                            desc:
2678                                DataSourceDesc::Ingestion { .. }
2679                                | DataSourceDesc::OldSyntaxIngestion { .. }
2680                                | DataSourceDesc::Introspection(_)
2681                                | DataSourceDesc::Progress
2682                                | DataSourceDesc::Catalog,
2683                            ..
2684                        } => {
2685                            unreachable!(
2686                                "unexpected DataSourceDesc for table {item_id}: {:?}",
2687                                table.data_source
2688                            )
2689                        }
2690                    }
2691                }
2692                _ => {
2693                    // Views could depend on sources, so ignore them if added by used_by above.
2694                    continue;
2695                }
2696            }
2697        }
2698        cws
2699    }
2700
2701    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2702        match id {
2703            CommentObjectId::Table(id)
2704            | CommentObjectId::View(id)
2705            | CommentObjectId::MaterializedView(id)
2706            | CommentObjectId::Source(id)
2707            | CommentObjectId::Sink(id)
2708            | CommentObjectId::Index(id)
2709            | CommentObjectId::Func(id)
2710            | CommentObjectId::Connection(id)
2711            | CommentObjectId::Type(id)
2712            | CommentObjectId::Secret(id) => Some(*id),
2713            CommentObjectId::Role(_)
2714            | CommentObjectId::Database(_)
2715            | CommentObjectId::Schema(_)
2716            | CommentObjectId::Cluster(_)
2717            | CommentObjectId::ClusterReplica(_)
2718            | CommentObjectId::NetworkPolicy(_) => None,
2719        }
2720    }
2721
2722    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2723        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2724    }
2725
2726    pub fn comment_id_to_audit_log_name(
2727        &self,
2728        id: CommentObjectId,
2729        conn_id: &ConnectionId,
2730    ) -> String {
2731        match id {
2732            CommentObjectId::Table(id)
2733            | CommentObjectId::View(id)
2734            | CommentObjectId::MaterializedView(id)
2735            | CommentObjectId::Source(id)
2736            | CommentObjectId::Sink(id)
2737            | CommentObjectId::Index(id)
2738            | CommentObjectId::Func(id)
2739            | CommentObjectId::Connection(id)
2740            | CommentObjectId::Type(id)
2741            | CommentObjectId::Secret(id) => {
2742                let item = self.get_entry(&id);
2743                let name = self.resolve_full_name(item.name(), Some(conn_id));
2744                name.to_string()
2745            }
2746            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2747            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2748            CommentObjectId::Schema((spec, schema_id)) => {
2749                let schema = self.get_schema(&spec, &schema_id, conn_id);
2750                self.resolve_full_schema_name(&schema.name).to_string()
2751            }
2752            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2753            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2754                let cluster = self.get_cluster(cluster_id);
2755                let replica = self.get_cluster_replica(cluster_id, replica_id);
2756                QualifiedReplica {
2757                    cluster: Ident::new_unchecked(cluster.name.clone()),
2758                    replica: Ident::new_unchecked(replica.name.clone()),
2759                }
2760                .to_string()
2761            }
2762            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2763        }
2764    }
2765
2766    pub fn mock_authentication_nonce(&self) -> String {
2767        self.mock_authentication_nonce.clone().unwrap_or_default()
2768    }
2769}
2770
2771impl ConnectionResolver for CatalogState {
2772    fn resolve_connection(
2773        &self,
2774        id: CatalogItemId,
2775    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2776        use mz_storage_types::connections::Connection::*;
2777        match self
2778            .get_entry(&id)
2779            .connection()
2780            .expect("catalog out of sync")
2781            .details
2782            .to_connection()
2783        {
2784            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2785            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2786            Csr(conn) => Csr(conn.into_inline_connection(self)),
2787            GlueSchemaRegistry(conn) => GlueSchemaRegistry(conn.into_inline_connection(self)),
2788            Ssh(conn) => Ssh(conn),
2789            Aws(conn) => Aws(conn),
2790            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2791            Gcp(conn) => Gcp(conn),
2792            MySql(conn) => MySql(conn.into_inline_connection(self)),
2793            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2794            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2795        }
2796    }
2797}
2798
2799impl OptimizerCatalog for CatalogState {
2800    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2801        CatalogState::get_entry_by_global_id(self, id)
2802    }
2803    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2804        CatalogState::get_entry(self, id)
2805    }
2806    fn resolve_full_name(
2807        &self,
2808        name: &QualifiedItemName,
2809        conn_id: Option<&ConnectionId>,
2810    ) -> FullItemName {
2811        CatalogState::resolve_full_name(self, name, conn_id)
2812    }
2813    fn get_indexes_on(
2814        &self,
2815        id: GlobalId,
2816        cluster: ClusterId,
2817    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2818        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2819    }
2820}
2821
2822impl OptimizerCatalog for Catalog {
2823    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2824        self.state.get_entry_by_global_id(id)
2825    }
2826
2827    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2828        self.state.get_entry(id)
2829    }
2830
2831    fn resolve_full_name(
2832        &self,
2833        name: &QualifiedItemName,
2834        conn_id: Option<&ConnectionId>,
2835    ) -> FullItemName {
2836        self.state.resolve_full_name(name, conn_id)
2837    }
2838
2839    fn get_indexes_on(
2840        &self,
2841        id: GlobalId,
2842        cluster: ClusterId,
2843    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2844        Box::new(self.state.get_indexes_on(id, cluster))
2845    }
2846}
2847
2848impl Catalog {
2849    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2850        self
2851    }
2852}