Skip to main content

mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62    VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{
67    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
68    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
69    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
70    TypeReference,
71};
72use mz_sql::catalog::{CatalogConfig, EnvironmentId};
73use mz_sql::names::{
74    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81    Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91    ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use mz_transform::notice::OptimizerNotice;
94use serde::Serialize;
95use timely::progress::Antichain;
96use tokio::sync::mpsc;
97use tracing::{debug, warn};
98
99// DO NOT add any more imports from `crate` outside of `crate::catalog`.
100use crate::AdapterError;
101use crate::catalog::{Catalog, ConnCatalog};
102use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
103use crate::optimize::{self, Optimize, OptimizerCatalog};
104use crate::session::Session;
105
106/// The in-memory representation of the Catalog. This struct is not directly used to persist
107/// metadata to persistent storage. For persistent metadata see
108/// [`mz_catalog::durable::DurableCatalogState`].
109///
110/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
111/// storing the contents of this struct on disk.
112#[derive(Debug, Clone, Serialize)]
113pub struct CatalogState {
114    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
115    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
116    // include:
117    //  - Temporary items.
118    //  - Certain objects are partially derived from read-only state.
119    pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
120    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121    pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
122    #[serde(serialize_with = "skip_temp_items")]
123    pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
124    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125    pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
126    pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
127    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128    pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
129    pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
130    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131    pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
132    pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
133    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
134    pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
135    pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
136    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
137    pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
138    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
139    pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
140
141    #[serde(skip)]
142    pub(super) system_configuration: Arc<SystemVars>,
143    pub(super) default_privileges: Arc<DefaultPrivileges>,
144    pub(super) system_privileges: Arc<PrivilegeMap>,
145    pub(super) comments: Arc<CommentsMap>,
146    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
147    pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
148    pub(super) storage_metadata: Arc<StorageMetadata>,
149    pub(super) mock_authentication_nonce: Option<String>,
150
151    // Mutable state not derived from the durable catalog. Populated
152    // during dataflow bootstrapping (`bootstrap_dataflow_plans`), which
153    // doesn't run in Testdrive's read-only consistency check, so this
154    // must be `#[serde(skip)]`.
155    #[serde(skip)]
156    pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
157
158    // Populated by active connections creating temporary objects. The
159    // read-only catalog opened by Testdrive's consistency check has no
160    // active connections, so this must be `#[serde(skip)]`.
161    #[serde(skip)]
162    pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
163
164    // Read-only state not derived from the durable catalog.
165    #[serde(skip)]
166    pub(super) config: mz_sql::catalog::CatalogConfig,
167    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
168    #[serde(skip)]
169    pub(crate) availability_zones: Vec<String>,
170
171    // Read-only not derived from the durable catalog.
172    #[serde(skip)]
173    pub(super) egress_addresses: Vec<IpNet>,
174    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
175    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
176    pub(super) http_host_name: Option<String>,
177
178    // Read-only not derived from the durable catalog.
179    #[serde(skip)]
180    pub(super) license_key: ValidatedLicenseKey,
181}
182
183/// Keeps track of what expressions are cached or not during startup.
184/// It's also used during catalog transactions to avoid re-optimizing CREATE VIEW / CREATE MAT VIEW
185/// statements when going back and forth between durable catalog operations and in-memory catalog
186/// operations.
187#[derive(Debug, Clone, Serialize)]
188pub(crate) enum LocalExpressionCache {
189    /// The cache is being used.
190    Open {
191        /// The local expressions that were cached in the expression cache.
192        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
193        /// The local expressions that were NOT cached in the expression cache.
194        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
195    },
196    /// The cache is not being used.
197    Closed,
198}
199
200impl LocalExpressionCache {
201    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
202        Self::Open {
203            cached_exprs,
204            uncached_exprs: BTreeMap::new(),
205        }
206    }
207
208    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
209        match self {
210            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
211            LocalExpressionCache::Closed => None,
212        }
213    }
214
215    /// Insert an expression that was cached, back into the cache. This is generally needed when
216    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
217    pub(super) fn insert_cached_expression(
218        &mut self,
219        id: GlobalId,
220        local_expressions: LocalExpressions,
221    ) {
222        match self {
223            LocalExpressionCache::Open { cached_exprs, .. } => {
224                cached_exprs.insert(id, local_expressions);
225            }
226            LocalExpressionCache::Closed => {}
227        }
228    }
229
230    /// Inform the cache that `id` was not found in the cache and that we should add it as
231    /// `local_mir` and `optimizer_features`.
232    pub(super) fn insert_uncached_expression(
233        &mut self,
234        id: GlobalId,
235        local_mir: OptimizedMirRelationExpr,
236        optimizer_features: OptimizerFeatures,
237    ) {
238        match self {
239            LocalExpressionCache::Open { uncached_exprs, .. } => {
240                let local_expr = LocalExpressions {
241                    local_mir,
242                    optimizer_features,
243                };
244                // If we are trying to cache the same item a second time, with a different
245                // expression, then we must be migrating the object or doing something else weird.
246                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
247                // version after a restart. Caching the migrated version may cause us to incorrectly
248                // think that the object has already been migrated. To simplify things, we cache
249                // neither.
250                let prev = uncached_exprs.remove(&id);
251                match prev {
252                    Some(prev) if prev == local_expr => {
253                        uncached_exprs.insert(id, local_expr);
254                    }
255                    None => {
256                        uncached_exprs.insert(id, local_expr);
257                    }
258                    Some(_) => {}
259                }
260            }
261            LocalExpressionCache::Closed => {}
262        }
263    }
264
265    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
266        match self {
267            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
268            LocalExpressionCache::Closed => BTreeMap::new(),
269        }
270    }
271}
272
273fn skip_temp_items<S>(
274    entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
275    serializer: S,
276) -> Result<S::Ok, S::Error>
277where
278    S: serde::Serializer,
279{
280    mz_ore::serde::map_key_to_string(
281        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
282        serializer,
283    )
284}
285
286impl CatalogState {
287    /// Returns an empty [`CatalogState`] that can be used in tests.
288    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
289    // tests are structured in this repository.
290    pub fn empty_test() -> Self {
291        CatalogState {
292            database_by_name: Default::default(),
293            database_by_id: Default::default(),
294            entry_by_id: Default::default(),
295            entry_by_global_id: Default::default(),
296            notices_by_dep_id: Default::default(),
297            ambient_schemas_by_name: Default::default(),
298            ambient_schemas_by_id: Default::default(),
299            temporary_schemas: Default::default(),
300            clusters_by_id: Default::default(),
301            clusters_by_name: Default::default(),
302            network_policies_by_name: Default::default(),
303            roles_by_name: Default::default(),
304            roles_by_id: Default::default(),
305            network_policies_by_id: Default::default(),
306            role_auth_by_id: Default::default(),
307            config: CatalogConfig {
308                start_time: Default::default(),
309                start_instant: Instant::now(),
310                nonce: Default::default(),
311                environment_id: EnvironmentId::for_tests(),
312                session_id: Default::default(),
313                build_info: &DUMMY_BUILD_INFO,
314                now: NOW_ZERO.clone(),
315                connection_context: ConnectionContext::for_tests(Arc::new(
316                    InMemorySecretsController::new(),
317                )),
318                helm_chart_version: None,
319            },
320            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
321            availability_zones: Default::default(),
322            system_configuration: Arc::new(SystemVars::default()),
323            egress_addresses: Default::default(),
324            aws_principal_context: Default::default(),
325            aws_privatelink_availability_zones: Default::default(),
326            http_host_name: Default::default(),
327            default_privileges: Arc::new(DefaultPrivileges::default()),
328            system_privileges: Arc::new(PrivilegeMap::default()),
329            comments: Arc::new(CommentsMap::default()),
330            source_references: Default::default(),
331            storage_metadata: Arc::new(StorageMetadata::default()),
332            license_key: ValidatedLicenseKey::for_tests(),
333            mock_authentication_nonce: Default::default(),
334        }
335    }
336
337    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
338        let search_path = self.resolve_search_path(session);
339        let database = self
340            .database_by_name
341            .get(session.vars().database())
342            .map(|id| id.clone());
343        let state = match session.transaction().catalog_state() {
344            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
345            None => Cow::Borrowed(self),
346        };
347        ConnCatalog {
348            state,
349            unresolvable_ids: BTreeSet::new(),
350            conn_id: session.conn_id().clone(),
351            cluster: session.vars().cluster().into(),
352            database,
353            search_path,
354            role_id: session.current_role_id().clone(),
355            prepared_statements: Some(session.prepared_statements()),
356            portals: Some(session.portals()),
357            notices_tx: session.retain_notice_transmitter(),
358        }
359    }
360
361    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
362        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
363        let cluster = self.system_configuration.default_cluster();
364
365        ConnCatalog {
366            state: Cow::Borrowed(self),
367            unresolvable_ids: BTreeSet::new(),
368            conn_id: SYSTEM_CONN_ID.clone(),
369            cluster,
370            database: self
371                .resolve_database(DEFAULT_DATABASE_NAME)
372                .ok()
373                .map(|db| db.id()),
374            // Leaving the system's search path empty allows us to catch issues
375            // where catalog object names have not been normalized correctly.
376            search_path: Vec::new(),
377            role_id,
378            prepared_statements: None,
379            portals: None,
380            notices_tx,
381        }
382    }
383
384    pub fn for_system_session(&self) -> ConnCatalog<'_> {
385        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
386    }
387
388    /// Returns an iterator over the deduplicated identifiers of all
389    /// objects this catalog entry transitively depends on (where
390    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
391    /// [`CatalogItem::references`]).
392    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
393        struct I<'a> {
394            queue: VecDeque<CatalogItemId>,
395            seen: BTreeSet<CatalogItemId>,
396            this: &'a CatalogState,
397        }
398        impl<'a> Iterator for I<'a> {
399            type Item = CatalogItemId;
400            fn next(&mut self) -> Option<Self::Item> {
401                if let Some(next) = self.queue.pop_front() {
402                    for child in self.this.get_entry(&next).item().uses() {
403                        if !self.seen.contains(&child) {
404                            self.queue.push_back(child);
405                            self.seen.insert(child);
406                        }
407                    }
408                    Some(next)
409                } else {
410                    None
411                }
412            }
413        }
414
415        I {
416            queue: [id].into_iter().collect(),
417            seen: [id].into_iter().collect(),
418            this: self,
419        }
420    }
421
422    /// Computes the IDs of any log sources this catalog entry transitively
423    /// depends on.
424    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
425        let mut out = Vec::new();
426        self.introspection_dependencies_inner(id, &mut out);
427        out
428    }
429
430    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
431        match self.get_entry(&id).item() {
432            CatalogItem::Log(_) => out.push(id),
433            item @ (CatalogItem::View(_)
434            | CatalogItem::MaterializedView(_)
435            | CatalogItem::Connection(_)) => {
436                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
437                for item_id in item.references().items() {
438                    self.introspection_dependencies_inner(*item_id, out);
439                }
440            }
441            CatalogItem::Sink(sink) => {
442                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
443                self.introspection_dependencies_inner(from_item_id, out)
444            }
445            CatalogItem::Index(idx) => {
446                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
447                self.introspection_dependencies_inner(on_item_id, out)
448            }
449            CatalogItem::Table(_)
450            | CatalogItem::Source(_)
451            | CatalogItem::Type(_)
452            | CatalogItem::Func(_)
453            | CatalogItem::Secret(_) => (),
454        }
455    }
456
457    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
458    ///
459    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
460    /// earlier in the list than the roots. This is particularly useful for the order to drop
461    /// objects.
462    pub(super) fn object_dependents(
463        &self,
464        object_ids: &Vec<ObjectId>,
465        conn_id: &ConnectionId,
466        seen: &mut BTreeSet<ObjectId>,
467    ) -> Vec<ObjectId> {
468        let mut dependents = Vec::new();
469        for object_id in object_ids {
470            match object_id {
471                ObjectId::Cluster(id) => {
472                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
473                }
474                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
475                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
476                ),
477                ObjectId::Database(id) => {
478                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
479                }
480                ObjectId::Schema((database_spec, schema_spec)) => {
481                    dependents.extend_from_slice(&self.schema_dependents(
482                        database_spec.clone(),
483                        schema_spec.clone(),
484                        conn_id,
485                        seen,
486                    ));
487                }
488                ObjectId::NetworkPolicy(id) => {
489                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
490                }
491                id @ ObjectId::Role(_) => {
492                    let unseen = seen.insert(id.clone());
493                    if unseen {
494                        dependents.push(id.clone());
495                    }
496                }
497                ObjectId::Item(id) => {
498                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
499                }
500            }
501        }
502        dependents
503    }
504
505    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
506    /// itself.
507    ///
508    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
509    /// earlier in the list than the roots. This is particularly useful for the order to drop
510    /// objects.
511    fn cluster_dependents(
512        &self,
513        cluster_id: ClusterId,
514        seen: &mut BTreeSet<ObjectId>,
515    ) -> Vec<ObjectId> {
516        let mut dependents = Vec::new();
517        let object_id = ObjectId::Cluster(cluster_id);
518        if !seen.contains(&object_id) {
519            seen.insert(object_id.clone());
520            let cluster = self.get_cluster(cluster_id);
521            for item_id in cluster.bound_objects() {
522                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
523            }
524            for replica_id in cluster.replica_ids().values() {
525                dependents.extend_from_slice(&self.cluster_replica_dependents(
526                    cluster_id,
527                    *replica_id,
528                    seen,
529                ));
530            }
531            dependents.push(object_id);
532        }
533        dependents
534    }
535
536    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
537    /// itself.
538    ///
539    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
540    /// earlier in the list than the roots. This is particularly useful for the order to drop
541    /// objects.
542    pub(super) fn cluster_replica_dependents(
543        &self,
544        cluster_id: ClusterId,
545        replica_id: ReplicaId,
546        seen: &mut BTreeSet<ObjectId>,
547    ) -> Vec<ObjectId> {
548        let mut dependents = Vec::new();
549        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
550        if !seen.contains(&object_id) {
551            seen.insert(object_id.clone());
552            // Materialized views that target this replica are implicitly
553            // dropped with it, so cascade to their dependents to avoid leaving
554            // dangling references.
555            let cluster = self.get_cluster(cluster_id);
556            for item_id in cluster.bound_objects() {
557                if let CatalogItem::MaterializedView(mv) = self.get_entry(item_id).item()
558                    && mv.target_replica == Some(replica_id)
559                {
560                    dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
561                }
562            }
563            dependents.push(object_id);
564        }
565        dependents
566    }
567
568    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
569    /// itself.
570    ///
571    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
572    /// earlier in the list than the roots. This is particularly useful for the order to drop
573    /// objects.
574    fn database_dependents(
575        &self,
576        database_id: DatabaseId,
577        conn_id: &ConnectionId,
578        seen: &mut BTreeSet<ObjectId>,
579    ) -> Vec<ObjectId> {
580        let mut dependents = Vec::new();
581        let object_id = ObjectId::Database(database_id);
582        if !seen.contains(&object_id) {
583            seen.insert(object_id.clone());
584            let database = self.get_database(&database_id);
585            for schema_id in database.schema_ids().values() {
586                dependents.extend_from_slice(&self.schema_dependents(
587                    ResolvedDatabaseSpecifier::Id(database_id),
588                    SchemaSpecifier::Id(*schema_id),
589                    conn_id,
590                    seen,
591                ));
592            }
593            dependents.push(object_id);
594        }
595        dependents
596    }
597
598    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
599    /// itself.
600    ///
601    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
602    /// earlier in the list than the roots. This is particularly useful for the order to drop
603    /// objects.
604    fn schema_dependents(
605        &self,
606        database_spec: ResolvedDatabaseSpecifier,
607        schema_spec: SchemaSpecifier,
608        conn_id: &ConnectionId,
609        seen: &mut BTreeSet<ObjectId>,
610    ) -> Vec<ObjectId> {
611        let mut dependents = Vec::new();
612        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
613        if !seen.contains(&object_id) {
614            seen.insert(object_id.clone());
615            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
616            for item_id in schema.item_ids() {
617                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
618            }
619            dependents.push(object_id)
620        }
621        dependents
622    }
623
624    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
625    /// itself.
626    ///
627    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
628    /// earlier in the list than the roots. This is particularly useful for the order to drop
629    /// objects.
630    pub(super) fn item_dependents(
631        &self,
632        item_id: CatalogItemId,
633        seen: &mut BTreeSet<ObjectId>,
634    ) -> Vec<ObjectId> {
635        let mut dependents = Vec::new();
636        let object_id = ObjectId::Item(item_id);
637        if !seen.contains(&object_id) {
638            seen.insert(object_id.clone());
639            let entry = self.get_entry(&item_id);
640            for dependent_id in entry.used_by() {
641                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
642            }
643            dependents.push(object_id);
644            // We treat the progress collection as if it depends on the source
645            // for dropping. We have additional code in planning to create a
646            // kind of special-case "CASCADE" for this dependency.
647            if let Some(progress_id) = entry.progress_id() {
648                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
649            }
650        }
651        dependents
652    }
653
654    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
655    /// itself.
656    ///
657    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
658    /// earlier in the list than the roots. This is particularly useful for the order to drop
659    /// objects.
660    pub(super) fn network_policy_dependents(
661        &self,
662        network_policy_id: NetworkPolicyId,
663        _seen: &mut BTreeSet<ObjectId>,
664    ) -> Vec<ObjectId> {
665        let object_id = ObjectId::NetworkPolicy(network_policy_id);
666        // Currently network policies have no dependents
667        // when we add the ability for users or sources/sinks to have policies
668        // this method will need to be updated.
669        vec![object_id]
670    }
671
672    /// Indicates whether the indicated item is considered stable or not.
673    ///
674    /// Only stable items can be used as dependencies of other catalog items.
675    fn is_stable(&self, id: CatalogItemId) -> bool {
676        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
677        !self.is_unstable_schema_specifier(spec)
678    }
679
680    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
681        if self.system_config().unsafe_enable_unstable_dependencies() {
682            return Ok(());
683        }
684
685        let unstable_dependencies: Vec<_> = item
686            .references()
687            .items()
688            .filter(|id| !self.is_stable(**id))
689            .map(|id| self.get_entry(id).name().item.clone())
690            .collect();
691
692        // It's okay to create a temporary object with unstable
693        // dependencies, since we will never need to reboot a catalog
694        // that contains it.
695        if unstable_dependencies.is_empty() || item.is_temporary() {
696            Ok(())
697        } else {
698            let object_type = item.typ().to_string();
699            Err(Error {
700                kind: ErrorKind::UnstableDependency {
701                    object_type,
702                    unstable_dependencies,
703                },
704            })
705        }
706    }
707
708    pub fn resolve_full_name(
709        &self,
710        name: &QualifiedItemName,
711        conn_id: Option<&ConnectionId>,
712    ) -> FullItemName {
713        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
714
715        let database = match &name.qualifiers.database_spec {
716            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
717            ResolvedDatabaseSpecifier::Id(id) => {
718                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
719            }
720        };
721        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
722        // and the schema may not exist yet if no temporary items have been created.
723        let schema = match &name.qualifiers.schema_spec {
724            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
725            SchemaSpecifier::Id(_) => self
726                .get_schema(
727                    &name.qualifiers.database_spec,
728                    &name.qualifiers.schema_spec,
729                    conn_id,
730                )
731                .name()
732                .schema
733                .clone(),
734        };
735        FullItemName {
736            database,
737            schema,
738            item: name.item.clone(),
739        }
740    }
741
742    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
743        let database = match &name.database {
744            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
745            ResolvedDatabaseSpecifier::Id(id) => {
746                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
747            }
748        };
749        FullSchemaName {
750            database,
751            schema: name.schema.clone(),
752        }
753    }
754
755    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
756        self.entry_by_id
757            .get(id)
758            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
759    }
760
761    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
762        let item_id = self
763            .entry_by_global_id
764            .get(id)
765            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
766
767        let entry = self.get_entry(item_id).clone();
768        let version = match entry.item() {
769            CatalogItem::Table(table) => {
770                let (version, _) = table
771                    .collections
772                    .iter()
773                    .find(|(_verison, gid)| *gid == id)
774                    .expect("version to exist");
775                RelationVersionSelector::Specific(*version)
776            }
777            _ => RelationVersionSelector::Latest,
778        };
779        CatalogCollectionEntry { entry, version }
780    }
781
782    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
783        self.entry_by_id.iter()
784    }
785
786    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
787        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
788        self.temporary_schemas
789            .get(conn)
790            .into_iter()
791            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
792    }
793
794    /// Returns true if a temporary schema exists for the given connection.
795    ///
796    /// Temporary schemas are created lazily when the first temporary object is created
797    /// for a connection, so this may return false for connections that haven't created
798    /// any temporary objects.
799    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
800        self.temporary_schemas.contains_key(conn)
801    }
802
803    /// Gets a type named `name` from exactly one of the system schemas.
804    ///
805    /// # Panics
806    /// - If `name` is not an entry in any system schema
807    /// - If more than one system schema has an entry named `name`.
808    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
809        let mut res = None;
810        for schema_id in self.system_schema_ids() {
811            let schema = &self.ambient_schemas_by_id[&schema_id];
812            if let Some(global_id) = schema.types.get(name) {
813                match res {
814                    None => res = Some(self.get_entry(global_id)),
815                    Some(_) => panic!(
816                        "only call get_system_type on objects uniquely identifiable in one system schema"
817                    ),
818                }
819            }
820        }
821
822        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
823    }
824
825    pub fn get_item_by_name(
826        &self,
827        name: &QualifiedItemName,
828        conn_id: &ConnectionId,
829    ) -> Option<&CatalogEntry> {
830        self.get_schema(
831            &name.qualifiers.database_spec,
832            &name.qualifiers.schema_spec,
833            conn_id,
834        )
835        .items
836        .get(&name.item)
837        .and_then(|id| self.try_get_entry(id))
838    }
839
840    pub fn get_type_by_name(
841        &self,
842        name: &QualifiedItemName,
843        conn_id: &ConnectionId,
844    ) -> Option<&CatalogEntry> {
845        self.get_schema(
846            &name.qualifiers.database_spec,
847            &name.qualifiers.schema_spec,
848            conn_id,
849        )
850        .types
851        .get(&name.item)
852        .and_then(|id| self.try_get_entry(id))
853    }
854
855    pub(super) fn find_available_name(
856        &self,
857        mut name: QualifiedItemName,
858        conn_id: &ConnectionId,
859    ) -> QualifiedItemName {
860        let mut i = 0;
861        let orig_item_name = name.item.clone();
862        while self.get_item_by_name(&name, conn_id).is_some() {
863            i += 1;
864            name.item = format!("{}{}", orig_item_name, i);
865        }
866        name
867    }
868
869    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
870        self.entry_by_id.get(id)
871    }
872
873    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
874        let item_id = self.entry_by_global_id.get(id)?;
875        self.try_get_entry(item_id)
876    }
877
878    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
879    /// object that returns rows.
880    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
881        let entry = self.try_get_entry_by_global_id(id)?;
882        let desc = match entry.item() {
883            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
884            // TODO(alter_table): Support schema evolution on sources.
885            other => other.relation_desc(RelationVersionSelector::Latest)?,
886        };
887        Some(desc)
888    }
889
890    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
891        self.try_get_cluster(cluster_id)
892            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
893    }
894
895    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
896        self.clusters_by_id.get(&cluster_id)
897    }
898
899    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
900        self.roles_by_id.get(id)
901    }
902
903    pub fn get_role(&self, id: &RoleId) -> &Role {
904        self.roles_by_id.get(id).expect("catalog out of sync")
905    }
906
907    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
908        self.roles_by_id.keys()
909    }
910
911    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
912        self.roles_by_name
913            .get(role_name)
914            .map(|id| &self.roles_by_id[id])
915    }
916
917    /// Case-insensitive role lookup for JWT group-to-role sync.
918    ///
919    /// Groups from JWTs are lowercased during extraction. SQL role names are
920    /// stored as-is (unquoted identifiers are lowercased by the parser, but
921    /// quoted identifiers preserve case). This method iterates all roles to
922    /// find a case-insensitive match.
923    pub fn try_get_role_by_name_case_insensitive(&self, role_name: &str) -> Option<&Role> {
924        let lower = role_name.to_lowercase();
925        self.roles_by_name
926            .iter()
927            .find(|(name, _)| name.to_lowercase() == lower)
928            .map(|(_, id)| &self.roles_by_id[id])
929    }
930
931    /// Returns a map from lowercased role name to `Role` for all roles.
932    ///
933    /// Use this when doing multiple case-insensitive lookups (e.g. iterating
934    /// JWT group claims) to avoid O(n) per lookup.
935    pub fn roles_by_lowercase_name(&self) -> BTreeMap<String, &Role> {
936        self.roles_by_name
937            .iter()
938            .map(|(name, id)| (name.to_lowercase(), &self.roles_by_id[id]))
939            .collect()
940    }
941
942    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
943        self.role_auth_by_id
944            .get(id)
945            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
946    }
947
948    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
949        self.role_auth_by_id.get(id)
950    }
951
952    pub(super) fn try_get_network_policy_by_name(
953        &self,
954        policy_name: &str,
955    ) -> Option<&NetworkPolicy> {
956        self.network_policies_by_name
957            .get(policy_name)
958            .map(|id| &self.network_policies_by_id[id])
959    }
960
961    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
962        let mut membership = BTreeSet::new();
963        let mut queue = VecDeque::from(vec![id]);
964        while let Some(cur_id) = queue.pop_front() {
965            if !membership.contains(cur_id) {
966                membership.insert(cur_id.clone());
967                let role = self.get_role(cur_id);
968                soft_assert_no_log!(
969                    !role.membership().keys().contains(id),
970                    "circular membership exists in the catalog"
971                );
972                queue.extend(role.membership().keys());
973            }
974        }
975        membership.insert(RoleId::Public);
976        membership
977    }
978
979    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
980        self.network_policies_by_id
981            .get(id)
982            .expect("catalog out of sync")
983    }
984
985    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
986        self.network_policies_by_id.keys()
987    }
988
989    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
990    /// source.
991    ///
992    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
993    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
994        let entry = self.try_get_entry(id)?;
995        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
996        let name = self.resolve_full_name(entry.name(), None);
997        let host_name = self
998            .http_host_name
999            .as_ref()
1000            .map(|x| x.as_str())
1001            .unwrap_or_else(|| "HOST");
1002
1003        let RawDatabaseSpecifier::Name(database) = name.database else {
1004            return None;
1005        };
1006
1007        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
1008        url.path_segments_mut()
1009            .ok()?
1010            .push(&database)
1011            .push(&name.schema)
1012            .push(&name.item);
1013
1014        Some(url)
1015    }
1016
1017    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1018    ///
1019    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
1020    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
1021    ///
1022    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
1023    /// their original state before the method returns.
1024    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1025        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
1026        // caller if this method changed the state of the catalog.
1027        &mut self,
1028        create_sql: &str,
1029        force_if_exists_skip: bool,
1030    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1031        self.with_enable_for_item_parsing(|state| {
1032            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1033            let pcx = Some(&pcx);
1034            let session_catalog = state.for_system_session();
1035
1036            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1037            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1038            let plan =
1039                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1040
1041            Ok((plan, resolved_ids))
1042        })
1043    }
1044
1045    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1046    #[mz_ore::instrument]
1047    pub(crate) fn parse_plan(
1048        create_sql: &str,
1049        pcx: Option<&PlanContext>,
1050        catalog: &ConnCatalog,
1051    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1052        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1053        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1054        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1055
1056        Ok((plan, resolved_ids))
1057    }
1058
1059    /// Parses the given SQL string into a pair of [`CatalogItem`].
1060    pub(crate) fn deserialize_item(
1061        &self,
1062        global_id: GlobalId,
1063        create_sql: &str,
1064        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1065        local_expression_cache: &mut LocalExpressionCache,
1066        previous_item: Option<CatalogItem>,
1067    ) -> Result<CatalogItem, AdapterError> {
1068        self.parse_item(
1069            global_id,
1070            create_sql,
1071            extra_versions,
1072            None,
1073            false,
1074            None,
1075            local_expression_cache,
1076            previous_item,
1077        )
1078    }
1079
1080    /// Parses the given SQL string into a `CatalogItem`.
1081    #[mz_ore::instrument]
1082    pub(crate) fn parse_item(
1083        &self,
1084        global_id: GlobalId,
1085        create_sql: &str,
1086        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1087        pcx: Option<&PlanContext>,
1088        is_retained_metrics_object: bool,
1089        custom_logical_compaction_window: Option<CompactionWindow>,
1090        local_expression_cache: &mut LocalExpressionCache,
1091        previous_item: Option<CatalogItem>,
1092    ) -> Result<CatalogItem, AdapterError> {
1093        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1094        match self.parse_item_inner(
1095            global_id,
1096            create_sql,
1097            extra_versions,
1098            pcx,
1099            is_retained_metrics_object,
1100            custom_logical_compaction_window,
1101            cached_expr,
1102            previous_item,
1103        ) {
1104            Ok((item, uncached_expr)) => {
1105                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1106                    local_expression_cache.insert_uncached_expression(
1107                        global_id,
1108                        uncached_expr,
1109                        optimizer_features,
1110                    );
1111                }
1112                Ok(item)
1113            }
1114            Err((err, cached_expr)) => {
1115                if let Some(local_expr) = cached_expr {
1116                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1117                }
1118                Err(err)
1119            }
1120        }
1121    }
1122
1123    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1124    ///
1125    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1126    /// not cached.
1127    ///
1128    /// On failure returns an error and `cached_expr` so it can be used later.
1129    #[mz_ore::instrument]
1130    pub(crate) fn parse_item_inner(
1131        &self,
1132        global_id: GlobalId,
1133        create_sql: &str,
1134        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1135        pcx: Option<&PlanContext>,
1136        is_retained_metrics_object: bool,
1137        custom_logical_compaction_window: Option<CompactionWindow>,
1138        cached_expr: Option<LocalExpressions>,
1139        previous_item: Option<CatalogItem>,
1140    ) -> Result<
1141        (
1142            CatalogItem,
1143            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1144        ),
1145        (AdapterError, Option<LocalExpressions>),
1146    > {
1147        let session_catalog = self.for_system_session();
1148
1149        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1150            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1151            Err(err) => return Err((err, cached_expr)),
1152        };
1153
1154        let mut uncached_expr = None;
1155
1156        // Carry over the plans (`optimized_plan`, `physical_plan`,
1157        // `dataflow_metainfo`) from the previous incarnation of this item when
1158        // re-parsing an existing item (e.g. after a RENAME). These fields live
1159        // on the `CatalogItem` since #35834, but are not reconstructable from
1160        // `create_sql` alone — they are populated by the sequencer `_finish`
1161        // paths at create time, and by the expression-cache / bootstrap
1162        // rendering path on boot. If we don't preserve them here, a RENAME
1163        // silently drops the plans and dataflow metainfo for the affected
1164        // MV/Index/CT.
1165        let previous_plans = previous_item.as_ref().map(|item| {
1166            (
1167                item.optimized_plan().cloned(),
1168                item.physical_plan().cloned(),
1169                item.dataflow_metainfo().cloned(),
1170            )
1171        });
1172
1173        let mut item = match plan {
1174            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1175                let collections = extra_versions
1176                    .iter()
1177                    .map(|(version, gid)| (*version, *gid))
1178                    .chain([(RelationVersion::root(), global_id)].into_iter())
1179                    .collect();
1180
1181                CatalogItem::Table(Table {
1182                    create_sql: Some(table.create_sql),
1183                    desc: table.desc,
1184                    collections,
1185                    conn_id: None,
1186                    resolved_ids,
1187                    custom_logical_compaction_window: custom_logical_compaction_window
1188                        .or(table.compaction_window),
1189                    is_retained_metrics_object,
1190                    data_source: match table.data_source {
1191                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1192                            TableDataSource::TableWrites { defaults }
1193                        }
1194                        mz_sql::plan::TableDataSource::DataSource {
1195                            desc: data_source_desc,
1196                            timeline,
1197                        } => match data_source_desc {
1198                            mz_sql::plan::DataSourceDesc::IngestionExport {
1199                                ingestion_id,
1200                                external_reference,
1201                                details,
1202                                data_config,
1203                            } => TableDataSource::DataSource {
1204                                desc: DataSourceDesc::IngestionExport {
1205                                    ingestion_id,
1206                                    external_reference,
1207                                    details,
1208                                    data_config,
1209                                },
1210                                timeline,
1211                            },
1212                            mz_sql::plan::DataSourceDesc::Webhook {
1213                                validate_using,
1214                                body_format,
1215                                headers,
1216                                cluster_id,
1217                            } => TableDataSource::DataSource {
1218                                desc: DataSourceDesc::Webhook {
1219                                    validate_using,
1220                                    body_format,
1221                                    headers,
1222                                    cluster_id: cluster_id
1223                                        .expect("Webhook Tables must have a cluster_id set"),
1224                                },
1225                                timeline,
1226                            },
1227                            _ => {
1228                                return Err((
1229                                    AdapterError::Unstructured(anyhow::anyhow!(
1230                                        "unsupported data source for table"
1231                                    )),
1232                                    cached_expr,
1233                                ));
1234                            }
1235                        },
1236                    },
1237                })
1238            }
1239            Plan::CreateSource(CreateSourcePlan {
1240                source,
1241                timeline,
1242                in_cluster,
1243                ..
1244            }) => CatalogItem::Source(Source {
1245                create_sql: Some(source.create_sql),
1246                data_source: match source.data_source {
1247                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1248                        desc,
1249                        cluster_id: match in_cluster {
1250                            Some(id) => id,
1251                            None => {
1252                                return Err((
1253                                    AdapterError::Unstructured(anyhow::anyhow!(
1254                                        "ingestion-based sources must have cluster specified"
1255                                    )),
1256                                    cached_expr,
1257                                ));
1258                            }
1259                        },
1260                    },
1261                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1262                        desc,
1263                        progress_subsource,
1264                        data_config,
1265                        details,
1266                    } => DataSourceDesc::OldSyntaxIngestion {
1267                        desc,
1268                        progress_subsource,
1269                        data_config,
1270                        details,
1271                        cluster_id: match in_cluster {
1272                            Some(id) => id,
1273                            None => {
1274                                return Err((
1275                                    AdapterError::Unstructured(anyhow::anyhow!(
1276                                        "ingestion-based sources must have cluster specified"
1277                                    )),
1278                                    cached_expr,
1279                                ));
1280                            }
1281                        },
1282                    },
1283                    mz_sql::plan::DataSourceDesc::IngestionExport {
1284                        ingestion_id,
1285                        external_reference,
1286                        details,
1287                        data_config,
1288                    } => DataSourceDesc::IngestionExport {
1289                        ingestion_id,
1290                        external_reference,
1291                        details,
1292                        data_config,
1293                    },
1294                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1295                    mz_sql::plan::DataSourceDesc::Webhook {
1296                        validate_using,
1297                        body_format,
1298                        headers,
1299                        cluster_id,
1300                    } => {
1301                        mz_ore::soft_assert_or_log!(
1302                            cluster_id.is_none(),
1303                            "cluster_id set at Source level for Webhooks"
1304                        );
1305                        DataSourceDesc::Webhook {
1306                            validate_using,
1307                            body_format,
1308                            headers,
1309                            cluster_id: in_cluster
1310                                .expect("webhook sources must use an existing cluster"),
1311                        }
1312                    }
1313                },
1314                desc: source.desc,
1315                global_id,
1316                timeline,
1317                resolved_ids,
1318                custom_logical_compaction_window: source
1319                    .compaction_window
1320                    .or(custom_logical_compaction_window),
1321                is_retained_metrics_object,
1322            }),
1323            Plan::CreateView(CreateViewPlan { view, .. }) => {
1324                // Collect optimizer parameters.
1325                let optimizer_config =
1326                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1327                let previous_exprs = previous_item.map(|item| match item {
1328                    CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1329                    _ => None,
1330                });
1331
1332                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1333                    (Some(local_expr), _)
1334                        if local_expr.optimizer_features == optimizer_config.features =>
1335                    {
1336                        debug!("local expression cache hit for {global_id:?}");
1337                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1338                    }
1339                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1340                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1341                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1342                    }
1343                    (cached_expr, _) => {
1344                        let optimizer_features = optimizer_config.features.clone();
1345                        // Build an optimizer for this VIEW.
1346                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1347
1348                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1349                        let raw_expr = view.expr;
1350                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1351                            Ok(optimzed_expr) => optimzed_expr,
1352                            Err(err) => return Err((err.into(), cached_expr)),
1353                        };
1354
1355                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1356
1357                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1358                    }
1359                };
1360
1361                // Resolve all item dependencies from the HIR expression.
1362                let dependencies: BTreeSet<_> = raw_expr
1363                    .depends_on()
1364                    .into_iter()
1365                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1366                    .collect();
1367
1368                let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1369                CatalogItem::View(View {
1370                    create_sql: view.create_sql,
1371                    global_id,
1372                    raw_expr,
1373                    desc: RelationDesc::new(typ, view.column_names),
1374                    locally_optimized_expr: optimized_expr,
1375                    conn_id: None,
1376                    resolved_ids,
1377                    dependencies: DependencyIds(dependencies),
1378                })
1379            }
1380            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1381                materialized_view, ..
1382            }) => {
1383                let collections = extra_versions
1384                    .iter()
1385                    .map(|(version, gid)| (*version, *gid))
1386                    .chain([(RelationVersion::root(), global_id)].into_iter())
1387                    .collect();
1388
1389                // Collect optimizer parameters.
1390                let system_vars = session_catalog.system_vars();
1391                let overrides = self
1392                    .get_cluster(materialized_view.cluster_id)
1393                    .config
1394                    .features();
1395                let optimizer_config =
1396                    optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1397                let previous_exprs = previous_item.map(|item| match item {
1398                    CatalogItem::MaterializedView(materialized_view) => (
1399                        materialized_view.raw_expr,
1400                        materialized_view.locally_optimized_expr,
1401                    ),
1402                    item => unreachable!("expected materialized view, found: {item:#?}"),
1403                });
1404
1405                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1406                    (Some(local_expr), _)
1407                        if local_expr.optimizer_features == optimizer_config.features =>
1408                    {
1409                        debug!("local expression cache hit for {global_id:?}");
1410                        (
1411                            Arc::new(materialized_view.expr),
1412                            Arc::new(local_expr.local_mir),
1413                        )
1414                    }
1415                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1416                    (_, Some((raw_expr, optimized_expr)))
1417                        if *raw_expr == materialized_view.expr =>
1418                    {
1419                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1420                    }
1421                    (cached_expr, _) => {
1422                        let optimizer_features = optimizer_config.features.clone();
1423                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1424                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1425
1426                        let raw_expr = materialized_view.expr;
1427                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1428                            Ok(optimized_expr) => optimized_expr,
1429                            Err(err) => return Err((err.into(), cached_expr)),
1430                        };
1431
1432                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1433
1434                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1435                    }
1436                };
1437                let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1438
1439                for &i in &materialized_view.non_null_assertions {
1440                    typ.column_types[i].nullable = false;
1441                }
1442                let desc = RelationDesc::new(typ, materialized_view.column_names);
1443                let desc = VersionedRelationDesc::new(desc);
1444
1445                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1446
1447                // Resolve all item dependencies from the HIR expression.
1448                let dependencies = raw_expr
1449                    .depends_on()
1450                    .into_iter()
1451                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1452                    .collect();
1453
1454                CatalogItem::MaterializedView(MaterializedView {
1455                    create_sql: materialized_view.create_sql,
1456                    collections,
1457                    raw_expr,
1458                    locally_optimized_expr: optimized_expr,
1459                    desc,
1460                    resolved_ids,
1461                    dependencies,
1462                    replacement_target: materialized_view.replacement_target,
1463                    cluster_id: materialized_view.cluster_id,
1464                    target_replica: materialized_view.target_replica,
1465                    non_null_assertions: materialized_view.non_null_assertions,
1466                    custom_logical_compaction_window: materialized_view.compaction_window,
1467                    refresh_schedule: materialized_view.refresh_schedule,
1468                    initial_as_of,
1469                    optimized_plan: None,
1470                    physical_plan: None,
1471                    dataflow_metainfo: None,
1472                })
1473            }
1474            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1475                create_sql: index.create_sql,
1476                global_id,
1477                on: index.on,
1478                keys: index.keys.into(),
1479                conn_id: None,
1480                resolved_ids,
1481                cluster_id: index.cluster_id,
1482                custom_logical_compaction_window: custom_logical_compaction_window
1483                    .or(index.compaction_window),
1484                is_retained_metrics_object,
1485                optimized_plan: None,
1486                physical_plan: None,
1487                dataflow_metainfo: None,
1488            }),
1489            Plan::CreateSink(CreateSinkPlan {
1490                sink,
1491                with_snapshot,
1492                in_cluster,
1493                ..
1494            }) => CatalogItem::Sink(Sink {
1495                create_sql: sink.create_sql,
1496                global_id,
1497                from: sink.from,
1498                connection: sink.connection,
1499                envelope: sink.envelope,
1500                version: sink.version,
1501                with_snapshot,
1502                resolved_ids,
1503                cluster_id: in_cluster,
1504                commit_interval: sink.commit_interval,
1505            }),
1506            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1507                // Even if we don't need the `RelationDesc` here, error out
1508                // early and eagerly, as a kind of soft assertion that we _can_
1509                // build the `RelationDesc` when needed.
1510                if let Err(err) = typ.inner.desc(&session_catalog) {
1511                    return Err((err.into(), cached_expr));
1512                }
1513                CatalogItem::Type(Type {
1514                    create_sql: Some(typ.create_sql),
1515                    global_id,
1516                    details: CatalogTypeDetails {
1517                        array_id: None,
1518                        typ: typ.inner,
1519                        pg_metadata: None,
1520                    },
1521                    resolved_ids,
1522                })
1523            }
1524            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1525                create_sql: secret.create_sql,
1526                global_id,
1527            }),
1528            Plan::CreateConnection(CreateConnectionPlan {
1529                connection:
1530                    mz_sql::plan::Connection {
1531                        create_sql,
1532                        details,
1533                    },
1534                ..
1535            }) => CatalogItem::Connection(Connection {
1536                create_sql,
1537                global_id,
1538                details,
1539                resolved_ids,
1540            }),
1541            _ => {
1542                return Err((
1543                    Error::new(ErrorKind::Corruption {
1544                        detail: "catalog entry generated inappropriate plan".to_string(),
1545                    })
1546                    .into(),
1547                    cached_expr,
1548                ));
1549            }
1550        };
1551
1552        // Carry over the plans (`optimized_plan`, `physical_plan`,
1553        // `dataflow_metainfo`) from the previous incarnation of this item, if
1554        // any. See the comment on `previous_plans` above.
1555        if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1556            if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1557            {
1558                *optimized_plan = prev_optimized;
1559                *physical_plan = prev_physical;
1560                *dataflow_metainfo = prev_metainfo;
1561            }
1562        }
1563
1564        Ok((item, uncached_expr))
1565    }
1566
1567    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1568    /// Calling this method will not permanently modify any system configuration variables.
1569    ///
1570    /// WARNING:
1571    /// Any modifications made to the system configuration variables in `f`, will be lost.
1572    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1573        // Enable catalog features that might be required during planning existing
1574        // catalog items. Existing catalog items might have been created while
1575        // a specific feature flag was turned on, so we need to ensure that this
1576        // is also the case during catalog rehydration in order to avoid panics.
1577        //
1578        // WARNING / CONTRACT:
1579        // 1. Features used in this method that related to parsing / planning
1580        //    should be `enable_for_item_parsing` set to `true`.
1581        // 2. After this step, feature flag configuration must not be
1582        //    overridden.
1583        let restore = Arc::clone(&self.system_configuration);
1584        Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1585        let res = f(self);
1586        self.system_configuration = restore;
1587        res
1588    }
1589
1590    /// Returns all indexes on the given object and cluster known in the catalog.
1591    pub fn get_indexes_on(
1592        &self,
1593        id: GlobalId,
1594        cluster: ClusterId,
1595    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1596        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1597
1598        self.try_get_entry_by_global_id(&id)
1599            .into_iter()
1600            .map(move |e| {
1601                e.used_by()
1602                    .iter()
1603                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1604                        CatalogItem::Index(index) if index_matches(index) => {
1605                            Some((index.global_id(), index))
1606                        }
1607                        _ => None,
1608                    })
1609            })
1610            .flatten()
1611    }
1612
1613    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1614        &self.database_by_id[database_id]
1615    }
1616
1617    /// Gets a reference to the specified replica of the specified cluster.
1618    ///
1619    /// Returns `None` if either the cluster or the replica does not
1620    /// exist.
1621    pub(super) fn try_get_cluster_replica(
1622        &self,
1623        id: ClusterId,
1624        replica_id: ReplicaId,
1625    ) -> Option<&ClusterReplica> {
1626        self.try_get_cluster(id)
1627            .and_then(|cluster| cluster.replica(replica_id))
1628    }
1629
1630    /// Gets a reference to the specified replica of the specified cluster.
1631    ///
1632    /// Panics if either the cluster or the replica does not exist.
1633    pub(crate) fn get_cluster_replica(
1634        &self,
1635        cluster_id: ClusterId,
1636        replica_id: ReplicaId,
1637    ) -> &ClusterReplica {
1638        self.try_get_cluster_replica(cluster_id, replica_id)
1639            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1640    }
1641
1642    pub(super) fn resolve_replica_in_cluster(
1643        &self,
1644        cluster_id: &ClusterId,
1645        replica_name: &str,
1646    ) -> Result<&ClusterReplica, SqlCatalogError> {
1647        let cluster = self.get_cluster(*cluster_id);
1648        let replica_id = cluster
1649            .replica_id_by_name_
1650            .get(replica_name)
1651            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1652        Ok(&cluster.replicas_by_id_[replica_id])
1653    }
1654
1655    /// Get system configuration `name`.
1656    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1657        Ok(self.system_configuration.get(name)?)
1658    }
1659
1660    /// Parse system configuration `name` with `value` int.
1661    ///
1662    /// Returns the parsed value as a string.
1663    pub(super) fn parse_system_configuration(
1664        &self,
1665        name: &str,
1666        value: VarInput,
1667    ) -> Result<String, Error> {
1668        let value = self.system_configuration.parse(name, value)?;
1669        Ok(value.format())
1670    }
1671
1672    /// Gets the schema map for the database matching `database_spec`.
1673    pub(super) fn resolve_schema_in_database(
1674        &self,
1675        database_spec: &ResolvedDatabaseSpecifier,
1676        schema_name: &str,
1677        conn_id: &ConnectionId,
1678    ) -> Result<&Schema, SqlCatalogError> {
1679        let schema = match database_spec {
1680            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1681                self.temporary_schemas.get(conn_id)
1682            }
1683            ResolvedDatabaseSpecifier::Ambient => self
1684                .ambient_schemas_by_name
1685                .get(schema_name)
1686                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1687            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1688                db.schemas_by_name
1689                    .get(schema_name)
1690                    .and_then(|id| db.schemas_by_id.get(id))
1691            }),
1692        };
1693        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1694    }
1695
1696    /// Try to get a schema, returning `None` if it doesn't exist.
1697    ///
1698    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1699    /// (temporary schemas are created lazily when the first temporary object is created).
1700    pub fn try_get_schema(
1701        &self,
1702        database_spec: &ResolvedDatabaseSpecifier,
1703        schema_spec: &SchemaSpecifier,
1704        conn_id: &ConnectionId,
1705    ) -> Option<&Schema> {
1706        // Keep in sync with `get_schema` and `get_schemas_mut`
1707        match (database_spec, schema_spec) {
1708            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1709                self.temporary_schemas.get(conn_id)
1710            }
1711            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1712                self.ambient_schemas_by_id.get(id)
1713            }
1714            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1715                .database_by_id
1716                .get(database_id)
1717                .and_then(|db| db.schemas_by_id.get(schema_id)),
1718            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1719                unreachable!("temporary schemas are in the ambient database")
1720            }
1721        }
1722    }
1723
1724    pub fn get_schema(
1725        &self,
1726        database_spec: &ResolvedDatabaseSpecifier,
1727        schema_spec: &SchemaSpecifier,
1728        conn_id: &ConnectionId,
1729    ) -> &Schema {
1730        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1731        self.try_get_schema(database_spec, schema_spec, conn_id)
1732            .expect("schema must exist")
1733    }
1734
1735    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1736        self.database_by_id
1737            .values()
1738            .filter_map(|database| database.schemas_by_id.get(schema_id))
1739            .chain(self.ambient_schemas_by_id.values())
1740            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1741            .into_first()
1742    }
1743
1744    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1745        self.temporary_schemas
1746            .values()
1747            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1748            .into_first()
1749    }
1750
1751    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1752        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1753    }
1754
1755    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1756        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1757    }
1758
1759    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1760        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1761    }
1762
1763    pub fn get_information_schema_id(&self) -> SchemaId {
1764        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1765    }
1766
1767    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1768        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1769    }
1770
1771    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1772        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1773    }
1774
1775    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1776        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1777    }
1778
1779    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1780        SYSTEM_SCHEMAS
1781            .iter()
1782            .map(|name| self.ambient_schemas_by_name[*name])
1783    }
1784
1785    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1786        self.system_schema_ids().contains(&id)
1787    }
1788
1789    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1790        match spec {
1791            SchemaSpecifier::Temporary => false,
1792            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1793        }
1794    }
1795
1796    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1797        UNSTABLE_SCHEMAS
1798            .iter()
1799            .map(|name| self.ambient_schemas_by_name[*name])
1800    }
1801
1802    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1803        self.unstable_schema_ids().contains(&id)
1804    }
1805
1806    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1807        match spec {
1808            SchemaSpecifier::Temporary => false,
1809            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1810        }
1811    }
1812
1813    /// Creates a new schema in the `Catalog` for temporary items
1814    /// indicated by the TEMPORARY or TEMP keywords.
1815    pub fn create_temporary_schema(
1816        &mut self,
1817        conn_id: &ConnectionId,
1818        owner_id: RoleId,
1819    ) -> Result<(), Error> {
1820        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1821        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1822        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1823        // all -1.
1824        let oid = INVALID_OID;
1825        self.temporary_schemas.insert(
1826            conn_id.clone(),
1827            Schema {
1828                name: QualifiedSchemaName {
1829                    database: ResolvedDatabaseSpecifier::Ambient,
1830                    schema: MZ_TEMP_SCHEMA.into(),
1831                },
1832                id: SchemaSpecifier::Temporary,
1833                oid,
1834                items: BTreeMap::new(),
1835                functions: BTreeMap::new(),
1836                types: BTreeMap::new(),
1837                owner_id,
1838                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1839                    mz_sql::catalog::ObjectType::Schema,
1840                    owner_id,
1841                )]),
1842            },
1843        );
1844        Ok(())
1845    }
1846
1847    /// Return all OIDs that are allocated to temporary objects.
1848    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1849        std::iter::empty()
1850            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1851                if schema.id.is_temporary() {
1852                    Some(schema.oid)
1853                } else {
1854                    None
1855                }
1856            }))
1857            .chain(self.entry_by_id.values().filter_map(|entry| {
1858                if entry.item().is_temporary() {
1859                    Some(entry.oid)
1860                } else {
1861                    None
1862                }
1863            }))
1864    }
1865
1866    /// Optimized lookup for a builtin table.
1867    ///
1868    /// Panics if the builtin table doesn't exist in the catalog.
1869    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1870        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1871    }
1872
1873    /// Optimized lookup for a builtin log.
1874    ///
1875    /// Panics if the builtin log doesn't exist in the catalog.
1876    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1877        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1878        let log = match self.get_entry(&item_id).item() {
1879            CatalogItem::Log(log) => log,
1880            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1881        };
1882        (item_id, log.global_id)
1883    }
1884
1885    /// Optimized lookup for a builtin storage collection.
1886    ///
1887    /// Panics if the builtin storage collection doesn't exist in the catalog.
1888    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1889        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1890    }
1891
1892    /// Optimized lookup for a builtin object.
1893    ///
1894    /// Panics if the builtin object doesn't exist in the catalog.
1895    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1896        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1897        let schema = &self.ambient_schemas_by_id[schema_id];
1898        match builtin.catalog_item_type() {
1899            CatalogItemType::Type => schema.types[builtin.name()],
1900            CatalogItemType::Func => schema.functions[builtin.name()],
1901            CatalogItemType::Table
1902            | CatalogItemType::Source
1903            | CatalogItemType::Sink
1904            | CatalogItemType::View
1905            | CatalogItemType::MaterializedView
1906            | CatalogItemType::Index
1907            | CatalogItemType::Secret
1908            | CatalogItemType::Connection => schema.items[builtin.name()],
1909        }
1910    }
1911
1912    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1913    pub fn resolve_builtin_type_references(
1914        &self,
1915        builtin: &BuiltinType<NameReference>,
1916    ) -> BuiltinType<IdReference> {
1917        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1918            CatalogType::AclItem => CatalogType::AclItem,
1919            CatalogType::Array { element_reference } => CatalogType::Array {
1920                element_reference: self.get_system_type(element_reference).id,
1921            },
1922            CatalogType::List {
1923                element_reference,
1924                element_modifiers,
1925            } => CatalogType::List {
1926                element_reference: self.get_system_type(element_reference).id,
1927                element_modifiers: element_modifiers.clone(),
1928            },
1929            CatalogType::Map {
1930                key_reference,
1931                value_reference,
1932                key_modifiers,
1933                value_modifiers,
1934            } => CatalogType::Map {
1935                key_reference: self.get_system_type(key_reference).id,
1936                value_reference: self.get_system_type(value_reference).id,
1937                key_modifiers: key_modifiers.clone(),
1938                value_modifiers: value_modifiers.clone(),
1939            },
1940            CatalogType::Range { element_reference } => CatalogType::Range {
1941                element_reference: self.get_system_type(element_reference).id,
1942            },
1943            CatalogType::Record { fields } => CatalogType::Record {
1944                fields: fields
1945                    .into_iter()
1946                    .map(|f| CatalogRecordField {
1947                        name: f.name.clone(),
1948                        type_reference: self.get_system_type(f.type_reference).id,
1949                        type_modifiers: f.type_modifiers.clone(),
1950                    })
1951                    .collect(),
1952            },
1953            CatalogType::Bool => CatalogType::Bool,
1954            CatalogType::Bytes => CatalogType::Bytes,
1955            CatalogType::Char => CatalogType::Char,
1956            CatalogType::Date => CatalogType::Date,
1957            CatalogType::Float32 => CatalogType::Float32,
1958            CatalogType::Float64 => CatalogType::Float64,
1959            CatalogType::Int16 => CatalogType::Int16,
1960            CatalogType::Int32 => CatalogType::Int32,
1961            CatalogType::Int64 => CatalogType::Int64,
1962            CatalogType::UInt16 => CatalogType::UInt16,
1963            CatalogType::UInt32 => CatalogType::UInt32,
1964            CatalogType::UInt64 => CatalogType::UInt64,
1965            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1966            CatalogType::Interval => CatalogType::Interval,
1967            CatalogType::Jsonb => CatalogType::Jsonb,
1968            CatalogType::Numeric => CatalogType::Numeric,
1969            CatalogType::Oid => CatalogType::Oid,
1970            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1971            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1972            CatalogType::Pseudo => CatalogType::Pseudo,
1973            CatalogType::RegClass => CatalogType::RegClass,
1974            CatalogType::RegProc => CatalogType::RegProc,
1975            CatalogType::RegType => CatalogType::RegType,
1976            CatalogType::String => CatalogType::String,
1977            CatalogType::Time => CatalogType::Time,
1978            CatalogType::Timestamp => CatalogType::Timestamp,
1979            CatalogType::TimestampTz => CatalogType::TimestampTz,
1980            CatalogType::Uuid => CatalogType::Uuid,
1981            CatalogType::VarChar => CatalogType::VarChar,
1982            CatalogType::Int2Vector => CatalogType::Int2Vector,
1983            CatalogType::MzAclItem => CatalogType::MzAclItem,
1984        };
1985
1986        BuiltinType {
1987            name: builtin.name,
1988            schema: builtin.schema,
1989            oid: builtin.oid,
1990            details: CatalogTypeDetails {
1991                array_id: builtin.details.array_id,
1992                typ,
1993                pg_metadata: builtin.details.pg_metadata.clone(),
1994            },
1995        }
1996    }
1997
1998    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1999        &self.config
2000    }
2001
2002    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
2003        match self.database_by_name.get(database_name) {
2004            Some(id) => Ok(&self.database_by_id[id]),
2005            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
2006        }
2007    }
2008
2009    pub fn resolve_schema(
2010        &self,
2011        current_database: Option<&DatabaseId>,
2012        database_name: Option<&str>,
2013        schema_name: &str,
2014        conn_id: &ConnectionId,
2015    ) -> Result<&Schema, SqlCatalogError> {
2016        let database_spec = match database_name {
2017            // If a database is explicitly specified, validate it. Note that we
2018            // intentionally do not validate `current_database` to permit
2019            // querying `mz_catalog` with an invalid session database, e.g., so
2020            // that you can run `SHOW DATABASES` to *find* a valid database.
2021            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
2022                self.resolve_database(database)?.id().clone(),
2023            )),
2024            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2025        };
2026
2027        // First try to find the schema in the named database.
2028        if let Some(database_spec) = database_spec {
2029            if let Ok(schema) =
2030                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2031            {
2032                return Ok(schema);
2033            }
2034        }
2035
2036        // Then fall back to the ambient database.
2037        if let Ok(schema) = self.resolve_schema_in_database(
2038            &ResolvedDatabaseSpecifier::Ambient,
2039            schema_name,
2040            conn_id,
2041        ) {
2042            return Ok(schema);
2043        }
2044
2045        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2046    }
2047
2048    /// Optimized lookup for a system schema.
2049    ///
2050    /// Panics if the system schema doesn't exist in the catalog.
2051    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2052        self.ambient_schemas_by_name[name]
2053    }
2054
2055    pub fn resolve_search_path(
2056        &self,
2057        session: &dyn SessionMetadata,
2058    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2059        let database = self
2060            .database_by_name
2061            .get(session.database())
2062            .map(|id| id.clone());
2063
2064        session
2065            .search_path()
2066            .iter()
2067            .map(|schema| {
2068                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2069            })
2070            .filter_map(|schema| schema.ok())
2071            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2072            .collect()
2073    }
2074
2075    pub fn effective_search_path(
2076        &self,
2077        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2078        include_temp_schema: bool,
2079    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2080        let mut v = Vec::with_capacity(search_path.len() + 3);
2081        // Temp schema is only included for relations and data types, not for functions and operators
2082        let temp_schema = (
2083            ResolvedDatabaseSpecifier::Ambient,
2084            SchemaSpecifier::Temporary,
2085        );
2086        if include_temp_schema && !search_path.contains(&temp_schema) {
2087            v.push(temp_schema);
2088        }
2089        let default_schemas = [
2090            (
2091                ResolvedDatabaseSpecifier::Ambient,
2092                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2093            ),
2094            (
2095                ResolvedDatabaseSpecifier::Ambient,
2096                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2097            ),
2098        ];
2099        for schema in default_schemas.into_iter() {
2100            if !search_path.contains(&schema) {
2101                v.push(schema);
2102            }
2103        }
2104        v.extend_from_slice(search_path);
2105        v
2106    }
2107
2108    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2109        let id = self
2110            .clusters_by_name
2111            .get(name)
2112            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2113        Ok(&self.clusters_by_id[id])
2114    }
2115
2116    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2117        let id = self
2118            .clusters_by_name
2119            .get(cluster.name)
2120            .expect("failed to lookup BuiltinCluster by name");
2121        self.clusters_by_id
2122            .get(id)
2123            .expect("failed to lookup BuiltinCluster by ID")
2124    }
2125
2126    pub fn resolve_cluster_replica(
2127        &self,
2128        cluster_replica_name: &QualifiedReplica,
2129    ) -> Result<&ClusterReplica, SqlCatalogError> {
2130        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2131        let replica_name = cluster_replica_name.replica.as_str();
2132        let replica_id = cluster
2133            .replica_id(replica_name)
2134            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2135        Ok(cluster.replica(replica_id).expect("Must exist"))
2136    }
2137
2138    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2139    ///
2140    /// If `name` does not specify a database, the `current_database` is used.
2141    /// If `name` does not specify a schema, then the schemas in `search_path`
2142    /// are searched in order.
2143    #[allow(clippy::useless_let_if_seq)]
2144    pub fn resolve(
2145        &self,
2146        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2147        current_database: Option<&DatabaseId>,
2148        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2149        name: &PartialItemName,
2150        conn_id: &ConnectionId,
2151        err_gen: fn(String) -> SqlCatalogError,
2152    ) -> Result<&CatalogEntry, SqlCatalogError> {
2153        // If a schema name was specified, just try to find the item in that
2154        // schema. If no schema was specified, try to find the item in the connection's
2155        // temporary schema. If the item is not found, try to find the item in every
2156        // schema in the search path.
2157        let schemas = match &name.schema {
2158            Some(schema_name) => {
2159                match self.resolve_schema(
2160                    current_database,
2161                    name.database.as_deref(),
2162                    schema_name,
2163                    conn_id,
2164                ) {
2165                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2166                    Err(e) => return Err(e),
2167                }
2168            }
2169            None => match self
2170                .try_get_schema(
2171                    &ResolvedDatabaseSpecifier::Ambient,
2172                    &SchemaSpecifier::Temporary,
2173                    conn_id,
2174                )
2175                .and_then(|schema| schema.items.get(&name.item))
2176            {
2177                Some(id) => return Ok(self.get_entry(id)),
2178                None => search_path.to_vec(),
2179            },
2180        };
2181
2182        for (database_spec, schema_spec) in &schemas {
2183            // Use try_get_schema because the temp schema might not exist yet
2184            // (it's created lazily when the first temp object is created).
2185            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2186                continue;
2187            };
2188
2189            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2190                return Ok(&self.entry_by_id[id]);
2191            }
2192        }
2193
2194        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2195        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2196        // automatically let uses of the old schema resolve to the new ones as well.
2197        // TODO(database-issues#8173) remove this after sufficient time has passed
2198        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2199        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2200            for schema_id in [
2201                self.get_mz_catalog_unstable_schema_id(),
2202                self.get_mz_introspection_schema_id(),
2203            ] {
2204                let schema = self.get_schema(
2205                    &ResolvedDatabaseSpecifier::Ambient,
2206                    &SchemaSpecifier::Id(schema_id),
2207                    conn_id,
2208                );
2209
2210                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2211                    debug!(
2212                        github_27831 = true,
2213                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2214                    );
2215                    return Ok(&self.entry_by_id[id]);
2216                }
2217            }
2218        }
2219
2220        Err(err_gen(name.to_string()))
2221    }
2222
2223    /// Resolves `name` to a non-function [`CatalogEntry`].
2224    pub fn resolve_entry(
2225        &self,
2226        current_database: Option<&DatabaseId>,
2227        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2228        name: &PartialItemName,
2229        conn_id: &ConnectionId,
2230    ) -> Result<&CatalogEntry, SqlCatalogError> {
2231        self.resolve(
2232            |schema| &schema.items,
2233            current_database,
2234            search_path,
2235            name,
2236            conn_id,
2237            SqlCatalogError::UnknownItem,
2238        )
2239    }
2240
2241    /// Resolves `name` to a function [`CatalogEntry`].
2242    pub fn resolve_function(
2243        &self,
2244        current_database: Option<&DatabaseId>,
2245        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2246        name: &PartialItemName,
2247        conn_id: &ConnectionId,
2248    ) -> Result<&CatalogEntry, SqlCatalogError> {
2249        self.resolve(
2250            |schema| &schema.functions,
2251            current_database,
2252            search_path,
2253            name,
2254            conn_id,
2255            |name| SqlCatalogError::UnknownFunction {
2256                name,
2257                alternative: None,
2258            },
2259        )
2260    }
2261
2262    /// Resolves `name` to a type [`CatalogEntry`].
2263    pub fn resolve_type(
2264        &self,
2265        current_database: Option<&DatabaseId>,
2266        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2267        name: &PartialItemName,
2268        conn_id: &ConnectionId,
2269    ) -> Result<&CatalogEntry, SqlCatalogError> {
2270        static NON_PG_CATALOG_TYPES: LazyLock<
2271            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2272        > = LazyLock::new(|| {
2273            BUILTINS::types()
2274                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2275                .map(|typ| (typ.name, typ))
2276                .collect()
2277        });
2278
2279        let entry = self.resolve(
2280            |schema| &schema.types,
2281            current_database,
2282            search_path,
2283            name,
2284            conn_id,
2285            |name| SqlCatalogError::UnknownType { name },
2286        )?;
2287
2288        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2289            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2290                warn!(
2291                    "user specified an incorrect schema of {} for the type {}, which should be in \
2292                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2293                    PG_CATALOG_SCHEMA.quoted(),
2294                    typ.name.quoted(),
2295                    typ.schema.quoted(),
2296                )
2297            }
2298        }
2299
2300        Ok(entry)
2301    }
2302
2303    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2304    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2305        match object_id {
2306            ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2307            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2308            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2309            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2310            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2311            ObjectId::ClusterReplica(cluster_replica_id) => {
2312                CommentObjectId::ClusterReplica(cluster_replica_id)
2313            }
2314            ObjectId::NetworkPolicy(network_policy_id) => {
2315                CommentObjectId::NetworkPolicy(network_policy_id)
2316            }
2317        }
2318    }
2319
2320    /// Return current system configuration.
2321    pub fn system_config(&self) -> &SystemVars {
2322        &self.system_configuration
2323    }
2324
2325    /// Return a mutable reference to the current system configuration.
2326    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2327        Arc::make_mut(&mut self.system_configuration)
2328    }
2329
2330    /// Serializes the catalog's in-memory state.
2331    ///
2332    /// There are no guarantees about the format of the serialized state, except
2333    /// that the serialized state for two identical catalogs will compare
2334    /// identically.
2335    ///
2336    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2337    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2338    /// argument.
2339    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2340        // Dump the base catalog.
2341        let mut dump = serde_json::to_value(&self).map_err(|e| {
2342            Error::new(ErrorKind::Unstructured(format!(
2343                // Don't panic here because we don't have compile-time failures for maps with
2344                // non-string keys.
2345                "internal error: could not dump catalog: {}",
2346                e
2347            )))
2348        })?;
2349
2350        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2351        // Stitch in system parameter defaults.
2352        dump_obj.insert(
2353            "system_parameter_defaults".into(),
2354            serde_json::json!(self.system_config().defaults()),
2355        );
2356        // Potentially overwrite unfinalized shards.
2357        if let Some(unfinalized_shards) = unfinalized_shards {
2358            dump_obj
2359                .get_mut("storage_metadata")
2360                .expect("known to exist")
2361                .as_object_mut()
2362                .expect("storage_metadata is an object")
2363                .insert(
2364                    "unfinalized_shards".into(),
2365                    serde_json::json!(unfinalized_shards),
2366                );
2367        }
2368        // Remove GlobalIds for temporary objects from the mapping.
2369        //
2370        // Post-test consistency checks with the durable catalog don't know about temporary items
2371        // since they're kept entirely in memory.
2372        let temporary_gids: Vec<_> = self
2373            .entry_by_global_id
2374            .iter()
2375            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2376            .map(|(gid, _item_id)| *gid)
2377            .collect();
2378        if !temporary_gids.is_empty() {
2379            let gids = dump_obj
2380                .get_mut("entry_by_global_id")
2381                .expect("known_to_exist")
2382                .as_object_mut()
2383                .expect("entry_by_global_id is an object");
2384            for gid in temporary_gids {
2385                gids.remove(&gid.to_string());
2386            }
2387        }
2388        // We exclude role_auth_by_id because it contains password information
2389        // which should not be included in the dump.
2390        dump_obj.remove("role_auth_by_id");
2391
2392        // Emit as pretty-printed JSON.
2393        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2394    }
2395
2396    pub fn availability_zones(&self) -> &[String] {
2397        &self.availability_zones
2398    }
2399
2400    pub fn concretize_replica_location(
2401        &self,
2402        location: mz_catalog::durable::ReplicaLocation,
2403        allowed_sizes: &Vec<String>,
2404        allowed_availability_zones: Option<&[String]>,
2405    ) -> Result<ReplicaLocation, Error> {
2406        let location = match location {
2407            mz_catalog::durable::ReplicaLocation::Unmanaged {
2408                storagectl_addrs,
2409                computectl_addrs,
2410            } => {
2411                if allowed_availability_zones.is_some() {
2412                    return Err(Error {
2413                        kind: ErrorKind::Internal(
2414                            "tried concretize unmanaged replica with specific availability_zones"
2415                                .to_string(),
2416                        ),
2417                    });
2418                }
2419                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2420                    storagectl_addrs,
2421                    computectl_addrs,
2422                })
2423            }
2424            mz_catalog::durable::ReplicaLocation::Managed {
2425                size,
2426                availability_zone,
2427                billed_as,
2428                internal,
2429                pending,
2430            } => {
2431                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2432                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2433                    return Err(Error {
2434                        kind: ErrorKind::Internal(message.to_string()),
2435                    });
2436                }
2437                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2438                let cluster_replica_sizes = &self.cluster_replica_sizes;
2439
2440                ReplicaLocation::Managed(ManagedReplicaLocation {
2441                    allocation: cluster_replica_sizes
2442                        .0
2443                        .get(&size)
2444                        .expect("catalog out of sync")
2445                        .clone(),
2446                    availability_zones: match (availability_zone, allowed_availability_zones) {
2447                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2448                        (None, Some([])) => ManagedReplicaAvailabilityZones::FromCluster(None),
2449                        (None, Some(azs)) => {
2450                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2451                        }
2452                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2453                    },
2454                    size,
2455                    billed_as,
2456                    internal,
2457                    pending,
2458                })
2459            }
2460        };
2461        Ok(location)
2462    }
2463
2464    /// Return whether the given replica size requests a disk.
2465    ///
2466    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2467    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2468    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2469    /// not something we can know at this point.
2470    ///
2471    /// # Panics
2472    ///
2473    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2474    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2475        let alloc = &self.cluster_replica_sizes.0[size];
2476        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2477    }
2478
2479    pub(crate) fn ensure_valid_replica_size(
2480        &self,
2481        allowed_sizes: &[String],
2482        size: &String,
2483    ) -> Result<(), Error> {
2484        let cluster_replica_sizes = &self.cluster_replica_sizes;
2485
2486        if !cluster_replica_sizes.0.contains_key(size)
2487            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2488            || cluster_replica_sizes.0[size].disabled
2489        {
2490            let mut entries = cluster_replica_sizes
2491                .enabled_allocations()
2492                .collect::<Vec<_>>();
2493
2494            if !allowed_sizes.is_empty() {
2495                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2496                entries.retain(|(name, _)| allowed_sizes.contains(name));
2497            }
2498
2499            entries.sort_by_key(
2500                |(
2501                    _name,
2502                    ReplicaAllocation {
2503                        scale, cpu_limit, ..
2504                    },
2505                )| (scale, cpu_limit),
2506            );
2507
2508            Err(Error {
2509                kind: ErrorKind::InvalidClusterReplicaSize {
2510                    size: size.to_owned(),
2511                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2512                },
2513            })
2514        } else {
2515            Ok(())
2516        }
2517    }
2518
2519    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2520        if role_id.is_builtin() {
2521            let role = self.get_role(role_id);
2522            Err(Error::new(ErrorKind::ReservedRoleName(
2523                role.name().to_string(),
2524            )))
2525        } else {
2526            Ok(())
2527        }
2528    }
2529
2530    pub fn ensure_not_reserved_network_policy(
2531        &self,
2532        network_policy_id: &NetworkPolicyId,
2533    ) -> Result<(), Error> {
2534        if network_policy_id.is_builtin() {
2535            let policy = self.get_network_policy(network_policy_id);
2536            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2537                policy.name.clone(),
2538            )))
2539        } else {
2540            Ok(())
2541        }
2542    }
2543
2544    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2545        let is_grantable = !role_id.is_public() && !role_id.is_system();
2546        if is_grantable {
2547            Ok(())
2548        } else {
2549            let role = self.get_role(role_id);
2550            Err(Error::new(ErrorKind::UngrantableRoleName(
2551                role.name().to_string(),
2552            )))
2553        }
2554    }
2555
2556    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2557        if role_id.is_system() {
2558            let role = self.get_role(role_id);
2559            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2560                role.name().to_string(),
2561            )))
2562        } else {
2563            Ok(())
2564        }
2565    }
2566
2567    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2568        if role_id.is_predefined() {
2569            let role = self.get_role(role_id);
2570            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2571                role.name().to_string(),
2572            )))
2573        } else {
2574            Ok(())
2575        }
2576    }
2577
2578    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2579    // passing tx, and session?
2580    pub(crate) fn add_to_audit_log(
2581        system_configuration: &SystemVars,
2582        oracle_write_ts: mz_repr::Timestamp,
2583        session: Option<&ConnMeta>,
2584        tx: &mut mz_catalog::durable::Transaction,
2585        audit_events: &mut Vec<VersionedEvent>,
2586        event_type: EventType,
2587        object_type: ObjectType,
2588        details: EventDetails,
2589    ) -> Result<(), Error> {
2590        let user = session.map(|session| session.user().name.to_string());
2591
2592        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2593
2594        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2595            Some(ts) => ts.into(),
2596            _ => oracle_write_ts.into(),
2597        };
2598        let id = tx.allocate_audit_log_id()?;
2599        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2600        audit_events.push(event.clone());
2601        tx.insert_audit_log_event(event);
2602        Ok(())
2603    }
2604
2605    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2606        match id {
2607            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2608            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2609                self.get_cluster_replica(*cluster_id, *replica_id)
2610                    .owner_id(),
2611            ),
2612            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2613            ObjectId::Schema((database_spec, schema_spec)) => Some(
2614                self.get_schema(database_spec, schema_spec, conn_id)
2615                    .owner_id(),
2616            ),
2617            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2618            ObjectId::Role(_) => None,
2619            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2620        }
2621    }
2622
2623    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2624        match object_id {
2625            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2626            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2627            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2628            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2629            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2630            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2631            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2632        }
2633    }
2634
2635    pub(super) fn get_system_object_type(
2636        &self,
2637        id: &SystemObjectId,
2638    ) -> mz_sql::catalog::SystemObjectType {
2639        match id {
2640            SystemObjectId::Object(object_id) => {
2641                SystemObjectType::Object(self.get_object_type(object_id))
2642            }
2643            SystemObjectId::System => SystemObjectType::System,
2644        }
2645    }
2646
2647    /// Returns a read-only view of the current [`StorageMetadata`].
2648    ///
2649    /// To write to this struct, you must use a catalog transaction.
2650    pub fn storage_metadata(&self) -> &StorageMetadata {
2651        &self.storage_metadata
2652    }
2653
2654    /// For the Sources ids in `ids`, return their compaction windows.
2655    pub fn source_compaction_windows(
2656        &self,
2657        ids: impl IntoIterator<Item = CatalogItemId>,
2658    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2659        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2660        let mut seen = BTreeSet::new();
2661        for item_id in ids {
2662            if !seen.insert(item_id) {
2663                continue;
2664            }
2665            let entry = self.get_entry(&item_id);
2666            match entry.item() {
2667                CatalogItem::Source(source) => {
2668                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2669                    cws.entry(source_cw).or_default().insert(item_id);
2670                }
2671                CatalogItem::Table(table) => {
2672                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2673                    match &table.data_source {
2674                        TableDataSource::DataSource {
2675                            desc:
2676                                DataSourceDesc::IngestionExport { .. }
2677                                // Also match webhook tables (source-to-table migration).
2678                                | DataSourceDesc::Webhook { .. },
2679                            timeline: _,
2680                        } => {
2681                            cws.entry(table_cw).or_default().insert(item_id);
2682                        }
2683                        // Regular tables handle compaction directly in
2684                        // catalog_implications, not through this function.
2685                        TableDataSource::TableWrites { .. } => {}
2686                        TableDataSource::DataSource {
2687                            desc:
2688                                DataSourceDesc::Ingestion { .. }
2689                                | DataSourceDesc::OldSyntaxIngestion { .. }
2690                                | DataSourceDesc::Introspection(_)
2691                                | DataSourceDesc::Progress
2692                                | DataSourceDesc::Catalog,
2693                            ..
2694                        } => {
2695                            unreachable!(
2696                                "unexpected DataSourceDesc for table {item_id}: {:?}",
2697                                table.data_source
2698                            )
2699                        }
2700                    }
2701                }
2702                _ => {
2703                    // Views could depend on sources, so ignore them if added by used_by above.
2704                    continue;
2705                }
2706            }
2707        }
2708        cws
2709    }
2710
2711    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2712        match id {
2713            CommentObjectId::Table(id)
2714            | CommentObjectId::View(id)
2715            | CommentObjectId::MaterializedView(id)
2716            | CommentObjectId::Source(id)
2717            | CommentObjectId::Sink(id)
2718            | CommentObjectId::Index(id)
2719            | CommentObjectId::Func(id)
2720            | CommentObjectId::Connection(id)
2721            | CommentObjectId::Type(id)
2722            | CommentObjectId::Secret(id) => Some(*id),
2723            CommentObjectId::Role(_)
2724            | CommentObjectId::Database(_)
2725            | CommentObjectId::Schema(_)
2726            | CommentObjectId::Cluster(_)
2727            | CommentObjectId::ClusterReplica(_)
2728            | CommentObjectId::NetworkPolicy(_) => None,
2729        }
2730    }
2731
2732    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2733        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2734    }
2735
2736    pub fn comment_id_to_audit_log_name(
2737        &self,
2738        id: CommentObjectId,
2739        conn_id: &ConnectionId,
2740    ) -> String {
2741        match id {
2742            CommentObjectId::Table(id)
2743            | CommentObjectId::View(id)
2744            | CommentObjectId::MaterializedView(id)
2745            | CommentObjectId::Source(id)
2746            | CommentObjectId::Sink(id)
2747            | CommentObjectId::Index(id)
2748            | CommentObjectId::Func(id)
2749            | CommentObjectId::Connection(id)
2750            | CommentObjectId::Type(id)
2751            | CommentObjectId::Secret(id) => {
2752                let item = self.get_entry(&id);
2753                let name = self.resolve_full_name(item.name(), Some(conn_id));
2754                name.to_string()
2755            }
2756            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2757            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2758            CommentObjectId::Schema((spec, schema_id)) => {
2759                let schema = self.get_schema(&spec, &schema_id, conn_id);
2760                self.resolve_full_schema_name(&schema.name).to_string()
2761            }
2762            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2763            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2764                let cluster = self.get_cluster(cluster_id);
2765                let replica = self.get_cluster_replica(cluster_id, replica_id);
2766                QualifiedReplica {
2767                    cluster: Ident::new_unchecked(cluster.name.clone()),
2768                    replica: Ident::new_unchecked(replica.name.clone()),
2769                }
2770                .to_string()
2771            }
2772            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2773        }
2774    }
2775
2776    pub fn mock_authentication_nonce(&self) -> String {
2777        self.mock_authentication_nonce.clone().unwrap_or_default()
2778    }
2779}
2780
2781impl ConnectionResolver for CatalogState {
2782    fn resolve_connection(
2783        &self,
2784        id: CatalogItemId,
2785    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2786        use mz_storage_types::connections::Connection::*;
2787        match self
2788            .get_entry(&id)
2789            .connection()
2790            .expect("catalog out of sync")
2791            .details
2792            .to_connection()
2793        {
2794            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2795            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2796            Csr(conn) => Csr(conn.into_inline_connection(self)),
2797            Ssh(conn) => Ssh(conn),
2798            Aws(conn) => Aws(conn),
2799            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2800            MySql(conn) => MySql(conn.into_inline_connection(self)),
2801            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2802            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2803        }
2804    }
2805}
2806
2807impl OptimizerCatalog for CatalogState {
2808    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2809        CatalogState::get_entry_by_global_id(self, id)
2810    }
2811    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2812        CatalogState::get_entry(self, id)
2813    }
2814    fn resolve_full_name(
2815        &self,
2816        name: &QualifiedItemName,
2817        conn_id: Option<&ConnectionId>,
2818    ) -> FullItemName {
2819        CatalogState::resolve_full_name(self, name, conn_id)
2820    }
2821    fn get_indexes_on(
2822        &self,
2823        id: GlobalId,
2824        cluster: ClusterId,
2825    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2826        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2827    }
2828}
2829
2830impl OptimizerCatalog for Catalog {
2831    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2832        self.state.get_entry_by_global_id(id)
2833    }
2834
2835    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2836        self.state.get_entry(id)
2837    }
2838
2839    fn resolve_full_name(
2840        &self,
2841        name: &QualifiedItemName,
2842        conn_id: Option<&ConnectionId>,
2843    ) -> FullItemName {
2844        self.state.resolve_full_name(name, conn_id)
2845    }
2846
2847    fn get_indexes_on(
2848        &self,
2849        id: GlobalId,
2850        cluster: ClusterId,
2851    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2852        Box::new(self.state.get_indexes_on(id, cluster))
2853    }
2854}
2855
2856impl Catalog {
2857    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2858        self
2859    }
2860}