Skip to main content

mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62    VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71    TypeReference,
72};
73use mz_sql::names::{
74    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81    Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91    ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98// DO NOT add any more imports from `crate` outside of `crate::catalog`.
99use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105/// The in-memory representation of the Catalog. This struct is not directly used to persist
106/// metadata to persistent storage. For persistent metadata see
107/// [`mz_catalog::durable::DurableCatalogState`].
108///
109/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
110/// storing the contents of this struct on disk.
111#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
114    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
115    // include:
116    //  - Temporary items.
117    //  - Certain objects are partially derived from read-only state.
118    pub(super) database_by_name: BTreeMap<String, DatabaseId>,
119    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120    pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
121    #[serde(serialize_with = "skip_temp_items")]
122    pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
123    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124    pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
125    pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
126    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127    pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
128    pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
129    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130    pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
131    pub(super) roles_by_name: BTreeMap<String, RoleId>,
132    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133    pub(super) roles_by_id: BTreeMap<RoleId, Role>,
134    pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
135    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136    pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
137    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138    pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
139
140    #[serde(skip)]
141    pub(super) system_configuration: SystemVars,
142    pub(super) default_privileges: DefaultPrivileges,
143    pub(super) system_privileges: PrivilegeMap,
144    pub(super) comments: CommentsMap,
145    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146    pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
147    pub(super) storage_metadata: StorageMetadata,
148    pub(super) mock_authentication_nonce: Option<String>,
149
150    // Mutable state not derived from the durable catalog.
151    #[serde(skip)]
152    pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
153
154    // Read-only state not derived from the durable catalog.
155    #[serde(skip)]
156    pub(super) config: mz_sql::catalog::CatalogConfig,
157    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158    #[serde(skip)]
159    pub(crate) availability_zones: Vec<String>,
160
161    // Read-only not derived from the durable catalog.
162    #[serde(skip)]
163    pub(super) egress_addresses: Vec<IpNet>,
164    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166    pub(super) http_host_name: Option<String>,
167
168    // Read-only not derived from the durable catalog.
169    #[serde(skip)]
170    pub(super) license_key: ValidatedLicenseKey,
171}
172
173/// Keeps track of what expressions are cached or not during startup.
174/// It's also used during catalog transactions to avoid re-optimizing CREATE VIEW / CREATE MAT VIEW
175/// statements when going back and forth between durable catalog operations and in-memory catalog
176/// operations.
177#[derive(Debug, Clone, Serialize)]
178pub(crate) enum LocalExpressionCache {
179    /// The cache is being used.
180    Open {
181        /// The local expressions that were cached in the expression cache.
182        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
183        /// The local expressions that were NOT cached in the expression cache.
184        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
185    },
186    /// The cache is not being used.
187    Closed,
188}
189
190impl LocalExpressionCache {
191    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
192        Self::Open {
193            cached_exprs,
194            uncached_exprs: BTreeMap::new(),
195        }
196    }
197
198    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
199        match self {
200            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
201            LocalExpressionCache::Closed => None,
202        }
203    }
204
205    /// Insert an expression that was cached, back into the cache. This is generally needed when
206    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
207    pub(super) fn insert_cached_expression(
208        &mut self,
209        id: GlobalId,
210        local_expressions: LocalExpressions,
211    ) {
212        match self {
213            LocalExpressionCache::Open { cached_exprs, .. } => {
214                cached_exprs.insert(id, local_expressions);
215            }
216            LocalExpressionCache::Closed => {}
217        }
218    }
219
220    /// Inform the cache that `id` was not found in the cache and that we should add it as
221    /// `local_mir` and `optimizer_features`.
222    pub(super) fn insert_uncached_expression(
223        &mut self,
224        id: GlobalId,
225        local_mir: OptimizedMirRelationExpr,
226        optimizer_features: OptimizerFeatures,
227    ) {
228        match self {
229            LocalExpressionCache::Open { uncached_exprs, .. } => {
230                let local_expr = LocalExpressions {
231                    local_mir,
232                    optimizer_features,
233                };
234                // If we are trying to cache the same item a second time, with a different
235                // expression, then we must be migrating the object or doing something else weird.
236                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
237                // version after a restart. Caching the migrated version may cause us to incorrectly
238                // think that the object has already been migrated. To simplify things, we cache
239                // neither.
240                let prev = uncached_exprs.remove(&id);
241                match prev {
242                    Some(prev) if prev == local_expr => {
243                        uncached_exprs.insert(id, local_expr);
244                    }
245                    None => {
246                        uncached_exprs.insert(id, local_expr);
247                    }
248                    Some(_) => {}
249                }
250            }
251            LocalExpressionCache::Closed => {}
252        }
253    }
254
255    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
256        match self {
257            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
258            LocalExpressionCache::Closed => BTreeMap::new(),
259        }
260    }
261}
262
263fn skip_temp_items<S>(
264    entries: &BTreeMap<CatalogItemId, CatalogEntry>,
265    serializer: S,
266) -> Result<S::Ok, S::Error>
267where
268    S: serde::Serializer,
269{
270    mz_ore::serde::map_key_to_string(
271        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
272        serializer,
273    )
274}
275
276impl CatalogState {
277    /// Returns an empty [`CatalogState`] that can be used in tests.
278    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
279    // tests are structured in this repository.
280    pub fn empty_test() -> Self {
281        CatalogState {
282            database_by_name: Default::default(),
283            database_by_id: Default::default(),
284            entry_by_id: Default::default(),
285            entry_by_global_id: Default::default(),
286            ambient_schemas_by_name: Default::default(),
287            ambient_schemas_by_id: Default::default(),
288            temporary_schemas: Default::default(),
289            clusters_by_id: Default::default(),
290            clusters_by_name: Default::default(),
291            network_policies_by_name: Default::default(),
292            roles_by_name: Default::default(),
293            roles_by_id: Default::default(),
294            network_policies_by_id: Default::default(),
295            role_auth_by_id: Default::default(),
296            config: CatalogConfig {
297                start_time: Default::default(),
298                start_instant: Instant::now(),
299                nonce: Default::default(),
300                environment_id: EnvironmentId::for_tests(),
301                session_id: Default::default(),
302                build_info: &DUMMY_BUILD_INFO,
303                timestamp_interval: Default::default(),
304                now: NOW_ZERO.clone(),
305                connection_context: ConnectionContext::for_tests(Arc::new(
306                    InMemorySecretsController::new(),
307                )),
308                builtins_cfg: BuiltinsConfig {
309                    include_continual_tasks: true,
310                },
311                helm_chart_version: None,
312            },
313            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
314            availability_zones: Default::default(),
315            system_configuration: Default::default(),
316            egress_addresses: Default::default(),
317            aws_principal_context: Default::default(),
318            aws_privatelink_availability_zones: Default::default(),
319            http_host_name: Default::default(),
320            default_privileges: Default::default(),
321            system_privileges: Default::default(),
322            comments: Default::default(),
323            source_references: Default::default(),
324            storage_metadata: Default::default(),
325            license_key: ValidatedLicenseKey::for_tests(),
326            mock_authentication_nonce: Default::default(),
327        }
328    }
329
330    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
331        let search_path = self.resolve_search_path(session);
332        let database = self
333            .database_by_name
334            .get(session.vars().database())
335            .map(|id| id.clone());
336        let state = match session.transaction().catalog_state() {
337            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
338            None => Cow::Borrowed(self),
339        };
340        ConnCatalog {
341            state,
342            unresolvable_ids: BTreeSet::new(),
343            conn_id: session.conn_id().clone(),
344            cluster: session.vars().cluster().into(),
345            database,
346            search_path,
347            role_id: session.current_role_id().clone(),
348            prepared_statements: Some(session.prepared_statements()),
349            portals: Some(session.portals()),
350            notices_tx: session.retain_notice_transmitter(),
351        }
352    }
353
354    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
355        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
356        let cluster = self.system_configuration.default_cluster();
357
358        ConnCatalog {
359            state: Cow::Borrowed(self),
360            unresolvable_ids: BTreeSet::new(),
361            conn_id: SYSTEM_CONN_ID.clone(),
362            cluster,
363            database: self
364                .resolve_database(DEFAULT_DATABASE_NAME)
365                .ok()
366                .map(|db| db.id()),
367            // Leaving the system's search path empty allows us to catch issues
368            // where catalog object names have not been normalized correctly.
369            search_path: Vec::new(),
370            role_id,
371            prepared_statements: None,
372            portals: None,
373            notices_tx,
374        }
375    }
376
377    pub fn for_system_session(&self) -> ConnCatalog<'_> {
378        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
379    }
380
381    /// Returns an iterator over the deduplicated identifiers of all
382    /// objects this catalog entry transitively depends on (where
383    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
384    /// [`CatalogItem::references`]).
385    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
386        struct I<'a> {
387            queue: VecDeque<CatalogItemId>,
388            seen: BTreeSet<CatalogItemId>,
389            this: &'a CatalogState,
390        }
391        impl<'a> Iterator for I<'a> {
392            type Item = CatalogItemId;
393            fn next(&mut self) -> Option<Self::Item> {
394                if let Some(next) = self.queue.pop_front() {
395                    for child in self.this.get_entry(&next).item().uses() {
396                        if !self.seen.contains(&child) {
397                            self.queue.push_back(child);
398                            self.seen.insert(child);
399                        }
400                    }
401                    Some(next)
402                } else {
403                    None
404                }
405            }
406        }
407
408        I {
409            queue: [id].into_iter().collect(),
410            seen: [id].into_iter().collect(),
411            this: self,
412        }
413    }
414
415    /// Computes the IDs of any log sources this catalog entry transitively
416    /// depends on.
417    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
418        let mut out = Vec::new();
419        self.introspection_dependencies_inner(id, &mut out);
420        out
421    }
422
423    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
424        match self.get_entry(&id).item() {
425            CatalogItem::Log(_) => out.push(id),
426            item @ (CatalogItem::View(_)
427            | CatalogItem::MaterializedView(_)
428            | CatalogItem::Connection(_)
429            | CatalogItem::ContinualTask(_)) => {
430                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
431                for item_id in item.references().items() {
432                    self.introspection_dependencies_inner(*item_id, out);
433                }
434            }
435            CatalogItem::Sink(sink) => {
436                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
437                self.introspection_dependencies_inner(from_item_id, out)
438            }
439            CatalogItem::Index(idx) => {
440                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
441                self.introspection_dependencies_inner(on_item_id, out)
442            }
443            CatalogItem::Table(_)
444            | CatalogItem::Source(_)
445            | CatalogItem::Type(_)
446            | CatalogItem::Func(_)
447            | CatalogItem::Secret(_) => (),
448        }
449    }
450
451    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
452    ///
453    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
454    /// earlier in the list than the roots. This is particularly useful for the order to drop
455    /// objects.
456    pub(super) fn object_dependents(
457        &self,
458        object_ids: &Vec<ObjectId>,
459        conn_id: &ConnectionId,
460        seen: &mut BTreeSet<ObjectId>,
461    ) -> Vec<ObjectId> {
462        let mut dependents = Vec::new();
463        for object_id in object_ids {
464            match object_id {
465                ObjectId::Cluster(id) => {
466                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
467                }
468                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
469                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
470                ),
471                ObjectId::Database(id) => {
472                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
473                }
474                ObjectId::Schema((database_spec, schema_spec)) => {
475                    dependents.extend_from_slice(&self.schema_dependents(
476                        database_spec.clone(),
477                        schema_spec.clone(),
478                        conn_id,
479                        seen,
480                    ));
481                }
482                ObjectId::NetworkPolicy(id) => {
483                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
484                }
485                id @ ObjectId::Role(_) => {
486                    let unseen = seen.insert(id.clone());
487                    if unseen {
488                        dependents.push(id.clone());
489                    }
490                }
491                ObjectId::Item(id) => {
492                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
493                }
494            }
495        }
496        dependents
497    }
498
499    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
500    /// itself.
501    ///
502    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
503    /// earlier in the list than the roots. This is particularly useful for the order to drop
504    /// objects.
505    fn cluster_dependents(
506        &self,
507        cluster_id: ClusterId,
508        seen: &mut BTreeSet<ObjectId>,
509    ) -> Vec<ObjectId> {
510        let mut dependents = Vec::new();
511        let object_id = ObjectId::Cluster(cluster_id);
512        if !seen.contains(&object_id) {
513            seen.insert(object_id.clone());
514            let cluster = self.get_cluster(cluster_id);
515            for item_id in cluster.bound_objects() {
516                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
517            }
518            for replica_id in cluster.replica_ids().values() {
519                dependents.extend_from_slice(&self.cluster_replica_dependents(
520                    cluster_id,
521                    *replica_id,
522                    seen,
523                ));
524            }
525            dependents.push(object_id);
526        }
527        dependents
528    }
529
530    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
531    /// itself.
532    ///
533    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
534    /// earlier in the list than the roots. This is particularly useful for the order to drop
535    /// objects.
536    pub(super) fn cluster_replica_dependents(
537        &self,
538        cluster_id: ClusterId,
539        replica_id: ReplicaId,
540        seen: &mut BTreeSet<ObjectId>,
541    ) -> Vec<ObjectId> {
542        let mut dependents = Vec::new();
543        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
544        if !seen.contains(&object_id) {
545            seen.insert(object_id.clone());
546            dependents.push(object_id);
547        }
548        dependents
549    }
550
551    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
552    /// itself.
553    ///
554    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
555    /// earlier in the list than the roots. This is particularly useful for the order to drop
556    /// objects.
557    fn database_dependents(
558        &self,
559        database_id: DatabaseId,
560        conn_id: &ConnectionId,
561        seen: &mut BTreeSet<ObjectId>,
562    ) -> Vec<ObjectId> {
563        let mut dependents = Vec::new();
564        let object_id = ObjectId::Database(database_id);
565        if !seen.contains(&object_id) {
566            seen.insert(object_id.clone());
567            let database = self.get_database(&database_id);
568            for schema_id in database.schema_ids().values() {
569                dependents.extend_from_slice(&self.schema_dependents(
570                    ResolvedDatabaseSpecifier::Id(database_id),
571                    SchemaSpecifier::Id(*schema_id),
572                    conn_id,
573                    seen,
574                ));
575            }
576            dependents.push(object_id);
577        }
578        dependents
579    }
580
581    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
582    /// itself.
583    ///
584    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
585    /// earlier in the list than the roots. This is particularly useful for the order to drop
586    /// objects.
587    fn schema_dependents(
588        &self,
589        database_spec: ResolvedDatabaseSpecifier,
590        schema_spec: SchemaSpecifier,
591        conn_id: &ConnectionId,
592        seen: &mut BTreeSet<ObjectId>,
593    ) -> Vec<ObjectId> {
594        let mut dependents = Vec::new();
595        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
596        if !seen.contains(&object_id) {
597            seen.insert(object_id.clone());
598            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
599            for item_id in schema.item_ids() {
600                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
601            }
602            dependents.push(object_id)
603        }
604        dependents
605    }
606
607    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
608    /// itself.
609    ///
610    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
611    /// earlier in the list than the roots. This is particularly useful for the order to drop
612    /// objects.
613    pub(super) fn item_dependents(
614        &self,
615        item_id: CatalogItemId,
616        seen: &mut BTreeSet<ObjectId>,
617    ) -> Vec<ObjectId> {
618        let mut dependents = Vec::new();
619        let object_id = ObjectId::Item(item_id);
620        if !seen.contains(&object_id) {
621            seen.insert(object_id.clone());
622            let entry = self.get_entry(&item_id);
623            for dependent_id in entry.used_by() {
624                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
625            }
626            dependents.push(object_id);
627            // We treat the progress collection as if it depends on the source
628            // for dropping. We have additional code in planning to create a
629            // kind of special-case "CASCADE" for this dependency.
630            if let Some(progress_id) = entry.progress_id() {
631                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
632            }
633        }
634        dependents
635    }
636
637    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
638    /// itself.
639    ///
640    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
641    /// earlier in the list than the roots. This is particularly useful for the order to drop
642    /// objects.
643    pub(super) fn network_policy_dependents(
644        &self,
645        network_policy_id: NetworkPolicyId,
646        _seen: &mut BTreeSet<ObjectId>,
647    ) -> Vec<ObjectId> {
648        let object_id = ObjectId::NetworkPolicy(network_policy_id);
649        // Currently network policies have no dependents
650        // when we add the ability for users or sources/sinks to have policies
651        // this method will need to be updated.
652        vec![object_id]
653    }
654
655    /// Indicates whether the indicated item is considered stable or not.
656    ///
657    /// Only stable items can be used as dependencies of other catalog items.
658    fn is_stable(&self, id: CatalogItemId) -> bool {
659        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
660        !self.is_unstable_schema_specifier(spec)
661    }
662
663    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
664        if self.system_config().unsafe_enable_unstable_dependencies() {
665            return Ok(());
666        }
667
668        let unstable_dependencies: Vec<_> = item
669            .references()
670            .items()
671            .filter(|id| !self.is_stable(**id))
672            .map(|id| self.get_entry(id).name().item.clone())
673            .collect();
674
675        // It's okay to create a temporary object with unstable
676        // dependencies, since we will never need to reboot a catalog
677        // that contains it.
678        if unstable_dependencies.is_empty() || item.is_temporary() {
679            Ok(())
680        } else {
681            let object_type = item.typ().to_string();
682            Err(Error {
683                kind: ErrorKind::UnstableDependency {
684                    object_type,
685                    unstable_dependencies,
686                },
687            })
688        }
689    }
690
691    pub fn resolve_full_name(
692        &self,
693        name: &QualifiedItemName,
694        conn_id: Option<&ConnectionId>,
695    ) -> FullItemName {
696        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
697
698        let database = match &name.qualifiers.database_spec {
699            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
700            ResolvedDatabaseSpecifier::Id(id) => {
701                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
702            }
703        };
704        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
705        // and the schema may not exist yet if no temporary items have been created.
706        let schema = match &name.qualifiers.schema_spec {
707            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
708            _ => self
709                .get_schema(
710                    &name.qualifiers.database_spec,
711                    &name.qualifiers.schema_spec,
712                    conn_id,
713                )
714                .name()
715                .schema
716                .clone(),
717        };
718        FullItemName {
719            database,
720            schema,
721            item: name.item.clone(),
722        }
723    }
724
725    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
726        let database = match &name.database {
727            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
728            ResolvedDatabaseSpecifier::Id(id) => {
729                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
730            }
731        };
732        FullSchemaName {
733            database,
734            schema: name.schema.clone(),
735        }
736    }
737
738    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
739        self.entry_by_id
740            .get(id)
741            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
742    }
743
744    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
745        let item_id = self
746            .entry_by_global_id
747            .get(id)
748            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
749
750        let entry = self.get_entry(item_id).clone();
751        let version = match entry.item() {
752            CatalogItem::Table(table) => {
753                let (version, _) = table
754                    .collections
755                    .iter()
756                    .find(|(_verison, gid)| *gid == id)
757                    .expect("version to exist");
758                RelationVersionSelector::Specific(*version)
759            }
760            _ => RelationVersionSelector::Latest,
761        };
762        CatalogCollectionEntry { entry, version }
763    }
764
765    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
766        self.entry_by_id.iter()
767    }
768
769    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
770        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
771        self.temporary_schemas
772            .get(conn)
773            .into_iter()
774            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
775    }
776
777    /// Returns true if a temporary schema exists for the given connection.
778    ///
779    /// Temporary schemas are created lazily when the first temporary object is created
780    /// for a connection, so this may return false for connections that haven't created
781    /// any temporary objects.
782    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
783        self.temporary_schemas.contains_key(conn)
784    }
785
786    /// Gets a type named `name` from exactly one of the system schemas.
787    ///
788    /// # Panics
789    /// - If `name` is not an entry in any system schema
790    /// - If more than one system schema has an entry named `name`.
791    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
792        let mut res = None;
793        for schema_id in self.system_schema_ids() {
794            let schema = &self.ambient_schemas_by_id[&schema_id];
795            if let Some(global_id) = schema.types.get(name) {
796                match res {
797                    None => res = Some(self.get_entry(global_id)),
798                    Some(_) => panic!(
799                        "only call get_system_type on objects uniquely identifiable in one system schema"
800                    ),
801                }
802            }
803        }
804
805        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
806    }
807
808    pub fn get_item_by_name(
809        &self,
810        name: &QualifiedItemName,
811        conn_id: &ConnectionId,
812    ) -> Option<&CatalogEntry> {
813        self.get_schema(
814            &name.qualifiers.database_spec,
815            &name.qualifiers.schema_spec,
816            conn_id,
817        )
818        .items
819        .get(&name.item)
820        .and_then(|id| self.try_get_entry(id))
821    }
822
823    pub fn get_type_by_name(
824        &self,
825        name: &QualifiedItemName,
826        conn_id: &ConnectionId,
827    ) -> Option<&CatalogEntry> {
828        self.get_schema(
829            &name.qualifiers.database_spec,
830            &name.qualifiers.schema_spec,
831            conn_id,
832        )
833        .types
834        .get(&name.item)
835        .and_then(|id| self.try_get_entry(id))
836    }
837
838    pub(super) fn find_available_name(
839        &self,
840        mut name: QualifiedItemName,
841        conn_id: &ConnectionId,
842    ) -> QualifiedItemName {
843        let mut i = 0;
844        let orig_item_name = name.item.clone();
845        while self.get_item_by_name(&name, conn_id).is_some() {
846            i += 1;
847            name.item = format!("{}{}", orig_item_name, i);
848        }
849        name
850    }
851
852    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
853        self.entry_by_id.get(id)
854    }
855
856    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
857        let item_id = self.entry_by_global_id.get(id)?;
858        self.try_get_entry(item_id)
859    }
860
861    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
862    /// object that returns rows.
863    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
864        let entry = self.try_get_entry_by_global_id(id)?;
865        let desc = match entry.item() {
866            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
867            // TODO(alter_table): Support schema evolution on sources.
868            other => other.relation_desc(RelationVersionSelector::Latest)?,
869        };
870        Some(desc)
871    }
872
873    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
874        self.try_get_cluster(cluster_id)
875            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
876    }
877
878    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
879        self.clusters_by_id.get(&cluster_id)
880    }
881
882    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
883        self.roles_by_id.get(id)
884    }
885
886    pub fn get_role(&self, id: &RoleId) -> &Role {
887        self.roles_by_id.get(id).expect("catalog out of sync")
888    }
889
890    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
891        self.roles_by_id.keys()
892    }
893
894    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
895        self.roles_by_name
896            .get(role_name)
897            .map(|id| &self.roles_by_id[id])
898    }
899
900    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
901        self.role_auth_by_id
902            .get(id)
903            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
904    }
905
906    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
907        self.role_auth_by_id.get(id)
908    }
909
910    pub(super) fn try_get_network_policy_by_name(
911        &self,
912        policy_name: &str,
913    ) -> Option<&NetworkPolicy> {
914        self.network_policies_by_name
915            .get(policy_name)
916            .map(|id| &self.network_policies_by_id[id])
917    }
918
919    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
920        let mut membership = BTreeSet::new();
921        let mut queue = VecDeque::from(vec![id]);
922        while let Some(cur_id) = queue.pop_front() {
923            if !membership.contains(cur_id) {
924                membership.insert(cur_id.clone());
925                let role = self.get_role(cur_id);
926                soft_assert_no_log!(
927                    !role.membership().keys().contains(id),
928                    "circular membership exists in the catalog"
929                );
930                queue.extend(role.membership().keys());
931            }
932        }
933        membership.insert(RoleId::Public);
934        membership
935    }
936
937    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
938        self.network_policies_by_id
939            .get(id)
940            .expect("catalog out of sync")
941    }
942
943    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
944        self.network_policies_by_id.keys()
945    }
946
947    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
948    /// source.
949    ///
950    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
951    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
952        let entry = self.try_get_entry(id)?;
953        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
954        let name = self.resolve_full_name(entry.name(), None);
955        let host_name = self
956            .http_host_name
957            .as_ref()
958            .map(|x| x.as_str())
959            .unwrap_or_else(|| "HOST");
960
961        let RawDatabaseSpecifier::Name(database) = name.database else {
962            return None;
963        };
964
965        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
966        url.path_segments_mut()
967            .ok()?
968            .push(&database)
969            .push(&name.schema)
970            .push(&name.item);
971
972        Some(url)
973    }
974
975    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
976    ///
977    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
978    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
979    ///
980    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
981    /// their original state before the method returns.
982    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
983        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
984        // caller if this method changed the state of the catalog.
985        &mut self,
986        create_sql: &str,
987        force_if_exists_skip: bool,
988    ) -> Result<(Plan, ResolvedIds), AdapterError> {
989        self.with_enable_for_item_parsing(|state| {
990            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
991            let pcx = Some(&pcx);
992            let session_catalog = state.for_system_session();
993
994            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
995            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
996            let plan =
997                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
998
999            Ok((plan, resolved_ids))
1000        })
1001    }
1002
1003    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1004    #[mz_ore::instrument]
1005    pub(crate) fn parse_plan(
1006        create_sql: &str,
1007        pcx: Option<&PlanContext>,
1008        catalog: &ConnCatalog,
1009    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1010        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1011        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1012        let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1013
1014        Ok((plan, resolved_ids))
1015    }
1016
1017    /// Parses the given SQL string into a pair of [`CatalogItem`].
1018    pub(crate) fn deserialize_item(
1019        &self,
1020        global_id: GlobalId,
1021        create_sql: &str,
1022        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1023        local_expression_cache: &mut LocalExpressionCache,
1024        previous_item: Option<CatalogItem>,
1025    ) -> Result<CatalogItem, AdapterError> {
1026        self.parse_item(
1027            global_id,
1028            create_sql,
1029            extra_versions,
1030            None,
1031            false,
1032            None,
1033            local_expression_cache,
1034            previous_item,
1035        )
1036    }
1037
1038    /// Parses the given SQL string into a `CatalogItem`.
1039    #[mz_ore::instrument]
1040    pub(crate) fn parse_item(
1041        &self,
1042        global_id: GlobalId,
1043        create_sql: &str,
1044        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1045        pcx: Option<&PlanContext>,
1046        is_retained_metrics_object: bool,
1047        custom_logical_compaction_window: Option<CompactionWindow>,
1048        local_expression_cache: &mut LocalExpressionCache,
1049        previous_item: Option<CatalogItem>,
1050    ) -> Result<CatalogItem, AdapterError> {
1051        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1052        match self.parse_item_inner(
1053            global_id,
1054            create_sql,
1055            extra_versions,
1056            pcx,
1057            is_retained_metrics_object,
1058            custom_logical_compaction_window,
1059            cached_expr,
1060            previous_item,
1061        ) {
1062            Ok((item, uncached_expr)) => {
1063                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1064                    local_expression_cache.insert_uncached_expression(
1065                        global_id,
1066                        uncached_expr,
1067                        optimizer_features,
1068                    );
1069                }
1070                Ok(item)
1071            }
1072            Err((err, cached_expr)) => {
1073                if let Some(local_expr) = cached_expr {
1074                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1075                }
1076                Err(err)
1077            }
1078        }
1079    }
1080
1081    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1082    ///
1083    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1084    /// not cached.
1085    ///
1086    /// On failure returns an error and `cached_expr` so it can be used later.
1087    #[mz_ore::instrument]
1088    pub(crate) fn parse_item_inner(
1089        &self,
1090        global_id: GlobalId,
1091        create_sql: &str,
1092        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1093        pcx: Option<&PlanContext>,
1094        is_retained_metrics_object: bool,
1095        custom_logical_compaction_window: Option<CompactionWindow>,
1096        cached_expr: Option<LocalExpressions>,
1097        previous_item: Option<CatalogItem>,
1098    ) -> Result<
1099        (
1100            CatalogItem,
1101            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1102        ),
1103        (AdapterError, Option<LocalExpressions>),
1104    > {
1105        let session_catalog = self.for_system_session();
1106
1107        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1108            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1109            Err(err) => return Err((err, cached_expr)),
1110        };
1111
1112        let mut uncached_expr = None;
1113
1114        let item = match plan {
1115            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1116                let collections = extra_versions
1117                    .iter()
1118                    .map(|(version, gid)| (*version, *gid))
1119                    .chain([(RelationVersion::root(), global_id)].into_iter())
1120                    .collect();
1121
1122                CatalogItem::Table(Table {
1123                    create_sql: Some(table.create_sql),
1124                    desc: table.desc,
1125                    collections,
1126                    conn_id: None,
1127                    resolved_ids,
1128                    custom_logical_compaction_window: custom_logical_compaction_window
1129                        .or(table.compaction_window),
1130                    is_retained_metrics_object,
1131                    data_source: match table.data_source {
1132                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1133                            TableDataSource::TableWrites { defaults }
1134                        }
1135                        mz_sql::plan::TableDataSource::DataSource {
1136                            desc: data_source_desc,
1137                            timeline,
1138                        } => match data_source_desc {
1139                            mz_sql::plan::DataSourceDesc::IngestionExport {
1140                                ingestion_id,
1141                                external_reference,
1142                                details,
1143                                data_config,
1144                            } => TableDataSource::DataSource {
1145                                desc: DataSourceDesc::IngestionExport {
1146                                    ingestion_id,
1147                                    external_reference,
1148                                    details,
1149                                    data_config,
1150                                },
1151                                timeline,
1152                            },
1153                            mz_sql::plan::DataSourceDesc::Webhook {
1154                                validate_using,
1155                                body_format,
1156                                headers,
1157                                cluster_id,
1158                            } => TableDataSource::DataSource {
1159                                desc: DataSourceDesc::Webhook {
1160                                    validate_using,
1161                                    body_format,
1162                                    headers,
1163                                    cluster_id: cluster_id
1164                                        .expect("Webhook Tables must have a cluster_id set"),
1165                                },
1166                                timeline,
1167                            },
1168                            _ => {
1169                                return Err((
1170                                    AdapterError::Unstructured(anyhow::anyhow!(
1171                                        "unsupported data source for table"
1172                                    )),
1173                                    cached_expr,
1174                                ));
1175                            }
1176                        },
1177                    },
1178                })
1179            }
1180            Plan::CreateSource(CreateSourcePlan {
1181                source,
1182                timeline,
1183                in_cluster,
1184                ..
1185            }) => CatalogItem::Source(Source {
1186                create_sql: Some(source.create_sql),
1187                data_source: match source.data_source {
1188                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1189                        desc,
1190                        cluster_id: match in_cluster {
1191                            Some(id) => id,
1192                            None => {
1193                                return Err((
1194                                    AdapterError::Unstructured(anyhow::anyhow!(
1195                                        "ingestion-based sources must have cluster specified"
1196                                    )),
1197                                    cached_expr,
1198                                ));
1199                            }
1200                        },
1201                    },
1202                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1203                        desc,
1204                        progress_subsource,
1205                        data_config,
1206                        details,
1207                    } => DataSourceDesc::OldSyntaxIngestion {
1208                        desc,
1209                        progress_subsource,
1210                        data_config,
1211                        details,
1212                        cluster_id: match in_cluster {
1213                            Some(id) => id,
1214                            None => {
1215                                return Err((
1216                                    AdapterError::Unstructured(anyhow::anyhow!(
1217                                        "ingestion-based sources must have cluster specified"
1218                                    )),
1219                                    cached_expr,
1220                                ));
1221                            }
1222                        },
1223                    },
1224                    mz_sql::plan::DataSourceDesc::IngestionExport {
1225                        ingestion_id,
1226                        external_reference,
1227                        details,
1228                        data_config,
1229                    } => DataSourceDesc::IngestionExport {
1230                        ingestion_id,
1231                        external_reference,
1232                        details,
1233                        data_config,
1234                    },
1235                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1236                    mz_sql::plan::DataSourceDesc::Webhook {
1237                        validate_using,
1238                        body_format,
1239                        headers,
1240                        cluster_id,
1241                    } => {
1242                        mz_ore::soft_assert_or_log!(
1243                            cluster_id.is_none(),
1244                            "cluster_id set at Source level for Webhooks"
1245                        );
1246                        DataSourceDesc::Webhook {
1247                            validate_using,
1248                            body_format,
1249                            headers,
1250                            cluster_id: in_cluster
1251                                .expect("webhook sources must use an existing cluster"),
1252                        }
1253                    }
1254                },
1255                desc: source.desc,
1256                global_id,
1257                timeline,
1258                resolved_ids,
1259                custom_logical_compaction_window: source
1260                    .compaction_window
1261                    .or(custom_logical_compaction_window),
1262                is_retained_metrics_object,
1263            }),
1264            Plan::CreateView(CreateViewPlan { view, .. }) => {
1265                // Collect optimizer parameters.
1266                let optimizer_config =
1267                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1268                let previous_exprs = previous_item.map(|item| match item {
1269                    CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1270                    _ => None,
1271                });
1272
1273                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1274                    (Some(local_expr), _)
1275                        if local_expr.optimizer_features == optimizer_config.features =>
1276                    {
1277                        debug!("local expression cache hit for {global_id:?}");
1278                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1279                    }
1280                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1281                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1282                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1283                    }
1284                    (cached_expr, _) => {
1285                        let optimizer_features = optimizer_config.features.clone();
1286                        // Build an optimizer for this VIEW.
1287                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1288
1289                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1290                        let raw_expr = view.expr;
1291                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1292                            Ok(optimzed_expr) => optimzed_expr,
1293                            Err(err) => return Err((err.into(), cached_expr)),
1294                        };
1295
1296                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1297
1298                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1299                    }
1300                };
1301
1302                // Resolve all item dependencies from the HIR expression.
1303                let dependencies: BTreeSet<_> = raw_expr
1304                    .depends_on()
1305                    .into_iter()
1306                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1307                    .collect();
1308
1309                let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1310                CatalogItem::View(View {
1311                    create_sql: view.create_sql,
1312                    global_id,
1313                    raw_expr,
1314                    desc: RelationDesc::new(typ, view.column_names),
1315                    optimized_expr,
1316                    conn_id: None,
1317                    resolved_ids,
1318                    dependencies: DependencyIds(dependencies),
1319                })
1320            }
1321            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1322                materialized_view, ..
1323            }) => {
1324                let collections = extra_versions
1325                    .iter()
1326                    .map(|(version, gid)| (*version, *gid))
1327                    .chain([(RelationVersion::root(), global_id)].into_iter())
1328                    .collect();
1329
1330                // Collect optimizer parameters.
1331                let optimizer_config =
1332                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1333                let previous_exprs = previous_item.map(|item| match item {
1334                    CatalogItem::MaterializedView(materialized_view) => {
1335                        (materialized_view.raw_expr, materialized_view.optimized_expr)
1336                    }
1337                    item => unreachable!("expected materialized view, found: {item:#?}"),
1338                });
1339
1340                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1341                    (Some(local_expr), _)
1342                        if local_expr.optimizer_features == optimizer_config.features =>
1343                    {
1344                        debug!("local expression cache hit for {global_id:?}");
1345                        (
1346                            Arc::new(materialized_view.expr),
1347                            Arc::new(local_expr.local_mir),
1348                        )
1349                    }
1350                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1351                    (_, Some((raw_expr, optimized_expr)))
1352                        if *raw_expr == materialized_view.expr =>
1353                    {
1354                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1355                    }
1356                    (cached_expr, _) => {
1357                        let optimizer_features = optimizer_config.features.clone();
1358                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1359                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1360
1361                        let raw_expr = materialized_view.expr;
1362                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1363                            Ok(optimized_expr) => optimized_expr,
1364                            Err(err) => return Err((err.into(), cached_expr)),
1365                        };
1366
1367                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1368
1369                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1370                    }
1371                };
1372                let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1373
1374                for &i in &materialized_view.non_null_assertions {
1375                    typ.column_types[i].nullable = false;
1376                }
1377                let desc = RelationDesc::new(typ, materialized_view.column_names);
1378                let desc = VersionedRelationDesc::new(desc);
1379
1380                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1381
1382                // Resolve all item dependencies from the HIR expression.
1383                let dependencies = raw_expr
1384                    .depends_on()
1385                    .into_iter()
1386                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1387                    .collect();
1388
1389                CatalogItem::MaterializedView(MaterializedView {
1390                    create_sql: materialized_view.create_sql,
1391                    collections,
1392                    raw_expr,
1393                    optimized_expr,
1394                    desc,
1395                    resolved_ids,
1396                    dependencies,
1397                    replacement_target: materialized_view.replacement_target,
1398                    cluster_id: materialized_view.cluster_id,
1399                    non_null_assertions: materialized_view.non_null_assertions,
1400                    custom_logical_compaction_window: materialized_view.compaction_window,
1401                    refresh_schedule: materialized_view.refresh_schedule,
1402                    initial_as_of,
1403                })
1404            }
1405            Plan::CreateContinualTask(plan) => {
1406                let ct =
1407                    match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1408                        Ok(ct) => ct,
1409                        Err(err) => return Err((err, cached_expr)),
1410                    };
1411                CatalogItem::ContinualTask(ct)
1412            }
1413            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1414                create_sql: index.create_sql,
1415                global_id,
1416                on: index.on,
1417                keys: index.keys.into(),
1418                conn_id: None,
1419                resolved_ids,
1420                cluster_id: index.cluster_id,
1421                custom_logical_compaction_window: custom_logical_compaction_window
1422                    .or(index.compaction_window),
1423                is_retained_metrics_object,
1424            }),
1425            Plan::CreateSink(CreateSinkPlan {
1426                sink,
1427                with_snapshot,
1428                in_cluster,
1429                ..
1430            }) => CatalogItem::Sink(Sink {
1431                create_sql: sink.create_sql,
1432                global_id,
1433                from: sink.from,
1434                connection: sink.connection,
1435                envelope: sink.envelope,
1436                version: sink.version,
1437                with_snapshot,
1438                resolved_ids,
1439                cluster_id: in_cluster,
1440                commit_interval: sink.commit_interval,
1441            }),
1442            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1443                // Even if we don't need the `RelationDesc` here, error out
1444                // early and eagerly, as a kind of soft assertion that we _can_
1445                // build the `RelationDesc` when needed.
1446                if let Err(err) = typ.inner.desc(&session_catalog) {
1447                    return Err((err.into(), cached_expr));
1448                }
1449                CatalogItem::Type(Type {
1450                    create_sql: Some(typ.create_sql),
1451                    global_id,
1452                    details: CatalogTypeDetails {
1453                        array_id: None,
1454                        typ: typ.inner,
1455                        pg_metadata: None,
1456                    },
1457                    resolved_ids,
1458                })
1459            }
1460            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1461                create_sql: secret.create_sql,
1462                global_id,
1463            }),
1464            Plan::CreateConnection(CreateConnectionPlan {
1465                connection:
1466                    mz_sql::plan::Connection {
1467                        create_sql,
1468                        details,
1469                    },
1470                ..
1471            }) => CatalogItem::Connection(Connection {
1472                create_sql,
1473                global_id,
1474                details,
1475                resolved_ids,
1476            }),
1477            _ => {
1478                return Err((
1479                    Error::new(ErrorKind::Corruption {
1480                        detail: "catalog entry generated inappropriate plan".to_string(),
1481                    })
1482                    .into(),
1483                    cached_expr,
1484                ));
1485            }
1486        };
1487
1488        Ok((item, uncached_expr))
1489    }
1490
1491    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1492    /// Calling this method will not permanently modify any system configuration variables.
1493    ///
1494    /// WARNING:
1495    /// Any modifications made to the system configuration variables in `f`, will be lost.
1496    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1497        // Enable catalog features that might be required during planning existing
1498        // catalog items. Existing catalog items might have been created while
1499        // a specific feature flag was turned on, so we need to ensure that this
1500        // is also the case during catalog rehydration in order to avoid panics.
1501        //
1502        // WARNING / CONTRACT:
1503        // 1. Features used in this method that related to parsing / planning
1504        //    should be `enable_for_item_parsing` set to `true`.
1505        // 2. After this step, feature flag configuration must not be
1506        //    overridden.
1507        let restore = self.system_configuration.clone();
1508        self.system_configuration.enable_for_item_parsing();
1509        let res = f(self);
1510        self.system_configuration = restore;
1511        res
1512    }
1513
1514    /// Returns all indexes on the given object and cluster known in the catalog.
1515    pub fn get_indexes_on(
1516        &self,
1517        id: GlobalId,
1518        cluster: ClusterId,
1519    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1520        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1521
1522        self.try_get_entry_by_global_id(&id)
1523            .into_iter()
1524            .map(move |e| {
1525                e.used_by()
1526                    .iter()
1527                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1528                        CatalogItem::Index(index) if index_matches(index) => {
1529                            Some((index.global_id(), index))
1530                        }
1531                        _ => None,
1532                    })
1533            })
1534            .flatten()
1535    }
1536
1537    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1538        &self.database_by_id[database_id]
1539    }
1540
1541    /// Gets a reference to the specified replica of the specified cluster.
1542    ///
1543    /// Returns `None` if either the cluster or the replica does not
1544    /// exist.
1545    pub(super) fn try_get_cluster_replica(
1546        &self,
1547        id: ClusterId,
1548        replica_id: ReplicaId,
1549    ) -> Option<&ClusterReplica> {
1550        self.try_get_cluster(id)
1551            .and_then(|cluster| cluster.replica(replica_id))
1552    }
1553
1554    /// Gets a reference to the specified replica of the specified cluster.
1555    ///
1556    /// Panics if either the cluster or the replica does not exist.
1557    pub(crate) fn get_cluster_replica(
1558        &self,
1559        cluster_id: ClusterId,
1560        replica_id: ReplicaId,
1561    ) -> &ClusterReplica {
1562        self.try_get_cluster_replica(cluster_id, replica_id)
1563            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1564    }
1565
1566    pub(super) fn resolve_replica_in_cluster(
1567        &self,
1568        cluster_id: &ClusterId,
1569        replica_name: &str,
1570    ) -> Result<&ClusterReplica, SqlCatalogError> {
1571        let cluster = self.get_cluster(*cluster_id);
1572        let replica_id = cluster
1573            .replica_id_by_name_
1574            .get(replica_name)
1575            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1576        Ok(&cluster.replicas_by_id_[replica_id])
1577    }
1578
1579    /// Get system configuration `name`.
1580    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1581        Ok(self.system_configuration.get(name)?)
1582    }
1583
1584    /// Parse system configuration `name` with `value` int.
1585    ///
1586    /// Returns the parsed value as a string.
1587    pub(super) fn parse_system_configuration(
1588        &self,
1589        name: &str,
1590        value: VarInput,
1591    ) -> Result<String, Error> {
1592        let value = self.system_configuration.parse(name, value)?;
1593        Ok(value.format())
1594    }
1595
1596    /// Gets the schema map for the database matching `database_spec`.
1597    pub(super) fn resolve_schema_in_database(
1598        &self,
1599        database_spec: &ResolvedDatabaseSpecifier,
1600        schema_name: &str,
1601        conn_id: &ConnectionId,
1602    ) -> Result<&Schema, SqlCatalogError> {
1603        let schema = match database_spec {
1604            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1605                self.temporary_schemas.get(conn_id)
1606            }
1607            ResolvedDatabaseSpecifier::Ambient => self
1608                .ambient_schemas_by_name
1609                .get(schema_name)
1610                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1611            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1612                db.schemas_by_name
1613                    .get(schema_name)
1614                    .and_then(|id| db.schemas_by_id.get(id))
1615            }),
1616        };
1617        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1618    }
1619
1620    /// Try to get a schema, returning `None` if it doesn't exist.
1621    ///
1622    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1623    /// (temporary schemas are created lazily when the first temporary object is created).
1624    pub fn try_get_schema(
1625        &self,
1626        database_spec: &ResolvedDatabaseSpecifier,
1627        schema_spec: &SchemaSpecifier,
1628        conn_id: &ConnectionId,
1629    ) -> Option<&Schema> {
1630        // Keep in sync with `get_schema` and `get_schemas_mut`
1631        match (database_spec, schema_spec) {
1632            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1633                self.temporary_schemas.get(conn_id)
1634            }
1635            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1636                self.ambient_schemas_by_id.get(id)
1637            }
1638            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1639                .database_by_id
1640                .get(database_id)
1641                .and_then(|db| db.schemas_by_id.get(schema_id)),
1642            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1643                unreachable!("temporary schemas are in the ambient database")
1644            }
1645        }
1646    }
1647
1648    pub fn get_schema(
1649        &self,
1650        database_spec: &ResolvedDatabaseSpecifier,
1651        schema_spec: &SchemaSpecifier,
1652        conn_id: &ConnectionId,
1653    ) -> &Schema {
1654        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1655        self.try_get_schema(database_spec, schema_spec, conn_id)
1656            .expect("schema must exist")
1657    }
1658
1659    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1660        self.database_by_id
1661            .values()
1662            .filter_map(|database| database.schemas_by_id.get(schema_id))
1663            .chain(self.ambient_schemas_by_id.values())
1664            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1665            .into_first()
1666    }
1667
1668    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1669        self.temporary_schemas
1670            .values()
1671            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1672            .into_first()
1673    }
1674
1675    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1676        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1677    }
1678
1679    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1680        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1681    }
1682
1683    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1684        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1685    }
1686
1687    pub fn get_information_schema_id(&self) -> SchemaId {
1688        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1689    }
1690
1691    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1692        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1693    }
1694
1695    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1696        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1697    }
1698
1699    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1700        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1701    }
1702
1703    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1704        SYSTEM_SCHEMAS
1705            .iter()
1706            .map(|name| self.ambient_schemas_by_name[*name])
1707    }
1708
1709    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1710        self.system_schema_ids().contains(&id)
1711    }
1712
1713    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1714        match spec {
1715            SchemaSpecifier::Temporary => false,
1716            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1717        }
1718    }
1719
1720    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1721        UNSTABLE_SCHEMAS
1722            .iter()
1723            .map(|name| self.ambient_schemas_by_name[*name])
1724    }
1725
1726    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1727        self.unstable_schema_ids().contains(&id)
1728    }
1729
1730    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1731        match spec {
1732            SchemaSpecifier::Temporary => false,
1733            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1734        }
1735    }
1736
1737    /// Creates a new schema in the `Catalog` for temporary items
1738    /// indicated by the TEMPORARY or TEMP keywords.
1739    pub fn create_temporary_schema(
1740        &mut self,
1741        conn_id: &ConnectionId,
1742        owner_id: RoleId,
1743    ) -> Result<(), Error> {
1744        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1745        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1746        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1747        // all -1.
1748        let oid = INVALID_OID;
1749        self.temporary_schemas.insert(
1750            conn_id.clone(),
1751            Schema {
1752                name: QualifiedSchemaName {
1753                    database: ResolvedDatabaseSpecifier::Ambient,
1754                    schema: MZ_TEMP_SCHEMA.into(),
1755                },
1756                id: SchemaSpecifier::Temporary,
1757                oid,
1758                items: BTreeMap::new(),
1759                functions: BTreeMap::new(),
1760                types: BTreeMap::new(),
1761                owner_id,
1762                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1763                    mz_sql::catalog::ObjectType::Schema,
1764                    owner_id,
1765                )]),
1766            },
1767        );
1768        Ok(())
1769    }
1770
1771    /// Return all OIDs that are allocated to temporary objects.
1772    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1773        std::iter::empty()
1774            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1775                if schema.id.is_temporary() {
1776                    Some(schema.oid)
1777                } else {
1778                    None
1779                }
1780            }))
1781            .chain(self.entry_by_id.values().filter_map(|entry| {
1782                if entry.item().is_temporary() {
1783                    Some(entry.oid)
1784                } else {
1785                    None
1786                }
1787            }))
1788    }
1789
1790    /// Optimized lookup for a builtin table.
1791    ///
1792    /// Panics if the builtin table doesn't exist in the catalog.
1793    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1794        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1795    }
1796
1797    /// Optimized lookup for a builtin log.
1798    ///
1799    /// Panics if the builtin log doesn't exist in the catalog.
1800    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1801        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1802        let log = match self.get_entry(&item_id).item() {
1803            CatalogItem::Log(log) => log,
1804            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1805        };
1806        (item_id, log.global_id)
1807    }
1808
1809    /// Optimized lookup for a builtin storage collection.
1810    ///
1811    /// Panics if the builtin storage collection doesn't exist in the catalog.
1812    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1813        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1814    }
1815
1816    /// Optimized lookup for a builtin object.
1817    ///
1818    /// Panics if the builtin object doesn't exist in the catalog.
1819    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1820        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1821        let schema = &self.ambient_schemas_by_id[schema_id];
1822        match builtin.catalog_item_type() {
1823            CatalogItemType::Type => schema.types[builtin.name()],
1824            CatalogItemType::Func => schema.functions[builtin.name()],
1825            CatalogItemType::Table
1826            | CatalogItemType::Source
1827            | CatalogItemType::Sink
1828            | CatalogItemType::View
1829            | CatalogItemType::MaterializedView
1830            | CatalogItemType::Index
1831            | CatalogItemType::Secret
1832            | CatalogItemType::Connection
1833            | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1834        }
1835    }
1836
1837    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1838    pub fn resolve_builtin_type_references(
1839        &self,
1840        builtin: &BuiltinType<NameReference>,
1841    ) -> BuiltinType<IdReference> {
1842        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1843            CatalogType::AclItem => CatalogType::AclItem,
1844            CatalogType::Array { element_reference } => CatalogType::Array {
1845                element_reference: self.get_system_type(element_reference).id,
1846            },
1847            CatalogType::List {
1848                element_reference,
1849                element_modifiers,
1850            } => CatalogType::List {
1851                element_reference: self.get_system_type(element_reference).id,
1852                element_modifiers: element_modifiers.clone(),
1853            },
1854            CatalogType::Map {
1855                key_reference,
1856                value_reference,
1857                key_modifiers,
1858                value_modifiers,
1859            } => CatalogType::Map {
1860                key_reference: self.get_system_type(key_reference).id,
1861                value_reference: self.get_system_type(value_reference).id,
1862                key_modifiers: key_modifiers.clone(),
1863                value_modifiers: value_modifiers.clone(),
1864            },
1865            CatalogType::Range { element_reference } => CatalogType::Range {
1866                element_reference: self.get_system_type(element_reference).id,
1867            },
1868            CatalogType::Record { fields } => CatalogType::Record {
1869                fields: fields
1870                    .into_iter()
1871                    .map(|f| CatalogRecordField {
1872                        name: f.name.clone(),
1873                        type_reference: self.get_system_type(f.type_reference).id,
1874                        type_modifiers: f.type_modifiers.clone(),
1875                    })
1876                    .collect(),
1877            },
1878            CatalogType::Bool => CatalogType::Bool,
1879            CatalogType::Bytes => CatalogType::Bytes,
1880            CatalogType::Char => CatalogType::Char,
1881            CatalogType::Date => CatalogType::Date,
1882            CatalogType::Float32 => CatalogType::Float32,
1883            CatalogType::Float64 => CatalogType::Float64,
1884            CatalogType::Int16 => CatalogType::Int16,
1885            CatalogType::Int32 => CatalogType::Int32,
1886            CatalogType::Int64 => CatalogType::Int64,
1887            CatalogType::UInt16 => CatalogType::UInt16,
1888            CatalogType::UInt32 => CatalogType::UInt32,
1889            CatalogType::UInt64 => CatalogType::UInt64,
1890            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1891            CatalogType::Interval => CatalogType::Interval,
1892            CatalogType::Jsonb => CatalogType::Jsonb,
1893            CatalogType::Numeric => CatalogType::Numeric,
1894            CatalogType::Oid => CatalogType::Oid,
1895            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1896            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1897            CatalogType::Pseudo => CatalogType::Pseudo,
1898            CatalogType::RegClass => CatalogType::RegClass,
1899            CatalogType::RegProc => CatalogType::RegProc,
1900            CatalogType::RegType => CatalogType::RegType,
1901            CatalogType::String => CatalogType::String,
1902            CatalogType::Time => CatalogType::Time,
1903            CatalogType::Timestamp => CatalogType::Timestamp,
1904            CatalogType::TimestampTz => CatalogType::TimestampTz,
1905            CatalogType::Uuid => CatalogType::Uuid,
1906            CatalogType::VarChar => CatalogType::VarChar,
1907            CatalogType::Int2Vector => CatalogType::Int2Vector,
1908            CatalogType::MzAclItem => CatalogType::MzAclItem,
1909        };
1910
1911        BuiltinType {
1912            name: builtin.name,
1913            schema: builtin.schema,
1914            oid: builtin.oid,
1915            details: CatalogTypeDetails {
1916                array_id: builtin.details.array_id,
1917                typ,
1918                pg_metadata: builtin.details.pg_metadata.clone(),
1919            },
1920        }
1921    }
1922
1923    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1924        &self.config
1925    }
1926
1927    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1928        match self.database_by_name.get(database_name) {
1929            Some(id) => Ok(&self.database_by_id[id]),
1930            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1931        }
1932    }
1933
1934    pub fn resolve_schema(
1935        &self,
1936        current_database: Option<&DatabaseId>,
1937        database_name: Option<&str>,
1938        schema_name: &str,
1939        conn_id: &ConnectionId,
1940    ) -> Result<&Schema, SqlCatalogError> {
1941        let database_spec = match database_name {
1942            // If a database is explicitly specified, validate it. Note that we
1943            // intentionally do not validate `current_database` to permit
1944            // querying `mz_catalog` with an invalid session database, e.g., so
1945            // that you can run `SHOW DATABASES` to *find* a valid database.
1946            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1947                self.resolve_database(database)?.id().clone(),
1948            )),
1949            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1950        };
1951
1952        // First try to find the schema in the named database.
1953        if let Some(database_spec) = database_spec {
1954            if let Ok(schema) =
1955                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1956            {
1957                return Ok(schema);
1958            }
1959        }
1960
1961        // Then fall back to the ambient database.
1962        if let Ok(schema) = self.resolve_schema_in_database(
1963            &ResolvedDatabaseSpecifier::Ambient,
1964            schema_name,
1965            conn_id,
1966        ) {
1967            return Ok(schema);
1968        }
1969
1970        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1971    }
1972
1973    /// Optimized lookup for a system schema.
1974    ///
1975    /// Panics if the system schema doesn't exist in the catalog.
1976    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1977        self.ambient_schemas_by_name[name]
1978    }
1979
1980    pub fn resolve_search_path(
1981        &self,
1982        session: &dyn SessionMetadata,
1983    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1984        let database = self
1985            .database_by_name
1986            .get(session.database())
1987            .map(|id| id.clone());
1988
1989        session
1990            .search_path()
1991            .iter()
1992            .map(|schema| {
1993                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1994            })
1995            .filter_map(|schema| schema.ok())
1996            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1997            .collect()
1998    }
1999
2000    pub fn effective_search_path(
2001        &self,
2002        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2003        include_temp_schema: bool,
2004    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2005        let mut v = Vec::with_capacity(search_path.len() + 3);
2006        // Temp schema is only included for relations and data types, not for functions and operators
2007        let temp_schema = (
2008            ResolvedDatabaseSpecifier::Ambient,
2009            SchemaSpecifier::Temporary,
2010        );
2011        if include_temp_schema && !search_path.contains(&temp_schema) {
2012            v.push(temp_schema);
2013        }
2014        let default_schemas = [
2015            (
2016                ResolvedDatabaseSpecifier::Ambient,
2017                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2018            ),
2019            (
2020                ResolvedDatabaseSpecifier::Ambient,
2021                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2022            ),
2023        ];
2024        for schema in default_schemas.into_iter() {
2025            if !search_path.contains(&schema) {
2026                v.push(schema);
2027            }
2028        }
2029        v.extend_from_slice(search_path);
2030        v
2031    }
2032
2033    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2034        let id = self
2035            .clusters_by_name
2036            .get(name)
2037            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2038        Ok(&self.clusters_by_id[id])
2039    }
2040
2041    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2042        let id = self
2043            .clusters_by_name
2044            .get(cluster.name)
2045            .expect("failed to lookup BuiltinCluster by name");
2046        self.clusters_by_id
2047            .get(id)
2048            .expect("failed to lookup BuiltinCluster by ID")
2049    }
2050
2051    pub fn resolve_cluster_replica(
2052        &self,
2053        cluster_replica_name: &QualifiedReplica,
2054    ) -> Result<&ClusterReplica, SqlCatalogError> {
2055        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2056        let replica_name = cluster_replica_name.replica.as_str();
2057        let replica_id = cluster
2058            .replica_id(replica_name)
2059            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2060        Ok(cluster.replica(replica_id).expect("Must exist"))
2061    }
2062
2063    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2064    ///
2065    /// If `name` does not specify a database, the `current_database` is used.
2066    /// If `name` does not specify a schema, then the schemas in `search_path`
2067    /// are searched in order.
2068    #[allow(clippy::useless_let_if_seq)]
2069    pub fn resolve(
2070        &self,
2071        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2072        current_database: Option<&DatabaseId>,
2073        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2074        name: &PartialItemName,
2075        conn_id: &ConnectionId,
2076        err_gen: fn(String) -> SqlCatalogError,
2077    ) -> Result<&CatalogEntry, SqlCatalogError> {
2078        // If a schema name was specified, just try to find the item in that
2079        // schema. If no schema was specified, try to find the item in the connection's
2080        // temporary schema. If the item is not found, try to find the item in every
2081        // schema in the search path.
2082        let schemas = match &name.schema {
2083            Some(schema_name) => {
2084                match self.resolve_schema(
2085                    current_database,
2086                    name.database.as_deref(),
2087                    schema_name,
2088                    conn_id,
2089                ) {
2090                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2091                    Err(e) => return Err(e),
2092                }
2093            }
2094            None => match self
2095                .try_get_schema(
2096                    &ResolvedDatabaseSpecifier::Ambient,
2097                    &SchemaSpecifier::Temporary,
2098                    conn_id,
2099                )
2100                .and_then(|schema| schema.items.get(&name.item))
2101            {
2102                Some(id) => return Ok(self.get_entry(id)),
2103                None => search_path.to_vec(),
2104            },
2105        };
2106
2107        for (database_spec, schema_spec) in &schemas {
2108            // Use try_get_schema because the temp schema might not exist yet
2109            // (it's created lazily when the first temp object is created).
2110            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2111                continue;
2112            };
2113
2114            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2115                return Ok(&self.entry_by_id[id]);
2116            }
2117        }
2118
2119        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2120        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2121        // automatically let uses of the old schema resolve to the new ones as well.
2122        // TODO(database-issues#8173) remove this after sufficient time has passed
2123        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2124        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2125            for schema_id in [
2126                self.get_mz_catalog_unstable_schema_id(),
2127                self.get_mz_introspection_schema_id(),
2128            ] {
2129                let schema = self.get_schema(
2130                    &ResolvedDatabaseSpecifier::Ambient,
2131                    &SchemaSpecifier::Id(schema_id),
2132                    conn_id,
2133                );
2134
2135                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2136                    debug!(
2137                        github_27831 = true,
2138                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2139                    );
2140                    return Ok(&self.entry_by_id[id]);
2141                }
2142            }
2143        }
2144
2145        Err(err_gen(name.to_string()))
2146    }
2147
2148    /// Resolves `name` to a non-function [`CatalogEntry`].
2149    pub fn resolve_entry(
2150        &self,
2151        current_database: Option<&DatabaseId>,
2152        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2153        name: &PartialItemName,
2154        conn_id: &ConnectionId,
2155    ) -> Result<&CatalogEntry, SqlCatalogError> {
2156        self.resolve(
2157            |schema| &schema.items,
2158            current_database,
2159            search_path,
2160            name,
2161            conn_id,
2162            SqlCatalogError::UnknownItem,
2163        )
2164    }
2165
2166    /// Resolves `name` to a function [`CatalogEntry`].
2167    pub fn resolve_function(
2168        &self,
2169        current_database: Option<&DatabaseId>,
2170        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2171        name: &PartialItemName,
2172        conn_id: &ConnectionId,
2173    ) -> Result<&CatalogEntry, SqlCatalogError> {
2174        self.resolve(
2175            |schema| &schema.functions,
2176            current_database,
2177            search_path,
2178            name,
2179            conn_id,
2180            |name| SqlCatalogError::UnknownFunction {
2181                name,
2182                alternative: None,
2183            },
2184        )
2185    }
2186
2187    /// Resolves `name` to a type [`CatalogEntry`].
2188    pub fn resolve_type(
2189        &self,
2190        current_database: Option<&DatabaseId>,
2191        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2192        name: &PartialItemName,
2193        conn_id: &ConnectionId,
2194    ) -> Result<&CatalogEntry, SqlCatalogError> {
2195        static NON_PG_CATALOG_TYPES: LazyLock<
2196            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2197        > = LazyLock::new(|| {
2198            BUILTINS::types()
2199                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2200                .map(|typ| (typ.name, typ))
2201                .collect()
2202        });
2203
2204        let entry = self.resolve(
2205            |schema| &schema.types,
2206            current_database,
2207            search_path,
2208            name,
2209            conn_id,
2210            |name| SqlCatalogError::UnknownType { name },
2211        )?;
2212
2213        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2214            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2215                warn!(
2216                    "user specified an incorrect schema of {} for the type {}, which should be in \
2217                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2218                    PG_CATALOG_SCHEMA.quoted(),
2219                    typ.name.quoted(),
2220                    typ.schema.quoted(),
2221                )
2222            }
2223        }
2224
2225        Ok(entry)
2226    }
2227
2228    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2229    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2230        match object_id {
2231            ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2232            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2233            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2234            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2235            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2236            ObjectId::ClusterReplica(cluster_replica_id) => {
2237                CommentObjectId::ClusterReplica(cluster_replica_id)
2238            }
2239            ObjectId::NetworkPolicy(network_policy_id) => {
2240                CommentObjectId::NetworkPolicy(network_policy_id)
2241            }
2242        }
2243    }
2244
2245    /// Return current system configuration.
2246    pub fn system_config(&self) -> &SystemVars {
2247        &self.system_configuration
2248    }
2249
2250    /// Return a mutable reference to the current system configuration.
2251    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2252        &mut self.system_configuration
2253    }
2254
2255    /// Serializes the catalog's in-memory state.
2256    ///
2257    /// There are no guarantees about the format of the serialized state, except
2258    /// that the serialized state for two identical catalogs will compare
2259    /// identically.
2260    ///
2261    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2262    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2263    /// argument.
2264    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2265        // Dump the base catalog.
2266        let mut dump = serde_json::to_value(&self).map_err(|e| {
2267            Error::new(ErrorKind::Unstructured(format!(
2268                // Don't panic here because we don't have compile-time failures for maps with
2269                // non-string keys.
2270                "internal error: could not dump catalog: {}",
2271                e
2272            )))
2273        })?;
2274
2275        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2276        // Stitch in system parameter defaults.
2277        dump_obj.insert(
2278            "system_parameter_defaults".into(),
2279            serde_json::json!(self.system_config().defaults()),
2280        );
2281        // Potentially overwrite unfinalized shards.
2282        if let Some(unfinalized_shards) = unfinalized_shards {
2283            dump_obj
2284                .get_mut("storage_metadata")
2285                .expect("known to exist")
2286                .as_object_mut()
2287                .expect("storage_metadata is an object")
2288                .insert(
2289                    "unfinalized_shards".into(),
2290                    serde_json::json!(unfinalized_shards),
2291                );
2292        }
2293        // Remove GlobalIds for temporary objects from the mapping.
2294        //
2295        // Post-test consistency checks with the durable catalog don't know about temporary items
2296        // since they're kept entirely in memory.
2297        let temporary_gids: Vec<_> = self
2298            .entry_by_global_id
2299            .iter()
2300            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2301            .map(|(gid, _item_id)| *gid)
2302            .collect();
2303        if !temporary_gids.is_empty() {
2304            let gids = dump_obj
2305                .get_mut("entry_by_global_id")
2306                .expect("known_to_exist")
2307                .as_object_mut()
2308                .expect("entry_by_global_id is an object");
2309            for gid in temporary_gids {
2310                gids.remove(&gid.to_string());
2311            }
2312        }
2313        // We exclude role_auth_by_id because it contains password information
2314        // which should not be included in the dump.
2315        dump_obj.remove("role_auth_by_id");
2316
2317        // Emit as pretty-printed JSON.
2318        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2319    }
2320
2321    pub fn availability_zones(&self) -> &[String] {
2322        &self.availability_zones
2323    }
2324
2325    pub fn concretize_replica_location(
2326        &self,
2327        location: mz_catalog::durable::ReplicaLocation,
2328        allowed_sizes: &Vec<String>,
2329        allowed_availability_zones: Option<&[String]>,
2330    ) -> Result<ReplicaLocation, Error> {
2331        let location = match location {
2332            mz_catalog::durable::ReplicaLocation::Unmanaged {
2333                storagectl_addrs,
2334                computectl_addrs,
2335            } => {
2336                if allowed_availability_zones.is_some() {
2337                    return Err(Error {
2338                        kind: ErrorKind::Internal(
2339                            "tried concretize unmanaged replica with specific availability_zones"
2340                                .to_string(),
2341                        ),
2342                    });
2343                }
2344                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2345                    storagectl_addrs,
2346                    computectl_addrs,
2347                })
2348            }
2349            mz_catalog::durable::ReplicaLocation::Managed {
2350                size,
2351                availability_zone,
2352                billed_as,
2353                internal,
2354                pending,
2355            } => {
2356                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2357                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2358                    return Err(Error {
2359                        kind: ErrorKind::Internal(message.to_string()),
2360                    });
2361                }
2362                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2363                let cluster_replica_sizes = &self.cluster_replica_sizes;
2364
2365                ReplicaLocation::Managed(ManagedReplicaLocation {
2366                    allocation: cluster_replica_sizes
2367                        .0
2368                        .get(&size)
2369                        .expect("catalog out of sync")
2370                        .clone(),
2371                    availability_zones: match (availability_zone, allowed_availability_zones) {
2372                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2373                        (None, Some(azs)) if azs.is_empty() => {
2374                            ManagedReplicaAvailabilityZones::FromCluster(None)
2375                        }
2376                        (None, Some(azs)) => {
2377                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2378                        }
2379                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2380                    },
2381                    size,
2382                    billed_as,
2383                    internal,
2384                    pending,
2385                })
2386            }
2387        };
2388        Ok(location)
2389    }
2390
2391    /// Return whether the given replica size requests a disk.
2392    ///
2393    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2394    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2395    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2396    /// not something we can know at this point.
2397    ///
2398    /// # Panics
2399    ///
2400    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2401    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2402        let alloc = &self.cluster_replica_sizes.0[size];
2403        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2404    }
2405
2406    pub(crate) fn ensure_valid_replica_size(
2407        &self,
2408        allowed_sizes: &[String],
2409        size: &String,
2410    ) -> Result<(), Error> {
2411        let cluster_replica_sizes = &self.cluster_replica_sizes;
2412
2413        if !cluster_replica_sizes.0.contains_key(size)
2414            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2415            || cluster_replica_sizes.0[size].disabled
2416        {
2417            let mut entries = cluster_replica_sizes
2418                .enabled_allocations()
2419                .collect::<Vec<_>>();
2420
2421            if !allowed_sizes.is_empty() {
2422                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2423                entries.retain(|(name, _)| allowed_sizes.contains(name));
2424            }
2425
2426            entries.sort_by_key(
2427                |(
2428                    _name,
2429                    ReplicaAllocation {
2430                        scale, cpu_limit, ..
2431                    },
2432                )| (scale, cpu_limit),
2433            );
2434
2435            Err(Error {
2436                kind: ErrorKind::InvalidClusterReplicaSize {
2437                    size: size.to_owned(),
2438                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2439                },
2440            })
2441        } else {
2442            Ok(())
2443        }
2444    }
2445
2446    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2447        if role_id.is_builtin() {
2448            let role = self.get_role(role_id);
2449            Err(Error::new(ErrorKind::ReservedRoleName(
2450                role.name().to_string(),
2451            )))
2452        } else {
2453            Ok(())
2454        }
2455    }
2456
2457    pub fn ensure_not_reserved_network_policy(
2458        &self,
2459        network_policy_id: &NetworkPolicyId,
2460    ) -> Result<(), Error> {
2461        if network_policy_id.is_builtin() {
2462            let policy = self.get_network_policy(network_policy_id);
2463            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2464                policy.name.clone(),
2465            )))
2466        } else {
2467            Ok(())
2468        }
2469    }
2470
2471    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2472        let is_grantable = !role_id.is_public() && !role_id.is_system();
2473        if is_grantable {
2474            Ok(())
2475        } else {
2476            let role = self.get_role(role_id);
2477            Err(Error::new(ErrorKind::UngrantableRoleName(
2478                role.name().to_string(),
2479            )))
2480        }
2481    }
2482
2483    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2484        if role_id.is_system() {
2485            let role = self.get_role(role_id);
2486            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2487                role.name().to_string(),
2488            )))
2489        } else {
2490            Ok(())
2491        }
2492    }
2493
2494    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2495        if role_id.is_predefined() {
2496            let role = self.get_role(role_id);
2497            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2498                role.name().to_string(),
2499            )))
2500        } else {
2501            Ok(())
2502        }
2503    }
2504
2505    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2506    // passing tx, and session?
2507    pub(crate) fn add_to_audit_log(
2508        system_configuration: &SystemVars,
2509        oracle_write_ts: mz_repr::Timestamp,
2510        session: Option<&ConnMeta>,
2511        tx: &mut mz_catalog::durable::Transaction,
2512        audit_events: &mut Vec<VersionedEvent>,
2513        event_type: EventType,
2514        object_type: ObjectType,
2515        details: EventDetails,
2516    ) -> Result<(), Error> {
2517        let user = session.map(|session| session.user().name.to_string());
2518
2519        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2520
2521        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2522            Some(ts) => ts.into(),
2523            _ => oracle_write_ts.into(),
2524        };
2525        let id = tx.allocate_audit_log_id()?;
2526        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2527        audit_events.push(event.clone());
2528        tx.insert_audit_log_event(event);
2529        Ok(())
2530    }
2531
2532    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2533        match id {
2534            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2535            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2536                self.get_cluster_replica(*cluster_id, *replica_id)
2537                    .owner_id(),
2538            ),
2539            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2540            ObjectId::Schema((database_spec, schema_spec)) => Some(
2541                self.get_schema(database_spec, schema_spec, conn_id)
2542                    .owner_id(),
2543            ),
2544            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2545            ObjectId::Role(_) => None,
2546            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2547        }
2548    }
2549
2550    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2551        match object_id {
2552            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2553            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2554            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2555            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2556            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2557            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2558            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2559        }
2560    }
2561
2562    pub(super) fn get_system_object_type(
2563        &self,
2564        id: &SystemObjectId,
2565    ) -> mz_sql::catalog::SystemObjectType {
2566        match id {
2567            SystemObjectId::Object(object_id) => {
2568                SystemObjectType::Object(self.get_object_type(object_id))
2569            }
2570            SystemObjectId::System => SystemObjectType::System,
2571        }
2572    }
2573
2574    /// Returns a read-only view of the current [`StorageMetadata`].
2575    ///
2576    /// To write to this struct, you must use a catalog transaction.
2577    pub fn storage_metadata(&self) -> &StorageMetadata {
2578        &self.storage_metadata
2579    }
2580
2581    /// For the Sources ids in `ids`, return their compaction windows.
2582    pub fn source_compaction_windows(
2583        &self,
2584        ids: impl IntoIterator<Item = CatalogItemId>,
2585    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2586        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2587        let mut seen = BTreeSet::new();
2588        for item_id in ids {
2589            if !seen.insert(item_id) {
2590                continue;
2591            }
2592            let entry = self.get_entry(&item_id);
2593            match entry.item() {
2594                CatalogItem::Source(source) => {
2595                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2596                    match source.data_source {
2597                        DataSourceDesc::Ingestion { .. }
2598                        | DataSourceDesc::OldSyntaxIngestion { .. }
2599                        | DataSourceDesc::IngestionExport { .. } => {
2600                            cws.entry(source_cw).or_default().insert(item_id);
2601                        }
2602                        DataSourceDesc::Introspection(_)
2603                        | DataSourceDesc::Progress
2604                        | DataSourceDesc::Webhook { .. } => {
2605                            cws.entry(source_cw).or_default().insert(item_id);
2606                        }
2607                    }
2608                }
2609                CatalogItem::Table(table) => {
2610                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2611                    match &table.data_source {
2612                        TableDataSource::DataSource {
2613                            desc: DataSourceDesc::IngestionExport { .. },
2614                            timeline: _,
2615                        } => {
2616                            cws.entry(table_cw).or_default().insert(item_id);
2617                        }
2618                        _ => {}
2619                    }
2620                }
2621                _ => {
2622                    // Views could depend on sources, so ignore them if added by used_by above.
2623                    continue;
2624                }
2625            }
2626        }
2627        cws
2628    }
2629
2630    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2631        match id {
2632            CommentObjectId::Table(id)
2633            | CommentObjectId::View(id)
2634            | CommentObjectId::MaterializedView(id)
2635            | CommentObjectId::Source(id)
2636            | CommentObjectId::Sink(id)
2637            | CommentObjectId::Index(id)
2638            | CommentObjectId::Func(id)
2639            | CommentObjectId::Connection(id)
2640            | CommentObjectId::Type(id)
2641            | CommentObjectId::Secret(id)
2642            | CommentObjectId::ContinualTask(id) => Some(*id),
2643            CommentObjectId::Role(_)
2644            | CommentObjectId::Database(_)
2645            | CommentObjectId::Schema(_)
2646            | CommentObjectId::Cluster(_)
2647            | CommentObjectId::ClusterReplica(_)
2648            | CommentObjectId::NetworkPolicy(_) => None,
2649        }
2650    }
2651
2652    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2653        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2654    }
2655
2656    pub fn comment_id_to_audit_log_name(
2657        &self,
2658        id: CommentObjectId,
2659        conn_id: &ConnectionId,
2660    ) -> String {
2661        match id {
2662            CommentObjectId::Table(id)
2663            | CommentObjectId::View(id)
2664            | CommentObjectId::MaterializedView(id)
2665            | CommentObjectId::Source(id)
2666            | CommentObjectId::Sink(id)
2667            | CommentObjectId::Index(id)
2668            | CommentObjectId::Func(id)
2669            | CommentObjectId::Connection(id)
2670            | CommentObjectId::Type(id)
2671            | CommentObjectId::Secret(id)
2672            | CommentObjectId::ContinualTask(id) => {
2673                let item = self.get_entry(&id);
2674                let name = self.resolve_full_name(item.name(), Some(conn_id));
2675                name.to_string()
2676            }
2677            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2678            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2679            CommentObjectId::Schema((spec, schema_id)) => {
2680                let schema = self.get_schema(&spec, &schema_id, conn_id);
2681                self.resolve_full_schema_name(&schema.name).to_string()
2682            }
2683            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2684            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2685                let cluster = self.get_cluster(cluster_id);
2686                let replica = self.get_cluster_replica(cluster_id, replica_id);
2687                QualifiedReplica {
2688                    cluster: Ident::new_unchecked(cluster.name.clone()),
2689                    replica: Ident::new_unchecked(replica.name.clone()),
2690                }
2691                .to_string()
2692            }
2693            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2694        }
2695    }
2696
2697    pub fn mock_authentication_nonce(&self) -> String {
2698        self.mock_authentication_nonce.clone().unwrap_or_default()
2699    }
2700}
2701
2702impl ConnectionResolver for CatalogState {
2703    fn resolve_connection(
2704        &self,
2705        id: CatalogItemId,
2706    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2707        use mz_storage_types::connections::Connection::*;
2708        match self
2709            .get_entry(&id)
2710            .connection()
2711            .expect("catalog out of sync")
2712            .details
2713            .to_connection()
2714        {
2715            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2716            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2717            Csr(conn) => Csr(conn.into_inline_connection(self)),
2718            Ssh(conn) => Ssh(conn),
2719            Aws(conn) => Aws(conn),
2720            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2721            MySql(conn) => MySql(conn.into_inline_connection(self)),
2722            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2723            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2724        }
2725    }
2726}
2727
2728impl OptimizerCatalog for CatalogState {
2729    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2730        CatalogState::get_entry_by_global_id(self, id)
2731    }
2732    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2733        CatalogState::get_entry(self, id)
2734    }
2735    fn resolve_full_name(
2736        &self,
2737        name: &QualifiedItemName,
2738        conn_id: Option<&ConnectionId>,
2739    ) -> FullItemName {
2740        CatalogState::resolve_full_name(self, name, conn_id)
2741    }
2742    fn get_indexes_on(
2743        &self,
2744        id: GlobalId,
2745        cluster: ClusterId,
2746    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2747        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2748    }
2749}
2750
2751impl OptimizerCatalog for Catalog {
2752    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2753        self.state.get_entry_by_global_id(id)
2754    }
2755
2756    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2757        self.state.get_entry(id)
2758    }
2759
2760    fn resolve_full_name(
2761        &self,
2762        name: &QualifiedItemName,
2763        conn_id: Option<&ConnectionId>,
2764    ) -> FullItemName {
2765        self.state.resolve_full_name(name, conn_id)
2766    }
2767
2768    fn get_indexes_on(
2769        &self,
2770        id: GlobalId,
2771        cluster: ClusterId,
2772    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2773        Box::new(self.state.get_indexes_on(id, cluster))
2774    }
2775}
2776
2777impl Catalog {
2778    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2779        self
2780    }
2781}