Skip to main content

mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62    VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71    TypeReference,
72};
73use mz_sql::names::{
74    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81    Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91    ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use mz_transform::notice::OptimizerNotice;
94use serde::Serialize;
95use timely::progress::Antichain;
96use tokio::sync::mpsc;
97use tracing::{debug, warn};
98
99// DO NOT add any more imports from `crate` outside of `crate::catalog`.
100use crate::AdapterError;
101use crate::catalog::{Catalog, ConnCatalog};
102use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
103use crate::optimize::{self, Optimize, OptimizerCatalog};
104use crate::session::Session;
105
106/// The in-memory representation of the Catalog. This struct is not directly used to persist
107/// metadata to persistent storage. For persistent metadata see
108/// [`mz_catalog::durable::DurableCatalogState`].
109///
110/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
111/// storing the contents of this struct on disk.
112#[derive(Debug, Clone, Serialize)]
113pub struct CatalogState {
114    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
115    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
116    // include:
117    //  - Temporary items.
118    //  - Certain objects are partially derived from read-only state.
119    pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
120    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121    pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
122    #[serde(serialize_with = "skip_temp_items")]
123    pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
124    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125    pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
126    pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
127    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128    pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
129    pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
130    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131    pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
132    pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
133    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
134    pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
135    pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
136    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
137    pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
138    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
139    pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
140
141    #[serde(skip)]
142    pub(super) system_configuration: Arc<SystemVars>,
143    pub(super) default_privileges: Arc<DefaultPrivileges>,
144    pub(super) system_privileges: Arc<PrivilegeMap>,
145    pub(super) comments: Arc<CommentsMap>,
146    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
147    pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
148    pub(super) storage_metadata: Arc<StorageMetadata>,
149    pub(super) mock_authentication_nonce: Option<String>,
150
151    // Mutable state not derived from the durable catalog. Populated
152    // during dataflow bootstrapping (`bootstrap_dataflow_plans`), which
153    // doesn't run in Testdrive's read-only consistency check, so this
154    // must be `#[serde(skip)]`.
155    #[serde(skip)]
156    pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
157
158    // Populated by active connections creating temporary objects. The
159    // read-only catalog opened by Testdrive's consistency check has no
160    // active connections, so this must be `#[serde(skip)]`.
161    #[serde(skip)]
162    pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
163
164    // Read-only state not derived from the durable catalog.
165    #[serde(skip)]
166    pub(super) config: mz_sql::catalog::CatalogConfig,
167    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
168    #[serde(skip)]
169    pub(crate) availability_zones: Vec<String>,
170
171    // Read-only not derived from the durable catalog.
172    #[serde(skip)]
173    pub(super) egress_addresses: Vec<IpNet>,
174    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
175    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
176    pub(super) http_host_name: Option<String>,
177
178    // Read-only not derived from the durable catalog.
179    #[serde(skip)]
180    pub(super) license_key: ValidatedLicenseKey,
181}
182
183/// Keeps track of what expressions are cached or not during startup.
184/// It's also used during catalog transactions to avoid re-optimizing CREATE VIEW / CREATE MAT VIEW
185/// statements when going back and forth between durable catalog operations and in-memory catalog
186/// operations.
187#[derive(Debug, Clone, Serialize)]
188pub(crate) enum LocalExpressionCache {
189    /// The cache is being used.
190    Open {
191        /// The local expressions that were cached in the expression cache.
192        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
193        /// The local expressions that were NOT cached in the expression cache.
194        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
195    },
196    /// The cache is not being used.
197    Closed,
198}
199
200impl LocalExpressionCache {
201    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
202        Self::Open {
203            cached_exprs,
204            uncached_exprs: BTreeMap::new(),
205        }
206    }
207
208    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
209        match self {
210            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
211            LocalExpressionCache::Closed => None,
212        }
213    }
214
215    /// Insert an expression that was cached, back into the cache. This is generally needed when
216    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
217    pub(super) fn insert_cached_expression(
218        &mut self,
219        id: GlobalId,
220        local_expressions: LocalExpressions,
221    ) {
222        match self {
223            LocalExpressionCache::Open { cached_exprs, .. } => {
224                cached_exprs.insert(id, local_expressions);
225            }
226            LocalExpressionCache::Closed => {}
227        }
228    }
229
230    /// Inform the cache that `id` was not found in the cache and that we should add it as
231    /// `local_mir` and `optimizer_features`.
232    pub(super) fn insert_uncached_expression(
233        &mut self,
234        id: GlobalId,
235        local_mir: OptimizedMirRelationExpr,
236        optimizer_features: OptimizerFeatures,
237    ) {
238        match self {
239            LocalExpressionCache::Open { uncached_exprs, .. } => {
240                let local_expr = LocalExpressions {
241                    local_mir,
242                    optimizer_features,
243                };
244                // If we are trying to cache the same item a second time, with a different
245                // expression, then we must be migrating the object or doing something else weird.
246                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
247                // version after a restart. Caching the migrated version may cause us to incorrectly
248                // think that the object has already been migrated. To simplify things, we cache
249                // neither.
250                let prev = uncached_exprs.remove(&id);
251                match prev {
252                    Some(prev) if prev == local_expr => {
253                        uncached_exprs.insert(id, local_expr);
254                    }
255                    None => {
256                        uncached_exprs.insert(id, local_expr);
257                    }
258                    Some(_) => {}
259                }
260            }
261            LocalExpressionCache::Closed => {}
262        }
263    }
264
265    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
266        match self {
267            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
268            LocalExpressionCache::Closed => BTreeMap::new(),
269        }
270    }
271}
272
273fn skip_temp_items<S>(
274    entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
275    serializer: S,
276) -> Result<S::Ok, S::Error>
277where
278    S: serde::Serializer,
279{
280    mz_ore::serde::map_key_to_string(
281        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
282        serializer,
283    )
284}
285
286impl CatalogState {
287    /// Returns an empty [`CatalogState`] that can be used in tests.
288    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
289    // tests are structured in this repository.
290    pub fn empty_test() -> Self {
291        CatalogState {
292            database_by_name: Default::default(),
293            database_by_id: Default::default(),
294            entry_by_id: Default::default(),
295            entry_by_global_id: Default::default(),
296            notices_by_dep_id: Default::default(),
297            ambient_schemas_by_name: Default::default(),
298            ambient_schemas_by_id: Default::default(),
299            temporary_schemas: Default::default(),
300            clusters_by_id: Default::default(),
301            clusters_by_name: Default::default(),
302            network_policies_by_name: Default::default(),
303            roles_by_name: Default::default(),
304            roles_by_id: Default::default(),
305            network_policies_by_id: Default::default(),
306            role_auth_by_id: Default::default(),
307            config: CatalogConfig {
308                start_time: Default::default(),
309                start_instant: Instant::now(),
310                nonce: Default::default(),
311                environment_id: EnvironmentId::for_tests(),
312                session_id: Default::default(),
313                build_info: &DUMMY_BUILD_INFO,
314                now: NOW_ZERO.clone(),
315                connection_context: ConnectionContext::for_tests(Arc::new(
316                    InMemorySecretsController::new(),
317                )),
318                builtins_cfg: BuiltinsConfig {
319                    include_continual_tasks: true,
320                },
321                helm_chart_version: None,
322            },
323            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
324            availability_zones: Default::default(),
325            system_configuration: Arc::new(SystemVars::default()),
326            egress_addresses: Default::default(),
327            aws_principal_context: Default::default(),
328            aws_privatelink_availability_zones: Default::default(),
329            http_host_name: Default::default(),
330            default_privileges: Arc::new(DefaultPrivileges::default()),
331            system_privileges: Arc::new(PrivilegeMap::default()),
332            comments: Arc::new(CommentsMap::default()),
333            source_references: Default::default(),
334            storage_metadata: Arc::new(StorageMetadata::default()),
335            license_key: ValidatedLicenseKey::for_tests(),
336            mock_authentication_nonce: Default::default(),
337        }
338    }
339
340    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
341        let search_path = self.resolve_search_path(session);
342        let database = self
343            .database_by_name
344            .get(session.vars().database())
345            .map(|id| id.clone());
346        let state = match session.transaction().catalog_state() {
347            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
348            None => Cow::Borrowed(self),
349        };
350        ConnCatalog {
351            state,
352            unresolvable_ids: BTreeSet::new(),
353            conn_id: session.conn_id().clone(),
354            cluster: session.vars().cluster().into(),
355            database,
356            search_path,
357            role_id: session.current_role_id().clone(),
358            prepared_statements: Some(session.prepared_statements()),
359            portals: Some(session.portals()),
360            notices_tx: session.retain_notice_transmitter(),
361        }
362    }
363
364    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
365        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
366        let cluster = self.system_configuration.default_cluster();
367
368        ConnCatalog {
369            state: Cow::Borrowed(self),
370            unresolvable_ids: BTreeSet::new(),
371            conn_id: SYSTEM_CONN_ID.clone(),
372            cluster,
373            database: self
374                .resolve_database(DEFAULT_DATABASE_NAME)
375                .ok()
376                .map(|db| db.id()),
377            // Leaving the system's search path empty allows us to catch issues
378            // where catalog object names have not been normalized correctly.
379            search_path: Vec::new(),
380            role_id,
381            prepared_statements: None,
382            portals: None,
383            notices_tx,
384        }
385    }
386
387    pub fn for_system_session(&self) -> ConnCatalog<'_> {
388        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
389    }
390
391    /// Returns an iterator over the deduplicated identifiers of all
392    /// objects this catalog entry transitively depends on (where
393    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
394    /// [`CatalogItem::references`]).
395    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
396        struct I<'a> {
397            queue: VecDeque<CatalogItemId>,
398            seen: BTreeSet<CatalogItemId>,
399            this: &'a CatalogState,
400        }
401        impl<'a> Iterator for I<'a> {
402            type Item = CatalogItemId;
403            fn next(&mut self) -> Option<Self::Item> {
404                if let Some(next) = self.queue.pop_front() {
405                    for child in self.this.get_entry(&next).item().uses() {
406                        if !self.seen.contains(&child) {
407                            self.queue.push_back(child);
408                            self.seen.insert(child);
409                        }
410                    }
411                    Some(next)
412                } else {
413                    None
414                }
415            }
416        }
417
418        I {
419            queue: [id].into_iter().collect(),
420            seen: [id].into_iter().collect(),
421            this: self,
422        }
423    }
424
425    /// Computes the IDs of any log sources this catalog entry transitively
426    /// depends on.
427    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
428        let mut out = Vec::new();
429        self.introspection_dependencies_inner(id, &mut out);
430        out
431    }
432
433    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
434        match self.get_entry(&id).item() {
435            CatalogItem::Log(_) => out.push(id),
436            item @ (CatalogItem::View(_)
437            | CatalogItem::MaterializedView(_)
438            | CatalogItem::Connection(_)
439            | CatalogItem::ContinualTask(_)) => {
440                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
441                for item_id in item.references().items() {
442                    self.introspection_dependencies_inner(*item_id, out);
443                }
444            }
445            CatalogItem::Sink(sink) => {
446                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
447                self.introspection_dependencies_inner(from_item_id, out)
448            }
449            CatalogItem::Index(idx) => {
450                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
451                self.introspection_dependencies_inner(on_item_id, out)
452            }
453            CatalogItem::Table(_)
454            | CatalogItem::Source(_)
455            | CatalogItem::Type(_)
456            | CatalogItem::Func(_)
457            | CatalogItem::Secret(_) => (),
458        }
459    }
460
461    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
462    ///
463    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
464    /// earlier in the list than the roots. This is particularly useful for the order to drop
465    /// objects.
466    pub(super) fn object_dependents(
467        &self,
468        object_ids: &Vec<ObjectId>,
469        conn_id: &ConnectionId,
470        seen: &mut BTreeSet<ObjectId>,
471    ) -> Vec<ObjectId> {
472        let mut dependents = Vec::new();
473        for object_id in object_ids {
474            match object_id {
475                ObjectId::Cluster(id) => {
476                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
477                }
478                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
479                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
480                ),
481                ObjectId::Database(id) => {
482                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
483                }
484                ObjectId::Schema((database_spec, schema_spec)) => {
485                    dependents.extend_from_slice(&self.schema_dependents(
486                        database_spec.clone(),
487                        schema_spec.clone(),
488                        conn_id,
489                        seen,
490                    ));
491                }
492                ObjectId::NetworkPolicy(id) => {
493                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
494                }
495                id @ ObjectId::Role(_) => {
496                    let unseen = seen.insert(id.clone());
497                    if unseen {
498                        dependents.push(id.clone());
499                    }
500                }
501                ObjectId::Item(id) => {
502                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
503                }
504            }
505        }
506        dependents
507    }
508
509    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
510    /// itself.
511    ///
512    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
513    /// earlier in the list than the roots. This is particularly useful for the order to drop
514    /// objects.
515    fn cluster_dependents(
516        &self,
517        cluster_id: ClusterId,
518        seen: &mut BTreeSet<ObjectId>,
519    ) -> Vec<ObjectId> {
520        let mut dependents = Vec::new();
521        let object_id = ObjectId::Cluster(cluster_id);
522        if !seen.contains(&object_id) {
523            seen.insert(object_id.clone());
524            let cluster = self.get_cluster(cluster_id);
525            for item_id in cluster.bound_objects() {
526                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
527            }
528            for replica_id in cluster.replica_ids().values() {
529                dependents.extend_from_slice(&self.cluster_replica_dependents(
530                    cluster_id,
531                    *replica_id,
532                    seen,
533                ));
534            }
535            dependents.push(object_id);
536        }
537        dependents
538    }
539
540    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
541    /// itself.
542    ///
543    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
544    /// earlier in the list than the roots. This is particularly useful for the order to drop
545    /// objects.
546    pub(super) fn cluster_replica_dependents(
547        &self,
548        cluster_id: ClusterId,
549        replica_id: ReplicaId,
550        seen: &mut BTreeSet<ObjectId>,
551    ) -> Vec<ObjectId> {
552        let mut dependents = Vec::new();
553        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
554        if !seen.contains(&object_id) {
555            seen.insert(object_id.clone());
556            dependents.push(object_id);
557        }
558        dependents
559    }
560
561    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
562    /// itself.
563    ///
564    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
565    /// earlier in the list than the roots. This is particularly useful for the order to drop
566    /// objects.
567    fn database_dependents(
568        &self,
569        database_id: DatabaseId,
570        conn_id: &ConnectionId,
571        seen: &mut BTreeSet<ObjectId>,
572    ) -> Vec<ObjectId> {
573        let mut dependents = Vec::new();
574        let object_id = ObjectId::Database(database_id);
575        if !seen.contains(&object_id) {
576            seen.insert(object_id.clone());
577            let database = self.get_database(&database_id);
578            for schema_id in database.schema_ids().values() {
579                dependents.extend_from_slice(&self.schema_dependents(
580                    ResolvedDatabaseSpecifier::Id(database_id),
581                    SchemaSpecifier::Id(*schema_id),
582                    conn_id,
583                    seen,
584                ));
585            }
586            dependents.push(object_id);
587        }
588        dependents
589    }
590
591    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
592    /// itself.
593    ///
594    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
595    /// earlier in the list than the roots. This is particularly useful for the order to drop
596    /// objects.
597    fn schema_dependents(
598        &self,
599        database_spec: ResolvedDatabaseSpecifier,
600        schema_spec: SchemaSpecifier,
601        conn_id: &ConnectionId,
602        seen: &mut BTreeSet<ObjectId>,
603    ) -> Vec<ObjectId> {
604        let mut dependents = Vec::new();
605        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
606        if !seen.contains(&object_id) {
607            seen.insert(object_id.clone());
608            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
609            for item_id in schema.item_ids() {
610                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
611            }
612            dependents.push(object_id)
613        }
614        dependents
615    }
616
617    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
618    /// itself.
619    ///
620    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
621    /// earlier in the list than the roots. This is particularly useful for the order to drop
622    /// objects.
623    pub(super) fn item_dependents(
624        &self,
625        item_id: CatalogItemId,
626        seen: &mut BTreeSet<ObjectId>,
627    ) -> Vec<ObjectId> {
628        let mut dependents = Vec::new();
629        let object_id = ObjectId::Item(item_id);
630        if !seen.contains(&object_id) {
631            seen.insert(object_id.clone());
632            let entry = self.get_entry(&item_id);
633            for dependent_id in entry.used_by() {
634                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
635            }
636            dependents.push(object_id);
637            // We treat the progress collection as if it depends on the source
638            // for dropping. We have additional code in planning to create a
639            // kind of special-case "CASCADE" for this dependency.
640            if let Some(progress_id) = entry.progress_id() {
641                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
642            }
643        }
644        dependents
645    }
646
647    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
648    /// itself.
649    ///
650    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
651    /// earlier in the list than the roots. This is particularly useful for the order to drop
652    /// objects.
653    pub(super) fn network_policy_dependents(
654        &self,
655        network_policy_id: NetworkPolicyId,
656        _seen: &mut BTreeSet<ObjectId>,
657    ) -> Vec<ObjectId> {
658        let object_id = ObjectId::NetworkPolicy(network_policy_id);
659        // Currently network policies have no dependents
660        // when we add the ability for users or sources/sinks to have policies
661        // this method will need to be updated.
662        vec![object_id]
663    }
664
665    /// Indicates whether the indicated item is considered stable or not.
666    ///
667    /// Only stable items can be used as dependencies of other catalog items.
668    fn is_stable(&self, id: CatalogItemId) -> bool {
669        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
670        !self.is_unstable_schema_specifier(spec)
671    }
672
673    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
674        if self.system_config().unsafe_enable_unstable_dependencies() {
675            return Ok(());
676        }
677
678        let unstable_dependencies: Vec<_> = item
679            .references()
680            .items()
681            .filter(|id| !self.is_stable(**id))
682            .map(|id| self.get_entry(id).name().item.clone())
683            .collect();
684
685        // It's okay to create a temporary object with unstable
686        // dependencies, since we will never need to reboot a catalog
687        // that contains it.
688        if unstable_dependencies.is_empty() || item.is_temporary() {
689            Ok(())
690        } else {
691            let object_type = item.typ().to_string();
692            Err(Error {
693                kind: ErrorKind::UnstableDependency {
694                    object_type,
695                    unstable_dependencies,
696                },
697            })
698        }
699    }
700
701    pub fn resolve_full_name(
702        &self,
703        name: &QualifiedItemName,
704        conn_id: Option<&ConnectionId>,
705    ) -> FullItemName {
706        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
707
708        let database = match &name.qualifiers.database_spec {
709            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
710            ResolvedDatabaseSpecifier::Id(id) => {
711                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
712            }
713        };
714        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
715        // and the schema may not exist yet if no temporary items have been created.
716        let schema = match &name.qualifiers.schema_spec {
717            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
718            SchemaSpecifier::Id(_) => self
719                .get_schema(
720                    &name.qualifiers.database_spec,
721                    &name.qualifiers.schema_spec,
722                    conn_id,
723                )
724                .name()
725                .schema
726                .clone(),
727        };
728        FullItemName {
729            database,
730            schema,
731            item: name.item.clone(),
732        }
733    }
734
735    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
736        let database = match &name.database {
737            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
738            ResolvedDatabaseSpecifier::Id(id) => {
739                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
740            }
741        };
742        FullSchemaName {
743            database,
744            schema: name.schema.clone(),
745        }
746    }
747
748    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
749        self.entry_by_id
750            .get(id)
751            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
752    }
753
754    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
755        let item_id = self
756            .entry_by_global_id
757            .get(id)
758            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
759
760        let entry = self.get_entry(item_id).clone();
761        let version = match entry.item() {
762            CatalogItem::Table(table) => {
763                let (version, _) = table
764                    .collections
765                    .iter()
766                    .find(|(_verison, gid)| *gid == id)
767                    .expect("version to exist");
768                RelationVersionSelector::Specific(*version)
769            }
770            _ => RelationVersionSelector::Latest,
771        };
772        CatalogCollectionEntry { entry, version }
773    }
774
775    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
776        self.entry_by_id.iter()
777    }
778
779    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
780        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
781        self.temporary_schemas
782            .get(conn)
783            .into_iter()
784            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
785    }
786
787    /// Returns true if a temporary schema exists for the given connection.
788    ///
789    /// Temporary schemas are created lazily when the first temporary object is created
790    /// for a connection, so this may return false for connections that haven't created
791    /// any temporary objects.
792    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
793        self.temporary_schemas.contains_key(conn)
794    }
795
796    /// Gets a type named `name` from exactly one of the system schemas.
797    ///
798    /// # Panics
799    /// - If `name` is not an entry in any system schema
800    /// - If more than one system schema has an entry named `name`.
801    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
802        let mut res = None;
803        for schema_id in self.system_schema_ids() {
804            let schema = &self.ambient_schemas_by_id[&schema_id];
805            if let Some(global_id) = schema.types.get(name) {
806                match res {
807                    None => res = Some(self.get_entry(global_id)),
808                    Some(_) => panic!(
809                        "only call get_system_type on objects uniquely identifiable in one system schema"
810                    ),
811                }
812            }
813        }
814
815        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
816    }
817
818    pub fn get_item_by_name(
819        &self,
820        name: &QualifiedItemName,
821        conn_id: &ConnectionId,
822    ) -> Option<&CatalogEntry> {
823        self.get_schema(
824            &name.qualifiers.database_spec,
825            &name.qualifiers.schema_spec,
826            conn_id,
827        )
828        .items
829        .get(&name.item)
830        .and_then(|id| self.try_get_entry(id))
831    }
832
833    pub fn get_type_by_name(
834        &self,
835        name: &QualifiedItemName,
836        conn_id: &ConnectionId,
837    ) -> Option<&CatalogEntry> {
838        self.get_schema(
839            &name.qualifiers.database_spec,
840            &name.qualifiers.schema_spec,
841            conn_id,
842        )
843        .types
844        .get(&name.item)
845        .and_then(|id| self.try_get_entry(id))
846    }
847
848    pub(super) fn find_available_name(
849        &self,
850        mut name: QualifiedItemName,
851        conn_id: &ConnectionId,
852    ) -> QualifiedItemName {
853        let mut i = 0;
854        let orig_item_name = name.item.clone();
855        while self.get_item_by_name(&name, conn_id).is_some() {
856            i += 1;
857            name.item = format!("{}{}", orig_item_name, i);
858        }
859        name
860    }
861
862    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
863        self.entry_by_id.get(id)
864    }
865
866    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
867        let item_id = self.entry_by_global_id.get(id)?;
868        self.try_get_entry(item_id)
869    }
870
871    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
872    /// object that returns rows.
873    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
874        let entry = self.try_get_entry_by_global_id(id)?;
875        let desc = match entry.item() {
876            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
877            // TODO(alter_table): Support schema evolution on sources.
878            other => other.relation_desc(RelationVersionSelector::Latest)?,
879        };
880        Some(desc)
881    }
882
883    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
884        self.try_get_cluster(cluster_id)
885            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
886    }
887
888    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
889        self.clusters_by_id.get(&cluster_id)
890    }
891
892    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
893        self.roles_by_id.get(id)
894    }
895
896    pub fn get_role(&self, id: &RoleId) -> &Role {
897        self.roles_by_id.get(id).expect("catalog out of sync")
898    }
899
900    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
901        self.roles_by_id.keys()
902    }
903
904    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
905        self.roles_by_name
906            .get(role_name)
907            .map(|id| &self.roles_by_id[id])
908    }
909
910    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
911        self.role_auth_by_id
912            .get(id)
913            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
914    }
915
916    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
917        self.role_auth_by_id.get(id)
918    }
919
920    pub(super) fn try_get_network_policy_by_name(
921        &self,
922        policy_name: &str,
923    ) -> Option<&NetworkPolicy> {
924        self.network_policies_by_name
925            .get(policy_name)
926            .map(|id| &self.network_policies_by_id[id])
927    }
928
929    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
930        let mut membership = BTreeSet::new();
931        let mut queue = VecDeque::from(vec![id]);
932        while let Some(cur_id) = queue.pop_front() {
933            if !membership.contains(cur_id) {
934                membership.insert(cur_id.clone());
935                let role = self.get_role(cur_id);
936                soft_assert_no_log!(
937                    !role.membership().keys().contains(id),
938                    "circular membership exists in the catalog"
939                );
940                queue.extend(role.membership().keys());
941            }
942        }
943        membership.insert(RoleId::Public);
944        membership
945    }
946
947    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
948        self.network_policies_by_id
949            .get(id)
950            .expect("catalog out of sync")
951    }
952
953    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
954        self.network_policies_by_id.keys()
955    }
956
957    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
958    /// source.
959    ///
960    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
961    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
962        let entry = self.try_get_entry(id)?;
963        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
964        let name = self.resolve_full_name(entry.name(), None);
965        let host_name = self
966            .http_host_name
967            .as_ref()
968            .map(|x| x.as_str())
969            .unwrap_or_else(|| "HOST");
970
971        let RawDatabaseSpecifier::Name(database) = name.database else {
972            return None;
973        };
974
975        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
976        url.path_segments_mut()
977            .ok()?
978            .push(&database)
979            .push(&name.schema)
980            .push(&name.item);
981
982        Some(url)
983    }
984
985    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
986    ///
987    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
988    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
989    ///
990    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
991    /// their original state before the method returns.
992    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
993        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
994        // caller if this method changed the state of the catalog.
995        &mut self,
996        create_sql: &str,
997        force_if_exists_skip: bool,
998    ) -> Result<(Plan, ResolvedIds), AdapterError> {
999        self.with_enable_for_item_parsing(|state| {
1000            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1001            let pcx = Some(&pcx);
1002            let session_catalog = state.for_system_session();
1003
1004            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1005            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1006            let plan =
1007                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1008
1009            Ok((plan, resolved_ids))
1010        })
1011    }
1012
1013    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1014    #[mz_ore::instrument]
1015    pub(crate) fn parse_plan(
1016        create_sql: &str,
1017        pcx: Option<&PlanContext>,
1018        catalog: &ConnCatalog,
1019    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1020        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1021        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1022        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1023
1024        Ok((plan, resolved_ids))
1025    }
1026
1027    /// Parses the given SQL string into a pair of [`CatalogItem`].
1028    pub(crate) fn deserialize_item(
1029        &self,
1030        global_id: GlobalId,
1031        create_sql: &str,
1032        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1033        local_expression_cache: &mut LocalExpressionCache,
1034        previous_item: Option<CatalogItem>,
1035    ) -> Result<CatalogItem, AdapterError> {
1036        self.parse_item(
1037            global_id,
1038            create_sql,
1039            extra_versions,
1040            None,
1041            false,
1042            None,
1043            local_expression_cache,
1044            previous_item,
1045        )
1046    }
1047
1048    /// Parses the given SQL string into a `CatalogItem`.
1049    #[mz_ore::instrument]
1050    pub(crate) fn parse_item(
1051        &self,
1052        global_id: GlobalId,
1053        create_sql: &str,
1054        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1055        pcx: Option<&PlanContext>,
1056        is_retained_metrics_object: bool,
1057        custom_logical_compaction_window: Option<CompactionWindow>,
1058        local_expression_cache: &mut LocalExpressionCache,
1059        previous_item: Option<CatalogItem>,
1060    ) -> Result<CatalogItem, AdapterError> {
1061        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1062        match self.parse_item_inner(
1063            global_id,
1064            create_sql,
1065            extra_versions,
1066            pcx,
1067            is_retained_metrics_object,
1068            custom_logical_compaction_window,
1069            cached_expr,
1070            previous_item,
1071        ) {
1072            Ok((item, uncached_expr)) => {
1073                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1074                    local_expression_cache.insert_uncached_expression(
1075                        global_id,
1076                        uncached_expr,
1077                        optimizer_features,
1078                    );
1079                }
1080                Ok(item)
1081            }
1082            Err((err, cached_expr)) => {
1083                if let Some(local_expr) = cached_expr {
1084                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1085                }
1086                Err(err)
1087            }
1088        }
1089    }
1090
1091    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1092    ///
1093    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1094    /// not cached.
1095    ///
1096    /// On failure returns an error and `cached_expr` so it can be used later.
1097    #[mz_ore::instrument]
1098    pub(crate) fn parse_item_inner(
1099        &self,
1100        global_id: GlobalId,
1101        create_sql: &str,
1102        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1103        pcx: Option<&PlanContext>,
1104        is_retained_metrics_object: bool,
1105        custom_logical_compaction_window: Option<CompactionWindow>,
1106        cached_expr: Option<LocalExpressions>,
1107        previous_item: Option<CatalogItem>,
1108    ) -> Result<
1109        (
1110            CatalogItem,
1111            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1112        ),
1113        (AdapterError, Option<LocalExpressions>),
1114    > {
1115        let session_catalog = self.for_system_session();
1116
1117        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1118            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1119            Err(err) => return Err((err, cached_expr)),
1120        };
1121
1122        let mut uncached_expr = None;
1123
1124        // Carry over the plans (`optimized_plan`, `physical_plan`,
1125        // `dataflow_metainfo`) from the previous incarnation of this item when
1126        // re-parsing an existing item (e.g. after a RENAME). These fields live
1127        // on the `CatalogItem` since #35834, but are not reconstructable from
1128        // `create_sql` alone — they are populated by the sequencer `_finish`
1129        // paths at create time, and by the expression-cache / bootstrap
1130        // rendering path on boot. If we don't preserve them here, a RENAME
1131        // silently drops the plans and dataflow metainfo for the affected
1132        // MV/Index/CT.
1133        let previous_plans = previous_item.as_ref().map(|item| {
1134            (
1135                item.optimized_plan().cloned(),
1136                item.physical_plan().cloned(),
1137                item.dataflow_metainfo().cloned(),
1138            )
1139        });
1140
1141        let mut item = match plan {
1142            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1143                let collections = extra_versions
1144                    .iter()
1145                    .map(|(version, gid)| (*version, *gid))
1146                    .chain([(RelationVersion::root(), global_id)].into_iter())
1147                    .collect();
1148
1149                CatalogItem::Table(Table {
1150                    create_sql: Some(table.create_sql),
1151                    desc: table.desc,
1152                    collections,
1153                    conn_id: None,
1154                    resolved_ids,
1155                    custom_logical_compaction_window: custom_logical_compaction_window
1156                        .or(table.compaction_window),
1157                    is_retained_metrics_object,
1158                    data_source: match table.data_source {
1159                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1160                            TableDataSource::TableWrites { defaults }
1161                        }
1162                        mz_sql::plan::TableDataSource::DataSource {
1163                            desc: data_source_desc,
1164                            timeline,
1165                        } => match data_source_desc {
1166                            mz_sql::plan::DataSourceDesc::IngestionExport {
1167                                ingestion_id,
1168                                external_reference,
1169                                details,
1170                                data_config,
1171                            } => TableDataSource::DataSource {
1172                                desc: DataSourceDesc::IngestionExport {
1173                                    ingestion_id,
1174                                    external_reference,
1175                                    details,
1176                                    data_config,
1177                                },
1178                                timeline,
1179                            },
1180                            mz_sql::plan::DataSourceDesc::Webhook {
1181                                validate_using,
1182                                body_format,
1183                                headers,
1184                                cluster_id,
1185                            } => TableDataSource::DataSource {
1186                                desc: DataSourceDesc::Webhook {
1187                                    validate_using,
1188                                    body_format,
1189                                    headers,
1190                                    cluster_id: cluster_id
1191                                        .expect("Webhook Tables must have a cluster_id set"),
1192                                },
1193                                timeline,
1194                            },
1195                            _ => {
1196                                return Err((
1197                                    AdapterError::Unstructured(anyhow::anyhow!(
1198                                        "unsupported data source for table"
1199                                    )),
1200                                    cached_expr,
1201                                ));
1202                            }
1203                        },
1204                    },
1205                })
1206            }
1207            Plan::CreateSource(CreateSourcePlan {
1208                source,
1209                timeline,
1210                in_cluster,
1211                ..
1212            }) => CatalogItem::Source(Source {
1213                create_sql: Some(source.create_sql),
1214                data_source: match source.data_source {
1215                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1216                        desc,
1217                        cluster_id: match in_cluster {
1218                            Some(id) => id,
1219                            None => {
1220                                return Err((
1221                                    AdapterError::Unstructured(anyhow::anyhow!(
1222                                        "ingestion-based sources must have cluster specified"
1223                                    )),
1224                                    cached_expr,
1225                                ));
1226                            }
1227                        },
1228                    },
1229                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1230                        desc,
1231                        progress_subsource,
1232                        data_config,
1233                        details,
1234                    } => DataSourceDesc::OldSyntaxIngestion {
1235                        desc,
1236                        progress_subsource,
1237                        data_config,
1238                        details,
1239                        cluster_id: match in_cluster {
1240                            Some(id) => id,
1241                            None => {
1242                                return Err((
1243                                    AdapterError::Unstructured(anyhow::anyhow!(
1244                                        "ingestion-based sources must have cluster specified"
1245                                    )),
1246                                    cached_expr,
1247                                ));
1248                            }
1249                        },
1250                    },
1251                    mz_sql::plan::DataSourceDesc::IngestionExport {
1252                        ingestion_id,
1253                        external_reference,
1254                        details,
1255                        data_config,
1256                    } => DataSourceDesc::IngestionExport {
1257                        ingestion_id,
1258                        external_reference,
1259                        details,
1260                        data_config,
1261                    },
1262                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1263                    mz_sql::plan::DataSourceDesc::Webhook {
1264                        validate_using,
1265                        body_format,
1266                        headers,
1267                        cluster_id,
1268                    } => {
1269                        mz_ore::soft_assert_or_log!(
1270                            cluster_id.is_none(),
1271                            "cluster_id set at Source level for Webhooks"
1272                        );
1273                        DataSourceDesc::Webhook {
1274                            validate_using,
1275                            body_format,
1276                            headers,
1277                            cluster_id: in_cluster
1278                                .expect("webhook sources must use an existing cluster"),
1279                        }
1280                    }
1281                },
1282                desc: source.desc,
1283                global_id,
1284                timeline,
1285                resolved_ids,
1286                custom_logical_compaction_window: source
1287                    .compaction_window
1288                    .or(custom_logical_compaction_window),
1289                is_retained_metrics_object,
1290            }),
1291            Plan::CreateView(CreateViewPlan { view, .. }) => {
1292                // Collect optimizer parameters.
1293                let optimizer_config =
1294                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1295                let previous_exprs = previous_item.map(|item| match item {
1296                    CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1297                    _ => None,
1298                });
1299
1300                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1301                    (Some(local_expr), _)
1302                        if local_expr.optimizer_features == optimizer_config.features =>
1303                    {
1304                        debug!("local expression cache hit for {global_id:?}");
1305                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1306                    }
1307                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1308                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1309                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1310                    }
1311                    (cached_expr, _) => {
1312                        let optimizer_features = optimizer_config.features.clone();
1313                        // Build an optimizer for this VIEW.
1314                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1315
1316                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1317                        let raw_expr = view.expr;
1318                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1319                            Ok(optimzed_expr) => optimzed_expr,
1320                            Err(err) => return Err((err.into(), cached_expr)),
1321                        };
1322
1323                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1324
1325                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1326                    }
1327                };
1328
1329                // Resolve all item dependencies from the HIR expression.
1330                let dependencies: BTreeSet<_> = raw_expr
1331                    .depends_on()
1332                    .into_iter()
1333                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1334                    .collect();
1335
1336                let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1337                CatalogItem::View(View {
1338                    create_sql: view.create_sql,
1339                    global_id,
1340                    raw_expr,
1341                    desc: RelationDesc::new(typ, view.column_names),
1342                    locally_optimized_expr: optimized_expr,
1343                    conn_id: None,
1344                    resolved_ids,
1345                    dependencies: DependencyIds(dependencies),
1346                })
1347            }
1348            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1349                materialized_view, ..
1350            }) => {
1351                let collections = extra_versions
1352                    .iter()
1353                    .map(|(version, gid)| (*version, *gid))
1354                    .chain([(RelationVersion::root(), global_id)].into_iter())
1355                    .collect();
1356
1357                // Collect optimizer parameters.
1358                let system_vars = session_catalog.system_vars();
1359                let overrides = self
1360                    .get_cluster(materialized_view.cluster_id)
1361                    .config
1362                    .features();
1363                let optimizer_config =
1364                    optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1365                let previous_exprs = previous_item.map(|item| match item {
1366                    CatalogItem::MaterializedView(materialized_view) => (
1367                        materialized_view.raw_expr,
1368                        materialized_view.locally_optimized_expr,
1369                    ),
1370                    item => unreachable!("expected materialized view, found: {item:#?}"),
1371                });
1372
1373                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1374                    (Some(local_expr), _)
1375                        if local_expr.optimizer_features == optimizer_config.features =>
1376                    {
1377                        debug!("local expression cache hit for {global_id:?}");
1378                        (
1379                            Arc::new(materialized_view.expr),
1380                            Arc::new(local_expr.local_mir),
1381                        )
1382                    }
1383                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1384                    (_, Some((raw_expr, optimized_expr)))
1385                        if *raw_expr == materialized_view.expr =>
1386                    {
1387                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1388                    }
1389                    (cached_expr, _) => {
1390                        let optimizer_features = optimizer_config.features.clone();
1391                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1392                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1393
1394                        let raw_expr = materialized_view.expr;
1395                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1396                            Ok(optimized_expr) => optimized_expr,
1397                            Err(err) => return Err((err.into(), cached_expr)),
1398                        };
1399
1400                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1401
1402                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1403                    }
1404                };
1405                let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1406
1407                for &i in &materialized_view.non_null_assertions {
1408                    typ.column_types[i].nullable = false;
1409                }
1410                let desc = RelationDesc::new(typ, materialized_view.column_names);
1411                let desc = VersionedRelationDesc::new(desc);
1412
1413                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1414
1415                // Resolve all item dependencies from the HIR expression.
1416                let dependencies = raw_expr
1417                    .depends_on()
1418                    .into_iter()
1419                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1420                    .collect();
1421
1422                CatalogItem::MaterializedView(MaterializedView {
1423                    create_sql: materialized_view.create_sql,
1424                    collections,
1425                    raw_expr,
1426                    locally_optimized_expr: optimized_expr,
1427                    desc,
1428                    resolved_ids,
1429                    dependencies,
1430                    replacement_target: materialized_view.replacement_target,
1431                    cluster_id: materialized_view.cluster_id,
1432                    target_replica: materialized_view.target_replica,
1433                    non_null_assertions: materialized_view.non_null_assertions,
1434                    custom_logical_compaction_window: materialized_view.compaction_window,
1435                    refresh_schedule: materialized_view.refresh_schedule,
1436                    initial_as_of,
1437                    optimized_plan: None,
1438                    physical_plan: None,
1439                    dataflow_metainfo: None,
1440                })
1441            }
1442            Plan::CreateContinualTask(plan) => {
1443                let ct =
1444                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1445                        Ok(ct) => ct,
1446                        Err(err) => return Err((err, cached_expr)),
1447                    };
1448                CatalogItem::ContinualTask(ct)
1449            }
1450            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1451                create_sql: index.create_sql,
1452                global_id,
1453                on: index.on,
1454                keys: index.keys.into(),
1455                conn_id: None,
1456                resolved_ids,
1457                cluster_id: index.cluster_id,
1458                custom_logical_compaction_window: custom_logical_compaction_window
1459                    .or(index.compaction_window),
1460                is_retained_metrics_object,
1461                optimized_plan: None,
1462                physical_plan: None,
1463                dataflow_metainfo: None,
1464            }),
1465            Plan::CreateSink(CreateSinkPlan {
1466                sink,
1467                with_snapshot,
1468                in_cluster,
1469                ..
1470            }) => CatalogItem::Sink(Sink {
1471                create_sql: sink.create_sql,
1472                global_id,
1473                from: sink.from,
1474                connection: sink.connection,
1475                envelope: sink.envelope,
1476                version: sink.version,
1477                with_snapshot,
1478                resolved_ids,
1479                cluster_id: in_cluster,
1480                commit_interval: sink.commit_interval,
1481            }),
1482            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1483                // Even if we don't need the `RelationDesc` here, error out
1484                // early and eagerly, as a kind of soft assertion that we _can_
1485                // build the `RelationDesc` when needed.
1486                if let Err(err) = typ.inner.desc(&session_catalog) {
1487                    return Err((err.into(), cached_expr));
1488                }
1489                CatalogItem::Type(Type {
1490                    create_sql: Some(typ.create_sql),
1491                    global_id,
1492                    details: CatalogTypeDetails {
1493                        array_id: None,
1494                        typ: typ.inner,
1495                        pg_metadata: None,
1496                    },
1497                    resolved_ids,
1498                })
1499            }
1500            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1501                create_sql: secret.create_sql,
1502                global_id,
1503            }),
1504            Plan::CreateConnection(CreateConnectionPlan {
1505                connection:
1506                    mz_sql::plan::Connection {
1507                        create_sql,
1508                        details,
1509                    },
1510                ..
1511            }) => CatalogItem::Connection(Connection {
1512                create_sql,
1513                global_id,
1514                details,
1515                resolved_ids,
1516            }),
1517            _ => {
1518                return Err((
1519                    Error::new(ErrorKind::Corruption {
1520                        detail: "catalog entry generated inappropriate plan".to_string(),
1521                    })
1522                    .into(),
1523                    cached_expr,
1524                ));
1525            }
1526        };
1527
1528        // Carry over the plans (`optimized_plan`, `physical_plan`,
1529        // `dataflow_metainfo`) from the previous incarnation of this item, if
1530        // any. See the comment on `previous_plans` above.
1531        if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1532            if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1533            {
1534                *optimized_plan = prev_optimized;
1535                *physical_plan = prev_physical;
1536                *dataflow_metainfo = prev_metainfo;
1537            }
1538        }
1539
1540        Ok((item, uncached_expr))
1541    }
1542
1543    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1544    /// Calling this method will not permanently modify any system configuration variables.
1545    ///
1546    /// WARNING:
1547    /// Any modifications made to the system configuration variables in `f`, will be lost.
1548    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1549        // Enable catalog features that might be required during planning existing
1550        // catalog items. Existing catalog items might have been created while
1551        // a specific feature flag was turned on, so we need to ensure that this
1552        // is also the case during catalog rehydration in order to avoid panics.
1553        //
1554        // WARNING / CONTRACT:
1555        // 1. Features used in this method that related to parsing / planning
1556        //    should be `enable_for_item_parsing` set to `true`.
1557        // 2. After this step, feature flag configuration must not be
1558        //    overridden.
1559        let restore = Arc::clone(&self.system_configuration);
1560        Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1561        let res = f(self);
1562        self.system_configuration = restore;
1563        res
1564    }
1565
1566    /// Returns all indexes on the given object and cluster known in the catalog.
1567    pub fn get_indexes_on(
1568        &self,
1569        id: GlobalId,
1570        cluster: ClusterId,
1571    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1572        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1573
1574        self.try_get_entry_by_global_id(&id)
1575            .into_iter()
1576            .map(move |e| {
1577                e.used_by()
1578                    .iter()
1579                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1580                        CatalogItem::Index(index) if index_matches(index) => {
1581                            Some((index.global_id(), index))
1582                        }
1583                        _ => None,
1584                    })
1585            })
1586            .flatten()
1587    }
1588
1589    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1590        &self.database_by_id[database_id]
1591    }
1592
1593    /// Gets a reference to the specified replica of the specified cluster.
1594    ///
1595    /// Returns `None` if either the cluster or the replica does not
1596    /// exist.
1597    pub(super) fn try_get_cluster_replica(
1598        &self,
1599        id: ClusterId,
1600        replica_id: ReplicaId,
1601    ) -> Option<&ClusterReplica> {
1602        self.try_get_cluster(id)
1603            .and_then(|cluster| cluster.replica(replica_id))
1604    }
1605
1606    /// Gets a reference to the specified replica of the specified cluster.
1607    ///
1608    /// Panics if either the cluster or the replica does not exist.
1609    pub(crate) fn get_cluster_replica(
1610        &self,
1611        cluster_id: ClusterId,
1612        replica_id: ReplicaId,
1613    ) -> &ClusterReplica {
1614        self.try_get_cluster_replica(cluster_id, replica_id)
1615            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1616    }
1617
1618    pub(super) fn resolve_replica_in_cluster(
1619        &self,
1620        cluster_id: &ClusterId,
1621        replica_name: &str,
1622    ) -> Result<&ClusterReplica, SqlCatalogError> {
1623        let cluster = self.get_cluster(*cluster_id);
1624        let replica_id = cluster
1625            .replica_id_by_name_
1626            .get(replica_name)
1627            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1628        Ok(&cluster.replicas_by_id_[replica_id])
1629    }
1630
1631    /// Get system configuration `name`.
1632    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1633        Ok(self.system_configuration.get(name)?)
1634    }
1635
1636    /// Parse system configuration `name` with `value` int.
1637    ///
1638    /// Returns the parsed value as a string.
1639    pub(super) fn parse_system_configuration(
1640        &self,
1641        name: &str,
1642        value: VarInput,
1643    ) -> Result<String, Error> {
1644        let value = self.system_configuration.parse(name, value)?;
1645        Ok(value.format())
1646    }
1647
1648    /// Gets the schema map for the database matching `database_spec`.
1649    pub(super) fn resolve_schema_in_database(
1650        &self,
1651        database_spec: &ResolvedDatabaseSpecifier,
1652        schema_name: &str,
1653        conn_id: &ConnectionId,
1654    ) -> Result<&Schema, SqlCatalogError> {
1655        let schema = match database_spec {
1656            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1657                self.temporary_schemas.get(conn_id)
1658            }
1659            ResolvedDatabaseSpecifier::Ambient => self
1660                .ambient_schemas_by_name
1661                .get(schema_name)
1662                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1663            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1664                db.schemas_by_name
1665                    .get(schema_name)
1666                    .and_then(|id| db.schemas_by_id.get(id))
1667            }),
1668        };
1669        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1670    }
1671
1672    /// Try to get a schema, returning `None` if it doesn't exist.
1673    ///
1674    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1675    /// (temporary schemas are created lazily when the first temporary object is created).
1676    pub fn try_get_schema(
1677        &self,
1678        database_spec: &ResolvedDatabaseSpecifier,
1679        schema_spec: &SchemaSpecifier,
1680        conn_id: &ConnectionId,
1681    ) -> Option<&Schema> {
1682        // Keep in sync with `get_schema` and `get_schemas_mut`
1683        match (database_spec, schema_spec) {
1684            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1685                self.temporary_schemas.get(conn_id)
1686            }
1687            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1688                self.ambient_schemas_by_id.get(id)
1689            }
1690            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1691                .database_by_id
1692                .get(database_id)
1693                .and_then(|db| db.schemas_by_id.get(schema_id)),
1694            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1695                unreachable!("temporary schemas are in the ambient database")
1696            }
1697        }
1698    }
1699
1700    pub fn get_schema(
1701        &self,
1702        database_spec: &ResolvedDatabaseSpecifier,
1703        schema_spec: &SchemaSpecifier,
1704        conn_id: &ConnectionId,
1705    ) -> &Schema {
1706        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1707        self.try_get_schema(database_spec, schema_spec, conn_id)
1708            .expect("schema must exist")
1709    }
1710
1711    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1712        self.database_by_id
1713            .values()
1714            .filter_map(|database| database.schemas_by_id.get(schema_id))
1715            .chain(self.ambient_schemas_by_id.values())
1716            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1717            .into_first()
1718    }
1719
1720    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1721        self.temporary_schemas
1722            .values()
1723            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1724            .into_first()
1725    }
1726
1727    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1728        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1729    }
1730
1731    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1732        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1733    }
1734
1735    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1736        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1737    }
1738
1739    pub fn get_information_schema_id(&self) -> SchemaId {
1740        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1741    }
1742
1743    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1744        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1745    }
1746
1747    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1748        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1749    }
1750
1751    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1752        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1753    }
1754
1755    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1756        SYSTEM_SCHEMAS
1757            .iter()
1758            .map(|name| self.ambient_schemas_by_name[*name])
1759    }
1760
1761    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1762        self.system_schema_ids().contains(&id)
1763    }
1764
1765    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1766        match spec {
1767            SchemaSpecifier::Temporary => false,
1768            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1769        }
1770    }
1771
1772    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1773        UNSTABLE_SCHEMAS
1774            .iter()
1775            .map(|name| self.ambient_schemas_by_name[*name])
1776    }
1777
1778    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1779        self.unstable_schema_ids().contains(&id)
1780    }
1781
1782    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1783        match spec {
1784            SchemaSpecifier::Temporary => false,
1785            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1786        }
1787    }
1788
1789    /// Creates a new schema in the `Catalog` for temporary items
1790    /// indicated by the TEMPORARY or TEMP keywords.
1791    pub fn create_temporary_schema(
1792        &mut self,
1793        conn_id: &ConnectionId,
1794        owner_id: RoleId,
1795    ) -> Result<(), Error> {
1796        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1797        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1798        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1799        // all -1.
1800        let oid = INVALID_OID;
1801        self.temporary_schemas.insert(
1802            conn_id.clone(),
1803            Schema {
1804                name: QualifiedSchemaName {
1805                    database: ResolvedDatabaseSpecifier::Ambient,
1806                    schema: MZ_TEMP_SCHEMA.into(),
1807                },
1808                id: SchemaSpecifier::Temporary,
1809                oid,
1810                items: BTreeMap::new(),
1811                functions: BTreeMap::new(),
1812                types: BTreeMap::new(),
1813                owner_id,
1814                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1815                    mz_sql::catalog::ObjectType::Schema,
1816                    owner_id,
1817                )]),
1818            },
1819        );
1820        Ok(())
1821    }
1822
1823    /// Return all OIDs that are allocated to temporary objects.
1824    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1825        std::iter::empty()
1826            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1827                if schema.id.is_temporary() {
1828                    Some(schema.oid)
1829                } else {
1830                    None
1831                }
1832            }))
1833            .chain(self.entry_by_id.values().filter_map(|entry| {
1834                if entry.item().is_temporary() {
1835                    Some(entry.oid)
1836                } else {
1837                    None
1838                }
1839            }))
1840    }
1841
1842    /// Optimized lookup for a builtin table.
1843    ///
1844    /// Panics if the builtin table doesn't exist in the catalog.
1845    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1846        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1847    }
1848
1849    /// Optimized lookup for a builtin log.
1850    ///
1851    /// Panics if the builtin log doesn't exist in the catalog.
1852    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1853        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1854        let log = match self.get_entry(&item_id).item() {
1855            CatalogItem::Log(log) => log,
1856            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1857        };
1858        (item_id, log.global_id)
1859    }
1860
1861    /// Optimized lookup for a builtin storage collection.
1862    ///
1863    /// Panics if the builtin storage collection doesn't exist in the catalog.
1864    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1865        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1866    }
1867
1868    /// Optimized lookup for a builtin object.
1869    ///
1870    /// Panics if the builtin object doesn't exist in the catalog.
1871    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1872        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1873        let schema = &self.ambient_schemas_by_id[schema_id];
1874        match builtin.catalog_item_type() {
1875            CatalogItemType::Type => schema.types[builtin.name()],
1876            CatalogItemType::Func => schema.functions[builtin.name()],
1877            CatalogItemType::Table
1878            | CatalogItemType::Source
1879            | CatalogItemType::Sink
1880            | CatalogItemType::View
1881            | CatalogItemType::MaterializedView
1882            | CatalogItemType::Index
1883            | CatalogItemType::Secret
1884            | CatalogItemType::Connection
1885            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1886        }
1887    }
1888
1889    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1890    pub fn resolve_builtin_type_references(
1891        &self,
1892        builtin: &BuiltinType<NameReference>,
1893    ) -> BuiltinType<IdReference> {
1894        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1895            CatalogType::AclItem => CatalogType::AclItem,
1896            CatalogType::Array { element_reference } => CatalogType::Array {
1897                element_reference: self.get_system_type(element_reference).id,
1898            },
1899            CatalogType::List {
1900                element_reference,
1901                element_modifiers,
1902            } => CatalogType::List {
1903                element_reference: self.get_system_type(element_reference).id,
1904                element_modifiers: element_modifiers.clone(),
1905            },
1906            CatalogType::Map {
1907                key_reference,
1908                value_reference,
1909                key_modifiers,
1910                value_modifiers,
1911            } => CatalogType::Map {
1912                key_reference: self.get_system_type(key_reference).id,
1913                value_reference: self.get_system_type(value_reference).id,
1914                key_modifiers: key_modifiers.clone(),
1915                value_modifiers: value_modifiers.clone(),
1916            },
1917            CatalogType::Range { element_reference } => CatalogType::Range {
1918                element_reference: self.get_system_type(element_reference).id,
1919            },
1920            CatalogType::Record { fields } => CatalogType::Record {
1921                fields: fields
1922                    .into_iter()
1923                    .map(|f| CatalogRecordField {
1924                        name: f.name.clone(),
1925                        type_reference: self.get_system_type(f.type_reference).id,
1926                        type_modifiers: f.type_modifiers.clone(),
1927                    })
1928                    .collect(),
1929            },
1930            CatalogType::Bool => CatalogType::Bool,
1931            CatalogType::Bytes => CatalogType::Bytes,
1932            CatalogType::Char => CatalogType::Char,
1933            CatalogType::Date => CatalogType::Date,
1934            CatalogType::Float32 => CatalogType::Float32,
1935            CatalogType::Float64 => CatalogType::Float64,
1936            CatalogType::Int16 => CatalogType::Int16,
1937            CatalogType::Int32 => CatalogType::Int32,
1938            CatalogType::Int64 => CatalogType::Int64,
1939            CatalogType::UInt16 => CatalogType::UInt16,
1940            CatalogType::UInt32 => CatalogType::UInt32,
1941            CatalogType::UInt64 => CatalogType::UInt64,
1942            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1943            CatalogType::Interval => CatalogType::Interval,
1944            CatalogType::Jsonb => CatalogType::Jsonb,
1945            CatalogType::Numeric => CatalogType::Numeric,
1946            CatalogType::Oid => CatalogType::Oid,
1947            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1948            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1949            CatalogType::Pseudo => CatalogType::Pseudo,
1950            CatalogType::RegClass => CatalogType::RegClass,
1951            CatalogType::RegProc => CatalogType::RegProc,
1952            CatalogType::RegType => CatalogType::RegType,
1953            CatalogType::String => CatalogType::String,
1954            CatalogType::Time => CatalogType::Time,
1955            CatalogType::Timestamp => CatalogType::Timestamp,
1956            CatalogType::TimestampTz => CatalogType::TimestampTz,
1957            CatalogType::Uuid => CatalogType::Uuid,
1958            CatalogType::VarChar => CatalogType::VarChar,
1959            CatalogType::Int2Vector => CatalogType::Int2Vector,
1960            CatalogType::MzAclItem => CatalogType::MzAclItem,
1961        };
1962
1963        BuiltinType {
1964            name: builtin.name,
1965            schema: builtin.schema,
1966            oid: builtin.oid,
1967            details: CatalogTypeDetails {
1968                array_id: builtin.details.array_id,
1969                typ,
1970                pg_metadata: builtin.details.pg_metadata.clone(),
1971            },
1972        }
1973    }
1974
1975    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1976        &self.config
1977    }
1978
1979    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1980        match self.database_by_name.get(database_name) {
1981            Some(id) => Ok(&self.database_by_id[id]),
1982            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1983        }
1984    }
1985
1986    pub fn resolve_schema(
1987        &self,
1988        current_database: Option<&DatabaseId>,
1989        database_name: Option<&str>,
1990        schema_name: &str,
1991        conn_id: &ConnectionId,
1992    ) -> Result<&Schema, SqlCatalogError> {
1993        let database_spec = match database_name {
1994            // If a database is explicitly specified, validate it. Note that we
1995            // intentionally do not validate `current_database` to permit
1996            // querying `mz_catalog` with an invalid session database, e.g., so
1997            // that you can run `SHOW DATABASES` to *find* a valid database.
1998            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1999                self.resolve_database(database)?.id().clone(),
2000            )),
2001            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2002        };
2003
2004        // First try to find the schema in the named database.
2005        if let Some(database_spec) = database_spec {
2006            if let Ok(schema) =
2007                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2008            {
2009                return Ok(schema);
2010            }
2011        }
2012
2013        // Then fall back to the ambient database.
2014        if let Ok(schema) = self.resolve_schema_in_database(
2015            &ResolvedDatabaseSpecifier::Ambient,
2016            schema_name,
2017            conn_id,
2018        ) {
2019            return Ok(schema);
2020        }
2021
2022        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2023    }
2024
2025    /// Optimized lookup for a system schema.
2026    ///
2027    /// Panics if the system schema doesn't exist in the catalog.
2028    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2029        self.ambient_schemas_by_name[name]
2030    }
2031
2032    pub fn resolve_search_path(
2033        &self,
2034        session: &dyn SessionMetadata,
2035    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2036        let database = self
2037            .database_by_name
2038            .get(session.database())
2039            .map(|id| id.clone());
2040
2041        session
2042            .search_path()
2043            .iter()
2044            .map(|schema| {
2045                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2046            })
2047            .filter_map(|schema| schema.ok())
2048            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2049            .collect()
2050    }
2051
2052    pub fn effective_search_path(
2053        &self,
2054        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2055        include_temp_schema: bool,
2056    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2057        let mut v = Vec::with_capacity(search_path.len() + 3);
2058        // Temp schema is only included for relations and data types, not for functions and operators
2059        let temp_schema = (
2060            ResolvedDatabaseSpecifier::Ambient,
2061            SchemaSpecifier::Temporary,
2062        );
2063        if include_temp_schema && !search_path.contains(&temp_schema) {
2064            v.push(temp_schema);
2065        }
2066        let default_schemas = [
2067            (
2068                ResolvedDatabaseSpecifier::Ambient,
2069                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2070            ),
2071            (
2072                ResolvedDatabaseSpecifier::Ambient,
2073                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2074            ),
2075        ];
2076        for schema in default_schemas.into_iter() {
2077            if !search_path.contains(&schema) {
2078                v.push(schema);
2079            }
2080        }
2081        v.extend_from_slice(search_path);
2082        v
2083    }
2084
2085    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2086        let id = self
2087            .clusters_by_name
2088            .get(name)
2089            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2090        Ok(&self.clusters_by_id[id])
2091    }
2092
2093    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2094        let id = self
2095            .clusters_by_name
2096            .get(cluster.name)
2097            .expect("failed to lookup BuiltinCluster by name");
2098        self.clusters_by_id
2099            .get(id)
2100            .expect("failed to lookup BuiltinCluster by ID")
2101    }
2102
2103    pub fn resolve_cluster_replica(
2104        &self,
2105        cluster_replica_name: &QualifiedReplica,
2106    ) -> Result<&ClusterReplica, SqlCatalogError> {
2107        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2108        let replica_name = cluster_replica_name.replica.as_str();
2109        let replica_id = cluster
2110            .replica_id(replica_name)
2111            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2112        Ok(cluster.replica(replica_id).expect("Must exist"))
2113    }
2114
2115    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2116    ///
2117    /// If `name` does not specify a database, the `current_database` is used.
2118    /// If `name` does not specify a schema, then the schemas in `search_path`
2119    /// are searched in order.
2120    #[allow(clippy::useless_let_if_seq)]
2121    pub fn resolve(
2122        &self,
2123        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2124        current_database: Option<&DatabaseId>,
2125        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2126        name: &PartialItemName,
2127        conn_id: &ConnectionId,
2128        err_gen: fn(String) -> SqlCatalogError,
2129    ) -> Result<&CatalogEntry, SqlCatalogError> {
2130        // If a schema name was specified, just try to find the item in that
2131        // schema. If no schema was specified, try to find the item in the connection's
2132        // temporary schema. If the item is not found, try to find the item in every
2133        // schema in the search path.
2134        let schemas = match &name.schema {
2135            Some(schema_name) => {
2136                match self.resolve_schema(
2137                    current_database,
2138                    name.database.as_deref(),
2139                    schema_name,
2140                    conn_id,
2141                ) {
2142                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2143                    Err(e) => return Err(e),
2144                }
2145            }
2146            None => match self
2147                .try_get_schema(
2148                    &ResolvedDatabaseSpecifier::Ambient,
2149                    &SchemaSpecifier::Temporary,
2150                    conn_id,
2151                )
2152                .and_then(|schema| schema.items.get(&name.item))
2153            {
2154                Some(id) => return Ok(self.get_entry(id)),
2155                None => search_path.to_vec(),
2156            },
2157        };
2158
2159        for (database_spec, schema_spec) in &schemas {
2160            // Use try_get_schema because the temp schema might not exist yet
2161            // (it's created lazily when the first temp object is created).
2162            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2163                continue;
2164            };
2165
2166            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2167                return Ok(&self.entry_by_id[id]);
2168            }
2169        }
2170
2171        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2172        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2173        // automatically let uses of the old schema resolve to the new ones as well.
2174        // TODO(database-issues#8173) remove this after sufficient time has passed
2175        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2176        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2177            for schema_id in [
2178                self.get_mz_catalog_unstable_schema_id(),
2179                self.get_mz_introspection_schema_id(),
2180            ] {
2181                let schema = self.get_schema(
2182                    &ResolvedDatabaseSpecifier::Ambient,
2183                    &SchemaSpecifier::Id(schema_id),
2184                    conn_id,
2185                );
2186
2187                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2188                    debug!(
2189                        github_27831 = true,
2190                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2191                    );
2192                    return Ok(&self.entry_by_id[id]);
2193                }
2194            }
2195        }
2196
2197        Err(err_gen(name.to_string()))
2198    }
2199
2200    /// Resolves `name` to a non-function [`CatalogEntry`].
2201    pub fn resolve_entry(
2202        &self,
2203        current_database: Option<&DatabaseId>,
2204        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2205        name: &PartialItemName,
2206        conn_id: &ConnectionId,
2207    ) -> Result<&CatalogEntry, SqlCatalogError> {
2208        self.resolve(
2209            |schema| &schema.items,
2210            current_database,
2211            search_path,
2212            name,
2213            conn_id,
2214            SqlCatalogError::UnknownItem,
2215        )
2216    }
2217
2218    /// Resolves `name` to a function [`CatalogEntry`].
2219    pub fn resolve_function(
2220        &self,
2221        current_database: Option<&DatabaseId>,
2222        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2223        name: &PartialItemName,
2224        conn_id: &ConnectionId,
2225    ) -> Result<&CatalogEntry, SqlCatalogError> {
2226        self.resolve(
2227            |schema| &schema.functions,
2228            current_database,
2229            search_path,
2230            name,
2231            conn_id,
2232            |name| SqlCatalogError::UnknownFunction {
2233                name,
2234                alternative: None,
2235            },
2236        )
2237    }
2238
2239    /// Resolves `name` to a type [`CatalogEntry`].
2240    pub fn resolve_type(
2241        &self,
2242        current_database: Option<&DatabaseId>,
2243        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2244        name: &PartialItemName,
2245        conn_id: &ConnectionId,
2246    ) -> Result<&CatalogEntry, SqlCatalogError> {
2247        static NON_PG_CATALOG_TYPES: LazyLock<
2248            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2249        > = LazyLock::new(|| {
2250            BUILTINS::types()
2251                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2252                .map(|typ| (typ.name, typ))
2253                .collect()
2254        });
2255
2256        let entry = self.resolve(
2257            |schema| &schema.types,
2258            current_database,
2259            search_path,
2260            name,
2261            conn_id,
2262            |name| SqlCatalogError::UnknownType { name },
2263        )?;
2264
2265        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2266            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2267                warn!(
2268                    "user specified an incorrect schema of {} for the type {}, which should be in \
2269                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2270                    PG_CATALOG_SCHEMA.quoted(),
2271                    typ.name.quoted(),
2272                    typ.schema.quoted(),
2273                )
2274            }
2275        }
2276
2277        Ok(entry)
2278    }
2279
2280    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2281    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2282        match object_id {
2283            ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2284            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2285            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2286            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2287            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2288            ObjectId::ClusterReplica(cluster_replica_id) => {
2289                CommentObjectId::ClusterReplica(cluster_replica_id)
2290            }
2291            ObjectId::NetworkPolicy(network_policy_id) => {
2292                CommentObjectId::NetworkPolicy(network_policy_id)
2293            }
2294        }
2295    }
2296
2297    /// Return current system configuration.
2298    pub fn system_config(&self) -> &SystemVars {
2299        &self.system_configuration
2300    }
2301
2302    /// Return a mutable reference to the current system configuration.
2303    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2304        Arc::make_mut(&mut self.system_configuration)
2305    }
2306
2307    /// Serializes the catalog's in-memory state.
2308    ///
2309    /// There are no guarantees about the format of the serialized state, except
2310    /// that the serialized state for two identical catalogs will compare
2311    /// identically.
2312    ///
2313    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2314    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2315    /// argument.
2316    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2317        // Dump the base catalog.
2318        let mut dump = serde_json::to_value(&self).map_err(|e| {
2319            Error::new(ErrorKind::Unstructured(format!(
2320                // Don't panic here because we don't have compile-time failures for maps with
2321                // non-string keys.
2322                "internal error: could not dump catalog: {}",
2323                e
2324            )))
2325        })?;
2326
2327        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2328        // Stitch in system parameter defaults.
2329        dump_obj.insert(
2330            "system_parameter_defaults".into(),
2331            serde_json::json!(self.system_config().defaults()),
2332        );
2333        // Potentially overwrite unfinalized shards.
2334        if let Some(unfinalized_shards) = unfinalized_shards {
2335            dump_obj
2336                .get_mut("storage_metadata")
2337                .expect("known to exist")
2338                .as_object_mut()
2339                .expect("storage_metadata is an object")
2340                .insert(
2341                    "unfinalized_shards".into(),
2342                    serde_json::json!(unfinalized_shards),
2343                );
2344        }
2345        // Remove GlobalIds for temporary objects from the mapping.
2346        //
2347        // Post-test consistency checks with the durable catalog don't know about temporary items
2348        // since they're kept entirely in memory.
2349        let temporary_gids: Vec<_> = self
2350            .entry_by_global_id
2351            .iter()
2352            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2353            .map(|(gid, _item_id)| *gid)
2354            .collect();
2355        if !temporary_gids.is_empty() {
2356            let gids = dump_obj
2357                .get_mut("entry_by_global_id")
2358                .expect("known_to_exist")
2359                .as_object_mut()
2360                .expect("entry_by_global_id is an object");
2361            for gid in temporary_gids {
2362                gids.remove(&gid.to_string());
2363            }
2364        }
2365        // We exclude role_auth_by_id because it contains password information
2366        // which should not be included in the dump.
2367        dump_obj.remove("role_auth_by_id");
2368
2369        // Emit as pretty-printed JSON.
2370        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2371    }
2372
2373    pub fn availability_zones(&self) -> &[String] {
2374        &self.availability_zones
2375    }
2376
2377    pub fn concretize_replica_location(
2378        &self,
2379        location: mz_catalog::durable::ReplicaLocation,
2380        allowed_sizes: &Vec<String>,
2381        allowed_availability_zones: Option<&[String]>,
2382    ) -> Result<ReplicaLocation, Error> {
2383        let location = match location {
2384            mz_catalog::durable::ReplicaLocation::Unmanaged {
2385                storagectl_addrs,
2386                computectl_addrs,
2387            } => {
2388                if allowed_availability_zones.is_some() {
2389                    return Err(Error {
2390                        kind: ErrorKind::Internal(
2391                            "tried concretize unmanaged replica with specific availability_zones"
2392                                .to_string(),
2393                        ),
2394                    });
2395                }
2396                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2397                    storagectl_addrs,
2398                    computectl_addrs,
2399                })
2400            }
2401            mz_catalog::durable::ReplicaLocation::Managed {
2402                size,
2403                availability_zone,
2404                billed_as,
2405                internal,
2406                pending,
2407            } => {
2408                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2409                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2410                    return Err(Error {
2411                        kind: ErrorKind::Internal(message.to_string()),
2412                    });
2413                }
2414                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2415                let cluster_replica_sizes = &self.cluster_replica_sizes;
2416
2417                ReplicaLocation::Managed(ManagedReplicaLocation {
2418                    allocation: cluster_replica_sizes
2419                        .0
2420                        .get(&size)
2421                        .expect("catalog out of sync")
2422                        .clone(),
2423                    availability_zones: match (availability_zone, allowed_availability_zones) {
2424                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2425                        (None, Some([])) => ManagedReplicaAvailabilityZones::FromCluster(None),
2426                        (None, Some(azs)) => {
2427                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2428                        }
2429                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2430                    },
2431                    size,
2432                    billed_as,
2433                    internal,
2434                    pending,
2435                })
2436            }
2437        };
2438        Ok(location)
2439    }
2440
2441    /// Return whether the given replica size requests a disk.
2442    ///
2443    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2444    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2445    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2446    /// not something we can know at this point.
2447    ///
2448    /// # Panics
2449    ///
2450    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2451    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2452        let alloc = &self.cluster_replica_sizes.0[size];
2453        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2454    }
2455
2456    pub(crate) fn ensure_valid_replica_size(
2457        &self,
2458        allowed_sizes: &[String],
2459        size: &String,
2460    ) -> Result<(), Error> {
2461        let cluster_replica_sizes = &self.cluster_replica_sizes;
2462
2463        if !cluster_replica_sizes.0.contains_key(size)
2464            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2465            || cluster_replica_sizes.0[size].disabled
2466        {
2467            let mut entries = cluster_replica_sizes
2468                .enabled_allocations()
2469                .collect::<Vec<_>>();
2470
2471            if !allowed_sizes.is_empty() {
2472                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2473                entries.retain(|(name, _)| allowed_sizes.contains(name));
2474            }
2475
2476            entries.sort_by_key(
2477                |(
2478                    _name,
2479                    ReplicaAllocation {
2480                        scale, cpu_limit, ..
2481                    },
2482                )| (scale, cpu_limit),
2483            );
2484
2485            Err(Error {
2486                kind: ErrorKind::InvalidClusterReplicaSize {
2487                    size: size.to_owned(),
2488                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2489                },
2490            })
2491        } else {
2492            Ok(())
2493        }
2494    }
2495
2496    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2497        if role_id.is_builtin() {
2498            let role = self.get_role(role_id);
2499            Err(Error::new(ErrorKind::ReservedRoleName(
2500                role.name().to_string(),
2501            )))
2502        } else {
2503            Ok(())
2504        }
2505    }
2506
2507    pub fn ensure_not_reserved_network_policy(
2508        &self,
2509        network_policy_id: &NetworkPolicyId,
2510    ) -> Result<(), Error> {
2511        if network_policy_id.is_builtin() {
2512            let policy = self.get_network_policy(network_policy_id);
2513            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2514                policy.name.clone(),
2515            )))
2516        } else {
2517            Ok(())
2518        }
2519    }
2520
2521    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2522        let is_grantable = !role_id.is_public() && !role_id.is_system();
2523        if is_grantable {
2524            Ok(())
2525        } else {
2526            let role = self.get_role(role_id);
2527            Err(Error::new(ErrorKind::UngrantableRoleName(
2528                role.name().to_string(),
2529            )))
2530        }
2531    }
2532
2533    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2534        if role_id.is_system() {
2535            let role = self.get_role(role_id);
2536            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2537                role.name().to_string(),
2538            )))
2539        } else {
2540            Ok(())
2541        }
2542    }
2543
2544    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2545        if role_id.is_predefined() {
2546            let role = self.get_role(role_id);
2547            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2548                role.name().to_string(),
2549            )))
2550        } else {
2551            Ok(())
2552        }
2553    }
2554
2555    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2556    // passing tx, and session?
2557    pub(crate) fn add_to_audit_log(
2558        system_configuration: &SystemVars,
2559        oracle_write_ts: mz_repr::Timestamp,
2560        session: Option<&ConnMeta>,
2561        tx: &mut mz_catalog::durable::Transaction,
2562        audit_events: &mut Vec<VersionedEvent>,
2563        event_type: EventType,
2564        object_type: ObjectType,
2565        details: EventDetails,
2566    ) -> Result<(), Error> {
2567        let user = session.map(|session| session.user().name.to_string());
2568
2569        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2570
2571        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2572            Some(ts) => ts.into(),
2573            _ => oracle_write_ts.into(),
2574        };
2575        let id = tx.allocate_audit_log_id()?;
2576        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2577        audit_events.push(event.clone());
2578        tx.insert_audit_log_event(event);
2579        Ok(())
2580    }
2581
2582    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2583        match id {
2584            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2585            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2586                self.get_cluster_replica(*cluster_id, *replica_id)
2587                    .owner_id(),
2588            ),
2589            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2590            ObjectId::Schema((database_spec, schema_spec)) => Some(
2591                self.get_schema(database_spec, schema_spec, conn_id)
2592                    .owner_id(),
2593            ),
2594            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2595            ObjectId::Role(_) => None,
2596            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2597        }
2598    }
2599
2600    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2601        match object_id {
2602            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2603            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2604            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2605            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2606            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2607            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2608            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2609        }
2610    }
2611
2612    pub(super) fn get_system_object_type(
2613        &self,
2614        id: &SystemObjectId,
2615    ) -> mz_sql::catalog::SystemObjectType {
2616        match id {
2617            SystemObjectId::Object(object_id) => {
2618                SystemObjectType::Object(self.get_object_type(object_id))
2619            }
2620            SystemObjectId::System => SystemObjectType::System,
2621        }
2622    }
2623
2624    /// Returns a read-only view of the current [`StorageMetadata`].
2625    ///
2626    /// To write to this struct, you must use a catalog transaction.
2627    pub fn storage_metadata(&self) -> &StorageMetadata {
2628        &self.storage_metadata
2629    }
2630
2631    /// For the Sources ids in `ids`, return their compaction windows.
2632    pub fn source_compaction_windows(
2633        &self,
2634        ids: impl IntoIterator<Item = CatalogItemId>,
2635    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2636        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2637        let mut seen = BTreeSet::new();
2638        for item_id in ids {
2639            if !seen.insert(item_id) {
2640                continue;
2641            }
2642            let entry = self.get_entry(&item_id);
2643            match entry.item() {
2644                CatalogItem::Source(source) => {
2645                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2646                    cws.entry(source_cw).or_default().insert(item_id);
2647                }
2648                CatalogItem::Table(table) => {
2649                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2650                    match &table.data_source {
2651                        TableDataSource::DataSource {
2652                            desc:
2653                                DataSourceDesc::IngestionExport { .. }
2654                                // Also match webhook tables (source-to-table migration).
2655                                | DataSourceDesc::Webhook { .. },
2656                            timeline: _,
2657                        } => {
2658                            cws.entry(table_cw).or_default().insert(item_id);
2659                        }
2660                        // Regular tables handle compaction directly in
2661                        // catalog_implications, not through this function.
2662                        TableDataSource::TableWrites { .. } => {}
2663                        TableDataSource::DataSource {
2664                            desc:
2665                                DataSourceDesc::Ingestion { .. }
2666                                | DataSourceDesc::OldSyntaxIngestion { .. }
2667                                | DataSourceDesc::Introspection(_)
2668                                | DataSourceDesc::Progress
2669                                | DataSourceDesc::Catalog,
2670                            ..
2671                        } => {
2672                            unreachable!(
2673                                "unexpected DataSourceDesc for table {item_id}: {:?}",
2674                                table.data_source
2675                            )
2676                        }
2677                    }
2678                }
2679                _ => {
2680                    // Views could depend on sources, so ignore them if added by used_by above.
2681                    continue;
2682                }
2683            }
2684        }
2685        cws
2686    }
2687
2688    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2689        match id {
2690            CommentObjectId::Table(id)
2691            | CommentObjectId::View(id)
2692            | CommentObjectId::MaterializedView(id)
2693            | CommentObjectId::Source(id)
2694            | CommentObjectId::Sink(id)
2695            | CommentObjectId::Index(id)
2696            | CommentObjectId::Func(id)
2697            | CommentObjectId::Connection(id)
2698            | CommentObjectId::Type(id)
2699            | CommentObjectId::Secret(id)
2700            | CommentObjectId::ContinualTask(id) => Some(*id),
2701            CommentObjectId::Role(_)
2702            | CommentObjectId::Database(_)
2703            | CommentObjectId::Schema(_)
2704            | CommentObjectId::Cluster(_)
2705            | CommentObjectId::ClusterReplica(_)
2706            | CommentObjectId::NetworkPolicy(_) => None,
2707        }
2708    }
2709
2710    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2711        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2712    }
2713
2714    pub fn comment_id_to_audit_log_name(
2715        &self,
2716        id: CommentObjectId,
2717        conn_id: &ConnectionId,
2718    ) -> String {
2719        match id {
2720            CommentObjectId::Table(id)
2721            | CommentObjectId::View(id)
2722            | CommentObjectId::MaterializedView(id)
2723            | CommentObjectId::Source(id)
2724            | CommentObjectId::Sink(id)
2725            | CommentObjectId::Index(id)
2726            | CommentObjectId::Func(id)
2727            | CommentObjectId::Connection(id)
2728            | CommentObjectId::Type(id)
2729            | CommentObjectId::Secret(id)
2730            | CommentObjectId::ContinualTask(id) => {
2731                let item = self.get_entry(&id);
2732                let name = self.resolve_full_name(item.name(), Some(conn_id));
2733                name.to_string()
2734            }
2735            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2736            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2737            CommentObjectId::Schema((spec, schema_id)) => {
2738                let schema = self.get_schema(&spec, &schema_id, conn_id);
2739                self.resolve_full_schema_name(&schema.name).to_string()
2740            }
2741            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2742            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2743                let cluster = self.get_cluster(cluster_id);
2744                let replica = self.get_cluster_replica(cluster_id, replica_id);
2745                QualifiedReplica {
2746                    cluster: Ident::new_unchecked(cluster.name.clone()),
2747                    replica: Ident::new_unchecked(replica.name.clone()),
2748                }
2749                .to_string()
2750            }
2751            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2752        }
2753    }
2754
2755    pub fn mock_authentication_nonce(&self) -> String {
2756        self.mock_authentication_nonce.clone().unwrap_or_default()
2757    }
2758}
2759
2760impl ConnectionResolver for CatalogState {
2761    fn resolve_connection(
2762        &self,
2763        id: CatalogItemId,
2764    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2765        use mz_storage_types::connections::Connection::*;
2766        match self
2767            .get_entry(&id)
2768            .connection()
2769            .expect("catalog out of sync")
2770            .details
2771            .to_connection()
2772        {
2773            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2774            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2775            Csr(conn) => Csr(conn.into_inline_connection(self)),
2776            Ssh(conn) => Ssh(conn),
2777            Aws(conn) => Aws(conn),
2778            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2779            MySql(conn) => MySql(conn.into_inline_connection(self)),
2780            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2781            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2782        }
2783    }
2784}
2785
2786impl OptimizerCatalog for CatalogState {
2787    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2788        CatalogState::get_entry_by_global_id(self, id)
2789    }
2790    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2791        CatalogState::get_entry(self, id)
2792    }
2793    fn resolve_full_name(
2794        &self,
2795        name: &QualifiedItemName,
2796        conn_id: Option<&ConnectionId>,
2797    ) -> FullItemName {
2798        CatalogState::resolve_full_name(self, name, conn_id)
2799    }
2800    fn get_indexes_on(
2801        &self,
2802        id: GlobalId,
2803        cluster: ClusterId,
2804    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2805        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2806    }
2807}
2808
2809impl OptimizerCatalog for Catalog {
2810    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2811        self.state.get_entry_by_global_id(id)
2812    }
2813
2814    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2815        self.state.get_entry(id)
2816    }
2817
2818    fn resolve_full_name(
2819        &self,
2820        name: &QualifiedItemName,
2821        conn_id: Option<&ConnectionId>,
2822    ) -> FullItemName {
2823        self.state.resolve_full_name(name, conn_id)
2824    }
2825
2826    fn get_indexes_on(
2827        &self,
2828        id: GlobalId,
2829        cluster: ClusterId,
2830    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2831        Box::new(self.state.get_indexes_on(id, cluster))
2832    }
2833}
2834
2835impl Catalog {
2836    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2837        self
2838    }
2839}