Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_persist_client::PersistClient;
55use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
56use mz_repr::explain::ExprHumanizer;
57use mz_repr::namespaces::MZ_TEMP_SCHEMA;
58use mz_repr::network_policy_id::NetworkPolicyId;
59use mz_repr::optimize::OptimizerFeatures;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
66    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
67    SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_transform::dataflow::DataflowMetainfo;
83use mz_transform::notice::OptimizerNotice;
84use tokio::sync::MutexGuard;
85use tokio::sync::mpsc::UnboundedSender;
86use uuid::Uuid;
87
88// DO NOT add any more imports from `crate` outside of `crate::catalog`.
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
90pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
91pub use crate::catalog::state::CatalogState;
92pub use crate::catalog::transact::{
93    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
94};
95use crate::command::CatalogDump;
96use crate::coord::TargetCluster;
97#[cfg(test)]
98use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
99use crate::session::{Portal, PreparedStatement, Session};
100use crate::util::ResultExt;
101use crate::{AdapterError, AdapterNotice, ExecuteResponse};
102
103mod builtin_table_updates;
104pub(crate) mod consistency;
105mod migrate;
106
107mod apply;
108mod open;
109mod state;
110mod timeline;
111mod transact;
112
113/// A `Catalog` keeps track of the SQL objects known to the planner.
114///
115/// For each object, it keeps track of both forward and reverse dependencies:
116/// i.e., which objects are depended upon by the object, and which objects
117/// depend upon the object. It enforces the SQL rules around dropping: an object
118/// cannot be dropped until all of the objects that depend upon it are dropped.
119/// It also enforces uniqueness of names.
120///
121/// SQL mandates a hierarchy of exactly three layers. A catalog contains
122/// databases, databases contain schemas, and schemas contain catalog items,
123/// like sources, sinks, view, and indexes.
124///
125/// To the outside world, databases, schemas, and items are all identified by
126/// name. Items can be referred to by their [`FullItemName`], which fully and
127/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
128/// database name and/or the schema name. Partial names can be converted into
129/// full names via a complicated resolution process documented by the
130/// [`CatalogState::resolve`] method.
131///
132/// The catalog also maintains special "ambient schemas": virtual schemas,
133/// implicitly present in all databases, that house various system views.
134/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
135#[derive(Debug)]
136pub struct Catalog {
137    state: CatalogState,
138    expr_cache_handle: Option<ExpressionCacheHandle>,
139    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
140    transient_revision: u64,
141}
142
143// Implement our own Clone because derive can't unless S is Clone, which it's
144// not (hence the Arc).
145impl Clone for Catalog {
146    fn clone(&self) -> Self {
147        Self {
148            state: self.state.clone(),
149            expr_cache_handle: self.expr_cache_handle.clone(),
150            storage: Arc::clone(&self.storage),
151            transient_revision: self.transient_revision,
152        }
153    }
154}
155
156impl Catalog {
157    /// Set the optimized plan for the item identified by `id`.
158    ///
159    /// # Panics
160    /// If the item is not an `Index`, `MaterializedView`, or
161    /// `ContinualTask`.
162    #[mz_ore::instrument(level = "trace")]
163    pub fn set_optimized_plan(
164        &mut self,
165        id: GlobalId,
166        plan: DataflowDescription<OptimizedMirRelationExpr>,
167    ) {
168        self.state.set_optimized_plan(id, plan);
169    }
170
171    /// Set the physical plan for the item identified by `id`.
172    ///
173    /// # Panics
174    /// If the item is not an `Index`, `MaterializedView`, or
175    /// `ContinualTask`.
176    #[mz_ore::instrument(level = "trace")]
177    pub fn set_physical_plan(
178        &mut self,
179        id: GlobalId,
180        plan: DataflowDescription<mz_compute_types::plan::Plan>,
181    ) {
182        self.state.set_physical_plan(id, plan);
183    }
184
185    /// Try to get the optimized plan for the item identified by `id`.
186    #[mz_ore::instrument(level = "trace")]
187    pub fn try_get_optimized_plan(
188        &self,
189        id: &GlobalId,
190    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
191        let entry = self.state.try_get_entry_by_global_id(id)?;
192        entry.item().optimized_plan().map(AsRef::as_ref)
193    }
194
195    /// Try to get the physical plan for the item identified by `id`.
196    #[mz_ore::instrument(level = "trace")]
197    pub fn try_get_physical_plan(
198        &self,
199        id: &GlobalId,
200    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
201        let entry = self.state.try_get_entry_by_global_id(id)?;
202        entry.item().physical_plan().map(AsRef::as_ref)
203    }
204
205    /// Set the `DataflowMetainfo` for the item identified by `id`.
206    ///
207    /// # Panics
208    /// If the item is not an `Index`, `MaterializedView`, or
209    /// `ContinualTask`.
210    #[mz_ore::instrument(level = "trace")]
211    pub fn set_dataflow_metainfo(
212        &mut self,
213        id: GlobalId,
214        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
215    ) {
216        self.state.set_dataflow_metainfo(id, metainfo);
217    }
218
219    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
220    #[mz_ore::instrument(level = "trace")]
221    pub fn try_get_dataflow_metainfo(
222        &self,
223        id: &GlobalId,
224    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
225        let entry = self.state.try_get_entry_by_global_id(id)?;
226        entry.item().dataflow_metainfo()
227    }
228}
229
230#[derive(Debug)]
231pub struct ConnCatalog<'a> {
232    state: Cow<'a, CatalogState>,
233    /// Because we don't have any way of removing items from the catalog
234    /// temporarily, we allow the ConnCatalog to pretend that a set of items
235    /// don't exist during resolution.
236    ///
237    /// This feature is necessary to allow re-planning of statements, which is
238    /// either incredibly useful or required when altering item definitions.
239    ///
240    /// Note that uses of this field should be used by short-lived
241    /// catalogs.
242    unresolvable_ids: BTreeSet<CatalogItemId>,
243    conn_id: ConnectionId,
244    cluster: String,
245    database: Option<DatabaseId>,
246    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
247    role_id: RoleId,
248    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
249    portals: Option<&'a BTreeMap<String, Portal>>,
250    notices_tx: UnboundedSender<AdapterNotice>,
251}
252
253impl ConnCatalog<'_> {
254    pub fn conn_id(&self) -> &ConnectionId {
255        &self.conn_id
256    }
257
258    pub fn state(&self) -> &CatalogState {
259        &*self.state
260    }
261
262    /// Prevent planning from resolving item with the provided ID. Instead,
263    /// return an error as if the item did not exist.
264    ///
265    /// This feature is meant exclusively to permit re-planning statements
266    /// during update operations and should not be used otherwise given its
267    /// extremely "powerful" semantics.
268    ///
269    /// # Panics
270    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
271    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
272        assert_eq!(
273            self.role_id, MZ_SYSTEM_ROLE_ID,
274            "only the system role can mark IDs unresolvable",
275        );
276        self.unresolvable_ids.insert(id);
277    }
278
279    /// Returns the schemas:
280    /// - mz_catalog
281    /// - pg_catalog
282    /// - temp (if requested)
283    /// - all schemas from the session's search_path var that exist
284    pub fn effective_search_path(
285        &self,
286        include_temp_schema: bool,
287    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
288        self.state
289            .effective_search_path(&self.search_path, include_temp_schema)
290    }
291}
292
293impl ConnectionResolver for ConnCatalog<'_> {
294    fn resolve_connection(
295        &self,
296        id: CatalogItemId,
297    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
298        self.state().resolve_connection(id)
299    }
300}
301
302impl Catalog {
303    /// Returns the catalog's transient revision, which starts at 1 and is
304    /// incremented on every change. This is not persisted to disk, and will
305    /// restart on every load.
306    pub fn transient_revision(&self) -> u64 {
307        self.transient_revision
308    }
309
310    /// Creates a debug catalog from the current
311    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
312    /// like in tests.
313    ///
314    /// WARNING! This function can arbitrarily fail because it does not make any
315    /// effort to adjust the catalog's contents' structure or semantics to the
316    /// currently running version, i.e. it does not apply any migrations.
317    ///
318    /// This function must not be called in production contexts. Use
319    /// [`Catalog::open`] with appropriately set configuration parameters
320    /// instead.
321    pub async fn with_debug<F, Fut, T>(f: F) -> T
322    where
323        F: FnOnce(Catalog) -> Fut,
324        Fut: Future<Output = T>,
325    {
326        let persist_client = PersistClient::new_for_tests().await;
327        let organization_id = Uuid::new_v4();
328        let bootstrap_args = test_bootstrap_args();
329        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
330            .await
331            .expect("can open debug catalog");
332        f(catalog).await
333    }
334
335    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
336    /// in progress.
337    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
338    where
339        F: FnOnce(Catalog) -> Fut,
340        Fut: Future<Output = T>,
341    {
342        let persist_client = PersistClient::new_for_tests().await;
343        let organization_id = Uuid::new_v4();
344        let bootstrap_args = test_bootstrap_args();
345        let mut catalog =
346            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
347                .await
348                .expect("can open debug catalog");
349
350        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
351        let now = SYSTEM_TIME.clone();
352        let openable_storage = TestCatalogStateBuilder::new(persist_client)
353            .with_organization_id(organization_id)
354            .with_default_deploy_generation()
355            .build()
356            .await
357            .expect("can create durable catalog");
358        let mut storage = openable_storage
359            .open(now().into(), &bootstrap_args)
360            .await
361            .expect("can open durable catalog")
362            .0;
363        // Drain updates.
364        let _ = storage
365            .sync_to_current_updates()
366            .await
367            .expect("can sync to current updates");
368        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
369
370        f(catalog).await
371    }
372
373    /// Opens a debug catalog.
374    ///
375    /// See [`Catalog::with_debug`].
376    pub async fn open_debug_catalog(
377        persist_client: PersistClient,
378        organization_id: Uuid,
379        bootstrap_args: &BootstrapArgs,
380    ) -> Result<Catalog, anyhow::Error> {
381        let now = SYSTEM_TIME.clone();
382        let environment_id = None;
383        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
384            .with_organization_id(organization_id)
385            .with_default_deploy_generation()
386            .build()
387            .await?;
388        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
389        let system_parameter_defaults = BTreeMap::default();
390        Self::open_debug_catalog_inner(
391            persist_client,
392            storage,
393            now,
394            environment_id,
395            &DUMMY_BUILD_INFO,
396            system_parameter_defaults,
397            bootstrap_args,
398            None,
399        )
400        .await
401    }
402
403    /// Opens a read only debug persist backed catalog defined by `persist_client` and
404    /// `organization_id`.
405    ///
406    /// See [`Catalog::with_debug`].
407    pub async fn open_debug_read_only_catalog(
408        persist_client: PersistClient,
409        organization_id: Uuid,
410        bootstrap_args: &BootstrapArgs,
411    ) -> Result<Catalog, anyhow::Error> {
412        let now = SYSTEM_TIME.clone();
413        let environment_id = None;
414        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
415            .with_organization_id(organization_id)
416            .build()
417            .await?;
418        let storage = openable_storage
419            .open_read_only(&test_bootstrap_args())
420            .await?;
421        let system_parameter_defaults = BTreeMap::default();
422        Self::open_debug_catalog_inner(
423            persist_client,
424            storage,
425            now,
426            environment_id,
427            &DUMMY_BUILD_INFO,
428            system_parameter_defaults,
429            bootstrap_args,
430            None,
431        )
432        .await
433    }
434
435    /// Opens a read only debug persist backed catalog defined by `persist_client` and
436    /// `organization_id`.
437    ///
438    /// See [`Catalog::with_debug`].
439    pub async fn open_debug_read_only_persist_catalog_config(
440        persist_client: PersistClient,
441        now: NowFn,
442        environment_id: EnvironmentId,
443        system_parameter_defaults: BTreeMap<String, String>,
444        build_info: &'static BuildInfo,
445        bootstrap_args: &BootstrapArgs,
446        enable_expression_cache_override: Option<bool>,
447    ) -> Result<Catalog, anyhow::Error> {
448        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
449            .with_organization_id(environment_id.organization_id())
450            .with_version(
451                build_info
452                    .version
453                    .parse()
454                    .expect("build version is parseable"),
455            )
456            .build()
457            .await?;
458        let storage = openable_storage.open_read_only(bootstrap_args).await?;
459        Self::open_debug_catalog_inner(
460            persist_client,
461            storage,
462            now,
463            Some(environment_id),
464            build_info,
465            system_parameter_defaults,
466            bootstrap_args,
467            enable_expression_cache_override,
468        )
469        .await
470    }
471
472    async fn open_debug_catalog_inner(
473        persist_client: PersistClient,
474        storage: Box<dyn DurableCatalogState>,
475        now: NowFn,
476        environment_id: Option<EnvironmentId>,
477        build_info: &'static BuildInfo,
478        system_parameter_defaults: BTreeMap<String, String>,
479        bootstrap_args: &BootstrapArgs,
480        enable_expression_cache_override: Option<bool>,
481    ) -> Result<Catalog, anyhow::Error> {
482        let metrics_registry = &MetricsRegistry::new();
483        let secrets_reader = Arc::new(InMemorySecretsController::new());
484        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
485        // debugging/testing.
486        let previous_ts = now().into();
487        let replica_size = &bootstrap_args.default_cluster_replica_size;
488        let read_only = false;
489
490        let OpenCatalogResult {
491            catalog,
492            migrated_storage_collections_0dt: _,
493            new_builtin_collections: _,
494            builtin_table_updates: _,
495            cached_global_exprs: _,
496            uncached_local_exprs: _,
497        } = Catalog::open(Config {
498            storage,
499            metrics_registry,
500            state: StateConfig {
501                unsafe_mode: true,
502                all_features: false,
503                build_info,
504                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
505                read_only,
506                now,
507                boot_ts: previous_ts,
508                skip_migrations: true,
509                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
510                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
511                    size: replica_size.clone(),
512                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
513                },
514                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
515                    size: replica_size.clone(),
516                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
517                },
518                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
519                    size: replica_size.clone(),
520                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
521                },
522                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
523                    size: replica_size.clone(),
524                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
525                },
526                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
527                    size: replica_size.clone(),
528                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
529                },
530                system_parameter_defaults,
531                remote_system_parameters: None,
532                availability_zones: vec![],
533                egress_addresses: vec![],
534                aws_principal_context: None,
535                aws_privatelink_availability_zones: None,
536                http_host_name: None,
537                connection_context: ConnectionContext::for_tests(secrets_reader),
538                builtin_item_migration_config: BuiltinItemMigrationConfig {
539                    persist_client: persist_client.clone(),
540                    read_only,
541                    force_migration: None,
542                },
543                persist_client,
544                enable_expression_cache_override,
545                helm_chart_version: None,
546                external_login_password_mz_system: None,
547                license_key: ValidatedLicenseKey::for_tests(),
548            },
549        })
550        .await?;
551        Ok(catalog)
552    }
553
554    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
555        self.state.for_session(session)
556    }
557
558    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
559        self.state.for_sessionless_user(role_id)
560    }
561
562    pub fn for_system_session(&self) -> ConnCatalog<'_> {
563        self.state.for_system_session()
564    }
565
566    async fn storage<'a>(
567        &'a self,
568    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
569        self.storage.lock().await
570    }
571
572    pub async fn current_upper(&self) -> mz_repr::Timestamp {
573        self.storage().await.current_upper().await
574    }
575
576    pub async fn allocate_user_id(
577        &self,
578        commit_ts: mz_repr::Timestamp,
579    ) -> Result<(CatalogItemId, GlobalId), Error> {
580        self.storage()
581            .await
582            .allocate_user_id(commit_ts)
583            .await
584            .maybe_terminate("allocating user ids")
585            .err_into()
586    }
587
588    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
589    pub async fn allocate_user_ids(
590        &self,
591        amount: u64,
592        commit_ts: mz_repr::Timestamp,
593    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
594        self.storage()
595            .await
596            .allocate_user_ids(amount, commit_ts)
597            .await
598            .maybe_terminate("allocating user ids")
599            .err_into()
600    }
601
602    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
603        let commit_ts = self.storage().await.current_upper().await;
604        self.allocate_user_id(commit_ts).await
605    }
606
607    /// Get the next user item ID without allocating it.
608    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
609        self.storage()
610            .await
611            .get_next_user_item_id()
612            .await
613            .err_into()
614    }
615
616    #[cfg(test)]
617    pub async fn allocate_system_id(
618        &self,
619        commit_ts: mz_repr::Timestamp,
620    ) -> Result<(CatalogItemId, GlobalId), Error> {
621        use mz_ore::collections::CollectionExt;
622
623        let mut storage = self.storage().await;
624        let mut txn = storage.transaction().await?;
625        let id = txn
626            .allocate_system_item_ids(1)
627            .maybe_terminate("allocating system ids")?
628            .into_element();
629        // Drain transaction.
630        let _ = txn.get_and_commit_op_updates();
631        txn.commit(commit_ts).await?;
632        Ok(id)
633    }
634
635    /// Get the next system item ID without allocating it.
636    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
637        self.storage()
638            .await
639            .get_next_system_item_id()
640            .await
641            .err_into()
642    }
643
644    pub async fn allocate_user_cluster_id(
645        &self,
646        commit_ts: mz_repr::Timestamp,
647    ) -> Result<ClusterId, Error> {
648        self.storage()
649            .await
650            .allocate_user_cluster_id(commit_ts)
651            .await
652            .maybe_terminate("allocating user cluster ids")
653            .err_into()
654    }
655
656    /// Get the next system replica id without allocating it.
657    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
658        self.storage()
659            .await
660            .get_next_system_replica_id()
661            .await
662            .err_into()
663    }
664
665    /// Get the next user replica id without allocating it.
666    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
667        self.storage()
668            .await
669            .get_next_user_replica_id()
670            .await
671            .err_into()
672    }
673
674    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
675        self.state.resolve_database(database_name)
676    }
677
678    pub fn resolve_schema(
679        &self,
680        current_database: Option<&DatabaseId>,
681        database_name: Option<&str>,
682        schema_name: &str,
683        conn_id: &ConnectionId,
684    ) -> Result<&Schema, SqlCatalogError> {
685        self.state
686            .resolve_schema(current_database, database_name, schema_name, conn_id)
687    }
688
689    pub fn resolve_schema_in_database(
690        &self,
691        database_spec: &ResolvedDatabaseSpecifier,
692        schema_name: &str,
693        conn_id: &ConnectionId,
694    ) -> Result<&Schema, SqlCatalogError> {
695        self.state
696            .resolve_schema_in_database(database_spec, schema_name, conn_id)
697    }
698
699    pub fn resolve_replica_in_cluster(
700        &self,
701        cluster_id: &ClusterId,
702        replica_name: &str,
703    ) -> Result<&ClusterReplica, SqlCatalogError> {
704        self.state
705            .resolve_replica_in_cluster(cluster_id, replica_name)
706    }
707
708    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
709        self.state.resolve_system_schema(name)
710    }
711
712    pub fn resolve_search_path(
713        &self,
714        session: &Session,
715    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
716        self.state.resolve_search_path(session)
717    }
718
719    /// Resolves `name` to a non-function [`CatalogEntry`].
720    pub fn resolve_entry(
721        &self,
722        current_database: Option<&DatabaseId>,
723        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
724        name: &PartialItemName,
725        conn_id: &ConnectionId,
726    ) -> Result<&CatalogEntry, SqlCatalogError> {
727        self.state
728            .resolve_entry(current_database, search_path, name, conn_id)
729    }
730
731    /// Resolves a `BuiltinTable`.
732    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
733        self.state.resolve_builtin_table(builtin)
734    }
735
736    /// Resolves a `BuiltinLog`.
737    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
738        self.state.resolve_builtin_log(builtin).0
739    }
740
741    /// Resolves a `BuiltinSource`.
742    pub fn resolve_builtin_storage_collection(
743        &self,
744        builtin: &'static BuiltinSource,
745    ) -> CatalogItemId {
746        self.state.resolve_builtin_source(builtin)
747    }
748
749    /// Resolves `name` to a function [`CatalogEntry`].
750    pub fn resolve_function(
751        &self,
752        current_database: Option<&DatabaseId>,
753        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
754        name: &PartialItemName,
755        conn_id: &ConnectionId,
756    ) -> Result<&CatalogEntry, SqlCatalogError> {
757        self.state
758            .resolve_function(current_database, search_path, name, conn_id)
759    }
760
761    /// Resolves `name` to a type [`CatalogEntry`].
762    pub fn resolve_type(
763        &self,
764        current_database: Option<&DatabaseId>,
765        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
766        name: &PartialItemName,
767        conn_id: &ConnectionId,
768    ) -> Result<&CatalogEntry, SqlCatalogError> {
769        self.state
770            .resolve_type(current_database, search_path, name, conn_id)
771    }
772
773    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
774        self.state.resolve_cluster(name)
775    }
776
777    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
778    ///
779    /// # Panics
780    /// * If the [`BuiltinCluster`] doesn't exist.
781    ///
782    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
783        self.state.resolve_builtin_cluster(cluster)
784    }
785
786    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
787        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
788    }
789
790    /// Resolves a [`Cluster`] for a TargetCluster.
791    pub fn resolve_target_cluster(
792        &self,
793        target_cluster: TargetCluster,
794        session: &Session,
795    ) -> Result<&Cluster, AdapterError> {
796        match target_cluster {
797            TargetCluster::CatalogServer => {
798                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
799            }
800            TargetCluster::Active => self.active_cluster(session),
801            TargetCluster::Transaction(cluster_id) => self
802                .try_get_cluster(cluster_id)
803                .ok_or(AdapterError::ConcurrentClusterDrop),
804        }
805    }
806
807    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
808        // TODO(benesch): this check here is not sufficiently protective. It'd
809        // be very easy for a code path to accidentally avoid this check by
810        // calling `resolve_cluster(session.vars().cluster())`.
811        if session.user().name != SYSTEM_USER.name
812            && session.user().name != SUPPORT_USER.name
813            && session.vars().cluster() == SYSTEM_USER.name
814        {
815            coord_bail!(
816                "system cluster '{}' cannot execute user queries",
817                SYSTEM_USER.name
818            );
819        }
820        let cluster = self.resolve_cluster(session.vars().cluster())?;
821        Ok(cluster)
822    }
823
824    pub fn state(&self) -> &CatalogState {
825        &self.state
826    }
827
828    pub fn resolve_full_name(
829        &self,
830        name: &QualifiedItemName,
831        conn_id: Option<&ConnectionId>,
832    ) -> FullItemName {
833        self.state.resolve_full_name(name, conn_id)
834    }
835
836    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
837        self.state.try_get_entry(id)
838    }
839
840    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
841        self.state.try_get_entry_by_global_id(id)
842    }
843
844    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
845        self.state.get_entry(id)
846    }
847
848    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
849        self.state.get_entry_by_global_id(id)
850    }
851
852    pub fn get_global_ids<'a>(
853        &'a self,
854        id: &CatalogItemId,
855    ) -> impl Iterator<Item = GlobalId> + use<'a> {
856        self.get_entry(id).global_ids()
857    }
858
859    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
860        self.get_entry_by_global_id(id).id()
861    }
862
863    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
864        let item = self.try_get_entry_by_global_id(id)?;
865        Some(item.id())
866    }
867
868    pub fn get_schema(
869        &self,
870        database_spec: &ResolvedDatabaseSpecifier,
871        schema_spec: &SchemaSpecifier,
872        conn_id: &ConnectionId,
873    ) -> &Schema {
874        self.state.get_schema(database_spec, schema_spec, conn_id)
875    }
876
877    pub fn try_get_schema(
878        &self,
879        database_spec: &ResolvedDatabaseSpecifier,
880        schema_spec: &SchemaSpecifier,
881        conn_id: &ConnectionId,
882    ) -> Option<&Schema> {
883        self.state
884            .try_get_schema(database_spec, schema_spec, conn_id)
885    }
886
887    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
888        self.state.get_mz_catalog_schema_id()
889    }
890
891    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
892        self.state.get_pg_catalog_schema_id()
893    }
894
895    pub fn get_information_schema_id(&self) -> SchemaId {
896        self.state.get_information_schema_id()
897    }
898
899    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
900        self.state.get_mz_internal_schema_id()
901    }
902
903    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
904        self.state.get_mz_introspection_schema_id()
905    }
906
907    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
908        self.state.get_mz_unsafe_schema_id()
909    }
910
911    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
912        self.state.system_schema_ids()
913    }
914
915    pub fn get_database(&self, id: &DatabaseId) -> &Database {
916        self.state.get_database(id)
917    }
918
919    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
920        self.state.try_get_role(id)
921    }
922
923    pub fn get_role(&self, id: &RoleId) -> &Role {
924        self.state.get_role(id)
925    }
926
927    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
928        self.state.try_get_role_by_name(role_name)
929    }
930
931    pub fn try_get_role_by_name_case_insensitive(&self, role_name: &str) -> Option<&Role> {
932        self.state.try_get_role_by_name_case_insensitive(role_name)
933    }
934
935    pub fn roles_by_lowercase_name(&self) -> BTreeMap<String, &Role> {
936        self.state.roles_by_lowercase_name()
937    }
938
939    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
940        self.state.try_get_role_auth_by_id(id)
941    }
942
943    /// Creates a new schema in the `Catalog` for temporary items
944    /// indicated by the TEMPORARY or TEMP keywords.
945    pub fn create_temporary_schema(
946        &mut self,
947        conn_id: &ConnectionId,
948        owner_id: RoleId,
949    ) -> Result<(), Error> {
950        self.state.create_temporary_schema(conn_id, owner_id)
951    }
952
953    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
954        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
955        self.state
956            .temporary_schemas
957            .get(conn_id)
958            .map(|schema| schema.items.contains_key(item_name))
959            .unwrap_or(false)
960    }
961
962    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
963    /// Returns Ok if conn_id's temp schema does not exist.
964    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
965        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
966            return Ok(());
967        };
968        if !schema.items.is_empty() {
969            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
970        }
971        Ok(())
972    }
973
974    pub(crate) fn object_dependents(
975        &self,
976        object_ids: &Vec<ObjectId>,
977        conn_id: &ConnectionId,
978    ) -> Vec<ObjectId> {
979        let mut seen = BTreeSet::new();
980        self.state.object_dependents(object_ids, conn_id, &mut seen)
981    }
982
983    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
984        FullNameV1 {
985            database: name.database.to_string(),
986            schema: name.schema.clone(),
987            item: name.item.clone(),
988        }
989    }
990
991    pub fn find_available_cluster_name(&self, name: &str) -> String {
992        let mut i = 0;
993        let mut candidate = name.to_string();
994        while self.state.clusters_by_name.contains_key(&candidate) {
995            i += 1;
996            candidate = format!("{}{}", name, i);
997        }
998        candidate
999    }
1000
1001    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1002        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1003            self.cluster_replica_sizes()
1004                .enabled_allocations()
1005                .map(|a| a.0.to_owned())
1006                .collect::<Vec<_>>()
1007        } else {
1008            self.system_config().allowed_cluster_replica_sizes()
1009        }
1010    }
1011
1012    pub fn concretize_replica_location(
1013        &self,
1014        location: mz_catalog::durable::ReplicaLocation,
1015        allowed_sizes: &Vec<String>,
1016        allowed_availability_zones: Option<&[String]>,
1017    ) -> Result<ReplicaLocation, Error> {
1018        self.state
1019            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1020    }
1021
1022    pub(crate) fn ensure_valid_replica_size(
1023        &self,
1024        allowed_sizes: &[String],
1025        size: &String,
1026    ) -> Result<(), Error> {
1027        self.state.ensure_valid_replica_size(allowed_sizes, size)
1028    }
1029
1030    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1031        &self.state.cluster_replica_sizes
1032    }
1033
1034    /// Returns the privileges of an object by its ID.
1035    pub fn get_privileges(
1036        &self,
1037        id: &SystemObjectId,
1038        conn_id: &ConnectionId,
1039    ) -> Option<&PrivilegeMap> {
1040        match id {
1041            SystemObjectId::Object(id) => match id {
1042                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1043                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1044                ObjectId::Schema((database_spec, schema_spec)) => Some(
1045                    self.get_schema(database_spec, schema_spec, conn_id)
1046                        .privileges(),
1047                ),
1048                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1049                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1050                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1051            },
1052            SystemObjectId::System => Some(&self.state.system_privileges),
1053        }
1054    }
1055
1056    #[mz_ore::instrument(level = "debug")]
1057    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1058        Ok(self.storage().await.advance_upper(new_upper).await?)
1059    }
1060
1061    /// Return the ids of all log sources the given object depends on.
1062    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1063        self.state.introspection_dependencies(id)
1064    }
1065
1066    /// Serializes the catalog's in-memory state.
1067    ///
1068    /// There are no guarantees about the format of the serialized state, except
1069    /// that the serialized state for two identical catalogs will compare
1070    /// identically.
1071    pub fn dump(&self) -> Result<CatalogDump, Error> {
1072        Ok(CatalogDump::new(self.state.dump(None)?))
1073    }
1074
1075    /// Checks the [`Catalog`]s internal consistency.
1076    ///
1077    /// Returns a JSON object describing the inconsistencies, if there are any.
1078    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1079        self.state.check_consistency().map_err(|inconsistencies| {
1080            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1081                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1082            })
1083        })
1084    }
1085
1086    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1087        self.state.config()
1088    }
1089
1090    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1091        self.state.entry_by_id.values()
1092    }
1093
1094    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1095        self.entries()
1096            .filter(|entry| entry.is_connection() && entry.id().is_user())
1097    }
1098
1099    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1100        self.entries()
1101            .filter(|entry| entry.is_table() && entry.id().is_user())
1102    }
1103
1104    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1105        self.entries()
1106            .filter(|entry| entry.is_source() && entry.id().is_user())
1107    }
1108
1109    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1110        self.entries()
1111            .filter(|entry| entry.is_sink() && entry.id().is_user())
1112    }
1113
1114    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1115        self.entries()
1116            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1117    }
1118
1119    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1120        self.entries()
1121            .filter(|entry| entry.is_secret() && entry.id().is_user())
1122    }
1123
1124    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1125        self.state.get_network_policy(&network_policy_id)
1126    }
1127
1128    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1129        self.state.try_get_network_policy_by_name(name)
1130    }
1131
1132    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1133        self.state.clusters_by_id.values()
1134    }
1135
1136    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1137        self.state.get_cluster(cluster_id)
1138    }
1139
1140    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1141        self.state.try_get_cluster(cluster_id)
1142    }
1143
1144    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1145        self.clusters().filter(|cluster| cluster.id.is_user())
1146    }
1147
1148    pub fn get_cluster_replica(
1149        &self,
1150        cluster_id: ClusterId,
1151        replica_id: ReplicaId,
1152    ) -> &ClusterReplica {
1153        self.state.get_cluster_replica(cluster_id, replica_id)
1154    }
1155
1156    pub fn try_get_cluster_replica(
1157        &self,
1158        cluster_id: ClusterId,
1159        replica_id: ReplicaId,
1160    ) -> Option<&ClusterReplica> {
1161        self.state.try_get_cluster_replica(cluster_id, replica_id)
1162    }
1163
1164    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1165        self.user_clusters()
1166            .flat_map(|cluster| cluster.user_replicas())
1167    }
1168
1169    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1170        self.state.database_by_id.values()
1171    }
1172
1173    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1174        self.state
1175            .roles_by_id
1176            .values()
1177            .filter(|role| role.is_user())
1178    }
1179
1180    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1181        self.state
1182            .network_policies_by_id
1183            .iter()
1184            .filter(|(id, _)| id.is_user())
1185            .map(|(_, policy)| policy)
1186    }
1187
1188    pub fn system_privileges(&self) -> &PrivilegeMap {
1189        &self.state.system_privileges
1190    }
1191
1192    pub fn default_privileges(
1193        &self,
1194    ) -> impl Iterator<
1195        Item = (
1196            &DefaultPrivilegeObject,
1197            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1198        ),
1199    > {
1200        self.state.default_privileges.iter()
1201    }
1202
1203    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1204        self.state
1205            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1206    }
1207
1208    pub fn pack_storage_usage_update(
1209        &self,
1210        event: VersionedStorageUsage,
1211        diff: Diff,
1212    ) -> BuiltinTableUpdate {
1213        self.state
1214            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1215    }
1216
1217    pub fn system_config(&self) -> &SystemVars {
1218        self.state.system_config()
1219    }
1220
1221    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1222        self.state.system_config_mut()
1223    }
1224
1225    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1226        self.state.ensure_not_reserved_role(role_id)
1227    }
1228
1229    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1230        self.state.ensure_grantable_role(role_id)
1231    }
1232
1233    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1234        self.state.ensure_not_system_role(role_id)
1235    }
1236
1237    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1238        self.state.ensure_not_predefined_role(role_id)
1239    }
1240
1241    pub fn ensure_not_reserved_network_policy(
1242        &self,
1243        network_policy_id: &NetworkPolicyId,
1244    ) -> Result<(), Error> {
1245        self.state
1246            .ensure_not_reserved_network_policy(network_policy_id)
1247    }
1248
1249    pub fn ensure_not_reserved_object(
1250        &self,
1251        object_id: &ObjectId,
1252        conn_id: &ConnectionId,
1253    ) -> Result<(), Error> {
1254        match object_id {
1255            ObjectId::Cluster(cluster_id) => {
1256                if cluster_id.is_system() {
1257                    let cluster = self.get_cluster(*cluster_id);
1258                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1259                        cluster.name().to_string(),
1260                    )))
1261                } else {
1262                    Ok(())
1263                }
1264            }
1265            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1266                if replica_id.is_system() {
1267                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1268                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1269                        replica.name().to_string(),
1270                    )))
1271                } else {
1272                    Ok(())
1273                }
1274            }
1275            ObjectId::Database(database_id) => {
1276                if database_id.is_system() {
1277                    let database = self.get_database(database_id);
1278                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1279                        database.name().to_string(),
1280                    )))
1281                } else {
1282                    Ok(())
1283                }
1284            }
1285            ObjectId::Schema((database_spec, schema_spec)) => {
1286                if schema_spec.is_system() {
1287                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1288                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1289                        schema.name().schema.clone(),
1290                    )))
1291                } else {
1292                    Ok(())
1293                }
1294            }
1295            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1296            ObjectId::Item(item_id) => {
1297                if item_id.is_system() {
1298                    let item = self.get_entry(item_id);
1299                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1300                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1301                } else {
1302                    Ok(())
1303                }
1304            }
1305            ObjectId::NetworkPolicy(network_policy_id) => {
1306                self.ensure_not_reserved_network_policy(network_policy_id)
1307            }
1308        }
1309    }
1310
1311    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1312    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1313        &mut self,
1314        create_sql: &str,
1315        force_if_exists_skip: bool,
1316    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1317        self.state
1318            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1319    }
1320
1321    /// Cache global and, optionally, local expressions for the given
1322    /// `GlobalId`.
1323    ///
1324    /// Takes the plans and metainfo directly as parameters (rather than
1325    /// fishing them out of catalog state), so this can be called **before**
1326    /// the catalog transaction that creates the item. Returns the future
1327    /// returned by [`Catalog::update_expression_cache`]; callers should
1328    /// `.await` it before the catalog transaction commits, so the durable
1329    /// expression cache is observed to contain the entries by the time any
1330    /// other process (or a subsequent bootstrap on this process) reads them.
1331    pub(crate) fn cache_expressions(
1332        &self,
1333        id: GlobalId,
1334        local_mir: Option<OptimizedMirRelationExpr>,
1335        mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1336        mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1337        dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1338        optimizer_features: OptimizerFeatures,
1339    ) -> BoxFuture<'static, ()> {
1340        // Make sure we're not caching the result of timestamp selection, as
1341        // it will almost certainly be wrong if we re-install the dataflow at
1342        // a later time.
1343        global_mir.as_of = None;
1344        global_mir.until = Default::default();
1345        physical_plan.as_of = None;
1346        physical_plan.until = Default::default();
1347
1348        let mut local_exprs = Vec::new();
1349        if let Some(local_mir) = local_mir {
1350            local_exprs.push((
1351                id,
1352                LocalExpressions {
1353                    local_mir,
1354                    optimizer_features: optimizer_features.clone(),
1355                },
1356            ));
1357        }
1358        let global_exprs = vec![(
1359            id,
1360            GlobalExpressions {
1361                global_mir,
1362                physical_plan,
1363                dataflow_metainfos,
1364                optimizer_features,
1365            },
1366        )];
1367        self.update_expression_cache(local_exprs, global_exprs, Default::default())
1368    }
1369
1370    pub(crate) fn update_expression_cache<'a, 'b>(
1371        &'a self,
1372        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1373        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1374        invalidate_ids: BTreeSet<GlobalId>,
1375    ) -> BoxFuture<'b, ()> {
1376        if let Some(expr_cache) = &self.expr_cache_handle {
1377            expr_cache
1378                .update(
1379                    new_local_expressions,
1380                    new_global_expressions,
1381                    invalidate_ids,
1382                )
1383                .boxed()
1384        } else {
1385            async {}.boxed()
1386        }
1387    }
1388
1389    /// Listen for and apply all unconsumed updates to the durable catalog state.
1390    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1391    // `#[cfg(test)]` annotation.
1392    #[cfg(test)]
1393    async fn sync_to_current_updates(
1394        &mut self,
1395    ) -> Result<
1396        (
1397            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1398            Vec<ParsedStateUpdate>,
1399        ),
1400        CatalogError,
1401    > {
1402        let updates = self.storage().await.sync_to_current_updates().await?;
1403        let (builtin_table_updates, catalog_updates) = self
1404            .state
1405            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1406            .await;
1407        Ok((builtin_table_updates, catalog_updates))
1408    }
1409}
1410
1411pub fn is_reserved_name(name: &str) -> bool {
1412    BUILTIN_PREFIXES
1413        .iter()
1414        .any(|prefix| name.starts_with(prefix))
1415}
1416
1417pub fn is_reserved_role_name(name: &str) -> bool {
1418    is_reserved_name(name) || is_public_role(name)
1419}
1420
1421pub fn is_public_role(name: &str) -> bool {
1422    name == &*PUBLIC_ROLE_NAME
1423}
1424
1425pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1426    object_type_to_audit_object_type(sql_type.into())
1427}
1428
1429pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1430    match id {
1431        CommentObjectId::Table(_) => ObjectType::Table,
1432        CommentObjectId::View(_) => ObjectType::View,
1433        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1434        CommentObjectId::Source(_) => ObjectType::Source,
1435        CommentObjectId::Sink(_) => ObjectType::Sink,
1436        CommentObjectId::Index(_) => ObjectType::Index,
1437        CommentObjectId::Func(_) => ObjectType::Func,
1438        CommentObjectId::Connection(_) => ObjectType::Connection,
1439        CommentObjectId::Type(_) => ObjectType::Type,
1440        CommentObjectId::Secret(_) => ObjectType::Secret,
1441        CommentObjectId::Role(_) => ObjectType::Role,
1442        CommentObjectId::Database(_) => ObjectType::Database,
1443        CommentObjectId::Schema(_) => ObjectType::Schema,
1444        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1445        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1446        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1447    }
1448}
1449
1450pub(crate) fn object_type_to_audit_object_type(
1451    object_type: mz_sql::catalog::ObjectType,
1452) -> ObjectType {
1453    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1454}
1455
1456pub(crate) fn system_object_type_to_audit_object_type(
1457    system_type: &SystemObjectType,
1458) -> ObjectType {
1459    match system_type {
1460        SystemObjectType::Object(object_type) => match object_type {
1461            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1462            mz_sql::catalog::ObjectType::View => ObjectType::View,
1463            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1464            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1465            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1466            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1467            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1468            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1469            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1470            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1471            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1472            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1473            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1474            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1475            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1476            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1477        },
1478        SystemObjectType::System => ObjectType::System,
1479    }
1480}
1481
1482#[derive(Debug, Copy, Clone)]
1483pub enum UpdatePrivilegeVariant {
1484    Grant,
1485    Revoke,
1486}
1487
1488impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1489    fn from(variant: UpdatePrivilegeVariant) -> Self {
1490        match variant {
1491            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1492            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1493        }
1494    }
1495}
1496
1497impl From<UpdatePrivilegeVariant> for EventType {
1498    fn from(variant: UpdatePrivilegeVariant) -> Self {
1499        match variant {
1500            UpdatePrivilegeVariant::Grant => EventType::Grant,
1501            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1502        }
1503    }
1504}
1505
1506impl ConnCatalog<'_> {
1507    fn resolve_item_name(
1508        &self,
1509        name: &PartialItemName,
1510    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1511        self.resolve_item(name).map(|entry| entry.name())
1512    }
1513
1514    fn resolve_function_name(
1515        &self,
1516        name: &PartialItemName,
1517    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1518        self.resolve_function(name).map(|entry| entry.name())
1519    }
1520
1521    fn resolve_type_name(
1522        &self,
1523        name: &PartialItemName,
1524    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1525        self.resolve_type(name).map(|entry| entry.name())
1526    }
1527}
1528
1529impl ExprHumanizer for ConnCatalog<'_> {
1530    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1531        let entry = self.state.try_get_entry_by_global_id(&id)?;
1532        Some(self.resolve_full_name(entry.name()).to_string())
1533    }
1534
1535    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1536        let entry = self.state.try_get_entry_by_global_id(&id)?;
1537        Some(entry.name().item.clone())
1538    }
1539
1540    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1541        let entry = self.state.try_get_entry_by_global_id(&id)?;
1542        Some(self.resolve_full_name(entry.name()).into_parts())
1543    }
1544
1545    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1546        use SqlScalarType::*;
1547
1548        match typ {
1549            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1550            List {
1551                custom_id: Some(item_id),
1552                ..
1553            }
1554            | Map {
1555                custom_id: Some(item_id),
1556                ..
1557            } => {
1558                let item = self.get_item(item_id);
1559                self.minimal_qualification(item.name()).to_string()
1560            }
1561            List { element_type, .. } => {
1562                format!(
1563                    "{} list",
1564                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1565                )
1566            }
1567            Map { value_type, .. } => format!(
1568                "map[{}=>{}]",
1569                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1570                self.humanize_sql_scalar_type(value_type, postgres_compat)
1571            ),
1572            Record {
1573                custom_id: Some(item_id),
1574                ..
1575            } => {
1576                let item = self.get_item(item_id);
1577                self.minimal_qualification(item.name()).to_string()
1578            }
1579            Record { fields, .. } => format!(
1580                "record({})",
1581                fields
1582                    .iter()
1583                    .map(|f| format!(
1584                        "{}: {}",
1585                        f.0,
1586                        self.humanize_sql_column_type(&f.1, postgres_compat)
1587                    ))
1588                    .join(",")
1589            ),
1590            PgLegacyChar => "\"char\"".into(),
1591            Char { length } if !postgres_compat => match length {
1592                None => "char".into(),
1593                Some(length) => format!("char({})", length.into_u32()),
1594            },
1595            VarChar { max_length } if !postgres_compat => match max_length {
1596                None => "varchar".into(),
1597                Some(length) => format!("varchar({})", length.into_u32()),
1598            },
1599            UInt16 => "uint2".into(),
1600            UInt32 => "uint4".into(),
1601            UInt64 => "uint8".into(),
1602            ty => {
1603                let pgrepr_type = mz_pgrepr::Type::from(ty);
1604                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1605
1606                let res = if self
1607                    .effective_search_path(true)
1608                    .iter()
1609                    .any(|(_, schema)| schema == &pg_catalog_schema)
1610                {
1611                    pgrepr_type.name().to_string()
1612                } else {
1613                    // If PG_CATALOG_SCHEMA is not in search path, you need
1614                    // qualified object name to refer to type.
1615                    let name = QualifiedItemName {
1616                        qualifiers: ItemQualifiers {
1617                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1618                            schema_spec: pg_catalog_schema,
1619                        },
1620                        item: pgrepr_type.name().to_string(),
1621                    };
1622                    self.resolve_full_name(&name).to_string()
1623                };
1624                res
1625            }
1626        }
1627    }
1628
1629    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1630        let entry = self.state.try_get_entry_by_global_id(&id)?;
1631
1632        match entry.index() {
1633            Some(index) => {
1634                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1635                let mut on_names = on_desc
1636                    .iter_names()
1637                    .map(|col_name| col_name.to_string())
1638                    .collect::<Vec<_>>();
1639
1640                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1641
1642                // Init ix_names with unknown column names. Unknown columns are
1643                // represented as an empty String and rendered as `#c` by the
1644                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1645                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1646                let mut ix_names = vec![String::new(); ix_arity];
1647
1648                // Apply the permutation by swapping on_names with ix_names.
1649                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1650                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1651                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1652                    std::mem::swap(on_name, ix_name);
1653                }
1654
1655                Some(ix_names) // Return the updated ix_names vector.
1656            }
1657            None => {
1658                let desc = self.state.try_get_desc_by_global_id(&id)?;
1659                let column_names = desc
1660                    .iter_names()
1661                    .map(|col_name| col_name.to_string())
1662                    .collect();
1663
1664                Some(column_names)
1665            }
1666        }
1667    }
1668
1669    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1670        let desc = self.state.try_get_desc_by_global_id(&id)?;
1671        Some(desc.get_name(column).to_string())
1672    }
1673
1674    fn id_exists(&self, id: GlobalId) -> bool {
1675        self.state.entry_by_global_id.contains_key(&id)
1676    }
1677}
1678
1679impl SessionCatalog for ConnCatalog<'_> {
1680    fn active_role_id(&self) -> &RoleId {
1681        &self.role_id
1682    }
1683
1684    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1685        self.prepared_statements
1686            .as_ref()
1687            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1688            .flatten()
1689    }
1690
1691    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1692        self.portals
1693            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1694    }
1695
1696    fn active_database(&self) -> Option<&DatabaseId> {
1697        self.database.as_ref()
1698    }
1699
1700    fn active_cluster(&self) -> &str {
1701        &self.cluster
1702    }
1703
1704    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1705        &self.search_path
1706    }
1707
1708    fn resolve_database(
1709        &self,
1710        database_name: &str,
1711    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1712        Ok(self.state.resolve_database(database_name)?)
1713    }
1714
1715    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1716        self.state
1717            .database_by_id
1718            .get(id)
1719            .expect("database doesn't exist")
1720    }
1721
1722    // `as` is ok to use to cast to a trait object.
1723    #[allow(clippy::as_conversions)]
1724    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1725        self.state
1726            .database_by_id
1727            .values()
1728            .map(|database| database as &dyn CatalogDatabase)
1729            .collect()
1730    }
1731
1732    fn resolve_schema(
1733        &self,
1734        database_name: Option<&str>,
1735        schema_name: &str,
1736    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1737        Ok(self.state.resolve_schema(
1738            self.database.as_ref(),
1739            database_name,
1740            schema_name,
1741            &self.conn_id,
1742        )?)
1743    }
1744
1745    fn resolve_schema_in_database(
1746        &self,
1747        database_spec: &ResolvedDatabaseSpecifier,
1748        schema_name: &str,
1749    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1750        Ok(self
1751            .state
1752            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1753    }
1754
1755    fn get_schema(
1756        &self,
1757        database_spec: &ResolvedDatabaseSpecifier,
1758        schema_spec: &SchemaSpecifier,
1759    ) -> &dyn CatalogSchema {
1760        self.state
1761            .get_schema(database_spec, schema_spec, &self.conn_id)
1762    }
1763
1764    // `as` is ok to use to cast to a trait object.
1765    #[allow(clippy::as_conversions)]
1766    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1767        self.get_databases()
1768            .into_iter()
1769            .flat_map(|database| database.schemas().into_iter())
1770            .chain(
1771                self.state
1772                    .ambient_schemas_by_id
1773                    .values()
1774                    .chain(self.state.temporary_schemas.values())
1775                    .map(|schema| schema as &dyn CatalogSchema),
1776            )
1777            .collect()
1778    }
1779
1780    fn get_mz_internal_schema_id(&self) -> SchemaId {
1781        self.state().get_mz_internal_schema_id()
1782    }
1783
1784    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1785        self.state().get_mz_unsafe_schema_id()
1786    }
1787
1788    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1789        self.state.is_system_schema_specifier(schema)
1790    }
1791
1792    fn resolve_role(
1793        &self,
1794        role_name: &str,
1795    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1796        match self.state.try_get_role_by_name(role_name) {
1797            Some(role) => Ok(role),
1798            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1799        }
1800    }
1801
1802    fn resolve_network_policy(
1803        &self,
1804        policy_name: &str,
1805    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1806        match self.state.try_get_network_policy_by_name(policy_name) {
1807            Some(policy) => Ok(policy),
1808            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1809        }
1810    }
1811
1812    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1813        Some(self.state.roles_by_id.get(id)?)
1814    }
1815
1816    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1817        self.state.get_role(id)
1818    }
1819
1820    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1821        // `as` is ok to use to cast to a trait object.
1822        #[allow(clippy::as_conversions)]
1823        self.state
1824            .roles_by_id
1825            .values()
1826            .map(|role| role as &dyn CatalogRole)
1827            .collect()
1828    }
1829
1830    fn mz_system_role_id(&self) -> RoleId {
1831        MZ_SYSTEM_ROLE_ID
1832    }
1833
1834    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1835        self.state.collect_role_membership(id)
1836    }
1837
1838    fn get_network_policy(
1839        &self,
1840        id: &NetworkPolicyId,
1841    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1842        self.state.get_network_policy(id)
1843    }
1844
1845    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1846        // `as` is ok to use to cast to a trait object.
1847        #[allow(clippy::as_conversions)]
1848        self.state
1849            .network_policies_by_id
1850            .values()
1851            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1852            .collect()
1853    }
1854
1855    fn resolve_cluster(
1856        &self,
1857        cluster_name: Option<&str>,
1858    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1859        Ok(self
1860            .state
1861            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1862    }
1863
1864    fn resolve_cluster_replica(
1865        &self,
1866        cluster_replica_name: &QualifiedReplica,
1867    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1868        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1869    }
1870
1871    fn resolve_item(
1872        &self,
1873        name: &PartialItemName,
1874    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1875        let r = self.state.resolve_entry(
1876            self.database.as_ref(),
1877            &self.effective_search_path(true),
1878            name,
1879            &self.conn_id,
1880        )?;
1881        if self.unresolvable_ids.contains(&r.id()) {
1882            Err(SqlCatalogError::UnknownItem(name.to_string()))
1883        } else {
1884            Ok(r)
1885        }
1886    }
1887
1888    fn resolve_function(
1889        &self,
1890        name: &PartialItemName,
1891    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1892        let r = self.state.resolve_function(
1893            self.database.as_ref(),
1894            &self.effective_search_path(false),
1895            name,
1896            &self.conn_id,
1897        )?;
1898
1899        if self.unresolvable_ids.contains(&r.id()) {
1900            Err(SqlCatalogError::UnknownFunction {
1901                name: name.to_string(),
1902                alternative: None,
1903            })
1904        } else {
1905            Ok(r)
1906        }
1907    }
1908
1909    fn resolve_type(
1910        &self,
1911        name: &PartialItemName,
1912    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1913        let r = self.state.resolve_type(
1914            self.database.as_ref(),
1915            &self.effective_search_path(false),
1916            name,
1917            &self.conn_id,
1918        )?;
1919
1920        if self.unresolvable_ids.contains(&r.id()) {
1921            Err(SqlCatalogError::UnknownType {
1922                name: name.to_string(),
1923            })
1924        } else {
1925            Ok(r)
1926        }
1927    }
1928
1929    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1930        self.state.get_system_type(name)
1931    }
1932
1933    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1934        Some(self.state.try_get_entry(id)?)
1935    }
1936
1937    fn try_get_item_by_global_id(
1938        &self,
1939        id: &GlobalId,
1940    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1941        let entry = self.state.try_get_entry_by_global_id(id)?;
1942        let entry = match &entry.item {
1943            CatalogItem::Table(table) => {
1944                let (version, _gid) = table
1945                    .collections
1946                    .iter()
1947                    .find(|(_version, gid)| *gid == id)
1948                    .expect("catalog out of sync, mismatched GlobalId");
1949                entry.at_version(RelationVersionSelector::Specific(*version))
1950            }
1951            _ => entry.at_version(RelationVersionSelector::Latest),
1952        };
1953        Some(entry)
1954    }
1955
1956    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1957        self.state.get_entry(id)
1958    }
1959
1960    fn get_item_by_global_id(
1961        &self,
1962        id: &GlobalId,
1963    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1964        let entry = self.state.get_entry_by_global_id(id);
1965        let entry = match &entry.item {
1966            CatalogItem::Table(table) => {
1967                let (version, _gid) = table
1968                    .collections
1969                    .iter()
1970                    .find(|(_version, gid)| *gid == id)
1971                    .expect("catalog out of sync, mismatched GlobalId");
1972                entry.at_version(RelationVersionSelector::Specific(*version))
1973            }
1974            _ => entry.at_version(RelationVersionSelector::Latest),
1975        };
1976        entry
1977    }
1978
1979    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
1980        self.get_schemas()
1981            .into_iter()
1982            .flat_map(|schema| schema.item_ids())
1983            .map(|id| self.get_item(&id))
1984            .collect()
1985    }
1986
1987    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1988        self.state
1989            .get_item_by_name(name, &self.conn_id)
1990            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1991    }
1992
1993    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1994        self.state
1995            .get_type_by_name(name, &self.conn_id)
1996            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1997    }
1998
1999    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2000        &self.state.clusters_by_id[&id]
2001    }
2002
2003    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2004        self.state
2005            .clusters_by_id
2006            .values()
2007            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2008            .collect()
2009    }
2010
2011    fn get_cluster_replica(
2012        &self,
2013        cluster_id: ClusterId,
2014        replica_id: ReplicaId,
2015    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2016        let cluster = self.get_cluster(cluster_id);
2017        cluster.replica(replica_id)
2018    }
2019
2020    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2021        self.get_clusters()
2022            .into_iter()
2023            .flat_map(|cluster| cluster.replicas().into_iter())
2024            .collect()
2025    }
2026
2027    fn get_system_privileges(&self) -> &PrivilegeMap {
2028        &self.state.system_privileges
2029    }
2030
2031    fn get_default_privileges(
2032        &self,
2033    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2034        self.state
2035            .default_privileges
2036            .iter()
2037            .map(|(object, acl_items)| (object, acl_items.collect()))
2038            .collect()
2039    }
2040
2041    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2042        self.state.find_available_name(name, &self.conn_id)
2043    }
2044
2045    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2046        self.state.resolve_full_name(name, Some(&self.conn_id))
2047    }
2048
2049    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2050        self.state.resolve_full_schema_name(name)
2051    }
2052
2053    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2054        self.state.get_entry_by_global_id(global_id).id()
2055    }
2056
2057    fn resolve_global_id(
2058        &self,
2059        item_id: &CatalogItemId,
2060        version: RelationVersionSelector,
2061    ) -> GlobalId {
2062        self.state
2063            .get_entry(item_id)
2064            .at_version(version)
2065            .global_id()
2066    }
2067
2068    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2069        self.state.config()
2070    }
2071
2072    fn now(&self) -> EpochMillis {
2073        (self.state.config().now)()
2074    }
2075
2076    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2077        self.state.aws_privatelink_availability_zones.clone()
2078    }
2079
2080    fn system_vars(&self) -> &SystemVars {
2081        &self.state.system_configuration
2082    }
2083
2084    fn system_vars_mut(&mut self) -> &mut SystemVars {
2085        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2086    }
2087
2088    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2089        self.state().get_owner_id(id, self.conn_id())
2090    }
2091
2092    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2093        match id {
2094            SystemObjectId::System => Some(&self.state.system_privileges),
2095            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2096                Some(self.get_cluster(*id).privileges())
2097            }
2098            SystemObjectId::Object(ObjectId::Database(id)) => {
2099                Some(self.get_database(id).privileges())
2100            }
2101            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2102                // For temporary schemas that haven't been created yet (lazy creation),
2103                // we return None - the RBAC check will need to handle this case.
2104                self.state
2105                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2106                    .map(|schema| schema.privileges())
2107            }
2108            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2109            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2110                Some(self.get_network_policy(id).privileges())
2111            }
2112            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2113            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2114        }
2115    }
2116
2117    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2118        let mut seen = BTreeSet::new();
2119        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2120    }
2121
2122    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2123        let mut seen = BTreeSet::new();
2124        self.state.item_dependents(id, &mut seen)
2125    }
2126
2127    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2128        rbac::all_object_privileges(object_type)
2129    }
2130
2131    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2132        self.state.get_object_type(object_id)
2133    }
2134
2135    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2136        self.state.get_system_object_type(id)
2137    }
2138
2139    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2140    /// the object.
2141    ///
2142    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2143    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2144    /// only for error msgs and other humanizations.
2145    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2146        if qualified_name.qualifiers.schema_spec.is_temporary() {
2147            // All bets are off. Just give up and return the qualified name as is.
2148            // TODO: Figure out what's going on with temporary objects.
2149
2150            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2151            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2152            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2153            // which we don't have a simple repro.
2154            return qualified_name.item.clone().into();
2155        }
2156
2157        let database_id = match &qualified_name.qualifiers.database_spec {
2158            ResolvedDatabaseSpecifier::Ambient => None,
2159            ResolvedDatabaseSpecifier::Id(id)
2160                if self.database.is_some() && self.database == Some(*id) =>
2161            {
2162                None
2163            }
2164            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2165        };
2166
2167        let schema_spec = if database_id.is_none()
2168            && self.resolve_item_name(&PartialItemName {
2169                database: None,
2170                schema: None,
2171                item: qualified_name.item.clone(),
2172            }) == Ok(qualified_name)
2173            || self.resolve_function_name(&PartialItemName {
2174                database: None,
2175                schema: None,
2176                item: qualified_name.item.clone(),
2177            }) == Ok(qualified_name)
2178            || self.resolve_type_name(&PartialItemName {
2179                database: None,
2180                schema: None,
2181                item: qualified_name.item.clone(),
2182            }) == Ok(qualified_name)
2183        {
2184            None
2185        } else {
2186            // If `search_path` does not contain `full_name.schema`, the
2187            // `PartialName` must contain it.
2188            Some(qualified_name.qualifiers.schema_spec.clone())
2189        };
2190
2191        let res = PartialItemName {
2192            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2193            schema: schema_spec.map(|spec| {
2194                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2195                    .name()
2196                    .schema
2197                    .clone()
2198            }),
2199            item: qualified_name.item.clone(),
2200        };
2201        assert!(
2202            self.resolve_item_name(&res) == Ok(qualified_name)
2203                || self.resolve_function_name(&res) == Ok(qualified_name)
2204                || self.resolve_type_name(&res) == Ok(qualified_name)
2205        );
2206        res
2207    }
2208
2209    fn add_notice(&self, notice: PlanNotice) {
2210        let _ = self.notices_tx.send(notice.into());
2211    }
2212
2213    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2214        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2215        self.state.comments.get_object_comments(comment_id)
2216    }
2217
2218    fn is_cluster_size_cc(&self, size: &str) -> bool {
2219        self.state
2220            .cluster_replica_sizes
2221            .0
2222            .get(size)
2223            .map_or(false, |a| a.is_cc)
2224    }
2225}
2226
2227#[cfg(test)]
2228mod tests {
2229    use std::collections::{BTreeMap, BTreeSet};
2230    use std::sync::Arc;
2231    use std::{env, iter};
2232
2233    use itertools::Itertools;
2234    use mz_catalog::memory::objects::CatalogItem;
2235    use tokio_postgres::NoTls;
2236    use tokio_postgres::types::Type;
2237    use uuid::Uuid;
2238
2239    use mz_catalog::SYSTEM_CONN_ID;
2240    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2241    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2242    use mz_controller_types::{ClusterId, ReplicaId};
2243    use mz_expr::MirScalarExpr;
2244    use mz_ore::now::to_datetime;
2245    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2246    use mz_persist_client::PersistClient;
2247    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2248    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2249    use mz_repr::role_id::RoleId;
2250    use mz_repr::{
2251        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2252        SqlScalarType, Timestamp,
2253    };
2254    use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2255    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2256    use mz_sql::names::{
2257        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2258        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2259    };
2260    use mz_sql::plan::{
2261        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2262        QueryLifetime, Scope, StatementContext,
2263    };
2264    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2265    use mz_sql::session::vars::{SystemVars, VarInput};
2266
2267    use crate::catalog::state::LocalExpressionCache;
2268    use crate::catalog::{Catalog, Op};
2269    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2270    use crate::session::Session;
2271
2272    /// System sessions have an empty `search_path` so it's necessary to
2273    /// schema-qualify all referenced items.
2274    ///
2275    /// Dummy (and ostensibly client) sessions contain system schemas in their
2276    /// search paths, so do not require schema qualification on system objects such
2277    /// as types.
2278    #[mz_ore::test(tokio::test)]
2279    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2280    async fn test_minimal_qualification() {
2281        Catalog::with_debug(|catalog| async move {
2282            struct TestCase {
2283                input: QualifiedItemName,
2284                system_output: PartialItemName,
2285                normal_output: PartialItemName,
2286            }
2287
2288            let test_cases = vec![
2289                TestCase {
2290                    input: QualifiedItemName {
2291                        qualifiers: ItemQualifiers {
2292                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2293                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2294                        },
2295                        item: "numeric".to_string(),
2296                    },
2297                    system_output: PartialItemName {
2298                        database: None,
2299                        schema: None,
2300                        item: "numeric".to_string(),
2301                    },
2302                    normal_output: PartialItemName {
2303                        database: None,
2304                        schema: None,
2305                        item: "numeric".to_string(),
2306                    },
2307                },
2308                TestCase {
2309                    input: QualifiedItemName {
2310                        qualifiers: ItemQualifiers {
2311                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2312                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2313                        },
2314                        item: "mz_array_types".to_string(),
2315                    },
2316                    system_output: PartialItemName {
2317                        database: None,
2318                        schema: None,
2319                        item: "mz_array_types".to_string(),
2320                    },
2321                    normal_output: PartialItemName {
2322                        database: None,
2323                        schema: None,
2324                        item: "mz_array_types".to_string(),
2325                    },
2326                },
2327            ];
2328
2329            for tc in test_cases {
2330                assert_eq!(
2331                    catalog
2332                        .for_system_session()
2333                        .minimal_qualification(&tc.input),
2334                    tc.system_output
2335                );
2336                assert_eq!(
2337                    catalog
2338                        .for_session(&Session::dummy())
2339                        .minimal_qualification(&tc.input),
2340                    tc.normal_output
2341                );
2342            }
2343            catalog.expire().await;
2344        })
2345        .await
2346    }
2347
2348    #[mz_ore::test(tokio::test)]
2349    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2350    async fn test_catalog_revision() {
2351        let persist_client = PersistClient::new_for_tests().await;
2352        let organization_id = Uuid::new_v4();
2353        let bootstrap_args = test_bootstrap_args();
2354        {
2355            let mut catalog = Catalog::open_debug_catalog(
2356                persist_client.clone(),
2357                organization_id.clone(),
2358                &bootstrap_args,
2359            )
2360            .await
2361            .expect("unable to open debug catalog");
2362            assert_eq!(catalog.transient_revision(), 1);
2363            let commit_ts = catalog.current_upper().await;
2364            catalog
2365                .transact(
2366                    None,
2367                    commit_ts,
2368                    None,
2369                    vec![Op::CreateDatabase {
2370                        name: "test".to_string(),
2371                        owner_id: MZ_SYSTEM_ROLE_ID,
2372                    }],
2373                )
2374                .await
2375                .expect("failed to transact");
2376            assert_eq!(catalog.transient_revision(), 2);
2377            catalog.expire().await;
2378        }
2379        {
2380            let catalog =
2381                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2382                    .await
2383                    .expect("unable to open debug catalog");
2384            // Re-opening the same catalog resets the transient_revision to 1.
2385            assert_eq!(catalog.transient_revision(), 1);
2386            catalog.expire().await;
2387        }
2388    }
2389
2390    #[mz_ore::test(tokio::test)]
2391    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2392    async fn test_effective_search_path() {
2393        Catalog::with_debug(|catalog| async move {
2394            let mz_catalog_schema = (
2395                ResolvedDatabaseSpecifier::Ambient,
2396                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2397            );
2398            let pg_catalog_schema = (
2399                ResolvedDatabaseSpecifier::Ambient,
2400                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2401            );
2402            let mz_temp_schema = (
2403                ResolvedDatabaseSpecifier::Ambient,
2404                SchemaSpecifier::Temporary,
2405            );
2406
2407            // Behavior with the default search_schema (public)
2408            let session = Session::dummy();
2409            let conn_catalog = catalog.for_session(&session);
2410            assert_ne!(
2411                conn_catalog.effective_search_path(false),
2412                conn_catalog.search_path
2413            );
2414            assert_ne!(
2415                conn_catalog.effective_search_path(true),
2416                conn_catalog.search_path
2417            );
2418            assert_eq!(
2419                conn_catalog.effective_search_path(false),
2420                vec![
2421                    mz_catalog_schema.clone(),
2422                    pg_catalog_schema.clone(),
2423                    conn_catalog.search_path[0].clone()
2424                ]
2425            );
2426            assert_eq!(
2427                conn_catalog.effective_search_path(true),
2428                vec![
2429                    mz_temp_schema.clone(),
2430                    mz_catalog_schema.clone(),
2431                    pg_catalog_schema.clone(),
2432                    conn_catalog.search_path[0].clone()
2433                ]
2434            );
2435
2436            // missing schemas are added when missing
2437            let mut session = Session::dummy();
2438            session
2439                .vars_mut()
2440                .set(
2441                    &SystemVars::new(),
2442                    "search_path",
2443                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2444                    false,
2445                )
2446                .expect("failed to set search_path");
2447            let conn_catalog = catalog.for_session(&session);
2448            assert_ne!(
2449                conn_catalog.effective_search_path(false),
2450                conn_catalog.search_path
2451            );
2452            assert_ne!(
2453                conn_catalog.effective_search_path(true),
2454                conn_catalog.search_path
2455            );
2456            assert_eq!(
2457                conn_catalog.effective_search_path(false),
2458                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2459            );
2460            assert_eq!(
2461                conn_catalog.effective_search_path(true),
2462                vec![
2463                    mz_temp_schema.clone(),
2464                    mz_catalog_schema.clone(),
2465                    pg_catalog_schema.clone()
2466                ]
2467            );
2468
2469            let mut session = Session::dummy();
2470            session
2471                .vars_mut()
2472                .set(
2473                    &SystemVars::new(),
2474                    "search_path",
2475                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2476                    false,
2477                )
2478                .expect("failed to set search_path");
2479            let conn_catalog = catalog.for_session(&session);
2480            assert_ne!(
2481                conn_catalog.effective_search_path(false),
2482                conn_catalog.search_path
2483            );
2484            assert_ne!(
2485                conn_catalog.effective_search_path(true),
2486                conn_catalog.search_path
2487            );
2488            assert_eq!(
2489                conn_catalog.effective_search_path(false),
2490                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2491            );
2492            assert_eq!(
2493                conn_catalog.effective_search_path(true),
2494                vec![
2495                    mz_temp_schema.clone(),
2496                    pg_catalog_schema.clone(),
2497                    mz_catalog_schema.clone()
2498                ]
2499            );
2500
2501            let mut session = Session::dummy();
2502            session
2503                .vars_mut()
2504                .set(
2505                    &SystemVars::new(),
2506                    "search_path",
2507                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2508                    false,
2509                )
2510                .expect("failed to set search_path");
2511            let conn_catalog = catalog.for_session(&session);
2512            assert_ne!(
2513                conn_catalog.effective_search_path(false),
2514                conn_catalog.search_path
2515            );
2516            assert_ne!(
2517                conn_catalog.effective_search_path(true),
2518                conn_catalog.search_path
2519            );
2520            assert_eq!(
2521                conn_catalog.effective_search_path(false),
2522                vec![
2523                    mz_catalog_schema.clone(),
2524                    pg_catalog_schema.clone(),
2525                    mz_temp_schema.clone()
2526                ]
2527            );
2528            assert_eq!(
2529                conn_catalog.effective_search_path(true),
2530                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2531            );
2532            catalog.expire().await;
2533        })
2534        .await
2535    }
2536
2537    #[mz_ore::test(tokio::test)]
2538    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2539    async fn test_normalized_create() {
2540        use mz_ore::collections::CollectionExt;
2541        Catalog::with_debug(|catalog| async move {
2542            let conn_catalog = catalog.for_system_session();
2543            let scx = &mut StatementContext::new(None, &conn_catalog);
2544
2545            let parsed = mz_sql_parser::parser::parse_statements(
2546                "create view public.foo as select 1 as bar",
2547            )
2548            .expect("")
2549            .into_element()
2550            .ast;
2551
2552            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2553
2554            // Ensure that all identifiers are quoted.
2555            assert_eq!(
2556                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2557                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2558            );
2559            catalog.expire().await;
2560        })
2561        .await;
2562    }
2563
2564    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2565    #[mz_ore::test(tokio::test)]
2566    #[cfg_attr(miri, ignore)] // slow
2567    async fn test_large_catalog_item() {
2568        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2569        let column = ", 1";
2570        let view_def_size = view_def.bytes().count();
2571        let column_size = column.bytes().count();
2572        let column_count =
2573            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2574        let columns = iter::repeat(column).take(column_count).join("");
2575        let create_sql = format!("{view_def}{columns})");
2576        let create_sql_check = create_sql.clone();
2577        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2578        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2579            &create_sql
2580        ));
2581
2582        let persist_client = PersistClient::new_for_tests().await;
2583        let organization_id = Uuid::new_v4();
2584        let id = CatalogItemId::User(1);
2585        let gid = GlobalId::User(1);
2586        let bootstrap_args = test_bootstrap_args();
2587        {
2588            let mut catalog = Catalog::open_debug_catalog(
2589                persist_client.clone(),
2590                organization_id.clone(),
2591                &bootstrap_args,
2592            )
2593            .await
2594            .expect("unable to open debug catalog");
2595            let item = catalog
2596                .state()
2597                .deserialize_item(
2598                    gid,
2599                    &create_sql,
2600                    &BTreeMap::new(),
2601                    &mut LocalExpressionCache::Closed,
2602                    None,
2603                )
2604                .expect("unable to parse view");
2605            let commit_ts = catalog.current_upper().await;
2606            catalog
2607                .transact(
2608                    None,
2609                    commit_ts,
2610                    None,
2611                    vec![Op::CreateItem {
2612                        item,
2613                        name: QualifiedItemName {
2614                            qualifiers: ItemQualifiers {
2615                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2616                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2617                            },
2618                            item: "v".to_string(),
2619                        },
2620                        id,
2621                        owner_id: MZ_SYSTEM_ROLE_ID,
2622                    }],
2623                )
2624                .await
2625                .expect("failed to transact");
2626            catalog.expire().await;
2627        }
2628        {
2629            let catalog =
2630                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2631                    .await
2632                    .expect("unable to open debug catalog");
2633            let view = catalog.get_entry(&id);
2634            assert_eq!("v", view.name.item);
2635            match &view.item {
2636                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2637                item => panic!("expected view, got {}", item.typ()),
2638            }
2639            catalog.expire().await;
2640        }
2641    }
2642
2643    #[mz_ore::test(tokio::test)]
2644    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2645    async fn test_object_type() {
2646        Catalog::with_debug(|catalog| async move {
2647            let conn_catalog = catalog.for_system_session();
2648
2649            assert_eq!(
2650                mz_sql::catalog::ObjectType::ClusterReplica,
2651                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2652                    ClusterId::user(1).expect("1 is a valid ID"),
2653                    ReplicaId::User(1)
2654                )))
2655            );
2656            assert_eq!(
2657                mz_sql::catalog::ObjectType::Role,
2658                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2659            );
2660            catalog.expire().await;
2661        })
2662        .await;
2663    }
2664
2665    #[mz_ore::test(tokio::test)]
2666    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2667    async fn test_get_privileges() {
2668        Catalog::with_debug(|catalog| async move {
2669            let conn_catalog = catalog.for_system_session();
2670
2671            assert_eq!(
2672                None,
2673                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2674                    ClusterId::user(1).expect("1 is a valid ID"),
2675                    ReplicaId::User(1),
2676                ))))
2677            );
2678            assert_eq!(
2679                None,
2680                conn_catalog
2681                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2682            );
2683            catalog.expire().await;
2684        })
2685        .await;
2686    }
2687
2688    #[mz_ore::test(tokio::test)]
2689    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2690    async fn verify_builtin_descs() {
2691        Catalog::with_debug(|catalog| async move {
2692            let conn_catalog = catalog.for_system_session();
2693
2694            for builtin in BUILTINS::iter() {
2695                let (schema, name, expected_desc) = match builtin {
2696                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2697                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2698                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2699                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2700                    Builtin::Log(_)
2701                    | Builtin::Type(_)
2702                    | Builtin::Func(_)
2703                    | Builtin::Index(_)
2704                    | Builtin::Connection(_) => continue,
2705                };
2706                let item = conn_catalog
2707                    .resolve_item(&PartialItemName {
2708                        database: None,
2709                        schema: Some(schema.to_string()),
2710                        item: name.to_string(),
2711                    })
2712                    .expect("unable to resolve item")
2713                    .at_version(RelationVersionSelector::Latest);
2714
2715                let actual_desc = item.relation_desc().expect("invalid item type");
2716                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2717                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2718                {
2719                    assert_eq!(
2720                        actual_name, expected_name,
2721                        "item {schema}.{name} column {index} name did not match its expected name"
2722                    );
2723                    assert_eq!(
2724                        actual_typ, expected_typ,
2725                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2726                    );
2727                }
2728                assert_eq!(
2729                    &*actual_desc, expected_desc,
2730                    "item {schema}.{name} did not match its expected RelationDesc"
2731                );
2732            }
2733            catalog.expire().await;
2734        })
2735        .await
2736    }
2737
2738    // Connect to a running Postgres server and verify that our builtin
2739    // types and functions match it, in addition to some other things.
2740    #[mz_ore::test(tokio::test)]
2741    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2742    async fn test_compare_builtins_postgres() {
2743        async fn inner(catalog: Catalog) {
2744            // Verify that all builtin functions:
2745            // - have a unique OID
2746            // - if they have a postgres counterpart (same oid) then they have matching name
2747            let (client, connection) = tokio_postgres::connect(
2748                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2749                NoTls,
2750            )
2751            .await
2752            .expect("failed to connect to Postgres");
2753
2754            task::spawn(|| "compare_builtin_postgres", async move {
2755                if let Err(e) = connection.await {
2756                    panic!("connection error: {}", e);
2757                }
2758            });
2759
2760            struct PgProc {
2761                name: String,
2762                arg_oids: Vec<u32>,
2763                ret_oid: Option<u32>,
2764                ret_set: bool,
2765            }
2766
2767            struct PgType {
2768                name: String,
2769                ty: String,
2770                elem: u32,
2771                array: u32,
2772                input: u32,
2773                receive: u32,
2774            }
2775
2776            struct PgOper {
2777                oprresult: u32,
2778                name: String,
2779            }
2780
2781            let pg_proc: BTreeMap<_, _> = client
2782                .query(
2783                    "SELECT
2784                    p.oid,
2785                    proname,
2786                    proargtypes,
2787                    prorettype,
2788                    proretset
2789                FROM pg_proc p
2790                JOIN pg_namespace n ON p.pronamespace = n.oid",
2791                    &[],
2792                )
2793                .await
2794                .expect("pg query failed")
2795                .into_iter()
2796                .map(|row| {
2797                    let oid: u32 = row.get("oid");
2798                    let pg_proc = PgProc {
2799                        name: row.get("proname"),
2800                        arg_oids: row.get("proargtypes"),
2801                        ret_oid: row.get("prorettype"),
2802                        ret_set: row.get("proretset"),
2803                    };
2804                    (oid, pg_proc)
2805                })
2806                .collect();
2807
2808            let pg_type: BTreeMap<_, _> = client
2809                .query(
2810                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2811                    &[],
2812                )
2813                .await
2814                .expect("pg query failed")
2815                .into_iter()
2816                .map(|row| {
2817                    let oid: u32 = row.get("oid");
2818                    let pg_type = PgType {
2819                        name: row.get("typname"),
2820                        ty: row.get("typtype"),
2821                        elem: row.get("typelem"),
2822                        array: row.get("typarray"),
2823                        input: row.get("typinput"),
2824                        receive: row.get("typreceive"),
2825                    };
2826                    (oid, pg_type)
2827                })
2828                .collect();
2829
2830            let pg_oper: BTreeMap<_, _> = client
2831                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2832                .await
2833                .expect("pg query failed")
2834                .into_iter()
2835                .map(|row| {
2836                    let oid: u32 = row.get("oid");
2837                    let pg_oper = PgOper {
2838                        name: row.get("oprname"),
2839                        oprresult: row.get("oprresult"),
2840                    };
2841                    (oid, pg_oper)
2842                })
2843                .collect();
2844
2845            let conn_catalog = catalog.for_system_session();
2846            let resolve_type_oid = |item: &str| {
2847                conn_catalog
2848                    .resolve_type(&PartialItemName {
2849                        database: None,
2850                        // All functions we check exist in PG, so the types must, as
2851                        // well
2852                        schema: Some(PG_CATALOG_SCHEMA.into()),
2853                        item: item.to_string(),
2854                    })
2855                    .expect("unable to resolve type")
2856                    .oid()
2857            };
2858
2859            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2860                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2861                .collect();
2862
2863            let mut all_oids = BTreeSet::new();
2864
2865            // A function to determine if two oids are equivalent enough for these tests. We don't
2866            // support some types, so map exceptions here.
2867            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2868                [
2869                    // We don't support NAME.
2870                    (Type::NAME, Type::TEXT),
2871                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2872                    // We don't support time with time zone.
2873                    (Type::TIME, Type::TIMETZ),
2874                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2875                ]
2876                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2877            );
2878            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2879                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2880            ]);
2881            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2882                if ignore_return_types.contains(&fn_oid) {
2883                    return true;
2884                }
2885                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2886                    return true;
2887                }
2888                a == b
2889            };
2890
2891            for builtin in BUILTINS::iter() {
2892                match builtin {
2893                    Builtin::Type(ty) => {
2894                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2895
2896                        if ty.oid >= FIRST_MATERIALIZE_OID {
2897                            // High OIDs are reserved in Materialize and don't have
2898                            // PostgreSQL counterparts.
2899                            continue;
2900                        }
2901
2902                        // For types that have a PostgreSQL counterpart, verify that
2903                        // the name and oid match.
2904                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2905                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2906                        });
2907                        assert_eq!(
2908                            ty.name, pg_ty.name,
2909                            "oid {} has name {} in postgres; expected {}",
2910                            ty.oid, pg_ty.name, ty.name,
2911                        );
2912
2913                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2914                            None => (0, 0),
2915                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2916                        };
2917                        assert_eq!(
2918                            typinput_oid, pg_ty.input,
2919                            "type {} has typinput OID {:?} in mz but {:?} in pg",
2920                            ty.name, typinput_oid, pg_ty.input,
2921                        );
2922                        assert_eq!(
2923                            typreceive_oid, pg_ty.receive,
2924                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
2925                            ty.name, typreceive_oid, pg_ty.receive,
2926                        );
2927                        if typinput_oid != 0 {
2928                            assert!(
2929                                func_oids.contains(&typinput_oid),
2930                                "type {} has typinput OID {} that does not exist in pg_proc",
2931                                ty.name,
2932                                typinput_oid,
2933                            );
2934                        }
2935                        if typreceive_oid != 0 {
2936                            assert!(
2937                                func_oids.contains(&typreceive_oid),
2938                                "type {} has typreceive OID {} that does not exist in pg_proc",
2939                                ty.name,
2940                                typreceive_oid,
2941                            );
2942                        }
2943
2944                        // Ensure the type matches.
2945                        match &ty.details.typ {
2946                            CatalogType::Array { element_reference } => {
2947                                let elem_ty = BUILTINS::iter()
2948                                    .filter_map(|builtin| match builtin {
2949                                        Builtin::Type(ty @ BuiltinType { name, .. })
2950                                            if element_reference == name =>
2951                                        {
2952                                            Some(ty)
2953                                        }
2954                                        _ => None,
2955                                    })
2956                                    .next();
2957                                let elem_ty = match elem_ty {
2958                                    Some(ty) => ty,
2959                                    None => {
2960                                        panic!("{} is unexpectedly not a type", element_reference)
2961                                    }
2962                                };
2963                                assert_eq!(
2964                                    pg_ty.elem, elem_ty.oid,
2965                                    "type {} has mismatched element OIDs",
2966                                    ty.name
2967                                )
2968                            }
2969                            CatalogType::Pseudo => {
2970                                assert_eq!(
2971                                    pg_ty.ty, "p",
2972                                    "type {} is not a pseudo type as expected",
2973                                    ty.name
2974                                )
2975                            }
2976                            CatalogType::Range { .. } => {
2977                                assert_eq!(
2978                                    pg_ty.ty, "r",
2979                                    "type {} is not a range type as expected",
2980                                    ty.name
2981                                );
2982                            }
2983                            _ => {
2984                                assert_eq!(
2985                                    pg_ty.ty, "b",
2986                                    "type {} is not a base type as expected",
2987                                    ty.name
2988                                )
2989                            }
2990                        }
2991
2992                        // Ensure the array type reference is correct.
2993                        let schema = catalog
2994                            .resolve_schema_in_database(
2995                                &ResolvedDatabaseSpecifier::Ambient,
2996                                ty.schema,
2997                                &SYSTEM_CONN_ID,
2998                            )
2999                            .expect("unable to resolve schema");
3000                        let allocated_type = catalog
3001                            .resolve_type(
3002                                None,
3003                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3004                                &PartialItemName {
3005                                    database: None,
3006                                    schema: Some(schema.name().schema.clone()),
3007                                    item: ty.name.to_string(),
3008                                },
3009                                &SYSTEM_CONN_ID,
3010                            )
3011                            .expect("unable to resolve type");
3012                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3013                            ty
3014                        } else {
3015                            panic!("unexpectedly not a type")
3016                        };
3017                        match ty.details.array_id {
3018                            Some(array_id) => {
3019                                let array_ty = catalog.get_entry(&array_id);
3020                                assert_eq!(
3021                                    pg_ty.array, array_ty.oid,
3022                                    "type {} has mismatched array OIDs",
3023                                    allocated_type.name.item,
3024                                );
3025                            }
3026                            None => assert_eq!(
3027                                pg_ty.array, 0,
3028                                "type {} does not have an array type in mz but does in pg",
3029                                allocated_type.name.item,
3030                            ),
3031                        }
3032                    }
3033                    Builtin::Func(func) => {
3034                        for imp in func.inner.func_impls() {
3035                            assert!(
3036                                all_oids.insert(imp.oid),
3037                                "{} reused oid {}",
3038                                func.name,
3039                                imp.oid
3040                            );
3041
3042                            assert!(
3043                                imp.oid < FIRST_USER_OID,
3044                                "built-in function {} erroneously has OID in user space ({})",
3045                                func.name,
3046                                imp.oid,
3047                            );
3048
3049                            // For functions that have a postgres counterpart, verify that the name and
3050                            // oid match.
3051                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3052                                continue;
3053                            } else {
3054                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3055                                    panic!(
3056                                        "pg_proc missing function {}: oid {}",
3057                                        func.name, imp.oid
3058                                    )
3059                                })
3060                            };
3061                            assert_eq!(
3062                                func.name, pg_fn.name,
3063                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3064                                imp.oid, func.name, pg_fn.name
3065                            );
3066
3067                            // Complain, but don't fail, if argument oids don't match.
3068                            // TODO: make these match.
3069                            let imp_arg_oids = imp
3070                                .arg_typs
3071                                .iter()
3072                                .map(|item| resolve_type_oid(item))
3073                                .collect::<Vec<_>>();
3074
3075                            if imp_arg_oids != pg_fn.arg_oids {
3076                                println!(
3077                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3078                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3079                                );
3080                            }
3081
3082                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3083
3084                            assert!(
3085                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3086                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3087                                imp.oid,
3088                                func.name,
3089                                imp_return_oid,
3090                                pg_fn.ret_oid
3091                            );
3092
3093                            assert_eq!(
3094                                imp.return_is_set, pg_fn.ret_set,
3095                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3096                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3097                            );
3098                        }
3099                    }
3100                    _ => (),
3101                }
3102            }
3103
3104            for (op, func) in OP_IMPLS.iter() {
3105                for imp in func.func_impls() {
3106                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3107
3108                    // For operators that have a postgres counterpart, verify that the name and oid match.
3109                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3110                        continue;
3111                    } else {
3112                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3113                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3114                        })
3115                    };
3116
3117                    assert_eq!(*op, pg_op.name);
3118
3119                    let imp_return_oid =
3120                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3121                    if imp_return_oid != pg_op.oprresult {
3122                        panic!(
3123                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3124                            imp.oid, op, imp_return_oid, pg_op.oprresult
3125                        );
3126                    }
3127                }
3128            }
3129            catalog.expire().await;
3130        }
3131
3132        Catalog::with_debug(inner).await
3133    }
3134
3135    // Execute all builtin functions with all combinations of arguments from interesting datums.
3136    #[mz_ore::test(tokio::test)]
3137    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3138    async fn test_smoketest_all_builtins() {
3139        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3140            let catalog = Arc::new(catalog);
3141            let conn_catalog = catalog.for_system_session();
3142
3143            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3144            let mut handles = Vec::new();
3145
3146            // Extracted during planning; always panics when executed.
3147            let ignore_names = BTreeSet::from([
3148                "avg",
3149                "avg_internal_v1",
3150                "bool_and",
3151                "bool_or",
3152                "has_table_privilege", // > 3 s each
3153                "has_type_privilege",  // > 3 s each
3154                "mod",
3155                "mz_panic",
3156                "mz_sleep",
3157                "pow",
3158                "stddev_pop",
3159                "stddev_samp",
3160                "stddev",
3161                "var_pop",
3162                "var_samp",
3163                "variance",
3164            ]);
3165
3166            let fns = BUILTINS::funcs()
3167                .map(|func| (&func.name, func.inner))
3168                .chain(OP_IMPLS.iter());
3169
3170            for (name, func) in fns {
3171                if ignore_names.contains(name) {
3172                    continue;
3173                }
3174                let Func::Scalar(impls) = func else {
3175                    continue;
3176                };
3177
3178                'outer: for imp in impls {
3179                    let details = imp.details();
3180                    let mut styps = Vec::new();
3181                    for item in details.arg_typs.iter() {
3182                        let oid = resolve_type_oid(item);
3183                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3184                            continue 'outer;
3185                        };
3186                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3187                    }
3188                    let datums = styps
3189                        .iter()
3190                        .map(|styp| {
3191                            let mut datums = vec![Datum::Null];
3192                            datums.extend(styp.interesting_datums());
3193                            datums
3194                        })
3195                        .collect::<Vec<_>>();
3196                    // Skip nullary fns.
3197                    if datums.is_empty() {
3198                        continue;
3199                    }
3200
3201                    let return_oid = details
3202                        .return_typ
3203                        .map(resolve_type_oid)
3204                        .expect("must exist");
3205                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3206                        .ok()
3207                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3208
3209                    let mut idxs = vec![0; datums.len()];
3210                    while idxs[0] < datums[0].len() {
3211                        let mut args = Vec::with_capacity(idxs.len());
3212                        for i in 0..(datums.len()) {
3213                            args.push(datums[i][idxs[i]]);
3214                        }
3215
3216                        let op = &imp.op;
3217                        let scalars = args
3218                            .iter()
3219                            .enumerate()
3220                            .map(|(i, datum)| {
3221                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3222                                    datum.clone(),
3223                                    styps[i].clone(),
3224                                ))
3225                            })
3226                            .collect();
3227
3228                        let call_name = format!(
3229                            "{name}({}) (oid: {})",
3230                            args.iter()
3231                                .map(|d| d.to_string())
3232                                .collect::<Vec<_>>()
3233                                .join(", "),
3234                            imp.oid
3235                        );
3236                        let catalog = Arc::clone(&catalog);
3237                        let call_name_fn = call_name.clone();
3238                        let return_styp = return_styp.clone();
3239                        let handle = task::spawn_blocking(
3240                            || call_name,
3241                            move || {
3242                                smoketest_fn(
3243                                    name,
3244                                    call_name_fn,
3245                                    op,
3246                                    imp,
3247                                    args,
3248                                    catalog,
3249                                    scalars,
3250                                    return_styp,
3251                                )
3252                            },
3253                        );
3254                        handles.push(handle);
3255
3256                        // Advance to the next datum combination.
3257                        for i in (0..datums.len()).rev() {
3258                            idxs[i] += 1;
3259                            if idxs[i] >= datums[i].len() {
3260                                if i == 0 {
3261                                    break;
3262                                }
3263                                idxs[i] = 0;
3264                                continue;
3265                            } else {
3266                                break;
3267                            }
3268                        }
3269                    }
3270                }
3271            }
3272            handles
3273        }
3274
3275        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3276        for handle in handles {
3277            handle.await;
3278        }
3279    }
3280
3281    fn smoketest_fn(
3282        name: &&str,
3283        call_name: String,
3284        op: &Operation<HirScalarExpr>,
3285        imp: &FuncImpl<HirScalarExpr>,
3286        args: Vec<Datum<'_>>,
3287        catalog: Arc<Catalog>,
3288        scalars: Vec<CoercibleScalarExpr>,
3289        return_styp: Option<SqlScalarType>,
3290    ) {
3291        let conn_catalog = catalog.for_system_session();
3292        let pcx = PlanContext::zero();
3293        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3294        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3295        let ecx = ExprContext {
3296            qcx: &qcx,
3297            name: "smoketest",
3298            scope: &Scope::empty(),
3299            relation_type: &SqlRelationType::empty(),
3300            allow_aggregates: false,
3301            allow_subqueries: false,
3302            allow_parameters: false,
3303            allow_windows: false,
3304        };
3305        let arena = RowArena::new();
3306        let mut session = Session::dummy();
3307        session
3308            .start_transaction(to_datetime(0), None, None)
3309            .expect("must succeed");
3310        let prep_style = ExprPrepOneShot {
3311            logical_time: EvalTime::Time(Timestamp::MIN),
3312            session: &session,
3313            catalog_state: &catalog.state,
3314        };
3315
3316        // Execute the function as much as possible, ensuring no panics occur, but
3317        // otherwise ignoring eval errors. We also do various other checks.
3318        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3319        if let Ok(hir) = res {
3320            let uneliminated_result_row = {
3321                if let HirScalarExpr::CallUnary { func, .. } = &hir
3322                    && func.is_eliminable_cast()
3323                {
3324                    let mut uneliminated_mir = hir
3325                        .clone()
3326                        .lower_uncorrelated(HirToMirConfig {
3327                            enable_cast_elimination: false,
3328                            ..catalog.system_config().into()
3329                        })
3330                        .expect("lowering eliminable cast should always succeed");
3331                    prep_style
3332                        .prep_scalar_expr(&mut uneliminated_mir)
3333                        .expect("must succeed");
3334
3335                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3336                    uneliminated_mir
3337                        .eval(&[], &arena)
3338                        .ok()
3339                        .map(|datum| Row::pack([datum]))
3340                } else {
3341                    None
3342                }
3343            };
3344
3345            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3346                // Populate unmaterialized functions.
3347                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3348
3349                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3350                    if let Some(return_styp) = return_styp {
3351                        let mir_typ = mir.typ(&[]);
3352                        // MIR type inference should be consistent with the type
3353                        // we get from the catalog.
3354                        soft_assert_eq_or_log!(
3355                            mir_typ.scalar_type,
3356                            (&return_styp).into(),
3357                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3358                        );
3359                        // The following will check not just that the scalar type
3360                        // is ok, but also catches if the function returned a null
3361                        // but the MIR type inference said "non-nullable".
3362                        if !eval_result_datum.is_instance_of(&mir_typ) {
3363                            panic!(
3364                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3365                            );
3366                        }
3367                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3368                        if let Some(row) = uneliminated_result_row {
3369                            let uneliminated_result_datum = row.unpack_first();
3370                            assert_eq!(
3371                                uneliminated_result_datum, eval_result_datum,
3372                                "datums should not change if cast is eliminable"
3373                            );
3374                        }
3375                        // Check the consistency of `introduces_nulls` and
3376                        // `propagates_nulls` with `MirScalarExpr::typ`.
3377                        if let Some((introduces_nulls, propagates_nulls)) =
3378                            call_introduces_propagates_nulls(&mir)
3379                        {
3380                            if introduces_nulls {
3381                                // If the function introduces_nulls, then the return
3382                                // type should always be nullable, regardless of
3383                                // the nullability of the input types.
3384                                assert!(
3385                                    mir_typ.nullable,
3386                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3387                                    name, args, mir, mir_typ.nullable
3388                                );
3389                            } else {
3390                                let any_input_null = args.iter().any(|arg| arg.is_null());
3391                                if !any_input_null {
3392                                    assert!(
3393                                        !mir_typ.nullable,
3394                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3395                                        name, args, mir, mir_typ.nullable
3396                                    );
3397                                } else if propagates_nulls {
3398                                    // propagates_nulls means the optimizer short-circuits
3399                                    // all-null inputs, so the output must be nullable.
3400                                    assert!(
3401                                        mir_typ.nullable,
3402                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3403                                        name, args, mir, mir_typ.nullable
3404                                    );
3405                                }
3406                                // When propagates_nulls is false, the output may still
3407                                // be nullable if a non-nullable parameter received a null
3408                                // input (per-position null rejection). The is_instance_of
3409                                // check above ensures type consistency.
3410                            }
3411                        }
3412                        // Check that `MirScalarExpr::reduce` yields the same result
3413                        // as the real evaluation.
3414                        let mut reduced = mir.clone();
3415                        reduced.reduce(&[]);
3416                        match reduced {
3417                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3418                                match reduce_result {
3419                                    Ok(reduce_result_row) => {
3420                                        let reduce_result_datum = reduce_result_row.unpack_first();
3421                                        assert_eq!(
3422                                            reduce_result_datum,
3423                                            eval_result_datum,
3424                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3425                                            name,
3426                                            args,
3427                                            mir,
3428                                            eval_result_datum,
3429                                            mir_typ.scalar_type,
3430                                            reduce_result_datum,
3431                                            ctyp.scalar_type
3432                                        );
3433                                        // Let's check that the types also match.
3434                                        // (We are not checking nullability here,
3435                                        // because it's ok when we know a more
3436                                        // precise nullability after actually
3437                                        // evaluating a function than before.)
3438                                        assert_eq!(
3439                                            ctyp.scalar_type,
3440                                            mir_typ.scalar_type,
3441                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3442                                            name,
3443                                            args,
3444                                            mir,
3445                                            eval_result_datum,
3446                                            mir_typ.scalar_type,
3447                                            reduce_result_datum,
3448                                            ctyp.scalar_type
3449                                        );
3450                                    }
3451                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3452                                }
3453                            }
3454                            _ => unreachable!(
3455                                "all args are literals, so should have reduced to a literal"
3456                            ),
3457                        }
3458                    }
3459                }
3460            }
3461        }
3462    }
3463
3464    /// If the given MirScalarExpr
3465    ///  - is a function call, and
3466    ///  - all arguments are literals
3467    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3468    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3469        match mir_func_call {
3470            MirScalarExpr::CallUnary { func, expr } => {
3471                if expr.is_literal() {
3472                    Some((func.introduces_nulls(), func.propagates_nulls()))
3473                } else {
3474                    None
3475                }
3476            }
3477            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3478                if expr1.is_literal() && expr2.is_literal() {
3479                    Some((func.introduces_nulls(), func.propagates_nulls()))
3480                } else {
3481                    None
3482                }
3483            }
3484            MirScalarExpr::CallVariadic { func, exprs } => {
3485                if exprs.iter().all(|arg| arg.is_literal()) {
3486                    Some((func.introduces_nulls(), func.propagates_nulls()))
3487                } else {
3488                    None
3489                }
3490            }
3491            _ => None,
3492        }
3493    }
3494
3495    // Make sure pg views don't use types that only exist in Materialize.
3496    #[mz_ore::test(tokio::test)]
3497    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3498    async fn test_pg_views_forbidden_types() {
3499        Catalog::with_debug(|catalog| async move {
3500            let conn_catalog = catalog.for_system_session();
3501
3502            for view in BUILTINS::views().filter(|view| {
3503                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3504            }) {
3505                let item = conn_catalog
3506                    .resolve_item(&PartialItemName {
3507                        database: None,
3508                        schema: Some(view.schema.to_string()),
3509                        item: view.name.to_string(),
3510                    })
3511                    .expect("unable to resolve view")
3512                    // TODO(alter_table)
3513                    .at_version(RelationVersionSelector::Latest);
3514                let full_name = conn_catalog.resolve_full_name(item.name());
3515                let desc = item.relation_desc().expect("invalid item type");
3516                for col_type in desc.iter_types() {
3517                    match &col_type.scalar_type {
3518                        typ @ SqlScalarType::UInt16
3519                        | typ @ SqlScalarType::UInt32
3520                        | typ @ SqlScalarType::UInt64
3521                        | typ @ SqlScalarType::MzTimestamp
3522                        | typ @ SqlScalarType::List { .. }
3523                        | typ @ SqlScalarType::Map { .. }
3524                        | typ @ SqlScalarType::MzAclItem => {
3525                            panic!("{typ:?} type found in {full_name}");
3526                        }
3527                        SqlScalarType::AclItem
3528                        | SqlScalarType::Bool
3529                        | SqlScalarType::Int16
3530                        | SqlScalarType::Int32
3531                        | SqlScalarType::Int64
3532                        | SqlScalarType::Float32
3533                        | SqlScalarType::Float64
3534                        | SqlScalarType::Numeric { .. }
3535                        | SqlScalarType::Date
3536                        | SqlScalarType::Time
3537                        | SqlScalarType::Timestamp { .. }
3538                        | SqlScalarType::TimestampTz { .. }
3539                        | SqlScalarType::Interval
3540                        | SqlScalarType::PgLegacyChar
3541                        | SqlScalarType::Bytes
3542                        | SqlScalarType::String
3543                        | SqlScalarType::Char { .. }
3544                        | SqlScalarType::VarChar { .. }
3545                        | SqlScalarType::Jsonb
3546                        | SqlScalarType::Uuid
3547                        | SqlScalarType::Array(_)
3548                        | SqlScalarType::Record { .. }
3549                        | SqlScalarType::Oid
3550                        | SqlScalarType::RegProc
3551                        | SqlScalarType::RegType
3552                        | SqlScalarType::RegClass
3553                        | SqlScalarType::Int2Vector
3554                        | SqlScalarType::Range { .. }
3555                        | SqlScalarType::PgLegacyName => {}
3556                    }
3557                }
3558            }
3559            catalog.expire().await;
3560        })
3561        .await
3562    }
3563
3564    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3565    // introspection relations.
3566    #[mz_ore::test(tokio::test)]
3567    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3568    async fn test_mz_introspection_builtins() {
3569        Catalog::with_debug(|catalog| async move {
3570            let conn_catalog = catalog.for_system_session();
3571
3572            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3573            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3574
3575            for entry in catalog.entries() {
3576                let schema_spec = entry.name().qualifiers.schema_spec;
3577                let introspection_deps = catalog.introspection_dependencies(entry.id);
3578                if introspection_deps.is_empty() {
3579                    assert!(
3580                        schema_spec != introspection_schema_spec,
3581                        "entry does not depend on introspection sources but is in \
3582                         `mz_introspection`: {}",
3583                        conn_catalog.resolve_full_name(entry.name()),
3584                    );
3585                } else {
3586                    assert!(
3587                        schema_spec == introspection_schema_spec,
3588                        "entry depends on introspection sources but is not in \
3589                         `mz_introspection`: {}",
3590                        conn_catalog.resolve_full_name(entry.name()),
3591                    );
3592                }
3593            }
3594        })
3595        .await
3596    }
3597
3598    #[mz_ore::test(tokio::test)]
3599    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3600    async fn test_multi_subscriber_catalog() {
3601        let persist_client = PersistClient::new_for_tests().await;
3602        let bootstrap_args = test_bootstrap_args();
3603        let organization_id = Uuid::new_v4();
3604        let db_name = "DB";
3605
3606        let mut writer_catalog = Catalog::open_debug_catalog(
3607            persist_client.clone(),
3608            organization_id.clone(),
3609            &bootstrap_args,
3610        )
3611        .await
3612        .expect("open_debug_catalog");
3613        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3614            persist_client.clone(),
3615            organization_id.clone(),
3616            &bootstrap_args,
3617        )
3618        .await
3619        .expect("open_debug_read_only_catalog");
3620        assert_err!(writer_catalog.resolve_database(db_name));
3621        assert_err!(read_only_catalog.resolve_database(db_name));
3622
3623        let commit_ts = writer_catalog.current_upper().await;
3624        writer_catalog
3625            .transact(
3626                None,
3627                commit_ts,
3628                None,
3629                vec![Op::CreateDatabase {
3630                    name: db_name.to_string(),
3631                    owner_id: MZ_SYSTEM_ROLE_ID,
3632                }],
3633            )
3634            .await
3635            .expect("failed to transact");
3636
3637        let write_db = writer_catalog
3638            .resolve_database(db_name)
3639            .expect("resolve_database");
3640        read_only_catalog
3641            .sync_to_current_updates()
3642            .await
3643            .expect("sync_to_current_updates");
3644        let read_db = read_only_catalog
3645            .resolve_database(db_name)
3646            .expect("resolve_database");
3647
3648        assert_eq!(write_db, read_db);
3649
3650        let writer_catalog_fencer =
3651            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3652                .await
3653                .expect("open_debug_catalog for fencer");
3654        let fencer_db = writer_catalog_fencer
3655            .resolve_database(db_name)
3656            .expect("resolve_database for fencer");
3657        assert_eq!(fencer_db, read_db);
3658
3659        let write_fence_err = writer_catalog
3660            .sync_to_current_updates()
3661            .await
3662            .expect_err("sync_to_current_updates for fencer");
3663        assert!(matches!(
3664            write_fence_err,
3665            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3666        ));
3667        let read_fence_err = read_only_catalog
3668            .sync_to_current_updates()
3669            .await
3670            .expect_err("sync_to_current_updates after fencer");
3671        assert!(matches!(
3672            read_fence_err,
3673            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3674        ));
3675
3676        writer_catalog.expire().await;
3677        read_only_catalog.expire().await;
3678        writer_catalog_fencer.expire().await;
3679    }
3680}