Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, STORAGE_USAGE_ID_ALLOC_KEY, TestCatalogStateBuilder,
39    test_bootstrap_args,
40};
41use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
42use mz_catalog::memory::error::{Error, ErrorKind};
43use mz_catalog::memory::objects::{
44    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
45    NetworkPolicy, Role, RoleAuth, Schema,
46};
47use mz_compute_types::dataflows::DataflowDescription;
48use mz_controller::clusters::ReplicaLocation;
49use mz_controller_types::{ClusterId, ReplicaId};
50use mz_expr::OptimizedMirRelationExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68    SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89// DO NOT add any more imports from `crate` outside of `crate::catalog`.
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    expr_cache_handle: Option<ExpressionCacheHandle>,
140    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141    transient_revision: u64,
142}
143
144// Implement our own Clone because derive can't unless S is Clone, which it's
145// not (hence the Arc).
146impl Clone for Catalog {
147    fn clone(&self) -> Self {
148        Self {
149            state: self.state.clone(),
150            expr_cache_handle: self.expr_cache_handle.clone(),
151            storage: Arc::clone(&self.storage),
152            transient_revision: self.transient_revision,
153        }
154    }
155}
156
157impl Catalog {
158    /// Set the optimized plan for the item identified by `id`.
159    ///
160    /// # Panics
161    /// If the item is not an `Index`, `MaterializedView`, or
162    /// `ContinualTask`.
163    #[mz_ore::instrument(level = "trace")]
164    pub fn set_optimized_plan(
165        &mut self,
166        id: GlobalId,
167        plan: DataflowDescription<OptimizedMirRelationExpr>,
168    ) {
169        self.state.set_optimized_plan(id, plan);
170    }
171
172    /// Set the physical plan for the item identified by `id`.
173    ///
174    /// # Panics
175    /// If the item is not an `Index`, `MaterializedView`, or
176    /// `ContinualTask`.
177    #[mz_ore::instrument(level = "trace")]
178    pub fn set_physical_plan(
179        &mut self,
180        id: GlobalId,
181        plan: DataflowDescription<mz_compute_types::plan::Plan>,
182    ) {
183        self.state.set_physical_plan(id, plan);
184    }
185
186    /// Try to get the optimized plan for the item identified by `id`.
187    #[mz_ore::instrument(level = "trace")]
188    pub fn try_get_optimized_plan(
189        &self,
190        id: &GlobalId,
191    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192        let entry = self.state.try_get_entry_by_global_id(id)?;
193        entry.item().optimized_plan().map(AsRef::as_ref)
194    }
195
196    /// Try to get the physical plan for the item identified by `id`.
197    #[mz_ore::instrument(level = "trace")]
198    pub fn try_get_physical_plan(
199        &self,
200        id: &GlobalId,
201    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202        let entry = self.state.try_get_entry_by_global_id(id)?;
203        entry.item().physical_plan().map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    ///
208    /// # Panics
209    /// If the item is not an `Index`, `MaterializedView`, or
210    /// `ContinualTask`.
211    #[mz_ore::instrument(level = "trace")]
212    pub fn set_dataflow_metainfo(
213        &mut self,
214        id: GlobalId,
215        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216    ) {
217        self.state.set_dataflow_metainfo(id, metainfo);
218    }
219
220    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
221    #[mz_ore::instrument(level = "trace")]
222    pub fn try_get_dataflow_metainfo(
223        &self,
224        id: &GlobalId,
225    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226        let entry = self.state.try_get_entry_by_global_id(id)?;
227        entry.item().dataflow_metainfo()
228    }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233    state: Cow<'a, CatalogState>,
234    /// Because we don't have any way of removing items from the catalog
235    /// temporarily, we allow the ConnCatalog to pretend that a set of items
236    /// don't exist during resolution.
237    ///
238    /// This feature is necessary to allow re-planning of statements, which is
239    /// either incredibly useful or required when altering item definitions.
240    ///
241    /// Note that uses of this field should be used by short-lived
242    /// catalogs.
243    unresolvable_ids: BTreeSet<CatalogItemId>,
244    conn_id: ConnectionId,
245    cluster: String,
246    database: Option<DatabaseId>,
247    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248    role_id: RoleId,
249    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250    portals: Option<&'a BTreeMap<String, Portal>>,
251    notices_tx: UnboundedSender<AdapterNotice>,
252    restrict_to_user_objects: bool,
253}
254
255impl ConnCatalog<'_> {
256    pub fn conn_id(&self) -> &ConnectionId {
257        &self.conn_id
258    }
259
260    pub fn state(&self) -> &CatalogState {
261        &*self.state
262    }
263
264    /// Prevent planning from resolving item with the provided ID. Instead,
265    /// return an error as if the item did not exist.
266    ///
267    /// This feature is meant exclusively to permit re-planning statements
268    /// during update operations and should not be used otherwise given its
269    /// extremely "powerful" semantics.
270    ///
271    /// # Panics
272    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
273    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
274        assert_eq!(
275            self.role_id, MZ_SYSTEM_ROLE_ID,
276            "only the system role can mark IDs unresolvable",
277        );
278        self.unresolvable_ids.insert(id);
279    }
280
281    /// Returns the schemas:
282    /// - mz_catalog
283    /// - pg_catalog
284    /// - temp (if requested)
285    /// - all schemas from the session's search_path var that exist
286    pub fn effective_search_path(
287        &self,
288        include_temp_schema: bool,
289    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
290        self.state
291            .effective_search_path(&self.search_path, include_temp_schema)
292    }
293}
294
295impl ConnectionResolver for ConnCatalog<'_> {
296    fn resolve_connection(
297        &self,
298        id: CatalogItemId,
299    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
300        self.state().resolve_connection(id)
301    }
302}
303
304impl Catalog {
305    /// Returns the catalog's transient revision, which starts at 1 and is
306    /// incremented on every change. This is not persisted to disk, and will
307    /// restart on every load.
308    pub fn transient_revision(&self) -> u64 {
309        self.transient_revision
310    }
311
312    /// Creates a debug catalog from the current
313    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
314    /// like in tests.
315    ///
316    /// WARNING! This function can arbitrarily fail because it does not make any
317    /// effort to adjust the catalog's contents' structure or semantics to the
318    /// currently running version, i.e. it does not apply any migrations.
319    ///
320    /// This function must not be called in production contexts. Use
321    /// [`Catalog::open`] with appropriately set configuration parameters
322    /// instead.
323    pub async fn with_debug<F, Fut, T>(f: F) -> T
324    where
325        F: FnOnce(Catalog) -> Fut,
326        Fut: Future<Output = T>,
327    {
328        let persist_client = PersistClient::new_for_tests().await;
329        let organization_id = Uuid::new_v4();
330        let bootstrap_args = test_bootstrap_args();
331        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
332            .await
333            .expect("can open debug catalog");
334        f(catalog).await
335    }
336
337    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
338    /// in progress.
339    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
340    where
341        F: FnOnce(Catalog) -> Fut,
342        Fut: Future<Output = T>,
343    {
344        let persist_client = PersistClient::new_for_tests().await;
345        let organization_id = Uuid::new_v4();
346        let bootstrap_args = test_bootstrap_args();
347        let mut catalog =
348            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
349                .await
350                .expect("can open debug catalog");
351
352        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
353        let now = SYSTEM_TIME.clone();
354        let openable_storage = TestCatalogStateBuilder::new(persist_client)
355            .with_organization_id(organization_id)
356            .with_default_deploy_generation()
357            .build()
358            .await
359            .expect("can create durable catalog");
360        let mut storage = openable_storage
361            .open(now().into(), &bootstrap_args)
362            .await
363            .expect("can open durable catalog")
364            .0;
365        // Drain updates.
366        let _ = storage
367            .sync_to_current_updates()
368            .await
369            .expect("can sync to current updates");
370        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
371
372        f(catalog).await
373    }
374
375    /// Opens a debug catalog.
376    ///
377    /// See [`Catalog::with_debug`].
378    pub async fn open_debug_catalog(
379        persist_client: PersistClient,
380        organization_id: Uuid,
381        bootstrap_args: &BootstrapArgs,
382    ) -> Result<Catalog, anyhow::Error> {
383        let now = SYSTEM_TIME.clone();
384        let environment_id = None;
385        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
386            .with_organization_id(organization_id)
387            .with_default_deploy_generation()
388            .build()
389            .await?;
390        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
391        let system_parameter_defaults = BTreeMap::default();
392        Self::open_debug_catalog_inner(
393            persist_client,
394            storage,
395            now,
396            environment_id,
397            &DUMMY_BUILD_INFO,
398            system_parameter_defaults,
399            bootstrap_args,
400            None,
401        )
402        .await
403    }
404
405    /// Opens a read only debug persist backed catalog defined by `persist_client` and
406    /// `organization_id`.
407    ///
408    /// See [`Catalog::with_debug`].
409    pub async fn open_debug_read_only_catalog(
410        persist_client: PersistClient,
411        organization_id: Uuid,
412        bootstrap_args: &BootstrapArgs,
413    ) -> Result<Catalog, anyhow::Error> {
414        let now = SYSTEM_TIME.clone();
415        let environment_id = None;
416        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
417            .with_organization_id(organization_id)
418            .build()
419            .await?;
420        let storage = openable_storage
421            .open_read_only(&test_bootstrap_args())
422            .await?;
423        let system_parameter_defaults = BTreeMap::default();
424        Self::open_debug_catalog_inner(
425            persist_client,
426            storage,
427            now,
428            environment_id,
429            &DUMMY_BUILD_INFO,
430            system_parameter_defaults,
431            bootstrap_args,
432            None,
433        )
434        .await
435    }
436
437    /// Opens a read only debug persist backed catalog defined by `persist_client` and
438    /// `organization_id`.
439    ///
440    /// See [`Catalog::with_debug`].
441    pub async fn open_debug_read_only_persist_catalog_config(
442        persist_client: PersistClient,
443        now: NowFn,
444        environment_id: EnvironmentId,
445        system_parameter_defaults: BTreeMap<String, String>,
446        build_info: &'static BuildInfo,
447        bootstrap_args: &BootstrapArgs,
448        enable_expression_cache_override: Option<bool>,
449    ) -> Result<Catalog, anyhow::Error> {
450        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
451            .with_organization_id(environment_id.organization_id())
452            .with_version(
453                build_info
454                    .version
455                    .parse()
456                    .expect("build version is parseable"),
457            )
458            .build()
459            .await?;
460        let storage = openable_storage.open_read_only(bootstrap_args).await?;
461        Self::open_debug_catalog_inner(
462            persist_client,
463            storage,
464            now,
465            Some(environment_id),
466            build_info,
467            system_parameter_defaults,
468            bootstrap_args,
469            enable_expression_cache_override,
470        )
471        .await
472    }
473
474    async fn open_debug_catalog_inner(
475        persist_client: PersistClient,
476        storage: Box<dyn DurableCatalogState>,
477        now: NowFn,
478        environment_id: Option<EnvironmentId>,
479        build_info: &'static BuildInfo,
480        system_parameter_defaults: BTreeMap<String, String>,
481        bootstrap_args: &BootstrapArgs,
482        enable_expression_cache_override: Option<bool>,
483    ) -> Result<Catalog, anyhow::Error> {
484        let metrics_registry = &MetricsRegistry::new();
485        let secrets_reader = Arc::new(InMemorySecretsController::new());
486        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
487        // debugging/testing.
488        let previous_ts = now().into();
489        let replica_size = &bootstrap_args.default_cluster_replica_size;
490        let read_only = false;
491
492        let OpenCatalogResult {
493            catalog,
494            migrated_storage_collections_0dt: _,
495            new_builtin_collections: _,
496            builtin_table_updates: _,
497            cached_global_exprs: _,
498            uncached_local_exprs: _,
499        } = Catalog::open(Config {
500            storage,
501            metrics_registry,
502            state: StateConfig {
503                unsafe_mode: true,
504                all_features: false,
505                build_info,
506                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
507                read_only,
508                now,
509                boot_ts: previous_ts,
510                skip_migrations: true,
511                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
512                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
513                    size: replica_size.clone(),
514                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
515                },
516                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
517                    size: replica_size.clone(),
518                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
519                },
520                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
521                    size: replica_size.clone(),
522                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
523                },
524                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
525                    size: replica_size.clone(),
526                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
527                },
528                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
529                    size: replica_size.clone(),
530                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
531                },
532                system_parameter_defaults,
533                remote_system_parameters: None,
534                availability_zones: vec![],
535                egress_addresses: vec![],
536                aws_principal_context: None,
537                aws_privatelink_availability_zones: None,
538                http_host_name: None,
539                connection_context: ConnectionContext::for_tests(secrets_reader),
540                builtin_item_migration_config: BuiltinItemMigrationConfig {
541                    persist_client: persist_client.clone(),
542                    read_only,
543                    force_migration: None,
544                },
545                persist_client,
546                enable_expression_cache_override,
547                helm_chart_version: None,
548                external_login_password_mz_system: None,
549                license_key: ValidatedLicenseKey::for_tests(),
550            },
551        })
552        .await?;
553        Ok(catalog)
554    }
555
556    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
557        self.state.for_session(session)
558    }
559
560    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
561        self.state.for_sessionless_user(role_id)
562    }
563
564    pub fn for_system_session(&self) -> ConnCatalog<'_> {
565        self.state.for_system_session()
566    }
567
568    async fn storage<'a>(
569        &'a self,
570    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
571        self.storage.lock().await
572    }
573
574    pub async fn current_upper(&self) -> mz_repr::Timestamp {
575        self.storage().await.current_upper().await
576    }
577
578    pub async fn allocate_user_id(
579        &self,
580        commit_ts: mz_repr::Timestamp,
581    ) -> Result<(CatalogItemId, GlobalId), Error> {
582        self.storage()
583            .await
584            .allocate_user_id(commit_ts)
585            .await
586            .maybe_terminate("allocating user ids")
587            .err_into()
588    }
589
590    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
591    pub async fn allocate_user_ids(
592        &self,
593        amount: u64,
594        commit_ts: mz_repr::Timestamp,
595    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
596        self.storage()
597            .await
598            .allocate_user_ids(amount, commit_ts)
599            .await
600            .maybe_terminate("allocating user ids")
601            .err_into()
602    }
603
604    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
605        let commit_ts = self.storage().await.current_upper().await;
606        self.allocate_user_id(commit_ts).await
607    }
608
609    /// Allocates a single durable id for a storage usage collection batch.
610    ///
611    /// Bumps the durable `STORAGE_USAGE_ID_ALLOC_KEY` allocator by one and
612    /// returns the previous value. The bump is committed at `commit_ts`.
613    /// One id is shared by every row produced by a collection cycle (see
614    /// `Coordinator::storage_usage_update`), so the durable cost is one
615    /// allocator round-trip per cycle, not per shard.
616    pub async fn allocate_storage_usage_id(
617        &self,
618        commit_ts: mz_repr::Timestamp,
619    ) -> Result<u64, Error> {
620        use mz_ore::collections::CollectionExt;
621
622        self.storage()
623            .await
624            .allocate_id(STORAGE_USAGE_ID_ALLOC_KEY, 1, commit_ts)
625            .await
626            .maybe_terminate("allocating storage usage id")
627            .map(|ids| ids.into_element())
628            .err_into()
629    }
630
631    /// Get the next user item ID without allocating it.
632    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
633        self.storage()
634            .await
635            .get_next_user_item_id()
636            .await
637            .err_into()
638    }
639
640    #[cfg(test)]
641    pub async fn allocate_system_id(
642        &self,
643        commit_ts: mz_repr::Timestamp,
644    ) -> Result<(CatalogItemId, GlobalId), Error> {
645        use mz_ore::collections::CollectionExt;
646
647        let mut storage = self.storage().await;
648        let mut txn = storage.transaction().await?;
649        let id = txn
650            .allocate_system_item_ids(1)
651            .maybe_terminate("allocating system ids")?
652            .into_element();
653        // Drain transaction.
654        let _ = txn.get_and_commit_op_updates();
655        txn.commit(commit_ts).await?;
656        Ok(id)
657    }
658
659    /// Get the next system item ID without allocating it.
660    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
661        self.storage()
662            .await
663            .get_next_system_item_id()
664            .await
665            .err_into()
666    }
667
668    pub async fn allocate_user_cluster_id(
669        &self,
670        commit_ts: mz_repr::Timestamp,
671    ) -> Result<ClusterId, Error> {
672        self.storage()
673            .await
674            .allocate_user_cluster_id(commit_ts)
675            .await
676            .maybe_terminate("allocating user cluster ids")
677            .err_into()
678    }
679
680    /// Get the next system replica id without allocating it.
681    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
682        self.storage()
683            .await
684            .get_next_system_replica_id()
685            .await
686            .err_into()
687    }
688
689    /// Get the next user replica id without allocating it.
690    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
691        self.storage()
692            .await
693            .get_next_user_replica_id()
694            .await
695            .err_into()
696    }
697
698    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
699        self.state.resolve_database(database_name)
700    }
701
702    pub fn resolve_schema(
703        &self,
704        current_database: Option<&DatabaseId>,
705        database_name: Option<&str>,
706        schema_name: &str,
707        conn_id: &ConnectionId,
708    ) -> Result<&Schema, SqlCatalogError> {
709        self.state
710            .resolve_schema(current_database, database_name, schema_name, conn_id)
711    }
712
713    pub fn resolve_schema_in_database(
714        &self,
715        database_spec: &ResolvedDatabaseSpecifier,
716        schema_name: &str,
717        conn_id: &ConnectionId,
718    ) -> Result<&Schema, SqlCatalogError> {
719        self.state
720            .resolve_schema_in_database(database_spec, schema_name, conn_id)
721    }
722
723    pub fn resolve_replica_in_cluster(
724        &self,
725        cluster_id: &ClusterId,
726        replica_name: &str,
727    ) -> Result<&ClusterReplica, SqlCatalogError> {
728        self.state
729            .resolve_replica_in_cluster(cluster_id, replica_name)
730    }
731
732    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
733        self.state.resolve_system_schema(name)
734    }
735
736    pub fn resolve_search_path(
737        &self,
738        session: &Session,
739    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
740        self.state.resolve_search_path(session)
741    }
742
743    /// Resolves `name` to a non-function [`CatalogEntry`].
744    pub fn resolve_entry(
745        &self,
746        current_database: Option<&DatabaseId>,
747        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
748        name: &PartialItemName,
749        conn_id: &ConnectionId,
750    ) -> Result<&CatalogEntry, SqlCatalogError> {
751        self.state
752            .resolve_entry(current_database, search_path, name, conn_id)
753    }
754
755    /// Resolves a `BuiltinTable`.
756    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
757        self.state.resolve_builtin_table(builtin)
758    }
759
760    /// Resolves a `BuiltinLog`.
761    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
762        self.state.resolve_builtin_log(builtin).0
763    }
764
765    /// Resolves a `BuiltinSource`.
766    pub fn resolve_builtin_storage_collection(
767        &self,
768        builtin: &'static BuiltinSource,
769    ) -> CatalogItemId {
770        self.state.resolve_builtin_source(builtin)
771    }
772
773    /// Resolves `name` to a function [`CatalogEntry`].
774    pub fn resolve_function(
775        &self,
776        current_database: Option<&DatabaseId>,
777        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
778        name: &PartialItemName,
779        conn_id: &ConnectionId,
780    ) -> Result<&CatalogEntry, SqlCatalogError> {
781        self.state
782            .resolve_function(current_database, search_path, name, conn_id)
783    }
784
785    /// Resolves `name` to a type [`CatalogEntry`].
786    pub fn resolve_type(
787        &self,
788        current_database: Option<&DatabaseId>,
789        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
790        name: &PartialItemName,
791        conn_id: &ConnectionId,
792    ) -> Result<&CatalogEntry, SqlCatalogError> {
793        self.state
794            .resolve_type(current_database, search_path, name, conn_id)
795    }
796
797    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
798        self.state.resolve_cluster(name)
799    }
800
801    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
802    ///
803    /// # Panics
804    /// * If the [`BuiltinCluster`] doesn't exist.
805    ///
806    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
807        self.state.resolve_builtin_cluster(cluster)
808    }
809
810    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
811        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
812    }
813
814    /// Resolves a [`Cluster`] for a TargetCluster.
815    pub fn resolve_target_cluster(
816        &self,
817        target_cluster: TargetCluster,
818        session: &Session,
819    ) -> Result<&Cluster, AdapterError> {
820        match target_cluster {
821            TargetCluster::CatalogServer => {
822                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
823            }
824            TargetCluster::Active => self.active_cluster(session),
825            TargetCluster::Transaction(cluster_id) => self
826                .try_get_cluster(cluster_id)
827                .ok_or(AdapterError::ConcurrentClusterDrop),
828        }
829    }
830
831    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
832        // TODO(benesch): this check here is not sufficiently protective. It'd
833        // be very easy for a code path to accidentally avoid this check by
834        // calling `resolve_cluster(session.vars().cluster())`.
835        if session.user().name != SYSTEM_USER.name
836            && session.user().name != SUPPORT_USER.name
837            && session.vars().cluster() == SYSTEM_USER.name
838        {
839            coord_bail!(
840                "system cluster '{}' cannot execute user queries",
841                SYSTEM_USER.name
842            );
843        }
844        let cluster = self.resolve_cluster(session.vars().cluster())?;
845        Ok(cluster)
846    }
847
848    pub fn state(&self) -> &CatalogState {
849        &self.state
850    }
851
852    pub fn resolve_full_name(
853        &self,
854        name: &QualifiedItemName,
855        conn_id: Option<&ConnectionId>,
856    ) -> FullItemName {
857        self.state.resolve_full_name(name, conn_id)
858    }
859
860    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
861        self.state.try_get_entry(id)
862    }
863
864    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
865        self.state.try_get_entry_by_global_id(id)
866    }
867
868    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
869        self.state.get_entry(id)
870    }
871
872    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
873        self.state.get_entry_by_global_id(id)
874    }
875
876    pub fn get_global_ids<'a>(
877        &'a self,
878        id: &CatalogItemId,
879    ) -> impl Iterator<Item = GlobalId> + use<'a> {
880        self.get_entry(id).global_ids()
881    }
882
883    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
884        self.get_entry_by_global_id(id).id()
885    }
886
887    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
888        let item = self.try_get_entry_by_global_id(id)?;
889        Some(item.id())
890    }
891
892    pub fn get_schema(
893        &self,
894        database_spec: &ResolvedDatabaseSpecifier,
895        schema_spec: &SchemaSpecifier,
896        conn_id: &ConnectionId,
897    ) -> &Schema {
898        self.state.get_schema(database_spec, schema_spec, conn_id)
899    }
900
901    pub fn try_get_schema(
902        &self,
903        database_spec: &ResolvedDatabaseSpecifier,
904        schema_spec: &SchemaSpecifier,
905        conn_id: &ConnectionId,
906    ) -> Option<&Schema> {
907        self.state
908            .try_get_schema(database_spec, schema_spec, conn_id)
909    }
910
911    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
912        self.state.get_mz_catalog_schema_id()
913    }
914
915    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
916        self.state.get_pg_catalog_schema_id()
917    }
918
919    pub fn get_information_schema_id(&self) -> SchemaId {
920        self.state.get_information_schema_id()
921    }
922
923    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
924        self.state.get_mz_internal_schema_id()
925    }
926
927    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
928        self.state.get_mz_introspection_schema_id()
929    }
930
931    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
932        self.state.get_mz_unsafe_schema_id()
933    }
934
935    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
936        self.state.system_schema_ids()
937    }
938
939    pub fn get_database(&self, id: &DatabaseId) -> &Database {
940        self.state.get_database(id)
941    }
942
943    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
944        self.state.try_get_role(id)
945    }
946
947    pub fn get_role(&self, id: &RoleId) -> &Role {
948        self.state.get_role(id)
949    }
950
951    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
952        self.state.try_get_role_by_name(role_name)
953    }
954
955    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
956        self.state.try_get_role_auth_by_id(id)
957    }
958
959    /// Creates a new schema in the `Catalog` for temporary items
960    /// indicated by the TEMPORARY or TEMP keywords.
961    pub fn create_temporary_schema(
962        &mut self,
963        conn_id: &ConnectionId,
964        owner_id: RoleId,
965    ) -> Result<(), Error> {
966        self.state.create_temporary_schema(conn_id, owner_id)
967    }
968
969    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
970        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
971        self.state
972            .temporary_schemas
973            .get(conn_id)
974            .map(|schema| schema.items.contains_key(item_name))
975            .unwrap_or(false)
976    }
977
978    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
979    /// Returns Ok if conn_id's temp schema does not exist.
980    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
981        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
982            return Ok(());
983        };
984        if !schema.items.is_empty() {
985            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
986        }
987        Ok(())
988    }
989
990    pub(crate) fn object_dependents(
991        &self,
992        object_ids: &Vec<ObjectId>,
993        conn_id: &ConnectionId,
994    ) -> Vec<ObjectId> {
995        let mut seen = BTreeSet::new();
996        self.state.object_dependents(object_ids, conn_id, &mut seen)
997    }
998
999    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1000        FullNameV1 {
1001            database: name.database.to_string(),
1002            schema: name.schema.clone(),
1003            item: name.item.clone(),
1004        }
1005    }
1006
1007    pub fn find_available_cluster_name(&self, name: &str) -> String {
1008        let mut i = 0;
1009        let mut candidate = name.to_string();
1010        while self.state.clusters_by_name.contains_key(&candidate) {
1011            i += 1;
1012            candidate = format!("{}{}", name, i);
1013        }
1014        candidate
1015    }
1016
1017    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1018        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1019            self.cluster_replica_sizes()
1020                .enabled_allocations()
1021                .map(|a| a.0.to_owned())
1022                .collect::<Vec<_>>()
1023        } else {
1024            self.system_config().allowed_cluster_replica_sizes()
1025        }
1026    }
1027
1028    pub fn concretize_replica_location(
1029        &self,
1030        location: mz_catalog::durable::ReplicaLocation,
1031        allowed_sizes: &Vec<String>,
1032        allowed_availability_zones: Option<&[String]>,
1033        allow_disabled: bool,
1034    ) -> Result<ReplicaLocation, Error> {
1035        self.state.concretize_replica_location(
1036            location,
1037            allowed_sizes,
1038            allowed_availability_zones,
1039            allow_disabled,
1040        )
1041    }
1042
1043    pub(crate) fn ensure_valid_replica_size(
1044        &self,
1045        allowed_sizes: &[String],
1046        size: &String,
1047        allow_disabled: bool,
1048    ) -> Result<(), Error> {
1049        self.state
1050            .ensure_valid_replica_size(allowed_sizes, size, allow_disabled)
1051    }
1052
1053    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1054        &self.state.cluster_replica_sizes
1055    }
1056
1057    /// Returns the privileges of an object by its ID.
1058    pub fn get_privileges(
1059        &self,
1060        id: &SystemObjectId,
1061        conn_id: &ConnectionId,
1062    ) -> Option<&PrivilegeMap> {
1063        match id {
1064            SystemObjectId::Object(id) => match id {
1065                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1066                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1067                ObjectId::Schema((database_spec, schema_spec)) => Some(
1068                    self.get_schema(database_spec, schema_spec, conn_id)
1069                        .privileges(),
1070                ),
1071                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1072                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1073                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1074            },
1075            SystemObjectId::System => Some(&self.state.system_privileges),
1076        }
1077    }
1078
1079    #[mz_ore::instrument(level = "debug")]
1080    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1081        Ok(self.storage().await.advance_upper(new_upper).await?)
1082    }
1083
1084    /// Return the ids of all log sources the given object depends on.
1085    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1086        self.state.introspection_dependencies(id)
1087    }
1088
1089    /// Serializes the catalog's in-memory state.
1090    ///
1091    /// There are no guarantees about the format of the serialized state, except
1092    /// that the serialized state for two identical catalogs will compare
1093    /// identically.
1094    pub fn dump(&self) -> Result<CatalogDump, Error> {
1095        Ok(CatalogDump::new(self.state.dump(None)?))
1096    }
1097
1098    /// Checks the [`Catalog`]s internal consistency.
1099    ///
1100    /// Returns a JSON object describing the inconsistencies, if there are any.
1101    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1102        self.state.check_consistency().map_err(|inconsistencies| {
1103            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1104                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1105            })
1106        })
1107    }
1108
1109    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1110        self.state.config()
1111    }
1112
1113    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1114        self.state.entry_by_id.values()
1115    }
1116
1117    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1118        self.entries()
1119            .filter(|entry| entry.is_connection() && entry.id().is_user())
1120    }
1121
1122    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1123        self.entries()
1124            .filter(|entry| entry.is_table() && entry.id().is_user())
1125    }
1126
1127    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1128        self.entries()
1129            .filter(|entry| entry.is_source() && entry.id().is_user())
1130    }
1131
1132    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1133        self.entries()
1134            .filter(|entry| entry.is_sink() && entry.id().is_user())
1135    }
1136
1137    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1138        self.entries()
1139            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1140    }
1141
1142    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1143        self.entries()
1144            .filter(|entry| entry.is_secret() && entry.id().is_user())
1145    }
1146
1147    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1148        self.state.get_network_policy(&network_policy_id)
1149    }
1150
1151    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1152        self.state.try_get_network_policy_by_name(name)
1153    }
1154
1155    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1156        self.state.clusters_by_id.values()
1157    }
1158
1159    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1160        self.state.get_cluster(cluster_id)
1161    }
1162
1163    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1164        self.state.try_get_cluster(cluster_id)
1165    }
1166
1167    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1168        self.clusters().filter(|cluster| cluster.id.is_user())
1169    }
1170
1171    pub fn get_cluster_replica(
1172        &self,
1173        cluster_id: ClusterId,
1174        replica_id: ReplicaId,
1175    ) -> &ClusterReplica {
1176        self.state.get_cluster_replica(cluster_id, replica_id)
1177    }
1178
1179    pub fn try_get_cluster_replica(
1180        &self,
1181        cluster_id: ClusterId,
1182        replica_id: ReplicaId,
1183    ) -> Option<&ClusterReplica> {
1184        self.state.try_get_cluster_replica(cluster_id, replica_id)
1185    }
1186
1187    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1188        self.user_clusters()
1189            .flat_map(|cluster| cluster.user_replicas())
1190    }
1191
1192    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1193        self.state.database_by_id.values()
1194    }
1195
1196    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1197        self.state
1198            .roles_by_id
1199            .values()
1200            .filter(|role| role.is_user())
1201    }
1202
1203    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1204        self.state
1205            .network_policies_by_id
1206            .iter()
1207            .filter(|(id, _)| id.is_user())
1208            .map(|(_, policy)| policy)
1209    }
1210
1211    pub fn system_privileges(&self) -> &PrivilegeMap {
1212        &self.state.system_privileges
1213    }
1214
1215    pub fn default_privileges(
1216        &self,
1217    ) -> impl Iterator<
1218        Item = (
1219            &DefaultPrivilegeObject,
1220            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1221        ),
1222    > {
1223        self.state.default_privileges.iter()
1224    }
1225
1226    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1227        self.state
1228            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1229    }
1230
1231    pub fn pack_storage_usage_update(
1232        &self,
1233        event: VersionedStorageUsage,
1234        diff: Diff,
1235    ) -> BuiltinTableUpdate {
1236        self.state
1237            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1238    }
1239
1240    pub fn system_config(&self) -> &SystemVars {
1241        self.state.system_config()
1242    }
1243
1244    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1245        self.state.system_config_mut()
1246    }
1247
1248    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1249        self.state.ensure_not_reserved_role(role_id)
1250    }
1251
1252    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1253        self.state.ensure_grantable_role(role_id)
1254    }
1255
1256    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1257        self.state.ensure_not_system_role(role_id)
1258    }
1259
1260    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1261        self.state.ensure_not_predefined_role(role_id)
1262    }
1263
1264    pub fn ensure_not_reserved_network_policy(
1265        &self,
1266        network_policy_id: &NetworkPolicyId,
1267    ) -> Result<(), Error> {
1268        self.state
1269            .ensure_not_reserved_network_policy(network_policy_id)
1270    }
1271
1272    pub fn ensure_not_reserved_object(
1273        &self,
1274        object_id: &ObjectId,
1275        conn_id: &ConnectionId,
1276    ) -> Result<(), Error> {
1277        match object_id {
1278            ObjectId::Cluster(cluster_id) => {
1279                if cluster_id.is_system() {
1280                    let cluster = self.get_cluster(*cluster_id);
1281                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1282                        cluster.name().to_string(),
1283                    )))
1284                } else {
1285                    Ok(())
1286                }
1287            }
1288            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1289                if replica_id.is_system() {
1290                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1291                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1292                        replica.name().to_string(),
1293                    )))
1294                } else {
1295                    Ok(())
1296                }
1297            }
1298            ObjectId::Database(database_id) => {
1299                if database_id.is_system() {
1300                    let database = self.get_database(database_id);
1301                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1302                        database.name().to_string(),
1303                    )))
1304                } else {
1305                    Ok(())
1306                }
1307            }
1308            ObjectId::Schema((database_spec, schema_spec)) => {
1309                if schema_spec.is_system() {
1310                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1311                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1312                        schema.name().schema.clone(),
1313                    )))
1314                } else {
1315                    Ok(())
1316                }
1317            }
1318            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1319            ObjectId::Item(item_id) => {
1320                if item_id.is_system() {
1321                    let item = self.get_entry(item_id);
1322                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1323                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1324                } else {
1325                    Ok(())
1326                }
1327            }
1328            ObjectId::NetworkPolicy(network_policy_id) => {
1329                self.ensure_not_reserved_network_policy(network_policy_id)
1330            }
1331        }
1332    }
1333
1334    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1335    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1336        &mut self,
1337        create_sql: &str,
1338        force_if_exists_skip: bool,
1339    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1340        self.state
1341            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1342    }
1343
1344    /// Cache global and, optionally, local expressions for the given
1345    /// `GlobalId`.
1346    ///
1347    /// Takes the plans and metainfo directly as parameters (rather than
1348    /// fishing them out of catalog state), so this can be called **before**
1349    /// the catalog transaction that creates the item. Returns the future
1350    /// returned by [`Catalog::update_expression_cache`]; callers should
1351    /// `.await` it before the catalog transaction commits, so the durable
1352    /// expression cache is observed to contain the entries by the time any
1353    /// other process (or a subsequent bootstrap on this process) reads them.
1354    pub(crate) fn cache_expressions(
1355        &self,
1356        id: GlobalId,
1357        local_mir: Option<OptimizedMirRelationExpr>,
1358        mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1359        mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1360        dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1361        optimizer_features: OptimizerFeatures,
1362    ) -> BoxFuture<'static, ()> {
1363        // Make sure we're not caching the result of timestamp selection, as
1364        // it will almost certainly be wrong if we re-install the dataflow at
1365        // a later time.
1366        global_mir.as_of = None;
1367        global_mir.until = Default::default();
1368        physical_plan.as_of = None;
1369        physical_plan.until = Default::default();
1370
1371        let mut local_exprs = Vec::new();
1372        if let Some(local_mir) = local_mir {
1373            local_exprs.push((
1374                id,
1375                LocalExpressions {
1376                    local_mir,
1377                    optimizer_features: optimizer_features.clone(),
1378                },
1379            ));
1380        }
1381        let global_exprs = vec![(
1382            id,
1383            GlobalExpressions {
1384                global_mir,
1385                physical_plan,
1386                dataflow_metainfos,
1387                optimizer_features,
1388            },
1389        )];
1390        self.update_expression_cache(local_exprs, global_exprs, Default::default())
1391    }
1392
1393    pub(crate) fn update_expression_cache<'a, 'b>(
1394        &'a self,
1395        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1396        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1397        invalidate_ids: BTreeSet<GlobalId>,
1398    ) -> BoxFuture<'b, ()> {
1399        if let Some(expr_cache) = &self.expr_cache_handle {
1400            expr_cache
1401                .update(
1402                    new_local_expressions,
1403                    new_global_expressions,
1404                    invalidate_ids,
1405                )
1406                .boxed()
1407        } else {
1408            async {}.boxed()
1409        }
1410    }
1411
1412    /// Listen for and apply all unconsumed updates to the durable catalog state.
1413    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1414    // `#[cfg(test)]` annotation.
1415    #[cfg(test)]
1416    async fn sync_to_current_updates(
1417        &mut self,
1418    ) -> Result<
1419        (
1420            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1421            Vec<ParsedStateUpdate>,
1422        ),
1423        CatalogError,
1424    > {
1425        let updates = self.storage().await.sync_to_current_updates().await?;
1426        let (builtin_table_updates, catalog_updates) = self
1427            .state
1428            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1429            .await;
1430        Ok((builtin_table_updates, catalog_updates))
1431    }
1432}
1433
1434pub fn is_reserved_name(name: &str) -> bool {
1435    BUILTIN_PREFIXES
1436        .iter()
1437        .any(|prefix| name.starts_with(prefix))
1438}
1439
1440pub fn is_reserved_role_name(name: &str) -> bool {
1441    is_reserved_name(name) || is_public_role(name)
1442}
1443
1444pub fn is_public_role(name: &str) -> bool {
1445    name == &*PUBLIC_ROLE_NAME
1446}
1447
1448pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1449    object_type_to_audit_object_type(sql_type.into())
1450}
1451
1452pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1453    match id {
1454        CommentObjectId::Table(_) => ObjectType::Table,
1455        CommentObjectId::View(_) => ObjectType::View,
1456        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1457        CommentObjectId::Source(_) => ObjectType::Source,
1458        CommentObjectId::Sink(_) => ObjectType::Sink,
1459        CommentObjectId::Index(_) => ObjectType::Index,
1460        CommentObjectId::Func(_) => ObjectType::Func,
1461        CommentObjectId::Connection(_) => ObjectType::Connection,
1462        CommentObjectId::Type(_) => ObjectType::Type,
1463        CommentObjectId::Secret(_) => ObjectType::Secret,
1464        CommentObjectId::Role(_) => ObjectType::Role,
1465        CommentObjectId::Database(_) => ObjectType::Database,
1466        CommentObjectId::Schema(_) => ObjectType::Schema,
1467        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1468        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1469        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1470    }
1471}
1472
1473pub(crate) fn object_type_to_audit_object_type(
1474    object_type: mz_sql::catalog::ObjectType,
1475) -> ObjectType {
1476    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1477}
1478
1479pub(crate) fn system_object_type_to_audit_object_type(
1480    system_type: &SystemObjectType,
1481) -> ObjectType {
1482    match system_type {
1483        SystemObjectType::Object(object_type) => match object_type {
1484            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1485            mz_sql::catalog::ObjectType::View => ObjectType::View,
1486            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1487            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1488            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1489            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1490            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1491            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1492            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1493            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1494            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1495            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1496            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1497            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1498            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1499            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1500        },
1501        SystemObjectType::System => ObjectType::System,
1502    }
1503}
1504
1505#[derive(Debug, Copy, Clone)]
1506pub enum UpdatePrivilegeVariant {
1507    Grant,
1508    Revoke,
1509}
1510
1511impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1512    fn from(variant: UpdatePrivilegeVariant) -> Self {
1513        match variant {
1514            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1515            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1516        }
1517    }
1518}
1519
1520impl From<UpdatePrivilegeVariant> for EventType {
1521    fn from(variant: UpdatePrivilegeVariant) -> Self {
1522        match variant {
1523            UpdatePrivilegeVariant::Grant => EventType::Grant,
1524            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1525        }
1526    }
1527}
1528
1529impl ConnCatalog<'_> {
1530    fn resolve_item_name(
1531        &self,
1532        name: &PartialItemName,
1533    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1534        self.resolve_item(name).map(|entry| entry.name())
1535    }
1536
1537    fn resolve_function_name(
1538        &self,
1539        name: &PartialItemName,
1540    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1541        self.resolve_function(name).map(|entry| entry.name())
1542    }
1543
1544    fn resolve_type_name(
1545        &self,
1546        name: &PartialItemName,
1547    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1548        self.resolve_type(name).map(|entry| entry.name())
1549    }
1550}
1551
1552impl ExprHumanizer for ConnCatalog<'_> {
1553    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1554        let entry = self.state.try_get_entry_by_global_id(&id)?;
1555        Some(self.resolve_full_name(entry.name()).to_string())
1556    }
1557
1558    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1559        let entry = self.state.try_get_entry_by_global_id(&id)?;
1560        Some(entry.name().item.clone())
1561    }
1562
1563    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1564        let entry = self.state.try_get_entry_by_global_id(&id)?;
1565        Some(self.resolve_full_name(entry.name()).into_parts())
1566    }
1567
1568    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1569        use SqlScalarType::*;
1570
1571        match typ {
1572            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1573            List {
1574                custom_id: Some(item_id),
1575                ..
1576            }
1577            | Map {
1578                custom_id: Some(item_id),
1579                ..
1580            } => {
1581                let item = self.get_item(item_id);
1582                self.minimal_qualification(item.name()).to_string()
1583            }
1584            List { element_type, .. } => {
1585                format!(
1586                    "{} list",
1587                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1588                )
1589            }
1590            Map { value_type, .. } => format!(
1591                "map[{}=>{}]",
1592                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1593                self.humanize_sql_scalar_type(value_type, postgres_compat)
1594            ),
1595            Record {
1596                custom_id: Some(item_id),
1597                ..
1598            } => {
1599                let item = self.get_item(item_id);
1600                self.minimal_qualification(item.name()).to_string()
1601            }
1602            Record { fields, .. } => format!(
1603                "record({})",
1604                fields
1605                    .iter()
1606                    .map(|f| format!(
1607                        "{}: {}",
1608                        f.0,
1609                        self.humanize_sql_column_type(&f.1, postgres_compat)
1610                    ))
1611                    .join(",")
1612            ),
1613            PgLegacyChar => "\"char\"".into(),
1614            Char { length } if !postgres_compat => match length {
1615                None => "char".into(),
1616                Some(length) => format!("char({})", length.into_u32()),
1617            },
1618            VarChar { max_length } if !postgres_compat => match max_length {
1619                None => "varchar".into(),
1620                Some(length) => format!("varchar({})", length.into_u32()),
1621            },
1622            UInt16 => "uint2".into(),
1623            UInt32 => "uint4".into(),
1624            UInt64 => "uint8".into(),
1625            ty => {
1626                let pgrepr_type = mz_pgrepr::Type::from(ty);
1627                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1628
1629                let res = if self
1630                    .effective_search_path(true)
1631                    .iter()
1632                    .any(|(_, schema)| schema == &pg_catalog_schema)
1633                {
1634                    pgrepr_type.name().to_string()
1635                } else {
1636                    // If PG_CATALOG_SCHEMA is not in search path, you need
1637                    // qualified object name to refer to type.
1638                    let name = QualifiedItemName {
1639                        qualifiers: ItemQualifiers {
1640                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1641                            schema_spec: pg_catalog_schema,
1642                        },
1643                        item: pgrepr_type.name().to_string(),
1644                    };
1645                    self.resolve_full_name(&name).to_string()
1646                };
1647                res
1648            }
1649        }
1650    }
1651
1652    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1653        let entry = self.state.try_get_entry_by_global_id(&id)?;
1654
1655        match entry.index() {
1656            Some(index) => {
1657                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1658                let mut on_names = on_desc
1659                    .iter_names()
1660                    .map(|col_name| col_name.to_string())
1661                    .collect::<Vec<_>>();
1662
1663                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1664
1665                // Init ix_names with unknown column names. Unknown columns are
1666                // represented as an empty String and rendered as `#c` by the
1667                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1668                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1669                let mut ix_names = vec![String::new(); ix_arity];
1670
1671                // Apply the permutation by swapping on_names with ix_names.
1672                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1673                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1674                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1675                    std::mem::swap(on_name, ix_name);
1676                }
1677
1678                Some(ix_names) // Return the updated ix_names vector.
1679            }
1680            None => {
1681                let desc = self.state.try_get_desc_by_global_id(&id)?;
1682                let column_names = desc
1683                    .iter_names()
1684                    .map(|col_name| col_name.to_string())
1685                    .collect();
1686
1687                Some(column_names)
1688            }
1689        }
1690    }
1691
1692    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1693        let desc = self.state.try_get_desc_by_global_id(&id)?;
1694        Some(desc.get_name(column).to_string())
1695    }
1696
1697    fn id_exists(&self, id: GlobalId) -> bool {
1698        self.state.entry_by_global_id.contains_key(&id)
1699    }
1700}
1701
1702impl SessionCatalog for ConnCatalog<'_> {
1703    fn active_role_id(&self) -> &RoleId {
1704        &self.role_id
1705    }
1706
1707    fn restrict_to_user_objects(&self) -> bool {
1708        self.restrict_to_user_objects
1709    }
1710
1711    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1712        self.prepared_statements
1713            .as_ref()
1714            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1715            .flatten()
1716    }
1717
1718    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1719        self.portals
1720            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1721    }
1722
1723    fn active_database(&self) -> Option<&DatabaseId> {
1724        self.database.as_ref()
1725    }
1726
1727    fn active_cluster(&self) -> &str {
1728        &self.cluster
1729    }
1730
1731    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1732        &self.search_path
1733    }
1734
1735    fn resolve_database(
1736        &self,
1737        database_name: &str,
1738    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1739        Ok(self.state.resolve_database(database_name)?)
1740    }
1741
1742    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1743        self.state
1744            .database_by_id
1745            .get(id)
1746            .expect("database doesn't exist")
1747    }
1748
1749    // `as` is ok to use to cast to a trait object.
1750    #[allow(clippy::as_conversions)]
1751    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1752        self.state
1753            .database_by_id
1754            .values()
1755            .map(|database| database as &dyn CatalogDatabase)
1756            .collect()
1757    }
1758
1759    fn resolve_schema(
1760        &self,
1761        database_name: Option<&str>,
1762        schema_name: &str,
1763    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1764        Ok(self.state.resolve_schema(
1765            self.database.as_ref(),
1766            database_name,
1767            schema_name,
1768            &self.conn_id,
1769        )?)
1770    }
1771
1772    fn resolve_schema_in_database(
1773        &self,
1774        database_spec: &ResolvedDatabaseSpecifier,
1775        schema_name: &str,
1776    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1777        Ok(self
1778            .state
1779            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1780    }
1781
1782    fn get_schema(
1783        &self,
1784        database_spec: &ResolvedDatabaseSpecifier,
1785        schema_spec: &SchemaSpecifier,
1786    ) -> &dyn CatalogSchema {
1787        self.state
1788            .get_schema(database_spec, schema_spec, &self.conn_id)
1789    }
1790
1791    // `as` is ok to use to cast to a trait object.
1792    #[allow(clippy::as_conversions)]
1793    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1794        self.get_databases()
1795            .into_iter()
1796            .flat_map(|database| database.schemas().into_iter())
1797            .chain(
1798                self.state
1799                    .ambient_schemas_by_id
1800                    .values()
1801                    .chain(self.state.temporary_schemas.values())
1802                    .map(|schema| schema as &dyn CatalogSchema),
1803            )
1804            .collect()
1805    }
1806
1807    fn get_mz_internal_schema_id(&self) -> SchemaId {
1808        self.state().get_mz_internal_schema_id()
1809    }
1810
1811    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1812        self.state().get_mz_unsafe_schema_id()
1813    }
1814
1815    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1816        self.state.is_system_schema_specifier(schema)
1817    }
1818
1819    fn resolve_role(
1820        &self,
1821        role_name: &str,
1822    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1823        match self.state.try_get_role_by_name(role_name) {
1824            Some(role) => Ok(role),
1825            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1826        }
1827    }
1828
1829    fn resolve_network_policy(
1830        &self,
1831        policy_name: &str,
1832    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1833        match self.state.try_get_network_policy_by_name(policy_name) {
1834            Some(policy) => Ok(policy),
1835            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1836        }
1837    }
1838
1839    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1840        Some(self.state.roles_by_id.get(id)?)
1841    }
1842
1843    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1844        self.state.get_role(id)
1845    }
1846
1847    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1848        // `as` is ok to use to cast to a trait object.
1849        #[allow(clippy::as_conversions)]
1850        self.state
1851            .roles_by_id
1852            .values()
1853            .map(|role| role as &dyn CatalogRole)
1854            .collect()
1855    }
1856
1857    fn mz_system_role_id(&self) -> RoleId {
1858        MZ_SYSTEM_ROLE_ID
1859    }
1860
1861    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1862        self.state.collect_role_membership(id)
1863    }
1864
1865    fn get_network_policy(
1866        &self,
1867        id: &NetworkPolicyId,
1868    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1869        self.state.get_network_policy(id)
1870    }
1871
1872    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1873        // `as` is ok to use to cast to a trait object.
1874        #[allow(clippy::as_conversions)]
1875        self.state
1876            .network_policies_by_id
1877            .values()
1878            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1879            .collect()
1880    }
1881
1882    fn resolve_cluster(
1883        &self,
1884        cluster_name: Option<&str>,
1885    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1886        Ok(self
1887            .state
1888            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1889    }
1890
1891    fn resolve_cluster_replica(
1892        &self,
1893        cluster_replica_name: &QualifiedReplica,
1894    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1895        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1896    }
1897
1898    fn resolve_item(
1899        &self,
1900        name: &PartialItemName,
1901    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1902        let r = self.state.resolve_entry(
1903            self.database.as_ref(),
1904            &self.effective_search_path(true),
1905            name,
1906            &self.conn_id,
1907        )?;
1908        if self.unresolvable_ids.contains(&r.id()) {
1909            Err(SqlCatalogError::UnknownItem(name.to_string()))
1910        } else {
1911            Ok(r)
1912        }
1913    }
1914
1915    fn resolve_function(
1916        &self,
1917        name: &PartialItemName,
1918    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1919        let r = self.state.resolve_function(
1920            self.database.as_ref(),
1921            &self.effective_search_path(false),
1922            name,
1923            &self.conn_id,
1924        )?;
1925
1926        if self.unresolvable_ids.contains(&r.id()) {
1927            Err(SqlCatalogError::UnknownFunction {
1928                name: name.to_string(),
1929                alternative: None,
1930            })
1931        } else {
1932            Ok(r)
1933        }
1934    }
1935
1936    fn resolve_type(
1937        &self,
1938        name: &PartialItemName,
1939    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1940        let r = self.state.resolve_type(
1941            self.database.as_ref(),
1942            &self.effective_search_path(false),
1943            name,
1944            &self.conn_id,
1945        )?;
1946
1947        if self.unresolvable_ids.contains(&r.id()) {
1948            Err(SqlCatalogError::UnknownType {
1949                name: name.to_string(),
1950            })
1951        } else {
1952            Ok(r)
1953        }
1954    }
1955
1956    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1957        self.state.get_system_type(name)
1958    }
1959
1960    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1961        Some(self.state.try_get_entry(id)?)
1962    }
1963
1964    fn try_get_item_by_global_id(
1965        &self,
1966        id: &GlobalId,
1967    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1968        let entry = self.state.try_get_entry_by_global_id(id)?;
1969        let entry = match &entry.item {
1970            CatalogItem::Table(table) => {
1971                let (version, _gid) = table
1972                    .collections
1973                    .iter()
1974                    .find(|(_version, gid)| *gid == id)
1975                    .expect("catalog out of sync, mismatched GlobalId");
1976                entry.at_version(RelationVersionSelector::Specific(*version))
1977            }
1978            _ => entry.at_version(RelationVersionSelector::Latest),
1979        };
1980        Some(entry)
1981    }
1982
1983    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1984        self.state.get_entry(id)
1985    }
1986
1987    fn get_item_by_global_id(
1988        &self,
1989        id: &GlobalId,
1990    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1991        let entry = self.state.get_entry_by_global_id(id);
1992        let entry = match &entry.item {
1993            CatalogItem::Table(table) => {
1994                let (version, _gid) = table
1995                    .collections
1996                    .iter()
1997                    .find(|(_version, gid)| *gid == id)
1998                    .expect("catalog out of sync, mismatched GlobalId");
1999                entry.at_version(RelationVersionSelector::Specific(*version))
2000            }
2001            _ => entry.at_version(RelationVersionSelector::Latest),
2002        };
2003        entry
2004    }
2005
2006    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2007        self.get_schemas()
2008            .into_iter()
2009            .flat_map(|schema| schema.item_ids())
2010            .map(|id| self.get_item(&id))
2011            .collect()
2012    }
2013
2014    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2015        self.state
2016            .get_item_by_name(name, &self.conn_id)
2017            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2018    }
2019
2020    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2021        self.state
2022            .get_type_by_name(name, &self.conn_id)
2023            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2024    }
2025
2026    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2027        &self.state.clusters_by_id[&id]
2028    }
2029
2030    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2031        self.state
2032            .clusters_by_id
2033            .values()
2034            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2035            .collect()
2036    }
2037
2038    fn get_cluster_replica(
2039        &self,
2040        cluster_id: ClusterId,
2041        replica_id: ReplicaId,
2042    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2043        let cluster = self.get_cluster(cluster_id);
2044        cluster.replica(replica_id)
2045    }
2046
2047    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2048        self.get_clusters()
2049            .into_iter()
2050            .flat_map(|cluster| cluster.replicas().into_iter())
2051            .collect()
2052    }
2053
2054    fn get_system_privileges(&self) -> &PrivilegeMap {
2055        &self.state.system_privileges
2056    }
2057
2058    fn get_default_privileges(
2059        &self,
2060    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2061        self.state
2062            .default_privileges
2063            .iter()
2064            .map(|(object, acl_items)| (object, acl_items.collect()))
2065            .collect()
2066    }
2067
2068    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2069        self.state.find_available_name(name, &self.conn_id)
2070    }
2071
2072    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2073        self.state.resolve_full_name(name, Some(&self.conn_id))
2074    }
2075
2076    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2077        self.state.resolve_full_schema_name(name)
2078    }
2079
2080    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2081        self.state.get_entry_by_global_id(global_id).id()
2082    }
2083
2084    fn resolve_global_id(
2085        &self,
2086        item_id: &CatalogItemId,
2087        version: RelationVersionSelector,
2088    ) -> GlobalId {
2089        self.state
2090            .get_entry(item_id)
2091            .at_version(version)
2092            .global_id()
2093    }
2094
2095    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2096        self.state.config()
2097    }
2098
2099    fn now(&self) -> EpochMillis {
2100        (self.state.config().now)()
2101    }
2102
2103    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2104        self.state.aws_privatelink_availability_zones.clone()
2105    }
2106
2107    fn system_vars(&self) -> &SystemVars {
2108        &self.state.system_configuration
2109    }
2110
2111    fn system_vars_mut(&mut self) -> &mut SystemVars {
2112        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2113    }
2114
2115    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2116        self.state().get_owner_id(id, self.conn_id())
2117    }
2118
2119    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2120        match id {
2121            SystemObjectId::System => Some(&self.state.system_privileges),
2122            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2123                Some(self.get_cluster(*id).privileges())
2124            }
2125            SystemObjectId::Object(ObjectId::Database(id)) => {
2126                Some(self.get_database(id).privileges())
2127            }
2128            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2129                // For temporary schemas that haven't been created yet (lazy creation),
2130                // we return None - the RBAC check will need to handle this case.
2131                self.state
2132                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2133                    .map(|schema| schema.privileges())
2134            }
2135            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2136            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2137                Some(self.get_network_policy(id).privileges())
2138            }
2139            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2140            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2141        }
2142    }
2143
2144    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2145        let mut seen = BTreeSet::new();
2146        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2147    }
2148
2149    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2150        let mut seen = BTreeSet::new();
2151        self.state.item_dependents(id, &mut seen)
2152    }
2153
2154    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2155        rbac::all_object_privileges(object_type)
2156    }
2157
2158    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2159        self.state.get_object_type(object_id)
2160    }
2161
2162    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2163        self.state.get_system_object_type(id)
2164    }
2165
2166    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2167    /// the object.
2168    ///
2169    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2170    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2171    /// only for error msgs and other humanizations.
2172    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2173        if qualified_name.qualifiers.schema_spec.is_temporary() {
2174            // All bets are off. Just give up and return the qualified name as is.
2175            // TODO: Figure out what's going on with temporary objects.
2176
2177            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2178            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2179            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2180            // which we don't have a simple repro.
2181            return qualified_name.item.clone().into();
2182        }
2183
2184        let database_id = match &qualified_name.qualifiers.database_spec {
2185            ResolvedDatabaseSpecifier::Ambient => None,
2186            ResolvedDatabaseSpecifier::Id(id)
2187                if self.database.is_some() && self.database == Some(*id) =>
2188            {
2189                None
2190            }
2191            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2192        };
2193
2194        let schema_spec = if database_id.is_none()
2195            && self.resolve_item_name(&PartialItemName {
2196                database: None,
2197                schema: None,
2198                item: qualified_name.item.clone(),
2199            }) == Ok(qualified_name)
2200            || self.resolve_function_name(&PartialItemName {
2201                database: None,
2202                schema: None,
2203                item: qualified_name.item.clone(),
2204            }) == Ok(qualified_name)
2205            || self.resolve_type_name(&PartialItemName {
2206                database: None,
2207                schema: None,
2208                item: qualified_name.item.clone(),
2209            }) == Ok(qualified_name)
2210        {
2211            None
2212        } else {
2213            // If `search_path` does not contain `full_name.schema`, the
2214            // `PartialName` must contain it.
2215            Some(qualified_name.qualifiers.schema_spec.clone())
2216        };
2217
2218        let res = PartialItemName {
2219            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2220            schema: schema_spec.map(|spec| {
2221                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2222                    .name()
2223                    .schema
2224                    .clone()
2225            }),
2226            item: qualified_name.item.clone(),
2227        };
2228        assert!(
2229            self.resolve_item_name(&res) == Ok(qualified_name)
2230                || self.resolve_function_name(&res) == Ok(qualified_name)
2231                || self.resolve_type_name(&res) == Ok(qualified_name)
2232        );
2233        res
2234    }
2235
2236    fn add_notice(&self, notice: PlanNotice) {
2237        let _ = self.notices_tx.send(notice.into());
2238    }
2239
2240    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2241        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2242        self.state.comments.get_object_comments(comment_id)
2243    }
2244
2245    fn is_cluster_size_cc(&self, size: &str) -> bool {
2246        self.state
2247            .cluster_replica_sizes
2248            .0
2249            .get(size)
2250            .map_or(false, |a| a.is_cc)
2251    }
2252}
2253
2254#[cfg(test)]
2255mod tests {
2256    use std::collections::{BTreeMap, BTreeSet};
2257    use std::sync::Arc;
2258    use std::{env, iter};
2259
2260    use itertools::Itertools;
2261    use mz_catalog::memory::objects::CatalogItem;
2262    use mz_postgres_util::{query, sql};
2263    use tokio_postgres::NoTls;
2264    use tokio_postgres::types::Type;
2265    use uuid::Uuid;
2266
2267    use mz_catalog::SYSTEM_CONN_ID;
2268    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2269    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2270    use mz_controller_types::{ClusterId, ReplicaId};
2271    use mz_expr::{Eval, MirScalarExpr};
2272    use mz_ore::now::to_datetime;
2273    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2274    use mz_persist_client::PersistClient;
2275    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2276    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2277    use mz_repr::role_id::RoleId;
2278    use mz_repr::{
2279        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2280        SqlScalarType, Timestamp,
2281    };
2282    use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2283    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2284    use mz_sql::names::{
2285        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2286        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2287    };
2288    use mz_sql::plan::{
2289        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2290        QueryLifetime, Scope, StatementContext,
2291    };
2292    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2293    use mz_sql::session::vars::{SystemVars, VarInput};
2294
2295    use crate::catalog::state::LocalExpressionCache;
2296    use crate::catalog::{Catalog, Op};
2297    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2298    use crate::session::Session;
2299
2300    /// System sessions have an empty `search_path` so it's necessary to
2301    /// schema-qualify all referenced items.
2302    ///
2303    /// Dummy (and ostensibly client) sessions contain system schemas in their
2304    /// search paths, so do not require schema qualification on system objects such
2305    /// as types.
2306    #[mz_ore::test(tokio::test)]
2307    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2308    async fn test_minimal_qualification() {
2309        Catalog::with_debug(|catalog| async move {
2310            struct TestCase {
2311                input: QualifiedItemName,
2312                system_output: PartialItemName,
2313                normal_output: PartialItemName,
2314            }
2315
2316            let test_cases = vec![
2317                TestCase {
2318                    input: QualifiedItemName {
2319                        qualifiers: ItemQualifiers {
2320                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2321                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2322                        },
2323                        item: "numeric".to_string(),
2324                    },
2325                    system_output: PartialItemName {
2326                        database: None,
2327                        schema: None,
2328                        item: "numeric".to_string(),
2329                    },
2330                    normal_output: PartialItemName {
2331                        database: None,
2332                        schema: None,
2333                        item: "numeric".to_string(),
2334                    },
2335                },
2336                TestCase {
2337                    input: QualifiedItemName {
2338                        qualifiers: ItemQualifiers {
2339                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2340                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2341                        },
2342                        item: "mz_array_types".to_string(),
2343                    },
2344                    system_output: PartialItemName {
2345                        database: None,
2346                        schema: None,
2347                        item: "mz_array_types".to_string(),
2348                    },
2349                    normal_output: PartialItemName {
2350                        database: None,
2351                        schema: None,
2352                        item: "mz_array_types".to_string(),
2353                    },
2354                },
2355            ];
2356
2357            for tc in test_cases {
2358                assert_eq!(
2359                    catalog
2360                        .for_system_session()
2361                        .minimal_qualification(&tc.input),
2362                    tc.system_output
2363                );
2364                assert_eq!(
2365                    catalog
2366                        .for_session(&Session::dummy())
2367                        .minimal_qualification(&tc.input),
2368                    tc.normal_output
2369                );
2370            }
2371            catalog.expire().await;
2372        })
2373        .await
2374    }
2375
2376    #[mz_ore::test(tokio::test)]
2377    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2378    async fn test_catalog_revision() {
2379        let persist_client = PersistClient::new_for_tests().await;
2380        let organization_id = Uuid::new_v4();
2381        let bootstrap_args = test_bootstrap_args();
2382        {
2383            let mut catalog = Catalog::open_debug_catalog(
2384                persist_client.clone(),
2385                organization_id.clone(),
2386                &bootstrap_args,
2387            )
2388            .await
2389            .expect("unable to open debug catalog");
2390            assert_eq!(catalog.transient_revision(), 1);
2391            let commit_ts = catalog.current_upper().await;
2392            catalog
2393                .transact(
2394                    None,
2395                    commit_ts,
2396                    None,
2397                    vec![Op::CreateDatabase {
2398                        name: "test".to_string(),
2399                        owner_id: MZ_SYSTEM_ROLE_ID,
2400                    }],
2401                )
2402                .await
2403                .expect("failed to transact");
2404            assert_eq!(catalog.transient_revision(), 2);
2405            catalog.expire().await;
2406        }
2407        {
2408            let catalog =
2409                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2410                    .await
2411                    .expect("unable to open debug catalog");
2412            // Re-opening the same catalog resets the transient_revision to 1.
2413            assert_eq!(catalog.transient_revision(), 1);
2414            catalog.expire().await;
2415        }
2416    }
2417
2418    #[mz_ore::test(tokio::test)]
2419    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2420    async fn test_effective_search_path() {
2421        Catalog::with_debug(|catalog| async move {
2422            let mz_catalog_schema = (
2423                ResolvedDatabaseSpecifier::Ambient,
2424                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2425            );
2426            let pg_catalog_schema = (
2427                ResolvedDatabaseSpecifier::Ambient,
2428                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2429            );
2430            let mz_temp_schema = (
2431                ResolvedDatabaseSpecifier::Ambient,
2432                SchemaSpecifier::Temporary,
2433            );
2434
2435            // Behavior with the default search_schema (public)
2436            let session = Session::dummy();
2437            let conn_catalog = catalog.for_session(&session);
2438            assert_ne!(
2439                conn_catalog.effective_search_path(false),
2440                conn_catalog.search_path
2441            );
2442            assert_ne!(
2443                conn_catalog.effective_search_path(true),
2444                conn_catalog.search_path
2445            );
2446            assert_eq!(
2447                conn_catalog.effective_search_path(false),
2448                vec![
2449                    mz_catalog_schema.clone(),
2450                    pg_catalog_schema.clone(),
2451                    conn_catalog.search_path[0].clone()
2452                ]
2453            );
2454            assert_eq!(
2455                conn_catalog.effective_search_path(true),
2456                vec![
2457                    mz_temp_schema.clone(),
2458                    mz_catalog_schema.clone(),
2459                    pg_catalog_schema.clone(),
2460                    conn_catalog.search_path[0].clone()
2461                ]
2462            );
2463
2464            // missing schemas are added when missing
2465            let mut session = Session::dummy();
2466            session
2467                .vars_mut()
2468                .set(
2469                    &SystemVars::new(),
2470                    "search_path",
2471                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2472                    false,
2473                )
2474                .expect("failed to set search_path");
2475            let conn_catalog = catalog.for_session(&session);
2476            assert_ne!(
2477                conn_catalog.effective_search_path(false),
2478                conn_catalog.search_path
2479            );
2480            assert_ne!(
2481                conn_catalog.effective_search_path(true),
2482                conn_catalog.search_path
2483            );
2484            assert_eq!(
2485                conn_catalog.effective_search_path(false),
2486                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2487            );
2488            assert_eq!(
2489                conn_catalog.effective_search_path(true),
2490                vec![
2491                    mz_temp_schema.clone(),
2492                    mz_catalog_schema.clone(),
2493                    pg_catalog_schema.clone()
2494                ]
2495            );
2496
2497            let mut session = Session::dummy();
2498            session
2499                .vars_mut()
2500                .set(
2501                    &SystemVars::new(),
2502                    "search_path",
2503                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2504                    false,
2505                )
2506                .expect("failed to set search_path");
2507            let conn_catalog = catalog.for_session(&session);
2508            assert_ne!(
2509                conn_catalog.effective_search_path(false),
2510                conn_catalog.search_path
2511            );
2512            assert_ne!(
2513                conn_catalog.effective_search_path(true),
2514                conn_catalog.search_path
2515            );
2516            assert_eq!(
2517                conn_catalog.effective_search_path(false),
2518                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2519            );
2520            assert_eq!(
2521                conn_catalog.effective_search_path(true),
2522                vec![
2523                    mz_temp_schema.clone(),
2524                    pg_catalog_schema.clone(),
2525                    mz_catalog_schema.clone()
2526                ]
2527            );
2528
2529            let mut session = Session::dummy();
2530            session
2531                .vars_mut()
2532                .set(
2533                    &SystemVars::new(),
2534                    "search_path",
2535                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2536                    false,
2537                )
2538                .expect("failed to set search_path");
2539            let conn_catalog = catalog.for_session(&session);
2540            assert_ne!(
2541                conn_catalog.effective_search_path(false),
2542                conn_catalog.search_path
2543            );
2544            assert_ne!(
2545                conn_catalog.effective_search_path(true),
2546                conn_catalog.search_path
2547            );
2548            assert_eq!(
2549                conn_catalog.effective_search_path(false),
2550                vec![
2551                    mz_catalog_schema.clone(),
2552                    pg_catalog_schema.clone(),
2553                    mz_temp_schema.clone()
2554                ]
2555            );
2556            assert_eq!(
2557                conn_catalog.effective_search_path(true),
2558                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2559            );
2560            catalog.expire().await;
2561        })
2562        .await
2563    }
2564
2565    #[mz_ore::test(tokio::test)]
2566    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2567    async fn test_normalized_create() {
2568        use mz_ore::collections::CollectionExt;
2569        Catalog::with_debug(|catalog| async move {
2570            let conn_catalog = catalog.for_system_session();
2571            let scx = &mut StatementContext::new(None, &conn_catalog);
2572
2573            let parsed = mz_sql_parser::parser::parse_statements(
2574                "create view public.foo as select 1 as bar",
2575            )
2576            .expect("")
2577            .into_element()
2578            .ast;
2579
2580            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2581
2582            // Ensure that all identifiers are quoted.
2583            assert_eq!(
2584                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2585                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2586            );
2587            catalog.expire().await;
2588        })
2589        .await;
2590    }
2591
2592    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2593    #[mz_ore::test(tokio::test)]
2594    #[cfg_attr(miri, ignore)] // slow
2595    async fn test_large_catalog_item() {
2596        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2597        let column = ", 1";
2598        let view_def_size = view_def.bytes().count();
2599        let column_size = column.bytes().count();
2600        let column_count =
2601            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2602        let columns = iter::repeat(column).take(column_count).join("");
2603        let create_sql = format!("{view_def}{columns})");
2604        let create_sql_check = create_sql.clone();
2605        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2606        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2607            &create_sql
2608        ));
2609
2610        let persist_client = PersistClient::new_for_tests().await;
2611        let organization_id = Uuid::new_v4();
2612        let id = CatalogItemId::User(1);
2613        let gid = GlobalId::User(1);
2614        let bootstrap_args = test_bootstrap_args();
2615        {
2616            let mut catalog = Catalog::open_debug_catalog(
2617                persist_client.clone(),
2618                organization_id.clone(),
2619                &bootstrap_args,
2620            )
2621            .await
2622            .expect("unable to open debug catalog");
2623            let item = catalog
2624                .state()
2625                .deserialize_item(
2626                    gid,
2627                    &create_sql,
2628                    &BTreeMap::new(),
2629                    &mut LocalExpressionCache::Closed,
2630                    None,
2631                )
2632                .expect("unable to parse view");
2633            let commit_ts = catalog.current_upper().await;
2634            catalog
2635                .transact(
2636                    None,
2637                    commit_ts,
2638                    None,
2639                    vec![Op::CreateItem {
2640                        item,
2641                        name: QualifiedItemName {
2642                            qualifiers: ItemQualifiers {
2643                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2644                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2645                            },
2646                            item: "v".to_string(),
2647                        },
2648                        id,
2649                        owner_id: MZ_SYSTEM_ROLE_ID,
2650                    }],
2651                )
2652                .await
2653                .expect("failed to transact");
2654            catalog.expire().await;
2655        }
2656        {
2657            let catalog =
2658                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2659                    .await
2660                    .expect("unable to open debug catalog");
2661            let view = catalog.get_entry(&id);
2662            assert_eq!("v", view.name.item);
2663            match &view.item {
2664                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2665                item => panic!("expected view, got {}", item.typ()),
2666            }
2667            catalog.expire().await;
2668        }
2669    }
2670
2671    #[mz_ore::test(tokio::test)]
2672    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2673    async fn test_object_type() {
2674        Catalog::with_debug(|catalog| async move {
2675            let conn_catalog = catalog.for_system_session();
2676
2677            assert_eq!(
2678                mz_sql::catalog::ObjectType::ClusterReplica,
2679                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2680                    ClusterId::user(1).expect("1 is a valid ID"),
2681                    ReplicaId::User(1)
2682                )))
2683            );
2684            assert_eq!(
2685                mz_sql::catalog::ObjectType::Role,
2686                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2687            );
2688            catalog.expire().await;
2689        })
2690        .await;
2691    }
2692
2693    #[mz_ore::test(tokio::test)]
2694    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2695    async fn test_get_privileges() {
2696        Catalog::with_debug(|catalog| async move {
2697            let conn_catalog = catalog.for_system_session();
2698
2699            assert_eq!(
2700                None,
2701                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2702                    ClusterId::user(1).expect("1 is a valid ID"),
2703                    ReplicaId::User(1),
2704                ))))
2705            );
2706            assert_eq!(
2707                None,
2708                conn_catalog
2709                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2710            );
2711            catalog.expire().await;
2712        })
2713        .await;
2714    }
2715
2716    #[mz_ore::test(tokio::test)]
2717    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2718    async fn verify_builtin_descs() {
2719        Catalog::with_debug(|catalog| async move {
2720            let conn_catalog = catalog.for_system_session();
2721
2722            for builtin in BUILTINS::iter() {
2723                let (schema, name, expected_desc) = match builtin {
2724                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2725                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2726                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2727                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2728                    Builtin::Log(_)
2729                    | Builtin::Type(_)
2730                    | Builtin::Func(_)
2731                    | Builtin::Index(_)
2732                    | Builtin::Connection(_) => continue,
2733                };
2734                let item = conn_catalog
2735                    .resolve_item(&PartialItemName {
2736                        database: None,
2737                        schema: Some(schema.to_string()),
2738                        item: name.to_string(),
2739                    })
2740                    .expect("unable to resolve item")
2741                    .at_version(RelationVersionSelector::Latest);
2742
2743                let actual_desc = item.relation_desc().expect("invalid item type");
2744                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2745                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2746                {
2747                    assert_eq!(
2748                        actual_name, expected_name,
2749                        "item {schema}.{name} column {index} name did not match its expected name"
2750                    );
2751                    assert_eq!(
2752                        actual_typ, expected_typ,
2753                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2754                    );
2755                }
2756                assert_eq!(
2757                    &*actual_desc, expected_desc,
2758                    "item {schema}.{name} did not match its expected RelationDesc"
2759                );
2760            }
2761            catalog.expire().await;
2762        })
2763        .await
2764    }
2765
2766    // Connect to a running Postgres server and verify that our builtin
2767    // types and functions match it, in addition to some other things.
2768    #[mz_ore::test(tokio::test)]
2769    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2770    async fn test_compare_builtins_postgres() {
2771        async fn inner(catalog: Catalog) {
2772            // Verify that all builtin functions:
2773            // - have a unique OID
2774            // - if they have a postgres counterpart (same oid) then they have matching name
2775            let (client, connection) = tokio_postgres::connect(
2776                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2777                NoTls,
2778            )
2779            .await
2780            .expect("failed to connect to Postgres");
2781
2782            task::spawn(|| "compare_builtin_postgres", async move {
2783                if let Err(e) = connection.await {
2784                    panic!("connection error: {}", e);
2785                }
2786            });
2787
2788            struct PgProc {
2789                name: String,
2790                arg_oids: Vec<u32>,
2791                ret_oid: Option<u32>,
2792                ret_set: bool,
2793            }
2794
2795            struct PgType {
2796                name: String,
2797                ty: String,
2798                elem: u32,
2799                array: u32,
2800                input: u32,
2801                receive: u32,
2802            }
2803
2804            struct PgOper {
2805                oprresult: u32,
2806                name: String,
2807            }
2808
2809            let pg_proc: BTreeMap<_, _> = query(
2810                &client,
2811                sql!(
2812                    "SELECT
2813                    p.oid,
2814                    proname,
2815                    proargtypes,
2816                    prorettype,
2817                    proretset
2818                FROM pg_proc p
2819                JOIN pg_namespace n ON p.pronamespace = n.oid"
2820                ),
2821                &[],
2822            )
2823            .await
2824            .expect("pg query failed")
2825            .into_iter()
2826            .map(|row| {
2827                let oid: u32 = row.get("oid");
2828                let pg_proc = PgProc {
2829                    name: row.get("proname"),
2830                    arg_oids: row.get("proargtypes"),
2831                    ret_oid: row.get("prorettype"),
2832                    ret_set: row.get("proretset"),
2833                };
2834                (oid, pg_proc)
2835            })
2836            .collect();
2837
2838            let pg_type: BTreeMap<_, _> = query(
2839                &client,
2840                sql!(
2841                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type"
2842                ),
2843                &[],
2844            )
2845            .await
2846                .expect("pg query failed")
2847                .into_iter()
2848                .map(|row| {
2849                    let oid: u32 = row.get("oid");
2850                    let pg_type = PgType {
2851                        name: row.get("typname"),
2852                        ty: row.get("typtype"),
2853                        elem: row.get("typelem"),
2854                        array: row.get("typarray"),
2855                        input: row.get("typinput"),
2856                        receive: row.get("typreceive"),
2857                    };
2858                    (oid, pg_type)
2859                })
2860                .collect();
2861
2862            let pg_oper: BTreeMap<_, _> = query(
2863                &client,
2864                sql!("SELECT oid, oprname, oprresult FROM pg_operator"),
2865                &[],
2866            )
2867            .await
2868            .expect("pg query failed")
2869            .into_iter()
2870            .map(|row| {
2871                let oid: u32 = row.get("oid");
2872                let pg_oper = PgOper {
2873                    name: row.get("oprname"),
2874                    oprresult: row.get("oprresult"),
2875                };
2876                (oid, pg_oper)
2877            })
2878            .collect();
2879
2880            let conn_catalog = catalog.for_system_session();
2881            let resolve_type_oid = |item: &str| {
2882                conn_catalog
2883                    .resolve_type(&PartialItemName {
2884                        database: None,
2885                        // All functions we check exist in PG, so the types must, as
2886                        // well
2887                        schema: Some(PG_CATALOG_SCHEMA.into()),
2888                        item: item.to_string(),
2889                    })
2890                    .expect("unable to resolve type")
2891                    .oid()
2892            };
2893
2894            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2895                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2896                .collect();
2897
2898            let mut all_oids = BTreeSet::new();
2899
2900            // A function to determine if two oids are equivalent enough for these tests. We don't
2901            // support some types, so map exceptions here.
2902            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2903                [
2904                    // We don't support NAME.
2905                    (Type::NAME, Type::TEXT),
2906                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2907                    // We don't support time with time zone.
2908                    (Type::TIME, Type::TIMETZ),
2909                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2910                ]
2911                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2912            );
2913            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2914                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2915            ]);
2916            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2917                if ignore_return_types.contains(&fn_oid) {
2918                    return true;
2919                }
2920                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2921                    return true;
2922                }
2923                a == b
2924            };
2925
2926            for builtin in BUILTINS::iter() {
2927                match builtin {
2928                    Builtin::Type(ty) => {
2929                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2930
2931                        if ty.oid >= FIRST_MATERIALIZE_OID {
2932                            // High OIDs are reserved in Materialize and don't have
2933                            // PostgreSQL counterparts.
2934                            continue;
2935                        }
2936
2937                        // For types that have a PostgreSQL counterpart, verify that
2938                        // the name and oid match.
2939                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2940                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2941                        });
2942                        assert_eq!(
2943                            ty.name, pg_ty.name,
2944                            "oid {} has name {} in postgres; expected {}",
2945                            ty.oid, pg_ty.name, ty.name,
2946                        );
2947
2948                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2949                            None => (0, 0),
2950                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2951                        };
2952                        assert_eq!(
2953                            typinput_oid, pg_ty.input,
2954                            "type {} has typinput OID {:?} in mz but {:?} in pg",
2955                            ty.name, typinput_oid, pg_ty.input,
2956                        );
2957                        assert_eq!(
2958                            typreceive_oid, pg_ty.receive,
2959                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
2960                            ty.name, typreceive_oid, pg_ty.receive,
2961                        );
2962                        if typinput_oid != 0 {
2963                            assert!(
2964                                func_oids.contains(&typinput_oid),
2965                                "type {} has typinput OID {} that does not exist in pg_proc",
2966                                ty.name,
2967                                typinput_oid,
2968                            );
2969                        }
2970                        if typreceive_oid != 0 {
2971                            assert!(
2972                                func_oids.contains(&typreceive_oid),
2973                                "type {} has typreceive OID {} that does not exist in pg_proc",
2974                                ty.name,
2975                                typreceive_oid,
2976                            );
2977                        }
2978
2979                        // Ensure the type matches.
2980                        match &ty.details.typ {
2981                            CatalogType::Array { element_reference } => {
2982                                let elem_ty = BUILTINS::iter()
2983                                    .filter_map(|builtin| match builtin {
2984                                        Builtin::Type(ty @ BuiltinType { name, .. })
2985                                            if element_reference == name =>
2986                                        {
2987                                            Some(ty)
2988                                        }
2989                                        _ => None,
2990                                    })
2991                                    .next();
2992                                let elem_ty = match elem_ty {
2993                                    Some(ty) => ty,
2994                                    None => {
2995                                        panic!("{} is unexpectedly not a type", element_reference)
2996                                    }
2997                                };
2998                                assert_eq!(
2999                                    pg_ty.elem, elem_ty.oid,
3000                                    "type {} has mismatched element OIDs",
3001                                    ty.name
3002                                )
3003                            }
3004                            CatalogType::Pseudo => {
3005                                assert_eq!(
3006                                    pg_ty.ty, "p",
3007                                    "type {} is not a pseudo type as expected",
3008                                    ty.name
3009                                )
3010                            }
3011                            CatalogType::Range { .. } => {
3012                                assert_eq!(
3013                                    pg_ty.ty, "r",
3014                                    "type {} is not a range type as expected",
3015                                    ty.name
3016                                );
3017                            }
3018                            _ => {
3019                                assert_eq!(
3020                                    pg_ty.ty, "b",
3021                                    "type {} is not a base type as expected",
3022                                    ty.name
3023                                )
3024                            }
3025                        }
3026
3027                        // Ensure the array type reference is correct.
3028                        let schema = catalog
3029                            .resolve_schema_in_database(
3030                                &ResolvedDatabaseSpecifier::Ambient,
3031                                ty.schema,
3032                                &SYSTEM_CONN_ID,
3033                            )
3034                            .expect("unable to resolve schema");
3035                        let allocated_type = catalog
3036                            .resolve_type(
3037                                None,
3038                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3039                                &PartialItemName {
3040                                    database: None,
3041                                    schema: Some(schema.name().schema.clone()),
3042                                    item: ty.name.to_string(),
3043                                },
3044                                &SYSTEM_CONN_ID,
3045                            )
3046                            .expect("unable to resolve type");
3047                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3048                            ty
3049                        } else {
3050                            panic!("unexpectedly not a type")
3051                        };
3052                        match ty.details.array_id {
3053                            Some(array_id) => {
3054                                let array_ty = catalog.get_entry(&array_id);
3055                                assert_eq!(
3056                                    pg_ty.array, array_ty.oid,
3057                                    "type {} has mismatched array OIDs",
3058                                    allocated_type.name.item,
3059                                );
3060                            }
3061                            None => assert_eq!(
3062                                pg_ty.array, 0,
3063                                "type {} does not have an array type in mz but does in pg",
3064                                allocated_type.name.item,
3065                            ),
3066                        }
3067                    }
3068                    Builtin::Func(func) => {
3069                        for imp in func.inner.func_impls() {
3070                            assert!(
3071                                all_oids.insert(imp.oid),
3072                                "{} reused oid {}",
3073                                func.name,
3074                                imp.oid
3075                            );
3076
3077                            assert!(
3078                                imp.oid < FIRST_USER_OID,
3079                                "built-in function {} erroneously has OID in user space ({})",
3080                                func.name,
3081                                imp.oid,
3082                            );
3083
3084                            // For functions that have a postgres counterpart, verify that the name and
3085                            // oid match.
3086                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3087                                continue;
3088                            } else {
3089                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3090                                    panic!(
3091                                        "pg_proc missing function {}: oid {}",
3092                                        func.name, imp.oid
3093                                    )
3094                                })
3095                            };
3096                            assert_eq!(
3097                                func.name, pg_fn.name,
3098                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3099                                imp.oid, func.name, pg_fn.name
3100                            );
3101
3102                            // Complain, but don't fail, if argument oids don't match.
3103                            // TODO: make these match.
3104                            let imp_arg_oids = imp
3105                                .arg_typs
3106                                .iter()
3107                                .map(|item| resolve_type_oid(item))
3108                                .collect::<Vec<_>>();
3109
3110                            if imp_arg_oids != pg_fn.arg_oids {
3111                                println!(
3112                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3113                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3114                                );
3115                            }
3116
3117                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3118
3119                            assert!(
3120                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3121                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3122                                imp.oid,
3123                                func.name,
3124                                imp_return_oid,
3125                                pg_fn.ret_oid
3126                            );
3127
3128                            assert_eq!(
3129                                imp.return_is_set, pg_fn.ret_set,
3130                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3131                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3132                            );
3133                        }
3134                    }
3135                    _ => (),
3136                }
3137            }
3138
3139            for (op, func) in OP_IMPLS.iter() {
3140                for imp in func.func_impls() {
3141                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3142
3143                    // For operators that have a postgres counterpart, verify that the name and oid match.
3144                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3145                        continue;
3146                    } else {
3147                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3148                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3149                        })
3150                    };
3151
3152                    assert_eq!(*op, pg_op.name);
3153
3154                    let imp_return_oid =
3155                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3156                    if imp_return_oid != pg_op.oprresult {
3157                        panic!(
3158                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3159                            imp.oid, op, imp_return_oid, pg_op.oprresult
3160                        );
3161                    }
3162                }
3163            }
3164            catalog.expire().await;
3165        }
3166
3167        Catalog::with_debug(inner).await
3168    }
3169
3170    // Execute all builtin functions with all combinations of arguments from interesting datums.
3171    #[mz_ore::test(tokio::test)]
3172    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3173    async fn test_smoketest_all_builtins() {
3174        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3175            let catalog = Arc::new(catalog);
3176            let conn_catalog = catalog.for_system_session();
3177
3178            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3179            let mut handles = Vec::new();
3180
3181            // Extracted during planning; always panics when executed.
3182            let ignore_names = BTreeSet::from([
3183                "avg",
3184                "avg_internal_v1",
3185                "bool_and",
3186                "bool_or",
3187                "has_table_privilege", // > 3 s each
3188                "has_type_privilege",  // > 3 s each
3189                "mod",
3190                "mz_panic",
3191                "mz_sleep",
3192                "pow",
3193                "stddev_pop",
3194                "stddev_samp",
3195                "stddev",
3196                "var_pop",
3197                "var_samp",
3198                "variance",
3199            ]);
3200
3201            let fns = BUILTINS::funcs()
3202                .map(|func| (&func.name, func.inner))
3203                .chain(OP_IMPLS.iter());
3204
3205            for (name, func) in fns {
3206                if ignore_names.contains(name) {
3207                    continue;
3208                }
3209                let Func::Scalar(impls) = func else {
3210                    continue;
3211                };
3212
3213                'outer: for imp in impls {
3214                    let details = imp.details();
3215                    let mut styps = Vec::new();
3216                    for item in details.arg_typs.iter() {
3217                        let oid = resolve_type_oid(item);
3218                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3219                            continue 'outer;
3220                        };
3221                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3222                    }
3223                    let datums = styps
3224                        .iter()
3225                        .map(|styp| {
3226                            let mut datums = vec![Datum::Null];
3227                            datums.extend(styp.interesting_datums());
3228                            datums
3229                        })
3230                        .collect::<Vec<_>>();
3231                    // Skip nullary fns.
3232                    if datums.is_empty() {
3233                        continue;
3234                    }
3235
3236                    let return_oid = details
3237                        .return_typ
3238                        .map(resolve_type_oid)
3239                        .expect("must exist");
3240                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3241                        .ok()
3242                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3243
3244                    let mut idxs = vec![0; datums.len()];
3245                    while idxs[0] < datums[0].len() {
3246                        let mut args = Vec::with_capacity(idxs.len());
3247                        for i in 0..(datums.len()) {
3248                            args.push(datums[i][idxs[i]]);
3249                        }
3250
3251                        let op = &imp.op;
3252                        let scalars = args
3253                            .iter()
3254                            .enumerate()
3255                            .map(|(i, datum)| {
3256                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3257                                    datum.clone(),
3258                                    styps[i].clone(),
3259                                ))
3260                            })
3261                            .collect();
3262
3263                        let call_name = format!(
3264                            "{name}({}) (oid: {})",
3265                            args.iter()
3266                                .map(|d| d.to_string())
3267                                .collect::<Vec<_>>()
3268                                .join(", "),
3269                            imp.oid
3270                        );
3271                        let catalog = Arc::clone(&catalog);
3272                        let call_name_fn = call_name.clone();
3273                        let return_styp = return_styp.clone();
3274                        let handle = task::spawn_blocking(
3275                            || call_name,
3276                            move || {
3277                                smoketest_fn(
3278                                    name,
3279                                    call_name_fn,
3280                                    op,
3281                                    imp,
3282                                    args,
3283                                    catalog,
3284                                    scalars,
3285                                    return_styp,
3286                                )
3287                            },
3288                        );
3289                        handles.push(handle);
3290
3291                        // Advance to the next datum combination.
3292                        for i in (0..datums.len()).rev() {
3293                            idxs[i] += 1;
3294                            if idxs[i] >= datums[i].len() {
3295                                if i == 0 {
3296                                    break;
3297                                }
3298                                idxs[i] = 0;
3299                                continue;
3300                            } else {
3301                                break;
3302                            }
3303                        }
3304                    }
3305                }
3306            }
3307            handles
3308        }
3309
3310        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3311        for handle in handles {
3312            handle.await;
3313        }
3314    }
3315
3316    fn smoketest_fn(
3317        name: &&str,
3318        call_name: String,
3319        op: &Operation<HirScalarExpr>,
3320        imp: &FuncImpl<HirScalarExpr>,
3321        args: Vec<Datum<'_>>,
3322        catalog: Arc<Catalog>,
3323        scalars: Vec<CoercibleScalarExpr>,
3324        return_styp: Option<SqlScalarType>,
3325    ) {
3326        let conn_catalog = catalog.for_system_session();
3327        let pcx = PlanContext::zero();
3328        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3329        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3330        let ecx = ExprContext {
3331            qcx: &qcx,
3332            name: "smoketest",
3333            scope: &Scope::empty(),
3334            relation_type: &SqlRelationType::empty(),
3335            allow_aggregates: false,
3336            allow_subqueries: false,
3337            allow_parameters: false,
3338            allow_windows: false,
3339        };
3340        let arena = RowArena::new();
3341        let mut session = Session::dummy();
3342        session
3343            .start_transaction(to_datetime(0), None, None)
3344            .expect("must succeed");
3345        let prep_style = ExprPrepOneShot {
3346            logical_time: EvalTime::Time(Timestamp::MIN),
3347            session: &session,
3348            catalog_state: &catalog.state,
3349        };
3350
3351        // Execute the function as much as possible, ensuring no panics occur, but
3352        // otherwise ignoring eval errors. We also do various other checks.
3353        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3354        if let Ok(hir) = res {
3355            let uneliminated_result_row = {
3356                if let HirScalarExpr::CallUnary { func, .. } = &hir
3357                    && func.is_eliminable_cast()
3358                {
3359                    let mut uneliminated_mir = hir
3360                        .clone()
3361                        .lower_uncorrelated(HirToMirConfig {
3362                            enable_cast_elimination: false,
3363                            ..catalog.system_config().into()
3364                        })
3365                        .expect("lowering eliminable cast should always succeed");
3366                    prep_style
3367                        .prep_scalar_expr(&mut uneliminated_mir)
3368                        .expect("must succeed");
3369
3370                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3371                    uneliminated_mir
3372                        .eval(&[], &arena)
3373                        .ok()
3374                        .map(|datum| Row::pack([datum]))
3375                } else {
3376                    None
3377                }
3378            };
3379
3380            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3381                // Populate unmaterialized functions.
3382                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3383
3384                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3385                    if let Some(return_styp) = return_styp {
3386                        let mir_typ = mir.typ(&[]);
3387                        // MIR type inference should be consistent with the type
3388                        // we get from the catalog.
3389                        soft_assert_eq_or_log!(
3390                            mir_typ.scalar_type,
3391                            (&return_styp).into(),
3392                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3393                        );
3394                        // The following will check not just that the scalar type
3395                        // is ok, but also catches if the function returned a null
3396                        // but the MIR type inference said "non-nullable".
3397                        if !eval_result_datum.is_instance_of(&mir_typ) {
3398                            panic!(
3399                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3400                            );
3401                        }
3402                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3403                        if let Some(row) = uneliminated_result_row {
3404                            let uneliminated_result_datum = row.unpack_first();
3405                            assert_eq!(
3406                                uneliminated_result_datum, eval_result_datum,
3407                                "datums should not change if cast is eliminable"
3408                            );
3409                        }
3410                        // Check the consistency of `introduces_nulls` and
3411                        // `propagates_nulls` with `MirScalarExpr::typ`.
3412                        if let Some((introduces_nulls, propagates_nulls)) =
3413                            call_introduces_propagates_nulls(&mir)
3414                        {
3415                            if introduces_nulls {
3416                                // If the function introduces_nulls, then the return
3417                                // type should always be nullable, regardless of
3418                                // the nullability of the input types.
3419                                assert!(
3420                                    mir_typ.nullable,
3421                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3422                                    name, args, mir, mir_typ.nullable
3423                                );
3424                            } else {
3425                                let any_input_null = args.iter().any(|arg| arg.is_null());
3426                                if !any_input_null {
3427                                    assert!(
3428                                        !mir_typ.nullable,
3429                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3430                                        name, args, mir, mir_typ.nullable
3431                                    );
3432                                } else if propagates_nulls {
3433                                    // propagates_nulls means the optimizer short-circuits
3434                                    // all-null inputs, so the output must be nullable.
3435                                    assert!(
3436                                        mir_typ.nullable,
3437                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3438                                        name, args, mir, mir_typ.nullable
3439                                    );
3440                                }
3441                                // When propagates_nulls is false, the output may still
3442                                // be nullable if a non-nullable parameter received a null
3443                                // input (per-position null rejection). The is_instance_of
3444                                // check above ensures type consistency.
3445                            }
3446                        }
3447                        // Check that `MirScalarExpr::reduce` yields the same result
3448                        // as the real evaluation.
3449                        let mut reduced = mir.clone();
3450                        reduced.reduce(&[]);
3451                        match reduced {
3452                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3453                                match reduce_result {
3454                                    Ok(reduce_result_row) => {
3455                                        let reduce_result_datum = reduce_result_row.unpack_first();
3456                                        assert_eq!(
3457                                            reduce_result_datum,
3458                                            eval_result_datum,
3459                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3460                                            name,
3461                                            args,
3462                                            mir,
3463                                            eval_result_datum,
3464                                            mir_typ.scalar_type,
3465                                            reduce_result_datum,
3466                                            ctyp.scalar_type
3467                                        );
3468                                        // Let's check that the types also match.
3469                                        // (We are not checking nullability here,
3470                                        // because it's ok when we know a more
3471                                        // precise nullability after actually
3472                                        // evaluating a function than before.)
3473                                        assert_eq!(
3474                                            ctyp.scalar_type,
3475                                            mir_typ.scalar_type,
3476                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3477                                            name,
3478                                            args,
3479                                            mir,
3480                                            eval_result_datum,
3481                                            mir_typ.scalar_type,
3482                                            reduce_result_datum,
3483                                            ctyp.scalar_type
3484                                        );
3485                                    }
3486                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3487                                }
3488                            }
3489                            _ => unreachable!(
3490                                "all args are literals, so should have reduced to a literal"
3491                            ),
3492                        }
3493                    }
3494                }
3495            }
3496        }
3497    }
3498
3499    /// If the given MirScalarExpr
3500    ///  - is a function call, and
3501    ///  - all arguments are literals
3502    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3503    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3504        match mir_func_call {
3505            MirScalarExpr::CallUnary { func, expr } => {
3506                if expr.is_literal() {
3507                    Some((func.introduces_nulls(), func.propagates_nulls()))
3508                } else {
3509                    None
3510                }
3511            }
3512            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3513                if expr1.is_literal() && expr2.is_literal() {
3514                    Some((func.introduces_nulls(), func.propagates_nulls()))
3515                } else {
3516                    None
3517                }
3518            }
3519            MirScalarExpr::CallVariadic { func, exprs } => {
3520                if exprs.iter().all(|arg| arg.is_literal()) {
3521                    Some((func.introduces_nulls(), func.propagates_nulls()))
3522                } else {
3523                    None
3524                }
3525            }
3526            _ => None,
3527        }
3528    }
3529
3530    // Make sure pg views don't use types that only exist in Materialize.
3531    #[mz_ore::test(tokio::test)]
3532    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3533    async fn test_pg_views_forbidden_types() {
3534        Catalog::with_debug(|catalog| async move {
3535            let conn_catalog = catalog.for_system_session();
3536
3537            for view in BUILTINS::views().filter(|view| {
3538                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3539            }) {
3540                let item = conn_catalog
3541                    .resolve_item(&PartialItemName {
3542                        database: None,
3543                        schema: Some(view.schema.to_string()),
3544                        item: view.name.to_string(),
3545                    })
3546                    .expect("unable to resolve view")
3547                    // TODO(alter_table)
3548                    .at_version(RelationVersionSelector::Latest);
3549                let full_name = conn_catalog.resolve_full_name(item.name());
3550                let desc = item.relation_desc().expect("invalid item type");
3551                for col_type in desc.iter_types() {
3552                    match &col_type.scalar_type {
3553                        typ @ SqlScalarType::UInt16
3554                        | typ @ SqlScalarType::UInt32
3555                        | typ @ SqlScalarType::UInt64
3556                        | typ @ SqlScalarType::MzTimestamp
3557                        | typ @ SqlScalarType::List { .. }
3558                        | typ @ SqlScalarType::Map { .. }
3559                        | typ @ SqlScalarType::MzAclItem => {
3560                            panic!("{typ:?} type found in {full_name}");
3561                        }
3562                        SqlScalarType::AclItem
3563                        | SqlScalarType::Bool
3564                        | SqlScalarType::Int16
3565                        | SqlScalarType::Int32
3566                        | SqlScalarType::Int64
3567                        | SqlScalarType::Float32
3568                        | SqlScalarType::Float64
3569                        | SqlScalarType::Numeric { .. }
3570                        | SqlScalarType::Date
3571                        | SqlScalarType::Time
3572                        | SqlScalarType::Timestamp { .. }
3573                        | SqlScalarType::TimestampTz { .. }
3574                        | SqlScalarType::Interval
3575                        | SqlScalarType::PgLegacyChar
3576                        | SqlScalarType::Bytes
3577                        | SqlScalarType::String
3578                        | SqlScalarType::Char { .. }
3579                        | SqlScalarType::VarChar { .. }
3580                        | SqlScalarType::Jsonb
3581                        | SqlScalarType::Uuid
3582                        | SqlScalarType::Array(_)
3583                        | SqlScalarType::Record { .. }
3584                        | SqlScalarType::Oid
3585                        | SqlScalarType::RegProc
3586                        | SqlScalarType::RegType
3587                        | SqlScalarType::RegClass
3588                        | SqlScalarType::Int2Vector
3589                        | SqlScalarType::Range { .. }
3590                        | SqlScalarType::PgLegacyName => {}
3591                    }
3592                }
3593            }
3594            catalog.expire().await;
3595        })
3596        .await
3597    }
3598
3599    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3600    // introspection relations.
3601    #[mz_ore::test(tokio::test)]
3602    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3603    async fn test_mz_introspection_builtins() {
3604        Catalog::with_debug(|catalog| async move {
3605            let conn_catalog = catalog.for_system_session();
3606
3607            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3608            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3609
3610            for entry in catalog.entries() {
3611                let schema_spec = entry.name().qualifiers.schema_spec;
3612                let introspection_deps = catalog.introspection_dependencies(entry.id);
3613                if introspection_deps.is_empty() {
3614                    assert!(
3615                        schema_spec != introspection_schema_spec,
3616                        "entry does not depend on introspection sources but is in \
3617                         `mz_introspection`: {}",
3618                        conn_catalog.resolve_full_name(entry.name()),
3619                    );
3620                } else {
3621                    assert!(
3622                        schema_spec == introspection_schema_spec,
3623                        "entry depends on introspection sources but is not in \
3624                         `mz_introspection`: {}",
3625                        conn_catalog.resolve_full_name(entry.name()),
3626                    );
3627                }
3628            }
3629        })
3630        .await
3631    }
3632
3633    #[mz_ore::test(tokio::test)]
3634    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3635    async fn test_multi_subscriber_catalog() {
3636        let persist_client = PersistClient::new_for_tests().await;
3637        let bootstrap_args = test_bootstrap_args();
3638        let organization_id = Uuid::new_v4();
3639        let db_name = "DB";
3640
3641        let mut writer_catalog = Catalog::open_debug_catalog(
3642            persist_client.clone(),
3643            organization_id.clone(),
3644            &bootstrap_args,
3645        )
3646        .await
3647        .expect("open_debug_catalog");
3648        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3649            persist_client.clone(),
3650            organization_id.clone(),
3651            &bootstrap_args,
3652        )
3653        .await
3654        .expect("open_debug_read_only_catalog");
3655        assert_err!(writer_catalog.resolve_database(db_name));
3656        assert_err!(read_only_catalog.resolve_database(db_name));
3657
3658        let commit_ts = writer_catalog.current_upper().await;
3659        writer_catalog
3660            .transact(
3661                None,
3662                commit_ts,
3663                None,
3664                vec![Op::CreateDatabase {
3665                    name: db_name.to_string(),
3666                    owner_id: MZ_SYSTEM_ROLE_ID,
3667                }],
3668            )
3669            .await
3670            .expect("failed to transact");
3671
3672        let write_db = writer_catalog
3673            .resolve_database(db_name)
3674            .expect("resolve_database");
3675        read_only_catalog
3676            .sync_to_current_updates()
3677            .await
3678            .expect("sync_to_current_updates");
3679        let read_db = read_only_catalog
3680            .resolve_database(db_name)
3681            .expect("resolve_database");
3682
3683        assert_eq!(write_db, read_db);
3684
3685        let writer_catalog_fencer =
3686            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3687                .await
3688                .expect("open_debug_catalog for fencer");
3689        let fencer_db = writer_catalog_fencer
3690            .resolve_database(db_name)
3691            .expect("resolve_database for fencer");
3692        assert_eq!(fencer_db, read_db);
3693
3694        let write_fence_err = writer_catalog
3695            .sync_to_current_updates()
3696            .await
3697            .expect_err("sync_to_current_updates for fencer");
3698        assert!(matches!(
3699            write_fence_err,
3700            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3701        ));
3702        let read_fence_err = read_only_catalog
3703            .sync_to_current_updates()
3704            .await
3705            .expect_err("sync_to_current_updates after fencer");
3706        assert!(matches!(
3707            read_fence_err,
3708            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3709        ));
3710
3711        writer_catalog.expire().await;
3712        read_only_catalog.expire().await;
3713        writer_catalog_fencer.expire().await;
3714    }
3715}