Skip to main content

mz_adapter/catalog/
state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! In-memory metadata storage for the coordinator.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27    BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34    Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35    NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36    TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40    UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53    INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54    MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55    UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61    CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62    VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{
67    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
68    CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
69    CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
70    TypeReference,
71};
72use mz_sql::catalog::{CatalogConfig, EnvironmentId};
73use mz_sql::names::{
74    CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75    PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79    CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80    CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81    Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91    ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use mz_transform::notice::OptimizerNotice;
94use serde::Serialize;
95use timely::progress::Antichain;
96use tokio::sync::mpsc;
97use tracing::{debug, warn};
98
99// DO NOT add any more imports from `crate` outside of `crate::catalog`.
100use crate::AdapterError;
101use crate::catalog::{Catalog, ConnCatalog};
102use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
103use crate::optimize::{self, Optimize, OptimizerCatalog};
104use crate::session::Session;
105
106/// The in-memory representation of the Catalog. This struct is not directly used to persist
107/// metadata to persistent storage. For persistent metadata see
108/// [`mz_catalog::durable::DurableCatalogState`].
109///
110/// [`Serialize`] is implemented to create human readable dumps of the in-memory state, not for
111/// storing the contents of this struct on disk.
112#[derive(Debug, Clone, Serialize)]
113pub struct CatalogState {
114    // State derived from the durable catalog. These fields should only be mutated in `open.rs` or
115    // `apply.rs`. Some of these fields are not 100% derived from the durable catalog. Those
116    // include:
117    //  - Temporary items.
118    //  - Certain objects are partially derived from read-only state.
119    pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
120    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121    pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
122    #[serde(serialize_with = "skip_temp_items")]
123    pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
124    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125    pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
126    pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
127    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128    pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
129    pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
130    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131    pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
132    pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
133    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
134    pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
135    pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
136    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
137    pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
138    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
139    pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
140
141    #[serde(skip)]
142    pub(super) system_configuration: Arc<SystemVars>,
143    pub(super) default_privileges: Arc<DefaultPrivileges>,
144    pub(super) system_privileges: Arc<PrivilegeMap>,
145    pub(super) comments: Arc<CommentsMap>,
146    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
147    pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
148    pub(super) storage_metadata: Arc<StorageMetadata>,
149    pub(super) mock_authentication_nonce: Option<String>,
150
151    // Mutable state not derived from the durable catalog. Populated
152    // during dataflow bootstrapping (`bootstrap_dataflow_plans`), which
153    // doesn't run in Testdrive's read-only consistency check, so this
154    // must be `#[serde(skip)]`.
155    #[serde(skip)]
156    pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
157
158    // Populated by active connections creating temporary objects. The
159    // read-only catalog opened by Testdrive's consistency check has no
160    // active connections, so this must be `#[serde(skip)]`.
161    #[serde(skip)]
162    pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
163
164    // Read-only state not derived from the durable catalog.
165    #[serde(skip)]
166    pub(super) config: mz_sql::catalog::CatalogConfig,
167    pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
168    #[serde(skip)]
169    pub(crate) availability_zones: Vec<String>,
170
171    // Read-only not derived from the durable catalog.
172    #[serde(skip)]
173    pub(super) egress_addresses: Vec<IpNet>,
174    pub(super) aws_principal_context: Option<AwsPrincipalContext>,
175    pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
176    pub(super) http_host_name: Option<String>,
177
178    // Read-only not derived from the durable catalog.
179    #[serde(skip)]
180    pub(super) license_key: ValidatedLicenseKey,
181}
182
183/// Keeps track of what expressions are cached or not during startup.
184/// It's also used during catalog transactions to avoid re-optimizing CREATE VIEW / CREATE MAT VIEW
185/// statements when going back and forth between durable catalog operations and in-memory catalog
186/// operations.
187#[derive(Debug, Clone, Serialize)]
188pub(crate) enum LocalExpressionCache {
189    /// The cache is being used.
190    Open {
191        /// The local expressions that were cached in the expression cache.
192        cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
193        /// The local expressions that were NOT cached in the expression cache.
194        uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
195    },
196    /// The cache is not being used.
197    Closed,
198}
199
200impl LocalExpressionCache {
201    pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
202        Self::Open {
203            cached_exprs,
204            uncached_exprs: BTreeMap::new(),
205        }
206    }
207
208    pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
209        match self {
210            LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
211            LocalExpressionCache::Closed => None,
212        }
213    }
214
215    /// Insert an expression that was cached, back into the cache. This is generally needed when
216    /// parsing/planning an expression fails, but we don't want to lose the cached expression.
217    pub(super) fn insert_cached_expression(
218        &mut self,
219        id: GlobalId,
220        local_expressions: LocalExpressions,
221    ) {
222        match self {
223            LocalExpressionCache::Open { cached_exprs, .. } => {
224                cached_exprs.insert(id, local_expressions);
225            }
226            LocalExpressionCache::Closed => {}
227        }
228    }
229
230    /// Inform the cache that `id` was not found in the cache and that we should add it as
231    /// `local_mir` and `optimizer_features`.
232    pub(super) fn insert_uncached_expression(
233        &mut self,
234        id: GlobalId,
235        local_mir: OptimizedMirRelationExpr,
236        optimizer_features: OptimizerFeatures,
237    ) {
238        match self {
239            LocalExpressionCache::Open { uncached_exprs, .. } => {
240                let local_expr = LocalExpressions {
241                    local_mir,
242                    optimizer_features,
243                };
244                // If we are trying to cache the same item a second time, with a different
245                // expression, then we must be migrating the object or doing something else weird.
246                // Caching the unmigrated expression may cause us to incorrectly use the unmigrated
247                // version after a restart. Caching the migrated version may cause us to incorrectly
248                // think that the object has already been migrated. To simplify things, we cache
249                // neither.
250                let prev = uncached_exprs.remove(&id);
251                match prev {
252                    Some(prev) if prev == local_expr => {
253                        uncached_exprs.insert(id, local_expr);
254                    }
255                    None => {
256                        uncached_exprs.insert(id, local_expr);
257                    }
258                    Some(_) => {}
259                }
260            }
261            LocalExpressionCache::Closed => {}
262        }
263    }
264
265    pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
266        match self {
267            LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
268            LocalExpressionCache::Closed => BTreeMap::new(),
269        }
270    }
271}
272
273fn skip_temp_items<S>(
274    entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
275    serializer: S,
276) -> Result<S::Ok, S::Error>
277where
278    S: serde::Serializer,
279{
280    mz_ore::serde::map_key_to_string(
281        entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
282        serializer,
283    )
284}
285
286impl CatalogState {
287    /// Returns an empty [`CatalogState`] that can be used in tests.
288    // TODO: Ideally we'd mark this as `#[cfg(test)]`, but that doesn't work with the way
289    // tests are structured in this repository.
290    pub fn empty_test() -> Self {
291        CatalogState {
292            database_by_name: Default::default(),
293            database_by_id: Default::default(),
294            entry_by_id: Default::default(),
295            entry_by_global_id: Default::default(),
296            notices_by_dep_id: Default::default(),
297            ambient_schemas_by_name: Default::default(),
298            ambient_schemas_by_id: Default::default(),
299            temporary_schemas: Default::default(),
300            clusters_by_id: Default::default(),
301            clusters_by_name: Default::default(),
302            network_policies_by_name: Default::default(),
303            roles_by_name: Default::default(),
304            roles_by_id: Default::default(),
305            network_policies_by_id: Default::default(),
306            role_auth_by_id: Default::default(),
307            config: CatalogConfig {
308                start_time: Default::default(),
309                start_instant: Instant::now(),
310                nonce: Default::default(),
311                environment_id: EnvironmentId::for_tests(),
312                session_id: Default::default(),
313                build_info: &DUMMY_BUILD_INFO,
314                now: NOW_ZERO.clone(),
315                connection_context: ConnectionContext::for_tests(Arc::new(
316                    InMemorySecretsController::new(),
317                )),
318                helm_chart_version: None,
319            },
320            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
321            availability_zones: Default::default(),
322            system_configuration: Arc::new(SystemVars::default()),
323            egress_addresses: Default::default(),
324            aws_principal_context: Default::default(),
325            aws_privatelink_availability_zones: Default::default(),
326            http_host_name: Default::default(),
327            default_privileges: Arc::new(DefaultPrivileges::default()),
328            system_privileges: Arc::new(PrivilegeMap::default()),
329            comments: Arc::new(CommentsMap::default()),
330            source_references: Default::default(),
331            storage_metadata: Arc::new(StorageMetadata::default()),
332            license_key: ValidatedLicenseKey::for_tests(),
333            mock_authentication_nonce: Default::default(),
334        }
335    }
336
337    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
338        let search_path = self.resolve_search_path(session);
339        let database = self
340            .database_by_name
341            .get(session.vars().database())
342            .map(|id| id.clone());
343        let state = match session.transaction().catalog_state() {
344            Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
345            None => Cow::Borrowed(self),
346        };
347        ConnCatalog {
348            state,
349            unresolvable_ids: BTreeSet::new(),
350            conn_id: session.conn_id().clone(),
351            cluster: session.vars().cluster().into(),
352            database,
353            search_path,
354            role_id: session.current_role_id().clone(),
355            prepared_statements: Some(session.prepared_statements()),
356            portals: Some(session.portals()),
357            notices_tx: session.retain_notice_transmitter(),
358            restrict_to_user_objects: session.vars().restrict_to_user_objects(),
359        }
360    }
361
362    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
363        let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
364        let cluster = self.system_configuration.default_cluster();
365
366        ConnCatalog {
367            state: Cow::Borrowed(self),
368            unresolvable_ids: BTreeSet::new(),
369            conn_id: SYSTEM_CONN_ID.clone(),
370            cluster,
371            database: self
372                .resolve_database(DEFAULT_DATABASE_NAME)
373                .ok()
374                .map(|db| db.id()),
375            // Leaving the system's search path empty allows us to catch issues
376            // where catalog object names have not been normalized correctly.
377            search_path: Vec::new(),
378            role_id,
379            prepared_statements: None,
380            portals: None,
381            notices_tx,
382            restrict_to_user_objects: false,
383        }
384    }
385
386    pub fn for_system_session(&self) -> ConnCatalog<'_> {
387        self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
388    }
389
390    /// Returns an iterator over the deduplicated identifiers of all
391    /// objects this catalog entry transitively depends on (where
392    /// "depends on" is meant in the sense of [`CatalogItem::uses`], rather than
393    /// [`CatalogItem::references`]).
394    pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
395        struct I<'a> {
396            queue: VecDeque<CatalogItemId>,
397            seen: BTreeSet<CatalogItemId>,
398            this: &'a CatalogState,
399        }
400        impl<'a> Iterator for I<'a> {
401            type Item = CatalogItemId;
402            fn next(&mut self) -> Option<Self::Item> {
403                if let Some(next) = self.queue.pop_front() {
404                    for child in self.this.get_entry(&next).item().uses() {
405                        if !self.seen.contains(&child) {
406                            self.queue.push_back(child);
407                            self.seen.insert(child);
408                        }
409                    }
410                    Some(next)
411                } else {
412                    None
413                }
414            }
415        }
416
417        I {
418            queue: [id].into_iter().collect(),
419            seen: [id].into_iter().collect(),
420            this: self,
421        }
422    }
423
424    /// Computes the IDs of any log sources this catalog entry transitively
425    /// depends on.
426    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
427        let mut out = Vec::new();
428        self.introspection_dependencies_inner(id, &mut out);
429        out
430    }
431
432    fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
433        match self.get_entry(&id).item() {
434            CatalogItem::Log(_) => out.push(id),
435            item @ (CatalogItem::View(_)
436            | CatalogItem::MaterializedView(_)
437            | CatalogItem::Connection(_)) => {
438                // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
439                for item_id in item.references().items() {
440                    self.introspection_dependencies_inner(*item_id, out);
441                }
442            }
443            CatalogItem::Sink(sink) => {
444                let from_item_id = self.get_entry_by_global_id(&sink.from).id();
445                self.introspection_dependencies_inner(from_item_id, out)
446            }
447            CatalogItem::Index(idx) => {
448                let on_item_id = self.get_entry_by_global_id(&idx.on).id();
449                self.introspection_dependencies_inner(on_item_id, out)
450            }
451            CatalogItem::Table(_)
452            | CatalogItem::Source(_)
453            | CatalogItem::Type(_)
454            | CatalogItem::Func(_)
455            | CatalogItem::Secret(_) => (),
456        }
457    }
458
459    /// Returns all the IDs of all objects that depend on `ids`, including `ids` themselves.
460    ///
461    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
462    /// earlier in the list than the roots. This is particularly useful for the order to drop
463    /// objects.
464    pub(super) fn object_dependents(
465        &self,
466        object_ids: &Vec<ObjectId>,
467        conn_id: &ConnectionId,
468        seen: &mut BTreeSet<ObjectId>,
469    ) -> Vec<ObjectId> {
470        let mut dependents = Vec::new();
471        for object_id in object_ids {
472            match object_id {
473                ObjectId::Cluster(id) => {
474                    dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
475                }
476                ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
477                    &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
478                ),
479                ObjectId::Database(id) => {
480                    dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
481                }
482                ObjectId::Schema((database_spec, schema_spec)) => {
483                    dependents.extend_from_slice(&self.schema_dependents(
484                        database_spec.clone(),
485                        schema_spec.clone(),
486                        conn_id,
487                        seen,
488                    ));
489                }
490                ObjectId::NetworkPolicy(id) => {
491                    dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
492                }
493                id @ ObjectId::Role(_) => {
494                    let unseen = seen.insert(id.clone());
495                    if unseen {
496                        dependents.push(id.clone());
497                    }
498                }
499                ObjectId::Item(id) => {
500                    dependents.extend_from_slice(&self.item_dependents(*id, seen))
501                }
502            }
503        }
504        dependents
505    }
506
507    /// Returns all the IDs of all objects that depend on `cluster_id`, including `cluster_id`
508    /// itself.
509    ///
510    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
511    /// earlier in the list than the roots. This is particularly useful for the order to drop
512    /// objects.
513    fn cluster_dependents(
514        &self,
515        cluster_id: ClusterId,
516        seen: &mut BTreeSet<ObjectId>,
517    ) -> Vec<ObjectId> {
518        let mut dependents = Vec::new();
519        let object_id = ObjectId::Cluster(cluster_id);
520        if !seen.contains(&object_id) {
521            seen.insert(object_id.clone());
522            let cluster = self.get_cluster(cluster_id);
523            for item_id in cluster.bound_objects() {
524                dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
525            }
526            for replica_id in cluster.replica_ids().values() {
527                dependents.extend_from_slice(&self.cluster_replica_dependents(
528                    cluster_id,
529                    *replica_id,
530                    seen,
531                ));
532            }
533            dependents.push(object_id);
534        }
535        dependents
536    }
537
538    /// Returns all the IDs of all objects that depend on `replica_id`, including `replica_id`
539    /// itself.
540    ///
541    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
542    /// earlier in the list than the roots. This is particularly useful for the order to drop
543    /// objects.
544    pub(super) fn cluster_replica_dependents(
545        &self,
546        cluster_id: ClusterId,
547        replica_id: ReplicaId,
548        seen: &mut BTreeSet<ObjectId>,
549    ) -> Vec<ObjectId> {
550        let mut dependents = Vec::new();
551        let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
552        if !seen.contains(&object_id) {
553            seen.insert(object_id.clone());
554            // Materialized views that target this replica are implicitly
555            // dropped with it, so cascade to their dependents to avoid leaving
556            // dangling references.
557            let cluster = self.get_cluster(cluster_id);
558            for item_id in cluster.bound_objects() {
559                if let CatalogItem::MaterializedView(mv) = self.get_entry(item_id).item()
560                    && mv.target_replica == Some(replica_id)
561                {
562                    dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
563                }
564            }
565            dependents.push(object_id);
566        }
567        dependents
568    }
569
570    /// Returns all the IDs of all objects that depend on `database_id`, including `database_id`
571    /// itself.
572    ///
573    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
574    /// earlier in the list than the roots. This is particularly useful for the order to drop
575    /// objects.
576    fn database_dependents(
577        &self,
578        database_id: DatabaseId,
579        conn_id: &ConnectionId,
580        seen: &mut BTreeSet<ObjectId>,
581    ) -> Vec<ObjectId> {
582        let mut dependents = Vec::new();
583        let object_id = ObjectId::Database(database_id);
584        if !seen.contains(&object_id) {
585            seen.insert(object_id.clone());
586            let database = self.get_database(&database_id);
587            for schema_id in database.schema_ids().values() {
588                dependents.extend_from_slice(&self.schema_dependents(
589                    ResolvedDatabaseSpecifier::Id(database_id),
590                    SchemaSpecifier::Id(*schema_id),
591                    conn_id,
592                    seen,
593                ));
594            }
595            dependents.push(object_id);
596        }
597        dependents
598    }
599
600    /// Returns all the IDs of all objects that depend on `schema_id`, including `schema_id`
601    /// itself.
602    ///
603    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
604    /// earlier in the list than the roots. This is particularly useful for the order to drop
605    /// objects.
606    fn schema_dependents(
607        &self,
608        database_spec: ResolvedDatabaseSpecifier,
609        schema_spec: SchemaSpecifier,
610        conn_id: &ConnectionId,
611        seen: &mut BTreeSet<ObjectId>,
612    ) -> Vec<ObjectId> {
613        let mut dependents = Vec::new();
614        let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
615        if !seen.contains(&object_id) {
616            seen.insert(object_id.clone());
617            let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
618            for item_id in schema.item_ids() {
619                dependents.extend_from_slice(&self.item_dependents(item_id, seen));
620            }
621            dependents.push(object_id)
622        }
623        dependents
624    }
625
626    /// Returns all the IDs of all objects that depend on `item_id`, including `item_id`
627    /// itself.
628    ///
629    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
630    /// earlier in the list than the roots. This is particularly useful for the order to drop
631    /// objects.
632    pub(super) fn item_dependents(
633        &self,
634        item_id: CatalogItemId,
635        seen: &mut BTreeSet<ObjectId>,
636    ) -> Vec<ObjectId> {
637        let mut dependents = Vec::new();
638        let object_id = ObjectId::Item(item_id);
639        if !seen.contains(&object_id) {
640            seen.insert(object_id.clone());
641            let entry = self.get_entry(&item_id);
642            for dependent_id in entry.used_by() {
643                dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
644            }
645            dependents.push(object_id);
646            // We treat the progress collection as if it depends on the source
647            // for dropping. We have additional code in planning to create a
648            // kind of special-case "CASCADE" for this dependency.
649            if let Some(progress_id) = entry.progress_id() {
650                dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
651            }
652        }
653        dependents
654    }
655
656    /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id`
657    /// itself.
658    ///
659    /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear
660    /// earlier in the list than the roots. This is particularly useful for the order to drop
661    /// objects.
662    pub(super) fn network_policy_dependents(
663        &self,
664        network_policy_id: NetworkPolicyId,
665        _seen: &mut BTreeSet<ObjectId>,
666    ) -> Vec<ObjectId> {
667        let object_id = ObjectId::NetworkPolicy(network_policy_id);
668        // Currently network policies have no dependents
669        // when we add the ability for users or sources/sinks to have policies
670        // this method will need to be updated.
671        vec![object_id]
672    }
673
674    /// Indicates whether the indicated item is considered stable or not.
675    ///
676    /// Only stable items can be used as dependencies of other catalog items.
677    fn is_stable(&self, id: CatalogItemId) -> bool {
678        let spec = self.get_entry(&id).name().qualifiers.schema_spec;
679        !self.is_unstable_schema_specifier(spec)
680    }
681
682    pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
683        if self.system_config().unsafe_enable_unstable_dependencies() {
684            return Ok(());
685        }
686
687        let unstable_dependencies: Vec<_> = item
688            .references()
689            .items()
690            .filter(|id| !self.is_stable(**id))
691            .map(|id| self.get_entry(id).name().item.clone())
692            .collect();
693
694        // It's okay to create a temporary object with unstable
695        // dependencies, since we will never need to reboot a catalog
696        // that contains it.
697        if unstable_dependencies.is_empty() || item.is_temporary() {
698            Ok(())
699        } else {
700            let object_type = item.typ().to_string();
701            Err(Error {
702                kind: ErrorKind::UnstableDependency {
703                    object_type,
704                    unstable_dependencies,
705                },
706            })
707        }
708    }
709
710    pub fn resolve_full_name(
711        &self,
712        name: &QualifiedItemName,
713        conn_id: Option<&ConnectionId>,
714    ) -> FullItemName {
715        let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
716
717        let database = match &name.qualifiers.database_spec {
718            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
719            ResolvedDatabaseSpecifier::Id(id) => {
720                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
721            }
722        };
723        // For temporary schemas, we know the name is always MZ_TEMP_SCHEMA,
724        // and the schema may not exist yet if no temporary items have been created.
725        let schema = match &name.qualifiers.schema_spec {
726            SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
727            SchemaSpecifier::Id(_) => self
728                .get_schema(
729                    &name.qualifiers.database_spec,
730                    &name.qualifiers.schema_spec,
731                    conn_id,
732                )
733                .name()
734                .schema
735                .clone(),
736        };
737        FullItemName {
738            database,
739            schema,
740            item: name.item.clone(),
741        }
742    }
743
744    pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
745        let database = match &name.database {
746            ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
747            ResolvedDatabaseSpecifier::Id(id) => {
748                RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
749            }
750        };
751        FullSchemaName {
752            database,
753            schema: name.schema.clone(),
754        }
755    }
756
757    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
758        self.entry_by_id
759            .get(id)
760            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
761    }
762
763    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
764        let item_id = self
765            .entry_by_global_id
766            .get(id)
767            .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
768
769        let entry = self.get_entry(item_id).clone();
770        let version = match entry.item() {
771            CatalogItem::Table(table) => {
772                let (version, _) = table
773                    .collections
774                    .iter()
775                    .find(|(_verison, gid)| *gid == id)
776                    .expect("version to exist");
777                RelationVersionSelector::Specific(*version)
778            }
779            _ => RelationVersionSelector::Latest,
780        };
781        CatalogCollectionEntry { entry, version }
782    }
783
784    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
785        self.entry_by_id.iter()
786    }
787
788    pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
789        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
790        self.temporary_schemas
791            .get(conn)
792            .into_iter()
793            .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
794    }
795
796    /// Returns true if a temporary schema exists for the given connection.
797    ///
798    /// Temporary schemas are created lazily when the first temporary object is created
799    /// for a connection, so this may return false for connections that haven't created
800    /// any temporary objects.
801    pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
802        self.temporary_schemas.contains_key(conn)
803    }
804
805    /// Gets a type named `name` from exactly one of the system schemas.
806    ///
807    /// # Panics
808    /// - If `name` is not an entry in any system schema
809    /// - If more than one system schema has an entry named `name`.
810    pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
811        let mut res = None;
812        for schema_id in self.system_schema_ids() {
813            let schema = &self.ambient_schemas_by_id[&schema_id];
814            if let Some(global_id) = schema.types.get(name) {
815                match res {
816                    None => res = Some(self.get_entry(global_id)),
817                    Some(_) => panic!(
818                        "only call get_system_type on objects uniquely identifiable in one system schema"
819                    ),
820                }
821            }
822        }
823
824        res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
825    }
826
827    pub fn get_item_by_name(
828        &self,
829        name: &QualifiedItemName,
830        conn_id: &ConnectionId,
831    ) -> Option<&CatalogEntry> {
832        self.get_schema(
833            &name.qualifiers.database_spec,
834            &name.qualifiers.schema_spec,
835            conn_id,
836        )
837        .items
838        .get(&name.item)
839        .and_then(|id| self.try_get_entry(id))
840    }
841
842    pub fn get_type_by_name(
843        &self,
844        name: &QualifiedItemName,
845        conn_id: &ConnectionId,
846    ) -> Option<&CatalogEntry> {
847        self.get_schema(
848            &name.qualifiers.database_spec,
849            &name.qualifiers.schema_spec,
850            conn_id,
851        )
852        .types
853        .get(&name.item)
854        .and_then(|id| self.try_get_entry(id))
855    }
856
857    pub(super) fn find_available_name(
858        &self,
859        mut name: QualifiedItemName,
860        conn_id: &ConnectionId,
861    ) -> QualifiedItemName {
862        let mut i = 0;
863        let orig_item_name = name.item.clone();
864        while self.get_item_by_name(&name, conn_id).is_some() {
865            i += 1;
866            name.item = format!("{}{}", orig_item_name, i);
867        }
868        name
869    }
870
871    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
872        self.entry_by_id.get(id)
873    }
874
875    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
876        let item_id = self.entry_by_global_id.get(id)?;
877        self.try_get_entry(item_id)
878    }
879
880    /// Returns the [`RelationDesc`] for a [`GlobalId`], if the provided [`GlobalId`] refers to an
881    /// object that returns rows.
882    pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
883        let entry = self.try_get_entry_by_global_id(id)?;
884        let desc = match entry.item() {
885            CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
886            // TODO(alter_table): Support schema evolution on sources.
887            other => other.relation_desc(RelationVersionSelector::Latest)?,
888        };
889        Some(desc)
890    }
891
892    pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
893        self.try_get_cluster(cluster_id)
894            .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
895    }
896
897    pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
898        self.clusters_by_id.get(&cluster_id)
899    }
900
901    pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
902        self.roles_by_id.get(id)
903    }
904
905    pub fn get_role(&self, id: &RoleId) -> &Role {
906        self.roles_by_id.get(id).expect("catalog out of sync")
907    }
908
909    pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
910        self.roles_by_id.keys()
911    }
912
913    pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
914        self.roles_by_name
915            .get(role_name)
916            .map(|id| &self.roles_by_id[id])
917    }
918
919    pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
920        self.role_auth_by_id
921            .get(id)
922            .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
923    }
924
925    pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
926        self.role_auth_by_id.get(id)
927    }
928
929    pub(super) fn try_get_network_policy_by_name(
930        &self,
931        policy_name: &str,
932    ) -> Option<&NetworkPolicy> {
933        self.network_policies_by_name
934            .get(policy_name)
935            .map(|id| &self.network_policies_by_id[id])
936    }
937
938    pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
939        let mut membership = BTreeSet::new();
940        let mut queue = VecDeque::from(vec![id]);
941        while let Some(cur_id) = queue.pop_front() {
942            if !membership.contains(cur_id) {
943                membership.insert(cur_id.clone());
944                let role = self.get_role(cur_id);
945                soft_assert_no_log!(
946                    !role.membership().keys().contains(id),
947                    "circular membership exists in the catalog"
948                );
949                queue.extend(role.membership().keys());
950            }
951        }
952        membership.insert(RoleId::Public);
953        membership
954    }
955
956    pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
957        self.network_policies_by_id
958            .get(id)
959            .expect("catalog out of sync")
960    }
961
962    pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
963        self.network_policies_by_id.keys()
964    }
965
966    /// Returns the URL for POST-ing data to a webhook source, if `id` corresponds to a webhook
967    /// source.
968    ///
969    /// Note: Identifiers for the source, e.g. item name, are URL encoded.
970    pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
971        let entry = self.try_get_entry(id)?;
972        // Note: Webhook sources can never be created in the temporary schema, hence passing None.
973        let name = self.resolve_full_name(entry.name(), None);
974        let host_name = self
975            .http_host_name
976            .as_ref()
977            .map(|x| x.as_str())
978            .unwrap_or_else(|| "HOST");
979
980        let RawDatabaseSpecifier::Name(database) = name.database else {
981            return None;
982        };
983
984        let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
985        url.path_segments_mut()
986            .ok()?
987            .push(&database)
988            .push(&name.schema)
989            .push(&name.item);
990
991        Some(url)
992    }
993
994    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
995    ///
996    /// This function will temporarily enable all "enable_for_item_parsing" feature flags. See
997    /// [`CatalogState::with_enable_for_item_parsing`] for more details.
998    ///
999    /// NOTE: While this method takes a `&mut self`, all mutations are temporary and restored to
1000    /// their original state before the method returns.
1001    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1002        // DO NOT add any additional mutations to this method. It would be fairly surprising to the
1003        // caller if this method changed the state of the catalog.
1004        &mut self,
1005        create_sql: &str,
1006        force_if_exists_skip: bool,
1007    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1008        self.with_enable_for_item_parsing(|state| {
1009            let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1010            let pcx = Some(&pcx);
1011            let session_catalog = state.for_system_session();
1012
1013            let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1014            let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1015            let (plan, _sql_impl_ids) =
1016                mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1017
1018            Ok((plan, resolved_ids))
1019        })
1020    }
1021
1022    /// Parses the given SQL string into a pair of [`Plan`] and a [`ResolvedIds`].
1023    #[mz_ore::instrument]
1024    pub(crate) fn parse_plan(
1025        create_sql: &str,
1026        pcx: Option<&PlanContext>,
1027        catalog: &ConnCatalog,
1028    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1029        let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1030        let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1031        let (plan, _sql_impl_ids) =
1032            mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1033
1034        Ok((plan, resolved_ids))
1035    }
1036
1037    /// Parses the given SQL string into a pair of [`CatalogItem`].
1038    pub(crate) fn deserialize_item(
1039        &self,
1040        global_id: GlobalId,
1041        create_sql: &str,
1042        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1043        local_expression_cache: &mut LocalExpressionCache,
1044        previous_item: Option<CatalogItem>,
1045    ) -> Result<CatalogItem, AdapterError> {
1046        self.parse_item(
1047            global_id,
1048            create_sql,
1049            extra_versions,
1050            None,
1051            false,
1052            None,
1053            local_expression_cache,
1054            previous_item,
1055        )
1056    }
1057
1058    /// Parses the given SQL string into a `CatalogItem`.
1059    #[mz_ore::instrument]
1060    pub(crate) fn parse_item(
1061        &self,
1062        global_id: GlobalId,
1063        create_sql: &str,
1064        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1065        pcx: Option<&PlanContext>,
1066        is_retained_metrics_object: bool,
1067        custom_logical_compaction_window: Option<CompactionWindow>,
1068        local_expression_cache: &mut LocalExpressionCache,
1069        previous_item: Option<CatalogItem>,
1070    ) -> Result<CatalogItem, AdapterError> {
1071        let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1072        match self.parse_item_inner(
1073            global_id,
1074            create_sql,
1075            extra_versions,
1076            pcx,
1077            is_retained_metrics_object,
1078            custom_logical_compaction_window,
1079            cached_expr,
1080            previous_item,
1081        ) {
1082            Ok((item, uncached_expr)) => {
1083                if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1084                    local_expression_cache.insert_uncached_expression(
1085                        global_id,
1086                        uncached_expr,
1087                        optimizer_features,
1088                    );
1089                }
1090                Ok(item)
1091            }
1092            Err((err, cached_expr)) => {
1093                if let Some(local_expr) = cached_expr {
1094                    local_expression_cache.insert_cached_expression(global_id, local_expr);
1095                }
1096                Err(err)
1097            }
1098        }
1099    }
1100
1101    /// Parses the given SQL string into a `CatalogItem`, using `cached_expr` if it's Some.
1102    ///
1103    /// On success returns the `CatalogItem` and an optimized expression iff the expression was
1104    /// not cached.
1105    ///
1106    /// On failure returns an error and `cached_expr` so it can be used later.
1107    #[mz_ore::instrument]
1108    pub(crate) fn parse_item_inner(
1109        &self,
1110        global_id: GlobalId,
1111        create_sql: &str,
1112        extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1113        pcx: Option<&PlanContext>,
1114        is_retained_metrics_object: bool,
1115        custom_logical_compaction_window: Option<CompactionWindow>,
1116        cached_expr: Option<LocalExpressions>,
1117        previous_item: Option<CatalogItem>,
1118    ) -> Result<
1119        (
1120            CatalogItem,
1121            Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1122        ),
1123        (AdapterError, Option<LocalExpressions>),
1124    > {
1125        let session_catalog = self.for_system_session();
1126
1127        let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1128            Ok((plan, resolved_ids)) => (plan, resolved_ids),
1129            Err(err) => return Err((err, cached_expr)),
1130        };
1131
1132        let mut uncached_expr = None;
1133
1134        // Carry over the plans (`optimized_plan`, `physical_plan`,
1135        // `dataflow_metainfo`) from the previous incarnation of this item when
1136        // re-parsing an existing item (e.g. after a RENAME). These fields live
1137        // on the `CatalogItem` since #35834, but are not reconstructable from
1138        // `create_sql` alone — they are populated by the sequencer `_finish`
1139        // paths at create time, and by the expression-cache / bootstrap
1140        // rendering path on boot. If we don't preserve them here, a RENAME
1141        // silently drops the plans and dataflow metainfo for the affected
1142        // MV/Index/CT.
1143        let previous_plans = previous_item.as_ref().map(|item| {
1144            (
1145                item.optimized_plan().cloned(),
1146                item.physical_plan().cloned(),
1147                item.dataflow_metainfo().cloned(),
1148            )
1149        });
1150
1151        let mut item = match plan {
1152            Plan::CreateTable(CreateTablePlan { table, .. }) => {
1153                let collections = extra_versions
1154                    .iter()
1155                    .map(|(version, gid)| (*version, *gid))
1156                    .chain([(RelationVersion::root(), global_id)].into_iter())
1157                    .collect();
1158
1159                CatalogItem::Table(Table {
1160                    create_sql: Some(table.create_sql),
1161                    desc: table.desc,
1162                    collections,
1163                    conn_id: None,
1164                    resolved_ids,
1165                    custom_logical_compaction_window: custom_logical_compaction_window
1166                        .or(table.compaction_window),
1167                    is_retained_metrics_object,
1168                    data_source: match table.data_source {
1169                        mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1170                            TableDataSource::TableWrites { defaults }
1171                        }
1172                        mz_sql::plan::TableDataSource::DataSource {
1173                            desc: data_source_desc,
1174                            timeline,
1175                        } => match data_source_desc {
1176                            mz_sql::plan::DataSourceDesc::IngestionExport {
1177                                ingestion_id,
1178                                external_reference,
1179                                details,
1180                                data_config,
1181                            } => TableDataSource::DataSource {
1182                                desc: DataSourceDesc::IngestionExport {
1183                                    ingestion_id,
1184                                    external_reference,
1185                                    details,
1186                                    data_config,
1187                                },
1188                                timeline,
1189                            },
1190                            mz_sql::plan::DataSourceDesc::Webhook {
1191                                validate_using,
1192                                body_format,
1193                                headers,
1194                                cluster_id,
1195                            } => TableDataSource::DataSource {
1196                                desc: DataSourceDesc::Webhook {
1197                                    validate_using,
1198                                    body_format,
1199                                    headers,
1200                                    cluster_id: cluster_id
1201                                        .expect("Webhook Tables must have a cluster_id set"),
1202                                },
1203                                timeline,
1204                            },
1205                            _ => {
1206                                return Err((
1207                                    AdapterError::Unstructured(anyhow::anyhow!(
1208                                        "unsupported data source for table"
1209                                    )),
1210                                    cached_expr,
1211                                ));
1212                            }
1213                        },
1214                    },
1215                })
1216            }
1217            Plan::CreateSource(CreateSourcePlan {
1218                source,
1219                timeline,
1220                in_cluster,
1221                ..
1222            }) => CatalogItem::Source(Source {
1223                create_sql: Some(source.create_sql),
1224                data_source: match source.data_source {
1225                    mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1226                        desc,
1227                        cluster_id: match in_cluster {
1228                            Some(id) => id,
1229                            None => {
1230                                return Err((
1231                                    AdapterError::Unstructured(anyhow::anyhow!(
1232                                        "ingestion-based sources must have cluster specified"
1233                                    )),
1234                                    cached_expr,
1235                                ));
1236                            }
1237                        },
1238                    },
1239                    mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1240                        desc,
1241                        progress_subsource,
1242                        data_config,
1243                        details,
1244                    } => DataSourceDesc::OldSyntaxIngestion {
1245                        desc,
1246                        progress_subsource,
1247                        data_config,
1248                        details,
1249                        cluster_id: match in_cluster {
1250                            Some(id) => id,
1251                            None => {
1252                                return Err((
1253                                    AdapterError::Unstructured(anyhow::anyhow!(
1254                                        "ingestion-based sources must have cluster specified"
1255                                    )),
1256                                    cached_expr,
1257                                ));
1258                            }
1259                        },
1260                    },
1261                    mz_sql::plan::DataSourceDesc::IngestionExport {
1262                        ingestion_id,
1263                        external_reference,
1264                        details,
1265                        data_config,
1266                    } => DataSourceDesc::IngestionExport {
1267                        ingestion_id,
1268                        external_reference,
1269                        details,
1270                        data_config,
1271                    },
1272                    mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1273                    mz_sql::plan::DataSourceDesc::Webhook {
1274                        validate_using,
1275                        body_format,
1276                        headers,
1277                        cluster_id,
1278                    } => {
1279                        mz_ore::soft_assert_or_log!(
1280                            cluster_id.is_none(),
1281                            "cluster_id set at Source level for Webhooks"
1282                        );
1283                        DataSourceDesc::Webhook {
1284                            validate_using,
1285                            body_format,
1286                            headers,
1287                            cluster_id: in_cluster
1288                                .expect("webhook sources must use an existing cluster"),
1289                        }
1290                    }
1291                },
1292                desc: source.desc,
1293                global_id,
1294                timeline,
1295                resolved_ids,
1296                custom_logical_compaction_window: source
1297                    .compaction_window
1298                    .or(custom_logical_compaction_window),
1299                is_retained_metrics_object,
1300            }),
1301            Plan::CreateView(CreateViewPlan { view, .. }) => {
1302                // Collect optimizer parameters.
1303                let optimizer_config =
1304                    optimize::OptimizerConfig::from(session_catalog.system_vars());
1305                let previous_exprs = previous_item.map(|item| match item {
1306                    CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1307                    _ => None,
1308                });
1309
1310                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1311                    (Some(local_expr), _)
1312                        if local_expr.optimizer_features == optimizer_config.features =>
1313                    {
1314                        debug!("local expression cache hit for {global_id:?}");
1315                        (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1316                    }
1317                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1318                    (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1319                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1320                    }
1321                    (cached_expr, _) => {
1322                        let optimizer_features = optimizer_config.features.clone();
1323                        // Build an optimizer for this VIEW.
1324                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1325
1326                        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
1327                        let raw_expr = view.expr;
1328                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1329                            Ok(optimzed_expr) => optimzed_expr,
1330                            Err(err) => return Err((err.into(), cached_expr)),
1331                        };
1332
1333                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1334
1335                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1336                    }
1337                };
1338
1339                // Resolve all item dependencies from the HIR expression.
1340                let dependencies: BTreeSet<_> = raw_expr
1341                    .depends_on()
1342                    .into_iter()
1343                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1344                    .collect();
1345
1346                let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1347                CatalogItem::View(View {
1348                    create_sql: view.create_sql,
1349                    global_id,
1350                    raw_expr,
1351                    desc: RelationDesc::new(typ, view.column_names),
1352                    locally_optimized_expr: optimized_expr,
1353                    conn_id: None,
1354                    resolved_ids,
1355                    dependencies: DependencyIds(dependencies),
1356                })
1357            }
1358            Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1359                materialized_view, ..
1360            }) => {
1361                let collections = extra_versions
1362                    .iter()
1363                    .map(|(version, gid)| (*version, *gid))
1364                    .chain([(RelationVersion::root(), global_id)].into_iter())
1365                    .collect();
1366
1367                // Collect optimizer parameters.
1368                let system_vars = session_catalog.system_vars();
1369                let overrides = self
1370                    .get_cluster(materialized_view.cluster_id)
1371                    .config
1372                    .features();
1373                let optimizer_config =
1374                    optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1375                let previous_exprs = previous_item.map(|item| match item {
1376                    CatalogItem::MaterializedView(materialized_view) => (
1377                        materialized_view.raw_expr,
1378                        materialized_view.locally_optimized_expr,
1379                    ),
1380                    item => unreachable!("expected materialized view, found: {item:#?}"),
1381                });
1382
1383                let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1384                    (Some(local_expr), _)
1385                        if local_expr.optimizer_features == optimizer_config.features =>
1386                    {
1387                        debug!("local expression cache hit for {global_id:?}");
1388                        (
1389                            Arc::new(materialized_view.expr),
1390                            Arc::new(local_expr.local_mir),
1391                        )
1392                    }
1393                    // If the new expr is equivalent to the old expr, then we don't need to re-optimize.
1394                    (_, Some((raw_expr, optimized_expr)))
1395                        if *raw_expr == materialized_view.expr =>
1396                    {
1397                        (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1398                    }
1399                    (cached_expr, _) => {
1400                        let optimizer_features = optimizer_config.features.clone();
1401                        // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1402                        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1403
1404                        let raw_expr = materialized_view.expr;
1405                        let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1406                            Ok(optimized_expr) => optimized_expr,
1407                            Err(err) => return Err((err.into(), cached_expr)),
1408                        };
1409
1410                        uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1411
1412                        (Arc::new(raw_expr), Arc::new(optimized_expr))
1413                    }
1414                };
1415                let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1416
1417                for &i in &materialized_view.non_null_assertions {
1418                    typ.column_types[i].nullable = false;
1419                }
1420                let desc = RelationDesc::new(typ, materialized_view.column_names);
1421                let desc = VersionedRelationDesc::new(desc);
1422
1423                let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1424
1425                // Resolve all item dependencies from the HIR expression.
1426                let dependencies = raw_expr
1427                    .depends_on()
1428                    .into_iter()
1429                    .map(|gid| self.get_entry_by_global_id(&gid).id())
1430                    .collect();
1431
1432                CatalogItem::MaterializedView(MaterializedView {
1433                    create_sql: materialized_view.create_sql,
1434                    collections,
1435                    raw_expr,
1436                    locally_optimized_expr: optimized_expr,
1437                    desc,
1438                    resolved_ids,
1439                    dependencies,
1440                    replacement_target: materialized_view.replacement_target,
1441                    cluster_id: materialized_view.cluster_id,
1442                    target_replica: materialized_view.target_replica,
1443                    non_null_assertions: materialized_view.non_null_assertions,
1444                    custom_logical_compaction_window: materialized_view.compaction_window,
1445                    refresh_schedule: materialized_view.refresh_schedule,
1446                    initial_as_of,
1447                    optimized_plan: None,
1448                    physical_plan: None,
1449                    dataflow_metainfo: None,
1450                })
1451            }
1452            Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1453                create_sql: index.create_sql,
1454                global_id,
1455                on: index.on,
1456                keys: index.keys.into(),
1457                conn_id: None,
1458                resolved_ids,
1459                cluster_id: index.cluster_id,
1460                custom_logical_compaction_window: custom_logical_compaction_window
1461                    .or(index.compaction_window),
1462                is_retained_metrics_object,
1463                optimized_plan: None,
1464                physical_plan: None,
1465                dataflow_metainfo: None,
1466            }),
1467            Plan::CreateSink(CreateSinkPlan {
1468                sink,
1469                with_snapshot,
1470                in_cluster,
1471                ..
1472            }) => CatalogItem::Sink(Sink {
1473                create_sql: sink.create_sql,
1474                global_id,
1475                from: sink.from,
1476                connection: sink.connection,
1477                envelope: sink.envelope,
1478                version: sink.version,
1479                with_snapshot,
1480                resolved_ids,
1481                cluster_id: in_cluster,
1482                commit_interval: sink.commit_interval,
1483            }),
1484            Plan::CreateType(CreateTypePlan { typ, .. }) => {
1485                // Even if we don't need the `RelationDesc` here, error out
1486                // early and eagerly, as a kind of soft assertion that we _can_
1487                // build the `RelationDesc` when needed.
1488                if let Err(err) = typ.inner.desc(&session_catalog) {
1489                    return Err((err.into(), cached_expr));
1490                }
1491                CatalogItem::Type(Type {
1492                    create_sql: Some(typ.create_sql),
1493                    global_id,
1494                    details: CatalogTypeDetails {
1495                        array_id: None,
1496                        typ: typ.inner,
1497                        pg_metadata: None,
1498                    },
1499                    resolved_ids,
1500                })
1501            }
1502            Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1503                create_sql: secret.create_sql,
1504                global_id,
1505            }),
1506            Plan::CreateConnection(CreateConnectionPlan {
1507                connection:
1508                    mz_sql::plan::Connection {
1509                        create_sql,
1510                        details,
1511                    },
1512                ..
1513            }) => CatalogItem::Connection(Connection {
1514                create_sql,
1515                global_id,
1516                details,
1517                resolved_ids,
1518            }),
1519            _ => {
1520                return Err((
1521                    Error::new(ErrorKind::Corruption {
1522                        detail: "catalog entry generated inappropriate plan".to_string(),
1523                    })
1524                    .into(),
1525                    cached_expr,
1526                ));
1527            }
1528        };
1529
1530        // Carry over the plans (`optimized_plan`, `physical_plan`,
1531        // `dataflow_metainfo`) from the previous incarnation of this item, if
1532        // any. See the comment on `previous_plans` above.
1533        if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1534            if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1535            {
1536                *optimized_plan = prev_optimized;
1537                *physical_plan = prev_physical;
1538                *dataflow_metainfo = prev_metainfo;
1539            }
1540        }
1541
1542        Ok((item, uncached_expr))
1543    }
1544
1545    /// Execute function `f` on `self`, with all "enable_for_item_parsing" feature flags enabled.
1546    /// Calling this method will not permanently modify any system configuration variables.
1547    ///
1548    /// WARNING:
1549    /// Any modifications made to the system configuration variables in `f`, will be lost.
1550    pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1551        // Enable catalog features that might be required during planning existing
1552        // catalog items. Existing catalog items might have been created while
1553        // a specific feature flag was turned on, so we need to ensure that this
1554        // is also the case during catalog rehydration in order to avoid panics.
1555        //
1556        // WARNING / CONTRACT:
1557        // 1. Features used in this method that related to parsing / planning
1558        //    should be `enable_for_item_parsing` set to `true`.
1559        // 2. After this step, feature flag configuration must not be
1560        //    overridden.
1561        let restore = Arc::clone(&self.system_configuration);
1562        Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1563        let res = f(self);
1564        self.system_configuration = restore;
1565        res
1566    }
1567
1568    /// Returns all indexes on the given object and cluster known in the catalog.
1569    pub fn get_indexes_on(
1570        &self,
1571        id: GlobalId,
1572        cluster: ClusterId,
1573    ) -> impl Iterator<Item = (GlobalId, &Index)> {
1574        let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1575
1576        self.try_get_entry_by_global_id(&id)
1577            .into_iter()
1578            .map(move |e| {
1579                e.used_by()
1580                    .iter()
1581                    .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1582                        CatalogItem::Index(index) if index_matches(index) => {
1583                            Some((index.global_id(), index))
1584                        }
1585                        _ => None,
1586                    })
1587            })
1588            .flatten()
1589    }
1590
1591    pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1592        &self.database_by_id[database_id]
1593    }
1594
1595    /// Gets a reference to the specified replica of the specified cluster.
1596    ///
1597    /// Returns `None` if either the cluster or the replica does not
1598    /// exist.
1599    pub(super) fn try_get_cluster_replica(
1600        &self,
1601        id: ClusterId,
1602        replica_id: ReplicaId,
1603    ) -> Option<&ClusterReplica> {
1604        self.try_get_cluster(id)
1605            .and_then(|cluster| cluster.replica(replica_id))
1606    }
1607
1608    /// Gets a reference to the specified replica of the specified cluster.
1609    ///
1610    /// Panics if either the cluster or the replica does not exist.
1611    pub(crate) fn get_cluster_replica(
1612        &self,
1613        cluster_id: ClusterId,
1614        replica_id: ReplicaId,
1615    ) -> &ClusterReplica {
1616        self.try_get_cluster_replica(cluster_id, replica_id)
1617            .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1618    }
1619
1620    pub(super) fn resolve_replica_in_cluster(
1621        &self,
1622        cluster_id: &ClusterId,
1623        replica_name: &str,
1624    ) -> Result<&ClusterReplica, SqlCatalogError> {
1625        let cluster = self.get_cluster(*cluster_id);
1626        let replica_id = cluster
1627            .replica_id_by_name_
1628            .get(replica_name)
1629            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1630        Ok(&cluster.replicas_by_id_[replica_id])
1631    }
1632
1633    /// Get system configuration `name`.
1634    pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1635        Ok(self.system_configuration.get(name)?)
1636    }
1637
1638    /// Parse system configuration `name` with `value` int.
1639    ///
1640    /// Returns the parsed value as a string.
1641    pub(super) fn parse_system_configuration(
1642        &self,
1643        name: &str,
1644        value: VarInput,
1645    ) -> Result<String, Error> {
1646        let value = self.system_configuration.parse(name, value)?;
1647        Ok(value.format())
1648    }
1649
1650    /// Gets the schema map for the database matching `database_spec`.
1651    pub(super) fn resolve_schema_in_database(
1652        &self,
1653        database_spec: &ResolvedDatabaseSpecifier,
1654        schema_name: &str,
1655        conn_id: &ConnectionId,
1656    ) -> Result<&Schema, SqlCatalogError> {
1657        let schema = match database_spec {
1658            ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1659                self.temporary_schemas.get(conn_id)
1660            }
1661            ResolvedDatabaseSpecifier::Ambient => self
1662                .ambient_schemas_by_name
1663                .get(schema_name)
1664                .and_then(|id| self.ambient_schemas_by_id.get(id)),
1665            ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1666                db.schemas_by_name
1667                    .get(schema_name)
1668                    .and_then(|id| db.schemas_by_id.get(id))
1669            }),
1670        };
1671        schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1672    }
1673
1674    /// Try to get a schema, returning `None` if it doesn't exist.
1675    ///
1676    /// For temporary schemas, returns `None` if the schema hasn't been created yet
1677    /// (temporary schemas are created lazily when the first temporary object is created).
1678    pub fn try_get_schema(
1679        &self,
1680        database_spec: &ResolvedDatabaseSpecifier,
1681        schema_spec: &SchemaSpecifier,
1682        conn_id: &ConnectionId,
1683    ) -> Option<&Schema> {
1684        // Keep in sync with `get_schema` and `get_schemas_mut`
1685        match (database_spec, schema_spec) {
1686            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1687                self.temporary_schemas.get(conn_id)
1688            }
1689            (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1690                self.ambient_schemas_by_id.get(id)
1691            }
1692            (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1693                .database_by_id
1694                .get(database_id)
1695                .and_then(|db| db.schemas_by_id.get(schema_id)),
1696            (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1697                unreachable!("temporary schemas are in the ambient database")
1698            }
1699        }
1700    }
1701
1702    pub fn get_schema(
1703        &self,
1704        database_spec: &ResolvedDatabaseSpecifier,
1705        schema_spec: &SchemaSpecifier,
1706        conn_id: &ConnectionId,
1707    ) -> &Schema {
1708        // Keep in sync with `try_get_schema` and `get_schemas_mut`
1709        self.try_get_schema(database_spec, schema_spec, conn_id)
1710            .expect("schema must exist")
1711    }
1712
1713    pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1714        self.database_by_id
1715            .values()
1716            .filter_map(|database| database.schemas_by_id.get(schema_id))
1717            .chain(self.ambient_schemas_by_id.values())
1718            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1719            .into_first()
1720    }
1721
1722    pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1723        self.temporary_schemas
1724            .values()
1725            .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1726            .into_first()
1727    }
1728
1729    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1730        self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1731    }
1732
1733    pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1734        self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1735    }
1736
1737    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1738        self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1739    }
1740
1741    pub fn get_information_schema_id(&self) -> SchemaId {
1742        self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1743    }
1744
1745    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1746        self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1747    }
1748
1749    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1750        self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1751    }
1752
1753    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1754        self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1755    }
1756
1757    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1758        SYSTEM_SCHEMAS
1759            .iter()
1760            .map(|name| self.ambient_schemas_by_name[*name])
1761    }
1762
1763    pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1764        self.system_schema_ids().contains(&id)
1765    }
1766
1767    pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1768        match spec {
1769            SchemaSpecifier::Temporary => false,
1770            SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1771        }
1772    }
1773
1774    pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1775        UNSTABLE_SCHEMAS
1776            .iter()
1777            .map(|name| self.ambient_schemas_by_name[*name])
1778    }
1779
1780    pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1781        self.unstable_schema_ids().contains(&id)
1782    }
1783
1784    pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1785        match spec {
1786            SchemaSpecifier::Temporary => false,
1787            SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1788        }
1789    }
1790
1791    /// Creates a new schema in the `Catalog` for temporary items
1792    /// indicated by the TEMPORARY or TEMP keywords.
1793    pub fn create_temporary_schema(
1794        &mut self,
1795        conn_id: &ConnectionId,
1796        owner_id: RoleId,
1797    ) -> Result<(), Error> {
1798        // Temporary schema OIDs are never used, and it's therefore wasteful to go to the durable
1799        // catalog to allocate a new OID for every temporary schema. Instead, we give them all the
1800        // same invalid OID. This matches the semantics of temporary schema `GlobalId`s which are
1801        // all -1.
1802        let oid = INVALID_OID;
1803        self.temporary_schemas.insert(
1804            conn_id.clone(),
1805            Schema {
1806                name: QualifiedSchemaName {
1807                    database: ResolvedDatabaseSpecifier::Ambient,
1808                    schema: MZ_TEMP_SCHEMA.into(),
1809                },
1810                id: SchemaSpecifier::Temporary,
1811                oid,
1812                items: BTreeMap::new(),
1813                functions: BTreeMap::new(),
1814                types: BTreeMap::new(),
1815                owner_id,
1816                privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1817                    mz_sql::catalog::ObjectType::Schema,
1818                    owner_id,
1819                )]),
1820            },
1821        );
1822        Ok(())
1823    }
1824
1825    /// Return all OIDs that are allocated to temporary objects.
1826    pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1827        std::iter::empty()
1828            .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1829                if schema.id.is_temporary() {
1830                    Some(schema.oid)
1831                } else {
1832                    None
1833                }
1834            }))
1835            .chain(self.entry_by_id.values().filter_map(|entry| {
1836                if entry.item().is_temporary() {
1837                    Some(entry.oid)
1838                } else {
1839                    None
1840                }
1841            }))
1842    }
1843
1844    /// Optimized lookup for a builtin table.
1845    ///
1846    /// Panics if the builtin table doesn't exist in the catalog.
1847    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1848        self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1849    }
1850
1851    /// Optimized lookup for a builtin log.
1852    ///
1853    /// Panics if the builtin log doesn't exist in the catalog.
1854    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1855        let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1856        let log = match self.get_entry(&item_id).item() {
1857            CatalogItem::Log(log) => log,
1858            other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1859        };
1860        (item_id, log.global_id)
1861    }
1862
1863    /// Optimized lookup for a builtin storage collection.
1864    ///
1865    /// Panics if the builtin storage collection doesn't exist in the catalog.
1866    pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1867        self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1868    }
1869
1870    /// Optimized lookup for a builtin object.
1871    ///
1872    /// Panics if the builtin object doesn't exist in the catalog.
1873    pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1874        let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1875        let schema = &self.ambient_schemas_by_id[schema_id];
1876        match builtin.catalog_item_type() {
1877            CatalogItemType::Type => schema.types[builtin.name()],
1878            CatalogItemType::Func => schema.functions[builtin.name()],
1879            CatalogItemType::Table
1880            | CatalogItemType::Source
1881            | CatalogItemType::Sink
1882            | CatalogItemType::View
1883            | CatalogItemType::MaterializedView
1884            | CatalogItemType::Index
1885            | CatalogItemType::Secret
1886            | CatalogItemType::Connection => schema.items[builtin.name()],
1887        }
1888    }
1889
1890    /// Resolve a [`BuiltinType<NameReference>`] to a [`BuiltinType<IdReference>`].
1891    pub fn resolve_builtin_type_references(
1892        &self,
1893        builtin: &BuiltinType<NameReference>,
1894    ) -> BuiltinType<IdReference> {
1895        let typ: CatalogType<IdReference> = match &builtin.details.typ {
1896            CatalogType::AclItem => CatalogType::AclItem,
1897            CatalogType::Array { element_reference } => CatalogType::Array {
1898                element_reference: self.get_system_type(element_reference).id,
1899            },
1900            CatalogType::List {
1901                element_reference,
1902                element_modifiers,
1903            } => CatalogType::List {
1904                element_reference: self.get_system_type(element_reference).id,
1905                element_modifiers: element_modifiers.clone(),
1906            },
1907            CatalogType::Map {
1908                key_reference,
1909                value_reference,
1910                key_modifiers,
1911                value_modifiers,
1912            } => CatalogType::Map {
1913                key_reference: self.get_system_type(key_reference).id,
1914                value_reference: self.get_system_type(value_reference).id,
1915                key_modifiers: key_modifiers.clone(),
1916                value_modifiers: value_modifiers.clone(),
1917            },
1918            CatalogType::Range { element_reference } => CatalogType::Range {
1919                element_reference: self.get_system_type(element_reference).id,
1920            },
1921            CatalogType::Record { fields } => CatalogType::Record {
1922                fields: fields
1923                    .into_iter()
1924                    .map(|f| CatalogRecordField {
1925                        name: f.name.clone(),
1926                        type_reference: self.get_system_type(f.type_reference).id,
1927                        type_modifiers: f.type_modifiers.clone(),
1928                    })
1929                    .collect(),
1930            },
1931            CatalogType::Bool => CatalogType::Bool,
1932            CatalogType::Bytes => CatalogType::Bytes,
1933            CatalogType::Char => CatalogType::Char,
1934            CatalogType::Date => CatalogType::Date,
1935            CatalogType::Float32 => CatalogType::Float32,
1936            CatalogType::Float64 => CatalogType::Float64,
1937            CatalogType::Int16 => CatalogType::Int16,
1938            CatalogType::Int32 => CatalogType::Int32,
1939            CatalogType::Int64 => CatalogType::Int64,
1940            CatalogType::UInt16 => CatalogType::UInt16,
1941            CatalogType::UInt32 => CatalogType::UInt32,
1942            CatalogType::UInt64 => CatalogType::UInt64,
1943            CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1944            CatalogType::Interval => CatalogType::Interval,
1945            CatalogType::Jsonb => CatalogType::Jsonb,
1946            CatalogType::Numeric => CatalogType::Numeric,
1947            CatalogType::Oid => CatalogType::Oid,
1948            CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1949            CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1950            CatalogType::Pseudo => CatalogType::Pseudo,
1951            CatalogType::RegClass => CatalogType::RegClass,
1952            CatalogType::RegProc => CatalogType::RegProc,
1953            CatalogType::RegType => CatalogType::RegType,
1954            CatalogType::String => CatalogType::String,
1955            CatalogType::Time => CatalogType::Time,
1956            CatalogType::Timestamp => CatalogType::Timestamp,
1957            CatalogType::TimestampTz => CatalogType::TimestampTz,
1958            CatalogType::Uuid => CatalogType::Uuid,
1959            CatalogType::VarChar => CatalogType::VarChar,
1960            CatalogType::Int2Vector => CatalogType::Int2Vector,
1961            CatalogType::MzAclItem => CatalogType::MzAclItem,
1962        };
1963
1964        BuiltinType {
1965            name: builtin.name,
1966            schema: builtin.schema,
1967            oid: builtin.oid,
1968            details: CatalogTypeDetails {
1969                array_id: builtin.details.array_id,
1970                typ,
1971                pg_metadata: builtin.details.pg_metadata.clone(),
1972            },
1973        }
1974    }
1975
1976    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1977        &self.config
1978    }
1979
1980    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1981        match self.database_by_name.get(database_name) {
1982            Some(id) => Ok(&self.database_by_id[id]),
1983            None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1984        }
1985    }
1986
1987    pub fn resolve_schema(
1988        &self,
1989        current_database: Option<&DatabaseId>,
1990        database_name: Option<&str>,
1991        schema_name: &str,
1992        conn_id: &ConnectionId,
1993    ) -> Result<&Schema, SqlCatalogError> {
1994        let database_spec = match database_name {
1995            // If a database is explicitly specified, validate it. Note that we
1996            // intentionally do not validate `current_database` to permit
1997            // querying `mz_catalog` with an invalid session database, e.g., so
1998            // that you can run `SHOW DATABASES` to *find* a valid database.
1999            Some(database) => Some(ResolvedDatabaseSpecifier::Id(
2000                self.resolve_database(database)?.id().clone(),
2001            )),
2002            None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2003        };
2004
2005        // First try to find the schema in the named database.
2006        if let Some(database_spec) = database_spec {
2007            if let Ok(schema) =
2008                self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2009            {
2010                return Ok(schema);
2011            }
2012        }
2013
2014        // Then fall back to the ambient database.
2015        if let Ok(schema) = self.resolve_schema_in_database(
2016            &ResolvedDatabaseSpecifier::Ambient,
2017            schema_name,
2018            conn_id,
2019        ) {
2020            return Ok(schema);
2021        }
2022
2023        Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2024    }
2025
2026    /// Optimized lookup for a system schema.
2027    ///
2028    /// Panics if the system schema doesn't exist in the catalog.
2029    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2030        self.ambient_schemas_by_name[name]
2031    }
2032
2033    pub fn resolve_search_path(
2034        &self,
2035        session: &dyn SessionMetadata,
2036    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2037        let database = self
2038            .database_by_name
2039            .get(session.database())
2040            .map(|id| id.clone());
2041
2042        session
2043            .search_path()
2044            .iter()
2045            .map(|schema| {
2046                self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2047            })
2048            .filter_map(|schema| schema.ok())
2049            .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2050            .collect()
2051    }
2052
2053    pub fn effective_search_path(
2054        &self,
2055        search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2056        include_temp_schema: bool,
2057    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2058        let mut v = Vec::with_capacity(search_path.len() + 3);
2059        // Temp schema is only included for relations and data types, not for functions and operators
2060        let temp_schema = (
2061            ResolvedDatabaseSpecifier::Ambient,
2062            SchemaSpecifier::Temporary,
2063        );
2064        if include_temp_schema && !search_path.contains(&temp_schema) {
2065            v.push(temp_schema);
2066        }
2067        let default_schemas = [
2068            (
2069                ResolvedDatabaseSpecifier::Ambient,
2070                SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2071            ),
2072            (
2073                ResolvedDatabaseSpecifier::Ambient,
2074                SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2075            ),
2076        ];
2077        for schema in default_schemas.into_iter() {
2078            if !search_path.contains(&schema) {
2079                v.push(schema);
2080            }
2081        }
2082        v.extend_from_slice(search_path);
2083        v
2084    }
2085
2086    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2087        let id = self
2088            .clusters_by_name
2089            .get(name)
2090            .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2091        Ok(&self.clusters_by_id[id])
2092    }
2093
2094    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2095        let id = self
2096            .clusters_by_name
2097            .get(cluster.name)
2098            .expect("failed to lookup BuiltinCluster by name");
2099        self.clusters_by_id
2100            .get(id)
2101            .expect("failed to lookup BuiltinCluster by ID")
2102    }
2103
2104    pub fn resolve_cluster_replica(
2105        &self,
2106        cluster_replica_name: &QualifiedReplica,
2107    ) -> Result<&ClusterReplica, SqlCatalogError> {
2108        let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2109        let replica_name = cluster_replica_name.replica.as_str();
2110        let replica_id = cluster
2111            .replica_id(replica_name)
2112            .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2113        Ok(cluster.replica(replica_id).expect("Must exist"))
2114    }
2115
2116    /// Resolves [`PartialItemName`] into a [`CatalogEntry`].
2117    ///
2118    /// If `name` does not specify a database, the `current_database` is used.
2119    /// If `name` does not specify a schema, then the schemas in `search_path`
2120    /// are searched in order.
2121    #[allow(clippy::useless_let_if_seq)]
2122    pub fn resolve(
2123        &self,
2124        get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2125        current_database: Option<&DatabaseId>,
2126        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2127        name: &PartialItemName,
2128        conn_id: &ConnectionId,
2129        err_gen: fn(String) -> SqlCatalogError,
2130    ) -> Result<&CatalogEntry, SqlCatalogError> {
2131        // If a schema name was specified, just try to find the item in that
2132        // schema. If no schema was specified, try to find the item in the connection's
2133        // temporary schema. If the item is not found, try to find the item in every
2134        // schema in the search path.
2135        let schemas = match &name.schema {
2136            Some(schema_name) => {
2137                match self.resolve_schema(
2138                    current_database,
2139                    name.database.as_deref(),
2140                    schema_name,
2141                    conn_id,
2142                ) {
2143                    Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2144                    Err(e) => return Err(e),
2145                }
2146            }
2147            None => match self
2148                .try_get_schema(
2149                    &ResolvedDatabaseSpecifier::Ambient,
2150                    &SchemaSpecifier::Temporary,
2151                    conn_id,
2152                )
2153                .and_then(|schema| schema.items.get(&name.item))
2154            {
2155                Some(id) => return Ok(self.get_entry(id)),
2156                None => search_path.to_vec(),
2157            },
2158        };
2159
2160        for (database_spec, schema_spec) in &schemas {
2161            // Use try_get_schema because the temp schema might not exist yet
2162            // (it's created lazily when the first temp object is created).
2163            let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2164                continue;
2165            };
2166
2167            if let Some(id) = get_schema_entries(schema).get(&name.item) {
2168                return Ok(&self.entry_by_id[id]);
2169            }
2170        }
2171
2172        // Some relations that have previously lived in the `mz_internal` schema have been moved to
2173        // `mz_catalog_unstable` or `mz_introspection`. To simplify the transition for users, we
2174        // automatically let uses of the old schema resolve to the new ones as well.
2175        // TODO(database-issues#8173) remove this after sufficient time has passed
2176        let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2177        if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2178            for schema_id in [
2179                self.get_mz_catalog_unstable_schema_id(),
2180                self.get_mz_introspection_schema_id(),
2181            ] {
2182                let schema = self.get_schema(
2183                    &ResolvedDatabaseSpecifier::Ambient,
2184                    &SchemaSpecifier::Id(schema_id),
2185                    conn_id,
2186                );
2187
2188                if let Some(id) = get_schema_entries(schema).get(&name.item) {
2189                    debug!(
2190                        github_27831 = true,
2191                        "encountered use of outdated schema `mz_internal` for relation: {name}",
2192                    );
2193                    return Ok(&self.entry_by_id[id]);
2194                }
2195            }
2196        }
2197
2198        Err(err_gen(name.to_string()))
2199    }
2200
2201    /// Resolves `name` to a non-function [`CatalogEntry`].
2202    pub fn resolve_entry(
2203        &self,
2204        current_database: Option<&DatabaseId>,
2205        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2206        name: &PartialItemName,
2207        conn_id: &ConnectionId,
2208    ) -> Result<&CatalogEntry, SqlCatalogError> {
2209        self.resolve(
2210            |schema| &schema.items,
2211            current_database,
2212            search_path,
2213            name,
2214            conn_id,
2215            SqlCatalogError::UnknownItem,
2216        )
2217    }
2218
2219    /// Resolves `name` to a function [`CatalogEntry`].
2220    pub fn resolve_function(
2221        &self,
2222        current_database: Option<&DatabaseId>,
2223        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2224        name: &PartialItemName,
2225        conn_id: &ConnectionId,
2226    ) -> Result<&CatalogEntry, SqlCatalogError> {
2227        self.resolve(
2228            |schema| &schema.functions,
2229            current_database,
2230            search_path,
2231            name,
2232            conn_id,
2233            |name| SqlCatalogError::UnknownFunction {
2234                name,
2235                alternative: None,
2236            },
2237        )
2238    }
2239
2240    /// Resolves `name` to a type [`CatalogEntry`].
2241    pub fn resolve_type(
2242        &self,
2243        current_database: Option<&DatabaseId>,
2244        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2245        name: &PartialItemName,
2246        conn_id: &ConnectionId,
2247    ) -> Result<&CatalogEntry, SqlCatalogError> {
2248        static NON_PG_CATALOG_TYPES: LazyLock<
2249            BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2250        > = LazyLock::new(|| {
2251            BUILTINS::types()
2252                .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2253                .map(|typ| (typ.name, typ))
2254                .collect()
2255        });
2256
2257        let entry = self.resolve(
2258            |schema| &schema.types,
2259            current_database,
2260            search_path,
2261            name,
2262            conn_id,
2263            |name| SqlCatalogError::UnknownType { name },
2264        )?;
2265
2266        if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2267            if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2268                warn!(
2269                    "user specified an incorrect schema of {} for the type {}, which should be in \
2270                    the {} schema. This works now due to a bug but will be fixed in a later release.",
2271                    PG_CATALOG_SCHEMA.quoted(),
2272                    typ.name.quoted(),
2273                    typ.schema.quoted(),
2274                )
2275            }
2276        }
2277
2278        Ok(entry)
2279    }
2280
2281    /// For an [`ObjectId`] gets the corresponding [`CommentObjectId`].
2282    pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2283        match object_id {
2284            ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2285            ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2286            ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2287            ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2288            ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2289            ObjectId::ClusterReplica(cluster_replica_id) => {
2290                CommentObjectId::ClusterReplica(cluster_replica_id)
2291            }
2292            ObjectId::NetworkPolicy(network_policy_id) => {
2293                CommentObjectId::NetworkPolicy(network_policy_id)
2294            }
2295        }
2296    }
2297
2298    /// Return current system configuration.
2299    pub fn system_config(&self) -> &SystemVars {
2300        &self.system_configuration
2301    }
2302
2303    /// Return a mutable reference to the current system configuration.
2304    pub fn system_config_mut(&mut self) -> &mut SystemVars {
2305        Arc::make_mut(&mut self.system_configuration)
2306    }
2307
2308    /// Serializes the catalog's in-memory state.
2309    ///
2310    /// There are no guarantees about the format of the serialized state, except
2311    /// that the serialized state for two identical catalogs will compare
2312    /// identically.
2313    ///
2314    /// Some consumers would like the ability to overwrite the `unfinalized_shards` catalog field,
2315    /// which they can accomplish by passing in a value of `Some` for the `unfinalized_shards`
2316    /// argument.
2317    pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2318        // Dump the base catalog.
2319        let mut dump = serde_json::to_value(&self).map_err(|e| {
2320            Error::new(ErrorKind::Unstructured(format!(
2321                // Don't panic here because we don't have compile-time failures for maps with
2322                // non-string keys.
2323                "internal error: could not dump catalog: {}",
2324                e
2325            )))
2326        })?;
2327
2328        let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2329        // Stitch in system parameter defaults.
2330        dump_obj.insert(
2331            "system_parameter_defaults".into(),
2332            serde_json::json!(self.system_config().defaults()),
2333        );
2334        // Potentially overwrite unfinalized shards.
2335        if let Some(unfinalized_shards) = unfinalized_shards {
2336            dump_obj
2337                .get_mut("storage_metadata")
2338                .expect("known to exist")
2339                .as_object_mut()
2340                .expect("storage_metadata is an object")
2341                .insert(
2342                    "unfinalized_shards".into(),
2343                    serde_json::json!(unfinalized_shards),
2344                );
2345        }
2346        // Remove GlobalIds for temporary objects from the mapping.
2347        //
2348        // Post-test consistency checks with the durable catalog don't know about temporary items
2349        // since they're kept entirely in memory.
2350        let temporary_gids: Vec<_> = self
2351            .entry_by_global_id
2352            .iter()
2353            .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2354            .map(|(gid, _item_id)| *gid)
2355            .collect();
2356        if !temporary_gids.is_empty() {
2357            let gids = dump_obj
2358                .get_mut("entry_by_global_id")
2359                .expect("known_to_exist")
2360                .as_object_mut()
2361                .expect("entry_by_global_id is an object");
2362            for gid in temporary_gids {
2363                gids.remove(&gid.to_string());
2364            }
2365        }
2366        // We exclude role_auth_by_id because it contains password information
2367        // which should not be included in the dump.
2368        dump_obj.remove("role_auth_by_id");
2369
2370        // Emit as pretty-printed JSON.
2371        Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2372    }
2373
2374    pub fn availability_zones(&self) -> &[String] {
2375        &self.availability_zones
2376    }
2377
2378    pub fn concretize_replica_location(
2379        &self,
2380        location: mz_catalog::durable::ReplicaLocation,
2381        allowed_sizes: &Vec<String>,
2382        allowed_availability_zones: Option<&[String]>,
2383    ) -> Result<ReplicaLocation, Error> {
2384        let location = match location {
2385            mz_catalog::durable::ReplicaLocation::Unmanaged {
2386                storagectl_addrs,
2387                computectl_addrs,
2388            } => {
2389                if allowed_availability_zones.is_some() {
2390                    return Err(Error {
2391                        kind: ErrorKind::Internal(
2392                            "tried concretize unmanaged replica with specific availability_zones"
2393                                .to_string(),
2394                        ),
2395                    });
2396                }
2397                ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2398                    storagectl_addrs,
2399                    computectl_addrs,
2400                })
2401            }
2402            mz_catalog::durable::ReplicaLocation::Managed {
2403                size,
2404                availability_zone,
2405                billed_as,
2406                internal,
2407                pending,
2408            } => {
2409                if allowed_availability_zones.is_some() && availability_zone.is_some() {
2410                    let message = "tried concretize managed replica with specific availability zones and availability zone";
2411                    return Err(Error {
2412                        kind: ErrorKind::Internal(message.to_string()),
2413                    });
2414                }
2415                self.ensure_valid_replica_size(allowed_sizes, &size)?;
2416                let cluster_replica_sizes = &self.cluster_replica_sizes;
2417
2418                ReplicaLocation::Managed(ManagedReplicaLocation {
2419                    allocation: cluster_replica_sizes
2420                        .0
2421                        .get(&size)
2422                        .expect("catalog out of sync")
2423                        .clone(),
2424                    availability_zones: match (availability_zone, allowed_availability_zones) {
2425                        (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2426                        (None, Some([])) => ManagedReplicaAvailabilityZones::FromCluster(None),
2427                        (None, Some(azs)) => {
2428                            ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2429                        }
2430                        (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2431                    },
2432                    size,
2433                    billed_as,
2434                    internal,
2435                    pending,
2436                })
2437            }
2438        };
2439        Ok(location)
2440    }
2441
2442    /// Return whether the given replica size requests a disk.
2443    ///
2444    /// Note that here we treat replica sizes that enable swap as _not_ requesting disk. For swap
2445    /// replicas, the provided disk limit is informational and mostly ignored. Whether an instance
2446    /// has access to swap depends on the configuration of the node it gets scheduled on, and is
2447    /// not something we can know at this point.
2448    ///
2449    /// # Panics
2450    ///
2451    /// Panics if the given size doesn't exist in `cluster_replica_sizes`.
2452    pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2453        let alloc = &self.cluster_replica_sizes.0[size];
2454        !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2455    }
2456
2457    pub(crate) fn ensure_valid_replica_size(
2458        &self,
2459        allowed_sizes: &[String],
2460        size: &String,
2461    ) -> Result<(), Error> {
2462        let cluster_replica_sizes = &self.cluster_replica_sizes;
2463
2464        if !cluster_replica_sizes.0.contains_key(size)
2465            || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2466            || cluster_replica_sizes.0[size].disabled
2467        {
2468            let mut entries = cluster_replica_sizes
2469                .enabled_allocations()
2470                .collect::<Vec<_>>();
2471
2472            if !allowed_sizes.is_empty() {
2473                let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2474                entries.retain(|(name, _)| allowed_sizes.contains(name));
2475            }
2476
2477            entries.sort_by_key(
2478                |(
2479                    _name,
2480                    ReplicaAllocation {
2481                        scale, cpu_limit, ..
2482                    },
2483                )| (scale, cpu_limit),
2484            );
2485
2486            Err(Error {
2487                kind: ErrorKind::InvalidClusterReplicaSize {
2488                    size: size.to_owned(),
2489                    expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2490                },
2491            })
2492        } else {
2493            Ok(())
2494        }
2495    }
2496
2497    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2498        if role_id.is_builtin() {
2499            let role = self.get_role(role_id);
2500            Err(Error::new(ErrorKind::ReservedRoleName(
2501                role.name().to_string(),
2502            )))
2503        } else {
2504            Ok(())
2505        }
2506    }
2507
2508    pub fn ensure_not_reserved_network_policy(
2509        &self,
2510        network_policy_id: &NetworkPolicyId,
2511    ) -> Result<(), Error> {
2512        if network_policy_id.is_builtin() {
2513            let policy = self.get_network_policy(network_policy_id);
2514            Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2515                policy.name.clone(),
2516            )))
2517        } else {
2518            Ok(())
2519        }
2520    }
2521
2522    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2523        let is_grantable = !role_id.is_public() && !role_id.is_system();
2524        if is_grantable {
2525            Ok(())
2526        } else {
2527            let role = self.get_role(role_id);
2528            Err(Error::new(ErrorKind::UngrantableRoleName(
2529                role.name().to_string(),
2530            )))
2531        }
2532    }
2533
2534    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2535        if role_id.is_system() {
2536            let role = self.get_role(role_id);
2537            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2538                role.name().to_string(),
2539            )))
2540        } else {
2541            Ok(())
2542        }
2543    }
2544
2545    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2546        if role_id.is_predefined() {
2547            let role = self.get_role(role_id);
2548            Err(Error::new(ErrorKind::ReservedSystemRoleName(
2549                role.name().to_string(),
2550            )))
2551        } else {
2552            Ok(())
2553        }
2554    }
2555
2556    // TODO(mjibson): Is there a way to make this a closure to avoid explicitly
2557    // passing tx, and session?
2558    pub(crate) fn add_to_audit_log(
2559        system_configuration: &SystemVars,
2560        oracle_write_ts: mz_repr::Timestamp,
2561        session: Option<&ConnMeta>,
2562        tx: &mut mz_catalog::durable::Transaction,
2563        audit_events: &mut Vec<VersionedEvent>,
2564        event_type: EventType,
2565        object_type: ObjectType,
2566        details: EventDetails,
2567    ) -> Result<(), Error> {
2568        let user = session.map(|session| session.user().name.to_string());
2569
2570        // unsafe_mock_audit_event_timestamp can only be set to Some when running in unsafe mode.
2571
2572        let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2573            Some(ts) => ts.into(),
2574            _ => oracle_write_ts.into(),
2575        };
2576        let id = tx.allocate_audit_log_id()?;
2577        let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2578        audit_events.push(event.clone());
2579        tx.insert_audit_log_event(event);
2580        Ok(())
2581    }
2582
2583    pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2584        match id {
2585            ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2586            ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2587                self.get_cluster_replica(*cluster_id, *replica_id)
2588                    .owner_id(),
2589            ),
2590            ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2591            ObjectId::Schema((database_spec, schema_spec)) => Some(
2592                self.get_schema(database_spec, schema_spec, conn_id)
2593                    .owner_id(),
2594            ),
2595            ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2596            ObjectId::Role(_) => None,
2597            ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2598        }
2599    }
2600
2601    pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2602        match object_id {
2603            ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2604            ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2605            ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2606            ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2607            ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2608            ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2609            ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2610        }
2611    }
2612
2613    pub(super) fn get_system_object_type(
2614        &self,
2615        id: &SystemObjectId,
2616    ) -> mz_sql::catalog::SystemObjectType {
2617        match id {
2618            SystemObjectId::Object(object_id) => {
2619                SystemObjectType::Object(self.get_object_type(object_id))
2620            }
2621            SystemObjectId::System => SystemObjectType::System,
2622        }
2623    }
2624
2625    /// Returns a read-only view of the current [`StorageMetadata`].
2626    ///
2627    /// To write to this struct, you must use a catalog transaction.
2628    pub fn storage_metadata(&self) -> &StorageMetadata {
2629        &self.storage_metadata
2630    }
2631
2632    /// For the Sources ids in `ids`, return their compaction windows.
2633    pub fn source_compaction_windows(
2634        &self,
2635        ids: impl IntoIterator<Item = CatalogItemId>,
2636    ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2637        let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2638        let mut seen = BTreeSet::new();
2639        for item_id in ids {
2640            if !seen.insert(item_id) {
2641                continue;
2642            }
2643            let entry = self.get_entry(&item_id);
2644            match entry.item() {
2645                CatalogItem::Source(source) => {
2646                    let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2647                    cws.entry(source_cw).or_default().insert(item_id);
2648                }
2649                CatalogItem::Table(table) => {
2650                    let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2651                    match &table.data_source {
2652                        TableDataSource::DataSource {
2653                            desc:
2654                                DataSourceDesc::IngestionExport { .. }
2655                                // Also match webhook tables (source-to-table migration).
2656                                | DataSourceDesc::Webhook { .. },
2657                            timeline: _,
2658                        } => {
2659                            cws.entry(table_cw).or_default().insert(item_id);
2660                        }
2661                        // Regular tables handle compaction directly in
2662                        // catalog_implications, not through this function.
2663                        TableDataSource::TableWrites { .. } => {}
2664                        TableDataSource::DataSource {
2665                            desc:
2666                                DataSourceDesc::Ingestion { .. }
2667                                | DataSourceDesc::OldSyntaxIngestion { .. }
2668                                | DataSourceDesc::Introspection(_)
2669                                | DataSourceDesc::Progress
2670                                | DataSourceDesc::Catalog,
2671                            ..
2672                        } => {
2673                            unreachable!(
2674                                "unexpected DataSourceDesc for table {item_id}: {:?}",
2675                                table.data_source
2676                            )
2677                        }
2678                    }
2679                }
2680                _ => {
2681                    // Views could depend on sources, so ignore them if added by used_by above.
2682                    continue;
2683                }
2684            }
2685        }
2686        cws
2687    }
2688
2689    pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2690        match id {
2691            CommentObjectId::Table(id)
2692            | CommentObjectId::View(id)
2693            | CommentObjectId::MaterializedView(id)
2694            | CommentObjectId::Source(id)
2695            | CommentObjectId::Sink(id)
2696            | CommentObjectId::Index(id)
2697            | CommentObjectId::Func(id)
2698            | CommentObjectId::Connection(id)
2699            | CommentObjectId::Type(id)
2700            | CommentObjectId::Secret(id) => Some(*id),
2701            CommentObjectId::Role(_)
2702            | CommentObjectId::Database(_)
2703            | CommentObjectId::Schema(_)
2704            | CommentObjectId::Cluster(_)
2705            | CommentObjectId::ClusterReplica(_)
2706            | CommentObjectId::NetworkPolicy(_) => None,
2707        }
2708    }
2709
2710    pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2711        Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2712    }
2713
2714    pub fn comment_id_to_audit_log_name(
2715        &self,
2716        id: CommentObjectId,
2717        conn_id: &ConnectionId,
2718    ) -> String {
2719        match id {
2720            CommentObjectId::Table(id)
2721            | CommentObjectId::View(id)
2722            | CommentObjectId::MaterializedView(id)
2723            | CommentObjectId::Source(id)
2724            | CommentObjectId::Sink(id)
2725            | CommentObjectId::Index(id)
2726            | CommentObjectId::Func(id)
2727            | CommentObjectId::Connection(id)
2728            | CommentObjectId::Type(id)
2729            | CommentObjectId::Secret(id) => {
2730                let item = self.get_entry(&id);
2731                let name = self.resolve_full_name(item.name(), Some(conn_id));
2732                name.to_string()
2733            }
2734            CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2735            CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2736            CommentObjectId::Schema((spec, schema_id)) => {
2737                let schema = self.get_schema(&spec, &schema_id, conn_id);
2738                self.resolve_full_schema_name(&schema.name).to_string()
2739            }
2740            CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2741            CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2742                let cluster = self.get_cluster(cluster_id);
2743                let replica = self.get_cluster_replica(cluster_id, replica_id);
2744                QualifiedReplica {
2745                    cluster: Ident::new_unchecked(cluster.name.clone()),
2746                    replica: Ident::new_unchecked(replica.name.clone()),
2747                }
2748                .to_string()
2749            }
2750            CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2751        }
2752    }
2753
2754    pub fn mock_authentication_nonce(&self) -> String {
2755        self.mock_authentication_nonce.clone().unwrap_or_default()
2756    }
2757}
2758
2759impl ConnectionResolver for CatalogState {
2760    fn resolve_connection(
2761        &self,
2762        id: CatalogItemId,
2763    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2764        use mz_storage_types::connections::Connection::*;
2765        match self
2766            .get_entry(&id)
2767            .connection()
2768            .expect("catalog out of sync")
2769            .details
2770            .to_connection()
2771        {
2772            Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2773            Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2774            Csr(conn) => Csr(conn.into_inline_connection(self)),
2775            Ssh(conn) => Ssh(conn),
2776            Aws(conn) => Aws(conn),
2777            AwsPrivatelink(conn) => AwsPrivatelink(conn),
2778            MySql(conn) => MySql(conn.into_inline_connection(self)),
2779            SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2780            IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2781        }
2782    }
2783}
2784
2785impl OptimizerCatalog for CatalogState {
2786    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2787        CatalogState::get_entry_by_global_id(self, id)
2788    }
2789    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2790        CatalogState::get_entry(self, id)
2791    }
2792    fn resolve_full_name(
2793        &self,
2794        name: &QualifiedItemName,
2795        conn_id: Option<&ConnectionId>,
2796    ) -> FullItemName {
2797        CatalogState::resolve_full_name(self, name, conn_id)
2798    }
2799    fn get_indexes_on(
2800        &self,
2801        id: GlobalId,
2802        cluster: ClusterId,
2803    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2804        Box::new(CatalogState::get_indexes_on(self, id, cluster))
2805    }
2806}
2807
2808impl OptimizerCatalog for Catalog {
2809    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2810        self.state.get_entry_by_global_id(id)
2811    }
2812
2813    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2814        self.state.get_entry(id)
2815    }
2816
2817    fn resolve_full_name(
2818        &self,
2819        name: &QualifiedItemName,
2820        conn_id: Option<&ConnectionId>,
2821    ) -> FullItemName {
2822        self.state.resolve_full_name(name, conn_id)
2823    }
2824
2825    fn get_indexes_on(
2826        &self,
2827        id: GlobalId,
2828        cluster: ClusterId,
2829    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2830        Box::new(self.state.get_indexes_on(id, cluster))
2831    }
2832}
2833
2834impl Catalog {
2835    pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2836        self
2837    }
2838}