Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::soft_panic_or_log;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68    SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89// DO NOT add any more imports from `crate` outside of `crate::catalog`.
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    expr_cache_handle: Option<ExpressionCacheHandle>,
140    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141    transient_revision: u64,
142}
143
144// Implement our own Clone because derive can't unless S is Clone, which it's
145// not (hence the Arc).
146impl Clone for Catalog {
147    fn clone(&self) -> Self {
148        Self {
149            state: self.state.clone(),
150            expr_cache_handle: self.expr_cache_handle.clone(),
151            storage: Arc::clone(&self.storage),
152            transient_revision: self.transient_revision,
153        }
154    }
155}
156
157impl Catalog {
158    /// Set the optimized plan for the item identified by `id`.
159    ///
160    /// # Panics
161    /// If the item is not an `Index`, `MaterializedView`, or
162    /// `ContinualTask`.
163    #[mz_ore::instrument(level = "trace")]
164    pub fn set_optimized_plan(
165        &mut self,
166        id: GlobalId,
167        plan: DataflowDescription<OptimizedMirRelationExpr>,
168    ) {
169        self.state.set_optimized_plan(id, plan);
170    }
171
172    /// Set the physical plan for the item identified by `id`.
173    ///
174    /// # Panics
175    /// If the item is not an `Index`, `MaterializedView`, or
176    /// `ContinualTask`.
177    #[mz_ore::instrument(level = "trace")]
178    pub fn set_physical_plan(
179        &mut self,
180        id: GlobalId,
181        plan: DataflowDescription<mz_compute_types::plan::Plan>,
182    ) {
183        self.state.set_physical_plan(id, plan);
184    }
185
186    /// Try to get the optimized plan for the item identified by `id`.
187    #[mz_ore::instrument(level = "trace")]
188    pub fn try_get_optimized_plan(
189        &self,
190        id: &GlobalId,
191    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192        let entry = self.state.try_get_entry_by_global_id(id)?;
193        entry.item().optimized_plan().map(AsRef::as_ref)
194    }
195
196    /// Try to get the physical plan for the item identified by `id`.
197    #[mz_ore::instrument(level = "trace")]
198    pub fn try_get_physical_plan(
199        &self,
200        id: &GlobalId,
201    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202        let entry = self.state.try_get_entry_by_global_id(id)?;
203        entry.item().physical_plan().map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    ///
208    /// # Panics
209    /// If the item is not an `Index`, `MaterializedView`, or
210    /// `ContinualTask`.
211    #[mz_ore::instrument(level = "trace")]
212    pub fn set_dataflow_metainfo(
213        &mut self,
214        id: GlobalId,
215        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216    ) {
217        self.state.set_dataflow_metainfo(id, metainfo);
218    }
219
220    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
221    #[mz_ore::instrument(level = "trace")]
222    pub fn try_get_dataflow_metainfo(
223        &self,
224        id: &GlobalId,
225    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226        let entry = self.state.try_get_entry_by_global_id(id)?;
227        entry.item().dataflow_metainfo()
228    }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233    state: Cow<'a, CatalogState>,
234    /// Because we don't have any way of removing items from the catalog
235    /// temporarily, we allow the ConnCatalog to pretend that a set of items
236    /// don't exist during resolution.
237    ///
238    /// This feature is necessary to allow re-planning of statements, which is
239    /// either incredibly useful or required when altering item definitions.
240    ///
241    /// Note that uses of this field should be used by short-lived
242    /// catalogs.
243    unresolvable_ids: BTreeSet<CatalogItemId>,
244    conn_id: ConnectionId,
245    cluster: String,
246    database: Option<DatabaseId>,
247    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248    role_id: RoleId,
249    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250    portals: Option<&'a BTreeMap<String, Portal>>,
251    notices_tx: UnboundedSender<AdapterNotice>,
252}
253
254impl ConnCatalog<'_> {
255    pub fn conn_id(&self) -> &ConnectionId {
256        &self.conn_id
257    }
258
259    pub fn state(&self) -> &CatalogState {
260        &*self.state
261    }
262
263    /// Prevent planning from resolving item with the provided ID. Instead,
264    /// return an error as if the item did not exist.
265    ///
266    /// This feature is meant exclusively to permit re-planning statements
267    /// during update operations and should not be used otherwise given its
268    /// extremely "powerful" semantics.
269    ///
270    /// # Panics
271    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
272    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
273        assert_eq!(
274            self.role_id, MZ_SYSTEM_ROLE_ID,
275            "only the system role can mark IDs unresolvable",
276        );
277        self.unresolvable_ids.insert(id);
278    }
279
280    /// Returns the schemas:
281    /// - mz_catalog
282    /// - pg_catalog
283    /// - temp (if requested)
284    /// - all schemas from the session's search_path var that exist
285    pub fn effective_search_path(
286        &self,
287        include_temp_schema: bool,
288    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
289        self.state
290            .effective_search_path(&self.search_path, include_temp_schema)
291    }
292}
293
294impl ConnectionResolver for ConnCatalog<'_> {
295    fn resolve_connection(
296        &self,
297        id: CatalogItemId,
298    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
299        self.state().resolve_connection(id)
300    }
301}
302
303impl Catalog {
304    /// Returns the catalog's transient revision, which starts at 1 and is
305    /// incremented on every change. This is not persisted to disk, and will
306    /// restart on every load.
307    pub fn transient_revision(&self) -> u64 {
308        self.transient_revision
309    }
310
311    /// Creates a debug catalog from the current
312    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
313    /// like in tests.
314    ///
315    /// WARNING! This function can arbitrarily fail because it does not make any
316    /// effort to adjust the catalog's contents' structure or semantics to the
317    /// currently running version, i.e. it does not apply any migrations.
318    ///
319    /// This function must not be called in production contexts. Use
320    /// [`Catalog::open`] with appropriately set configuration parameters
321    /// instead.
322    pub async fn with_debug<F, Fut, T>(f: F) -> T
323    where
324        F: FnOnce(Catalog) -> Fut,
325        Fut: Future<Output = T>,
326    {
327        let persist_client = PersistClient::new_for_tests().await;
328        let organization_id = Uuid::new_v4();
329        let bootstrap_args = test_bootstrap_args();
330        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
331            .await
332            .expect("can open debug catalog");
333        f(catalog).await
334    }
335
336    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
337    /// in progress.
338    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
339    where
340        F: FnOnce(Catalog) -> Fut,
341        Fut: Future<Output = T>,
342    {
343        let persist_client = PersistClient::new_for_tests().await;
344        let organization_id = Uuid::new_v4();
345        let bootstrap_args = test_bootstrap_args();
346        let mut catalog =
347            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
348                .await
349                .expect("can open debug catalog");
350
351        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
352        let now = SYSTEM_TIME.clone();
353        let openable_storage = TestCatalogStateBuilder::new(persist_client)
354            .with_organization_id(organization_id)
355            .with_default_deploy_generation()
356            .build()
357            .await
358            .expect("can create durable catalog");
359        let mut storage = openable_storage
360            .open(now().into(), &bootstrap_args)
361            .await
362            .expect("can open durable catalog")
363            .0;
364        // Drain updates.
365        let _ = storage
366            .sync_to_current_updates()
367            .await
368            .expect("can sync to current updates");
369        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
370
371        f(catalog).await
372    }
373
374    /// Opens a debug catalog.
375    ///
376    /// See [`Catalog::with_debug`].
377    pub async fn open_debug_catalog(
378        persist_client: PersistClient,
379        organization_id: Uuid,
380        bootstrap_args: &BootstrapArgs,
381    ) -> Result<Catalog, anyhow::Error> {
382        let now = SYSTEM_TIME.clone();
383        let environment_id = None;
384        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
385            .with_organization_id(organization_id)
386            .with_default_deploy_generation()
387            .build()
388            .await?;
389        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
390        let system_parameter_defaults = BTreeMap::default();
391        Self::open_debug_catalog_inner(
392            persist_client,
393            storage,
394            now,
395            environment_id,
396            &DUMMY_BUILD_INFO,
397            system_parameter_defaults,
398            bootstrap_args,
399            None,
400        )
401        .await
402    }
403
404    /// Opens a read only debug persist backed catalog defined by `persist_client` and
405    /// `organization_id`.
406    ///
407    /// See [`Catalog::with_debug`].
408    pub async fn open_debug_read_only_catalog(
409        persist_client: PersistClient,
410        organization_id: Uuid,
411        bootstrap_args: &BootstrapArgs,
412    ) -> Result<Catalog, anyhow::Error> {
413        let now = SYSTEM_TIME.clone();
414        let environment_id = None;
415        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
416            .with_organization_id(organization_id)
417            .build()
418            .await?;
419        let storage = openable_storage
420            .open_read_only(&test_bootstrap_args())
421            .await?;
422        let system_parameter_defaults = BTreeMap::default();
423        Self::open_debug_catalog_inner(
424            persist_client,
425            storage,
426            now,
427            environment_id,
428            &DUMMY_BUILD_INFO,
429            system_parameter_defaults,
430            bootstrap_args,
431            None,
432        )
433        .await
434    }
435
436    /// Opens a read only debug persist backed catalog defined by `persist_client` and
437    /// `organization_id`.
438    ///
439    /// See [`Catalog::with_debug`].
440    pub async fn open_debug_read_only_persist_catalog_config(
441        persist_client: PersistClient,
442        now: NowFn,
443        environment_id: EnvironmentId,
444        system_parameter_defaults: BTreeMap<String, String>,
445        build_info: &'static BuildInfo,
446        bootstrap_args: &BootstrapArgs,
447        enable_expression_cache_override: Option<bool>,
448    ) -> Result<Catalog, anyhow::Error> {
449        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
450            .with_organization_id(environment_id.organization_id())
451            .with_version(
452                build_info
453                    .version
454                    .parse()
455                    .expect("build version is parseable"),
456            )
457            .build()
458            .await?;
459        let storage = openable_storage.open_read_only(bootstrap_args).await?;
460        Self::open_debug_catalog_inner(
461            persist_client,
462            storage,
463            now,
464            Some(environment_id),
465            build_info,
466            system_parameter_defaults,
467            bootstrap_args,
468            enable_expression_cache_override,
469        )
470        .await
471    }
472
473    async fn open_debug_catalog_inner(
474        persist_client: PersistClient,
475        storage: Box<dyn DurableCatalogState>,
476        now: NowFn,
477        environment_id: Option<EnvironmentId>,
478        build_info: &'static BuildInfo,
479        system_parameter_defaults: BTreeMap<String, String>,
480        bootstrap_args: &BootstrapArgs,
481        enable_expression_cache_override: Option<bool>,
482    ) -> Result<Catalog, anyhow::Error> {
483        let metrics_registry = &MetricsRegistry::new();
484        let secrets_reader = Arc::new(InMemorySecretsController::new());
485        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
486        // debugging/testing.
487        let previous_ts = now().into();
488        let replica_size = &bootstrap_args.default_cluster_replica_size;
489        let read_only = false;
490
491        let OpenCatalogResult {
492            catalog,
493            migrated_storage_collections_0dt: _,
494            new_builtin_collections: _,
495            builtin_table_updates: _,
496            cached_global_exprs: _,
497            uncached_local_exprs: _,
498        } = Catalog::open(Config {
499            storage,
500            metrics_registry,
501            state: StateConfig {
502                unsafe_mode: true,
503                all_features: false,
504                build_info,
505                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
506                read_only,
507                now,
508                boot_ts: previous_ts,
509                skip_migrations: true,
510                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
511                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
512                    size: replica_size.clone(),
513                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
514                },
515                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
516                    size: replica_size.clone(),
517                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
518                },
519                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
520                    size: replica_size.clone(),
521                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
522                },
523                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
524                    size: replica_size.clone(),
525                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
526                },
527                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
528                    size: replica_size.clone(),
529                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
530                },
531                system_parameter_defaults,
532                remote_system_parameters: None,
533                availability_zones: vec![],
534                egress_addresses: vec![],
535                aws_principal_context: None,
536                aws_privatelink_availability_zones: None,
537                http_host_name: None,
538                connection_context: ConnectionContext::for_tests(secrets_reader),
539                builtin_item_migration_config: BuiltinItemMigrationConfig {
540                    persist_client: persist_client.clone(),
541                    read_only,
542                    force_migration: None,
543                },
544                persist_client,
545                enable_expression_cache_override,
546                helm_chart_version: None,
547                external_login_password_mz_system: None,
548                license_key: ValidatedLicenseKey::for_tests(),
549            },
550        })
551        .await?;
552        Ok(catalog)
553    }
554
555    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
556        self.state.for_session(session)
557    }
558
559    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
560        self.state.for_sessionless_user(role_id)
561    }
562
563    pub fn for_system_session(&self) -> ConnCatalog<'_> {
564        self.state.for_system_session()
565    }
566
567    async fn storage<'a>(
568        &'a self,
569    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
570        self.storage.lock().await
571    }
572
573    pub async fn current_upper(&self) -> mz_repr::Timestamp {
574        self.storage().await.current_upper().await
575    }
576
577    pub async fn allocate_user_id(
578        &self,
579        commit_ts: mz_repr::Timestamp,
580    ) -> Result<(CatalogItemId, GlobalId), Error> {
581        self.storage()
582            .await
583            .allocate_user_id(commit_ts)
584            .await
585            .maybe_terminate("allocating user ids")
586            .err_into()
587    }
588
589    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
590    pub async fn allocate_user_ids(
591        &self,
592        amount: u64,
593        commit_ts: mz_repr::Timestamp,
594    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
595        self.storage()
596            .await
597            .allocate_user_ids(amount, commit_ts)
598            .await
599            .maybe_terminate("allocating user ids")
600            .err_into()
601    }
602
603    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
604        let commit_ts = self.storage().await.current_upper().await;
605        self.allocate_user_id(commit_ts).await
606    }
607
608    /// Get the next user item ID without allocating it.
609    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
610        self.storage()
611            .await
612            .get_next_user_item_id()
613            .await
614            .err_into()
615    }
616
617    #[cfg(test)]
618    pub async fn allocate_system_id(
619        &self,
620        commit_ts: mz_repr::Timestamp,
621    ) -> Result<(CatalogItemId, GlobalId), Error> {
622        use mz_ore::collections::CollectionExt;
623
624        let mut storage = self.storage().await;
625        let mut txn = storage.transaction().await?;
626        let id = txn
627            .allocate_system_item_ids(1)
628            .maybe_terminate("allocating system ids")?
629            .into_element();
630        // Drain transaction.
631        let _ = txn.get_and_commit_op_updates();
632        txn.commit(commit_ts).await?;
633        Ok(id)
634    }
635
636    /// Get the next system item ID without allocating it.
637    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
638        self.storage()
639            .await
640            .get_next_system_item_id()
641            .await
642            .err_into()
643    }
644
645    pub async fn allocate_user_cluster_id(
646        &self,
647        commit_ts: mz_repr::Timestamp,
648    ) -> Result<ClusterId, Error> {
649        self.storage()
650            .await
651            .allocate_user_cluster_id(commit_ts)
652            .await
653            .maybe_terminate("allocating user cluster ids")
654            .err_into()
655    }
656
657    /// Get the next system replica id without allocating it.
658    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
659        self.storage()
660            .await
661            .get_next_system_replica_id()
662            .await
663            .err_into()
664    }
665
666    /// Get the next user replica id without allocating it.
667    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
668        self.storage()
669            .await
670            .get_next_user_replica_id()
671            .await
672            .err_into()
673    }
674
675    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
676        self.state.resolve_database(database_name)
677    }
678
679    pub fn resolve_schema(
680        &self,
681        current_database: Option<&DatabaseId>,
682        database_name: Option<&str>,
683        schema_name: &str,
684        conn_id: &ConnectionId,
685    ) -> Result<&Schema, SqlCatalogError> {
686        self.state
687            .resolve_schema(current_database, database_name, schema_name, conn_id)
688    }
689
690    pub fn resolve_schema_in_database(
691        &self,
692        database_spec: &ResolvedDatabaseSpecifier,
693        schema_name: &str,
694        conn_id: &ConnectionId,
695    ) -> Result<&Schema, SqlCatalogError> {
696        self.state
697            .resolve_schema_in_database(database_spec, schema_name, conn_id)
698    }
699
700    pub fn resolve_replica_in_cluster(
701        &self,
702        cluster_id: &ClusterId,
703        replica_name: &str,
704    ) -> Result<&ClusterReplica, SqlCatalogError> {
705        self.state
706            .resolve_replica_in_cluster(cluster_id, replica_name)
707    }
708
709    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
710        self.state.resolve_system_schema(name)
711    }
712
713    pub fn resolve_search_path(
714        &self,
715        session: &Session,
716    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
717        self.state.resolve_search_path(session)
718    }
719
720    /// Resolves `name` to a non-function [`CatalogEntry`].
721    pub fn resolve_entry(
722        &self,
723        current_database: Option<&DatabaseId>,
724        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
725        name: &PartialItemName,
726        conn_id: &ConnectionId,
727    ) -> Result<&CatalogEntry, SqlCatalogError> {
728        self.state
729            .resolve_entry(current_database, search_path, name, conn_id)
730    }
731
732    /// Resolves a `BuiltinTable`.
733    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
734        self.state.resolve_builtin_table(builtin)
735    }
736
737    /// Resolves a `BuiltinLog`.
738    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
739        self.state.resolve_builtin_log(builtin).0
740    }
741
742    /// Resolves a `BuiltinSource`.
743    pub fn resolve_builtin_storage_collection(
744        &self,
745        builtin: &'static BuiltinSource,
746    ) -> CatalogItemId {
747        self.state.resolve_builtin_source(builtin)
748    }
749
750    /// Resolves `name` to a function [`CatalogEntry`].
751    pub fn resolve_function(
752        &self,
753        current_database: Option<&DatabaseId>,
754        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
755        name: &PartialItemName,
756        conn_id: &ConnectionId,
757    ) -> Result<&CatalogEntry, SqlCatalogError> {
758        self.state
759            .resolve_function(current_database, search_path, name, conn_id)
760    }
761
762    /// Resolves `name` to a type [`CatalogEntry`].
763    pub fn resolve_type(
764        &self,
765        current_database: Option<&DatabaseId>,
766        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
767        name: &PartialItemName,
768        conn_id: &ConnectionId,
769    ) -> Result<&CatalogEntry, SqlCatalogError> {
770        self.state
771            .resolve_type(current_database, search_path, name, conn_id)
772    }
773
774    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
775        self.state.resolve_cluster(name)
776    }
777
778    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
779    ///
780    /// # Panics
781    /// * If the [`BuiltinCluster`] doesn't exist.
782    ///
783    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
784        self.state.resolve_builtin_cluster(cluster)
785    }
786
787    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
788        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
789    }
790
791    /// Resolves a [`Cluster`] for a TargetCluster.
792    pub fn resolve_target_cluster(
793        &self,
794        target_cluster: TargetCluster,
795        session: &Session,
796    ) -> Result<&Cluster, AdapterError> {
797        match target_cluster {
798            TargetCluster::CatalogServer => {
799                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
800            }
801            TargetCluster::Active => self.active_cluster(session),
802            TargetCluster::Transaction(cluster_id) => self
803                .try_get_cluster(cluster_id)
804                .ok_or(AdapterError::ConcurrentClusterDrop),
805        }
806    }
807
808    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
809        // TODO(benesch): this check here is not sufficiently protective. It'd
810        // be very easy for a code path to accidentally avoid this check by
811        // calling `resolve_cluster(session.vars().cluster())`.
812        if session.user().name != SYSTEM_USER.name
813            && session.user().name != SUPPORT_USER.name
814            && session.vars().cluster() == SYSTEM_USER.name
815        {
816            coord_bail!(
817                "system cluster '{}' cannot execute user queries",
818                SYSTEM_USER.name
819            );
820        }
821        let cluster = self.resolve_cluster(session.vars().cluster())?;
822        Ok(cluster)
823    }
824
825    pub fn state(&self) -> &CatalogState {
826        &self.state
827    }
828
829    pub fn resolve_full_name(
830        &self,
831        name: &QualifiedItemName,
832        conn_id: Option<&ConnectionId>,
833    ) -> FullItemName {
834        self.state.resolve_full_name(name, conn_id)
835    }
836
837    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
838        self.state.try_get_entry(id)
839    }
840
841    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
842        self.state.try_get_entry_by_global_id(id)
843    }
844
845    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
846        self.state.get_entry(id)
847    }
848
849    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
850        self.state.get_entry_by_global_id(id)
851    }
852
853    pub fn get_global_ids<'a>(
854        &'a self,
855        id: &CatalogItemId,
856    ) -> impl Iterator<Item = GlobalId> + use<'a> {
857        self.get_entry(id).global_ids()
858    }
859
860    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
861        self.get_entry_by_global_id(id).id()
862    }
863
864    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
865        let item = self.try_get_entry_by_global_id(id)?;
866        Some(item.id())
867    }
868
869    pub fn get_schema(
870        &self,
871        database_spec: &ResolvedDatabaseSpecifier,
872        schema_spec: &SchemaSpecifier,
873        conn_id: &ConnectionId,
874    ) -> &Schema {
875        self.state.get_schema(database_spec, schema_spec, conn_id)
876    }
877
878    pub fn try_get_schema(
879        &self,
880        database_spec: &ResolvedDatabaseSpecifier,
881        schema_spec: &SchemaSpecifier,
882        conn_id: &ConnectionId,
883    ) -> Option<&Schema> {
884        self.state
885            .try_get_schema(database_spec, schema_spec, conn_id)
886    }
887
888    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
889        self.state.get_mz_catalog_schema_id()
890    }
891
892    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
893        self.state.get_pg_catalog_schema_id()
894    }
895
896    pub fn get_information_schema_id(&self) -> SchemaId {
897        self.state.get_information_schema_id()
898    }
899
900    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
901        self.state.get_mz_internal_schema_id()
902    }
903
904    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
905        self.state.get_mz_introspection_schema_id()
906    }
907
908    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
909        self.state.get_mz_unsafe_schema_id()
910    }
911
912    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
913        self.state.system_schema_ids()
914    }
915
916    pub fn get_database(&self, id: &DatabaseId) -> &Database {
917        self.state.get_database(id)
918    }
919
920    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
921        self.state.try_get_role(id)
922    }
923
924    pub fn get_role(&self, id: &RoleId) -> &Role {
925        self.state.get_role(id)
926    }
927
928    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
929        self.state.try_get_role_by_name(role_name)
930    }
931
932    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
933        self.state.try_get_role_auth_by_id(id)
934    }
935
936    /// Creates a new schema in the `Catalog` for temporary items
937    /// indicated by the TEMPORARY or TEMP keywords.
938    pub fn create_temporary_schema(
939        &mut self,
940        conn_id: &ConnectionId,
941        owner_id: RoleId,
942    ) -> Result<(), Error> {
943        self.state.create_temporary_schema(conn_id, owner_id)
944    }
945
946    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
947        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
948        self.state
949            .temporary_schemas
950            .get(conn_id)
951            .map(|schema| schema.items.contains_key(item_name))
952            .unwrap_or(false)
953    }
954
955    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
956    /// Returns Ok if conn_id's temp schema does not exist.
957    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
958        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
959            return Ok(());
960        };
961        if !schema.items.is_empty() {
962            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
963        }
964        Ok(())
965    }
966
967    pub(crate) fn object_dependents(
968        &self,
969        object_ids: &Vec<ObjectId>,
970        conn_id: &ConnectionId,
971    ) -> Vec<ObjectId> {
972        let mut seen = BTreeSet::new();
973        self.state.object_dependents(object_ids, conn_id, &mut seen)
974    }
975
976    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
977        FullNameV1 {
978            database: name.database.to_string(),
979            schema: name.schema.clone(),
980            item: name.item.clone(),
981        }
982    }
983
984    pub fn find_available_cluster_name(&self, name: &str) -> String {
985        let mut i = 0;
986        let mut candidate = name.to_string();
987        while self.state.clusters_by_name.contains_key(&candidate) {
988            i += 1;
989            candidate = format!("{}{}", name, i);
990        }
991        candidate
992    }
993
994    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
995        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
996            self.cluster_replica_sizes()
997                .enabled_allocations()
998                .map(|a| a.0.to_owned())
999                .collect::<Vec<_>>()
1000        } else {
1001            self.system_config().allowed_cluster_replica_sizes()
1002        }
1003    }
1004
1005    pub fn concretize_replica_location(
1006        &self,
1007        location: mz_catalog::durable::ReplicaLocation,
1008        allowed_sizes: &Vec<String>,
1009        allowed_availability_zones: Option<&[String]>,
1010    ) -> Result<ReplicaLocation, Error> {
1011        self.state
1012            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1013    }
1014
1015    pub(crate) fn ensure_valid_replica_size(
1016        &self,
1017        allowed_sizes: &[String],
1018        size: &String,
1019    ) -> Result<(), Error> {
1020        self.state.ensure_valid_replica_size(allowed_sizes, size)
1021    }
1022
1023    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1024        &self.state.cluster_replica_sizes
1025    }
1026
1027    /// Returns the privileges of an object by its ID.
1028    pub fn get_privileges(
1029        &self,
1030        id: &SystemObjectId,
1031        conn_id: &ConnectionId,
1032    ) -> Option<&PrivilegeMap> {
1033        match id {
1034            SystemObjectId::Object(id) => match id {
1035                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1036                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1037                ObjectId::Schema((database_spec, schema_spec)) => Some(
1038                    self.get_schema(database_spec, schema_spec, conn_id)
1039                        .privileges(),
1040                ),
1041                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1042                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1043                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1044            },
1045            SystemObjectId::System => Some(&self.state.system_privileges),
1046        }
1047    }
1048
1049    #[mz_ore::instrument(level = "debug")]
1050    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1051        Ok(self.storage().await.advance_upper(new_upper).await?)
1052    }
1053
1054    /// Return the ids of all log sources the given object depends on.
1055    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1056        self.state.introspection_dependencies(id)
1057    }
1058
1059    /// Serializes the catalog's in-memory state.
1060    ///
1061    /// There are no guarantees about the format of the serialized state, except
1062    /// that the serialized state for two identical catalogs will compare
1063    /// identically.
1064    pub fn dump(&self) -> Result<CatalogDump, Error> {
1065        Ok(CatalogDump::new(self.state.dump(None)?))
1066    }
1067
1068    /// Checks the [`Catalog`]s internal consistency.
1069    ///
1070    /// Returns a JSON object describing the inconsistencies, if there are any.
1071    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1072        self.state.check_consistency().map_err(|inconsistencies| {
1073            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1074                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1075            })
1076        })
1077    }
1078
1079    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1080        self.state.config()
1081    }
1082
1083    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1084        self.state.entry_by_id.values()
1085    }
1086
1087    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1088        self.entries()
1089            .filter(|entry| entry.is_connection() && entry.id().is_user())
1090    }
1091
1092    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1093        self.entries()
1094            .filter(|entry| entry.is_table() && entry.id().is_user())
1095    }
1096
1097    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1098        self.entries()
1099            .filter(|entry| entry.is_source() && entry.id().is_user())
1100    }
1101
1102    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1103        self.entries()
1104            .filter(|entry| entry.is_sink() && entry.id().is_user())
1105    }
1106
1107    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1108        self.entries()
1109            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1110    }
1111
1112    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1113        self.entries()
1114            .filter(|entry| entry.is_secret() && entry.id().is_user())
1115    }
1116
1117    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1118        self.state.get_network_policy(&network_policy_id)
1119    }
1120
1121    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1122        self.state.try_get_network_policy_by_name(name)
1123    }
1124
1125    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1126        self.state.clusters_by_id.values()
1127    }
1128
1129    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1130        self.state.get_cluster(cluster_id)
1131    }
1132
1133    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1134        self.state.try_get_cluster(cluster_id)
1135    }
1136
1137    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1138        self.clusters().filter(|cluster| cluster.id.is_user())
1139    }
1140
1141    pub fn get_cluster_replica(
1142        &self,
1143        cluster_id: ClusterId,
1144        replica_id: ReplicaId,
1145    ) -> &ClusterReplica {
1146        self.state.get_cluster_replica(cluster_id, replica_id)
1147    }
1148
1149    pub fn try_get_cluster_replica(
1150        &self,
1151        cluster_id: ClusterId,
1152        replica_id: ReplicaId,
1153    ) -> Option<&ClusterReplica> {
1154        self.state.try_get_cluster_replica(cluster_id, replica_id)
1155    }
1156
1157    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1158        self.user_clusters()
1159            .flat_map(|cluster| cluster.user_replicas())
1160    }
1161
1162    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1163        self.state.database_by_id.values()
1164    }
1165
1166    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1167        self.state
1168            .roles_by_id
1169            .values()
1170            .filter(|role| role.is_user())
1171    }
1172
1173    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1174        self.entries()
1175            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1176    }
1177
1178    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1179        self.state
1180            .network_policies_by_id
1181            .iter()
1182            .filter(|(id, _)| id.is_user())
1183            .map(|(_, policy)| policy)
1184    }
1185
1186    pub fn system_privileges(&self) -> &PrivilegeMap {
1187        &self.state.system_privileges
1188    }
1189
1190    pub fn default_privileges(
1191        &self,
1192    ) -> impl Iterator<
1193        Item = (
1194            &DefaultPrivilegeObject,
1195            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1196        ),
1197    > {
1198        self.state.default_privileges.iter()
1199    }
1200
1201    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1202        self.state
1203            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1204    }
1205
1206    pub fn pack_storage_usage_update(
1207        &self,
1208        event: VersionedStorageUsage,
1209        diff: Diff,
1210    ) -> BuiltinTableUpdate {
1211        self.state
1212            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1213    }
1214
1215    pub fn system_config(&self) -> &SystemVars {
1216        self.state.system_config()
1217    }
1218
1219    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1220        self.state.system_config_mut()
1221    }
1222
1223    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1224        self.state.ensure_not_reserved_role(role_id)
1225    }
1226
1227    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1228        self.state.ensure_grantable_role(role_id)
1229    }
1230
1231    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1232        self.state.ensure_not_system_role(role_id)
1233    }
1234
1235    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1236        self.state.ensure_not_predefined_role(role_id)
1237    }
1238
1239    pub fn ensure_not_reserved_network_policy(
1240        &self,
1241        network_policy_id: &NetworkPolicyId,
1242    ) -> Result<(), Error> {
1243        self.state
1244            .ensure_not_reserved_network_policy(network_policy_id)
1245    }
1246
1247    pub fn ensure_not_reserved_object(
1248        &self,
1249        object_id: &ObjectId,
1250        conn_id: &ConnectionId,
1251    ) -> Result<(), Error> {
1252        match object_id {
1253            ObjectId::Cluster(cluster_id) => {
1254                if cluster_id.is_system() {
1255                    let cluster = self.get_cluster(*cluster_id);
1256                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1257                        cluster.name().to_string(),
1258                    )))
1259                } else {
1260                    Ok(())
1261                }
1262            }
1263            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1264                if replica_id.is_system() {
1265                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1266                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1267                        replica.name().to_string(),
1268                    )))
1269                } else {
1270                    Ok(())
1271                }
1272            }
1273            ObjectId::Database(database_id) => {
1274                if database_id.is_system() {
1275                    let database = self.get_database(database_id);
1276                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1277                        database.name().to_string(),
1278                    )))
1279                } else {
1280                    Ok(())
1281                }
1282            }
1283            ObjectId::Schema((database_spec, schema_spec)) => {
1284                if schema_spec.is_system() {
1285                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1286                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1287                        schema.name().schema.clone(),
1288                    )))
1289                } else {
1290                    Ok(())
1291                }
1292            }
1293            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1294            ObjectId::Item(item_id) => {
1295                if item_id.is_system() {
1296                    let item = self.get_entry(item_id);
1297                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1298                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1299                } else {
1300                    Ok(())
1301                }
1302            }
1303            ObjectId::NetworkPolicy(network_policy_id) => {
1304                self.ensure_not_reserved_network_policy(network_policy_id)
1305            }
1306        }
1307    }
1308
1309    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1310    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1311        &mut self,
1312        create_sql: &str,
1313        force_if_exists_skip: bool,
1314    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1315        self.state
1316            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1317    }
1318
1319    /// Cache global and, optionally, local expressions for the given `GlobalId`.
1320    ///
1321    /// This takes the required plans and metainfo from the catalog and expects that they were
1322    /// previously stored via [`Catalog::set_optimized_plan`], [`Catalog::set_physical_plan`], and
1323    /// [`Catalog::set_dataflow_metainfo`].
1324    pub(crate) fn cache_expressions(
1325        &self,
1326        id: GlobalId,
1327        local_mir: Option<OptimizedMirRelationExpr>,
1328        optimizer_features: OptimizerFeatures,
1329    ) {
1330        let Some(mut global_mir) = self.try_get_optimized_plan(&id).cloned() else {
1331            soft_panic_or_log!("optimized plan missing for ID {id}");
1332            return;
1333        };
1334        let Some(mut physical_plan) = self.try_get_physical_plan(&id).cloned() else {
1335            soft_panic_or_log!("physical plan missing for ID {id}");
1336            return;
1337        };
1338        let Some(dataflow_metainfos) = self.try_get_dataflow_metainfo(&id).cloned() else {
1339            soft_panic_or_log!("dataflow metainfo missing for ID {id}");
1340            return;
1341        };
1342
1343        // Make sure we're not caching the result of timestamp selection, as it will almost
1344        // certainly be wrong if we re-install the dataflow at a later time.
1345        global_mir.as_of = None;
1346        global_mir.until = Default::default();
1347        physical_plan.as_of = None;
1348        physical_plan.until = Default::default();
1349
1350        let mut local_exprs = Vec::new();
1351        if let Some(local_mir) = local_mir {
1352            local_exprs.push((
1353                id,
1354                LocalExpressions {
1355                    local_mir,
1356                    optimizer_features: optimizer_features.clone(),
1357                },
1358            ));
1359        }
1360        let global_exprs = vec![(
1361            id,
1362            GlobalExpressions {
1363                global_mir,
1364                physical_plan,
1365                dataflow_metainfos,
1366                optimizer_features,
1367            },
1368        )];
1369        let _fut = self.update_expression_cache(local_exprs, global_exprs, Default::default());
1370    }
1371
1372    pub(crate) fn update_expression_cache<'a, 'b>(
1373        &'a self,
1374        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1375        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1376        invalidate_ids: BTreeSet<GlobalId>,
1377    ) -> BoxFuture<'b, ()> {
1378        if let Some(expr_cache) = &self.expr_cache_handle {
1379            expr_cache
1380                .update(
1381                    new_local_expressions,
1382                    new_global_expressions,
1383                    invalidate_ids,
1384                )
1385                .boxed()
1386        } else {
1387            async {}.boxed()
1388        }
1389    }
1390
1391    /// Listen for and apply all unconsumed updates to the durable catalog state.
1392    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1393    // `#[cfg(test)]` annotation.
1394    #[cfg(test)]
1395    async fn sync_to_current_updates(
1396        &mut self,
1397    ) -> Result<
1398        (
1399            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1400            Vec<ParsedStateUpdate>,
1401        ),
1402        CatalogError,
1403    > {
1404        let updates = self.storage().await.sync_to_current_updates().await?;
1405        let (builtin_table_updates, catalog_updates) = self
1406            .state
1407            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1408            .await;
1409        Ok((builtin_table_updates, catalog_updates))
1410    }
1411}
1412
1413pub fn is_reserved_name(name: &str) -> bool {
1414    BUILTIN_PREFIXES
1415        .iter()
1416        .any(|prefix| name.starts_with(prefix))
1417}
1418
1419pub fn is_reserved_role_name(name: &str) -> bool {
1420    is_reserved_name(name) || is_public_role(name)
1421}
1422
1423pub fn is_public_role(name: &str) -> bool {
1424    name == &*PUBLIC_ROLE_NAME
1425}
1426
1427pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1428    object_type_to_audit_object_type(sql_type.into())
1429}
1430
1431pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1432    match id {
1433        CommentObjectId::Table(_) => ObjectType::Table,
1434        CommentObjectId::View(_) => ObjectType::View,
1435        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1436        CommentObjectId::Source(_) => ObjectType::Source,
1437        CommentObjectId::Sink(_) => ObjectType::Sink,
1438        CommentObjectId::Index(_) => ObjectType::Index,
1439        CommentObjectId::Func(_) => ObjectType::Func,
1440        CommentObjectId::Connection(_) => ObjectType::Connection,
1441        CommentObjectId::Type(_) => ObjectType::Type,
1442        CommentObjectId::Secret(_) => ObjectType::Secret,
1443        CommentObjectId::Role(_) => ObjectType::Role,
1444        CommentObjectId::Database(_) => ObjectType::Database,
1445        CommentObjectId::Schema(_) => ObjectType::Schema,
1446        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1447        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1448        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1449        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1450    }
1451}
1452
1453pub(crate) fn object_type_to_audit_object_type(
1454    object_type: mz_sql::catalog::ObjectType,
1455) -> ObjectType {
1456    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1457}
1458
1459pub(crate) fn system_object_type_to_audit_object_type(
1460    system_type: &SystemObjectType,
1461) -> ObjectType {
1462    match system_type {
1463        SystemObjectType::Object(object_type) => match object_type {
1464            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1465            mz_sql::catalog::ObjectType::View => ObjectType::View,
1466            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1467            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1468            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1469            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1470            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1471            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1472            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1473            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1474            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1475            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1476            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1477            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1478            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1479            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1480            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1481        },
1482        SystemObjectType::System => ObjectType::System,
1483    }
1484}
1485
1486#[derive(Debug, Copy, Clone)]
1487pub enum UpdatePrivilegeVariant {
1488    Grant,
1489    Revoke,
1490}
1491
1492impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1493    fn from(variant: UpdatePrivilegeVariant) -> Self {
1494        match variant {
1495            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1496            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1497        }
1498    }
1499}
1500
1501impl From<UpdatePrivilegeVariant> for EventType {
1502    fn from(variant: UpdatePrivilegeVariant) -> Self {
1503        match variant {
1504            UpdatePrivilegeVariant::Grant => EventType::Grant,
1505            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1506        }
1507    }
1508}
1509
1510impl ConnCatalog<'_> {
1511    fn resolve_item_name(
1512        &self,
1513        name: &PartialItemName,
1514    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1515        self.resolve_item(name).map(|entry| entry.name())
1516    }
1517
1518    fn resolve_function_name(
1519        &self,
1520        name: &PartialItemName,
1521    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1522        self.resolve_function(name).map(|entry| entry.name())
1523    }
1524
1525    fn resolve_type_name(
1526        &self,
1527        name: &PartialItemName,
1528    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1529        self.resolve_type(name).map(|entry| entry.name())
1530    }
1531}
1532
1533impl ExprHumanizer for ConnCatalog<'_> {
1534    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1535        let entry = self.state.try_get_entry_by_global_id(&id)?;
1536        Some(self.resolve_full_name(entry.name()).to_string())
1537    }
1538
1539    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1540        let entry = self.state.try_get_entry_by_global_id(&id)?;
1541        Some(entry.name().item.clone())
1542    }
1543
1544    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1545        let entry = self.state.try_get_entry_by_global_id(&id)?;
1546        Some(self.resolve_full_name(entry.name()).into_parts())
1547    }
1548
1549    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1550        use SqlScalarType::*;
1551
1552        match typ {
1553            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1554            List {
1555                custom_id: Some(item_id),
1556                ..
1557            }
1558            | Map {
1559                custom_id: Some(item_id),
1560                ..
1561            } => {
1562                let item = self.get_item(item_id);
1563                self.minimal_qualification(item.name()).to_string()
1564            }
1565            List { element_type, .. } => {
1566                format!(
1567                    "{} list",
1568                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1569                )
1570            }
1571            Map { value_type, .. } => format!(
1572                "map[{}=>{}]",
1573                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1574                self.humanize_sql_scalar_type(value_type, postgres_compat)
1575            ),
1576            Record {
1577                custom_id: Some(item_id),
1578                ..
1579            } => {
1580                let item = self.get_item(item_id);
1581                self.minimal_qualification(item.name()).to_string()
1582            }
1583            Record { fields, .. } => format!(
1584                "record({})",
1585                fields
1586                    .iter()
1587                    .map(|f| format!(
1588                        "{}: {}",
1589                        f.0,
1590                        self.humanize_sql_column_type(&f.1, postgres_compat)
1591                    ))
1592                    .join(",")
1593            ),
1594            PgLegacyChar => "\"char\"".into(),
1595            Char { length } if !postgres_compat => match length {
1596                None => "char".into(),
1597                Some(length) => format!("char({})", length.into_u32()),
1598            },
1599            VarChar { max_length } if !postgres_compat => match max_length {
1600                None => "varchar".into(),
1601                Some(length) => format!("varchar({})", length.into_u32()),
1602            },
1603            UInt16 => "uint2".into(),
1604            UInt32 => "uint4".into(),
1605            UInt64 => "uint8".into(),
1606            ty => {
1607                let pgrepr_type = mz_pgrepr::Type::from(ty);
1608                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1609
1610                let res = if self
1611                    .effective_search_path(true)
1612                    .iter()
1613                    .any(|(_, schema)| schema == &pg_catalog_schema)
1614                {
1615                    pgrepr_type.name().to_string()
1616                } else {
1617                    // If PG_CATALOG_SCHEMA is not in search path, you need
1618                    // qualified object name to refer to type.
1619                    let name = QualifiedItemName {
1620                        qualifiers: ItemQualifiers {
1621                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1622                            schema_spec: pg_catalog_schema,
1623                        },
1624                        item: pgrepr_type.name().to_string(),
1625                    };
1626                    self.resolve_full_name(&name).to_string()
1627                };
1628                res
1629            }
1630        }
1631    }
1632
1633    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1634        let entry = self.state.try_get_entry_by_global_id(&id)?;
1635
1636        match entry.index() {
1637            Some(index) => {
1638                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1639                let mut on_names = on_desc
1640                    .iter_names()
1641                    .map(|col_name| col_name.to_string())
1642                    .collect::<Vec<_>>();
1643
1644                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1645
1646                // Init ix_names with unknown column names. Unknown columns are
1647                // represented as an empty String and rendered as `#c` by the
1648                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1649                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1650                let mut ix_names = vec![String::new(); ix_arity];
1651
1652                // Apply the permutation by swapping on_names with ix_names.
1653                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1654                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1655                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1656                    std::mem::swap(on_name, ix_name);
1657                }
1658
1659                Some(ix_names) // Return the updated ix_names vector.
1660            }
1661            None => {
1662                let desc = self.state.try_get_desc_by_global_id(&id)?;
1663                let column_names = desc
1664                    .iter_names()
1665                    .map(|col_name| col_name.to_string())
1666                    .collect();
1667
1668                Some(column_names)
1669            }
1670        }
1671    }
1672
1673    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1674        let desc = self.state.try_get_desc_by_global_id(&id)?;
1675        Some(desc.get_name(column).to_string())
1676    }
1677
1678    fn id_exists(&self, id: GlobalId) -> bool {
1679        self.state.entry_by_global_id.contains_key(&id)
1680    }
1681}
1682
1683impl SessionCatalog for ConnCatalog<'_> {
1684    fn active_role_id(&self) -> &RoleId {
1685        &self.role_id
1686    }
1687
1688    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1689        self.prepared_statements
1690            .as_ref()
1691            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1692            .flatten()
1693    }
1694
1695    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1696        self.portals
1697            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1698    }
1699
1700    fn active_database(&self) -> Option<&DatabaseId> {
1701        self.database.as_ref()
1702    }
1703
1704    fn active_cluster(&self) -> &str {
1705        &self.cluster
1706    }
1707
1708    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1709        &self.search_path
1710    }
1711
1712    fn resolve_database(
1713        &self,
1714        database_name: &str,
1715    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1716        Ok(self.state.resolve_database(database_name)?)
1717    }
1718
1719    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1720        self.state
1721            .database_by_id
1722            .get(id)
1723            .expect("database doesn't exist")
1724    }
1725
1726    // `as` is ok to use to cast to a trait object.
1727    #[allow(clippy::as_conversions)]
1728    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1729        self.state
1730            .database_by_id
1731            .values()
1732            .map(|database| database as &dyn CatalogDatabase)
1733            .collect()
1734    }
1735
1736    fn resolve_schema(
1737        &self,
1738        database_name: Option<&str>,
1739        schema_name: &str,
1740    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1741        Ok(self.state.resolve_schema(
1742            self.database.as_ref(),
1743            database_name,
1744            schema_name,
1745            &self.conn_id,
1746        )?)
1747    }
1748
1749    fn resolve_schema_in_database(
1750        &self,
1751        database_spec: &ResolvedDatabaseSpecifier,
1752        schema_name: &str,
1753    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1754        Ok(self
1755            .state
1756            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1757    }
1758
1759    fn get_schema(
1760        &self,
1761        database_spec: &ResolvedDatabaseSpecifier,
1762        schema_spec: &SchemaSpecifier,
1763    ) -> &dyn CatalogSchema {
1764        self.state
1765            .get_schema(database_spec, schema_spec, &self.conn_id)
1766    }
1767
1768    // `as` is ok to use to cast to a trait object.
1769    #[allow(clippy::as_conversions)]
1770    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1771        self.get_databases()
1772            .into_iter()
1773            .flat_map(|database| database.schemas().into_iter())
1774            .chain(
1775                self.state
1776                    .ambient_schemas_by_id
1777                    .values()
1778                    .chain(self.state.temporary_schemas.values())
1779                    .map(|schema| schema as &dyn CatalogSchema),
1780            )
1781            .collect()
1782    }
1783
1784    fn get_mz_internal_schema_id(&self) -> SchemaId {
1785        self.state().get_mz_internal_schema_id()
1786    }
1787
1788    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1789        self.state().get_mz_unsafe_schema_id()
1790    }
1791
1792    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1793        self.state.is_system_schema_specifier(schema)
1794    }
1795
1796    fn resolve_role(
1797        &self,
1798        role_name: &str,
1799    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1800        match self.state.try_get_role_by_name(role_name) {
1801            Some(role) => Ok(role),
1802            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1803        }
1804    }
1805
1806    fn resolve_network_policy(
1807        &self,
1808        policy_name: &str,
1809    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1810        match self.state.try_get_network_policy_by_name(policy_name) {
1811            Some(policy) => Ok(policy),
1812            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1813        }
1814    }
1815
1816    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1817        Some(self.state.roles_by_id.get(id)?)
1818    }
1819
1820    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1821        self.state.get_role(id)
1822    }
1823
1824    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1825        // `as` is ok to use to cast to a trait object.
1826        #[allow(clippy::as_conversions)]
1827        self.state
1828            .roles_by_id
1829            .values()
1830            .map(|role| role as &dyn CatalogRole)
1831            .collect()
1832    }
1833
1834    fn mz_system_role_id(&self) -> RoleId {
1835        MZ_SYSTEM_ROLE_ID
1836    }
1837
1838    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1839        self.state.collect_role_membership(id)
1840    }
1841
1842    fn get_network_policy(
1843        &self,
1844        id: &NetworkPolicyId,
1845    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1846        self.state.get_network_policy(id)
1847    }
1848
1849    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1850        // `as` is ok to use to cast to a trait object.
1851        #[allow(clippy::as_conversions)]
1852        self.state
1853            .network_policies_by_id
1854            .values()
1855            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1856            .collect()
1857    }
1858
1859    fn resolve_cluster(
1860        &self,
1861        cluster_name: Option<&str>,
1862    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1863        Ok(self
1864            .state
1865            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1866    }
1867
1868    fn resolve_cluster_replica(
1869        &self,
1870        cluster_replica_name: &QualifiedReplica,
1871    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1872        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1873    }
1874
1875    fn resolve_item(
1876        &self,
1877        name: &PartialItemName,
1878    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1879        let r = self.state.resolve_entry(
1880            self.database.as_ref(),
1881            &self.effective_search_path(true),
1882            name,
1883            &self.conn_id,
1884        )?;
1885        if self.unresolvable_ids.contains(&r.id()) {
1886            Err(SqlCatalogError::UnknownItem(name.to_string()))
1887        } else {
1888            Ok(r)
1889        }
1890    }
1891
1892    fn resolve_function(
1893        &self,
1894        name: &PartialItemName,
1895    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1896        let r = self.state.resolve_function(
1897            self.database.as_ref(),
1898            &self.effective_search_path(false),
1899            name,
1900            &self.conn_id,
1901        )?;
1902
1903        if self.unresolvable_ids.contains(&r.id()) {
1904            Err(SqlCatalogError::UnknownFunction {
1905                name: name.to_string(),
1906                alternative: None,
1907            })
1908        } else {
1909            Ok(r)
1910        }
1911    }
1912
1913    fn resolve_type(
1914        &self,
1915        name: &PartialItemName,
1916    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1917        let r = self.state.resolve_type(
1918            self.database.as_ref(),
1919            &self.effective_search_path(false),
1920            name,
1921            &self.conn_id,
1922        )?;
1923
1924        if self.unresolvable_ids.contains(&r.id()) {
1925            Err(SqlCatalogError::UnknownType {
1926                name: name.to_string(),
1927            })
1928        } else {
1929            Ok(r)
1930        }
1931    }
1932
1933    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1934        self.state.get_system_type(name)
1935    }
1936
1937    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1938        Some(self.state.try_get_entry(id)?)
1939    }
1940
1941    fn try_get_item_by_global_id(
1942        &self,
1943        id: &GlobalId,
1944    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1945        let entry = self.state.try_get_entry_by_global_id(id)?;
1946        let entry = match &entry.item {
1947            CatalogItem::Table(table) => {
1948                let (version, _gid) = table
1949                    .collections
1950                    .iter()
1951                    .find(|(_version, gid)| *gid == id)
1952                    .expect("catalog out of sync, mismatched GlobalId");
1953                entry.at_version(RelationVersionSelector::Specific(*version))
1954            }
1955            _ => entry.at_version(RelationVersionSelector::Latest),
1956        };
1957        Some(entry)
1958    }
1959
1960    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1961        self.state.get_entry(id)
1962    }
1963
1964    fn get_item_by_global_id(
1965        &self,
1966        id: &GlobalId,
1967    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1968        let entry = self.state.get_entry_by_global_id(id);
1969        let entry = match &entry.item {
1970            CatalogItem::Table(table) => {
1971                let (version, _gid) = table
1972                    .collections
1973                    .iter()
1974                    .find(|(_version, gid)| *gid == id)
1975                    .expect("catalog out of sync, mismatched GlobalId");
1976                entry.at_version(RelationVersionSelector::Specific(*version))
1977            }
1978            _ => entry.at_version(RelationVersionSelector::Latest),
1979        };
1980        entry
1981    }
1982
1983    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
1984        self.get_schemas()
1985            .into_iter()
1986            .flat_map(|schema| schema.item_ids())
1987            .map(|id| self.get_item(&id))
1988            .collect()
1989    }
1990
1991    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1992        self.state
1993            .get_item_by_name(name, &self.conn_id)
1994            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1995    }
1996
1997    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1998        self.state
1999            .get_type_by_name(name, &self.conn_id)
2000            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2001    }
2002
2003    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2004        &self.state.clusters_by_id[&id]
2005    }
2006
2007    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2008        self.state
2009            .clusters_by_id
2010            .values()
2011            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2012            .collect()
2013    }
2014
2015    fn get_cluster_replica(
2016        &self,
2017        cluster_id: ClusterId,
2018        replica_id: ReplicaId,
2019    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2020        let cluster = self.get_cluster(cluster_id);
2021        cluster.replica(replica_id)
2022    }
2023
2024    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2025        self.get_clusters()
2026            .into_iter()
2027            .flat_map(|cluster| cluster.replicas().into_iter())
2028            .collect()
2029    }
2030
2031    fn get_system_privileges(&self) -> &PrivilegeMap {
2032        &self.state.system_privileges
2033    }
2034
2035    fn get_default_privileges(
2036        &self,
2037    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2038        self.state
2039            .default_privileges
2040            .iter()
2041            .map(|(object, acl_items)| (object, acl_items.collect()))
2042            .collect()
2043    }
2044
2045    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2046        self.state.find_available_name(name, &self.conn_id)
2047    }
2048
2049    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2050        self.state.resolve_full_name(name, Some(&self.conn_id))
2051    }
2052
2053    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2054        self.state.resolve_full_schema_name(name)
2055    }
2056
2057    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2058        self.state.get_entry_by_global_id(global_id).id()
2059    }
2060
2061    fn resolve_global_id(
2062        &self,
2063        item_id: &CatalogItemId,
2064        version: RelationVersionSelector,
2065    ) -> GlobalId {
2066        self.state
2067            .get_entry(item_id)
2068            .at_version(version)
2069            .global_id()
2070    }
2071
2072    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2073        self.state.config()
2074    }
2075
2076    fn now(&self) -> EpochMillis {
2077        (self.state.config().now)()
2078    }
2079
2080    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2081        self.state.aws_privatelink_availability_zones.clone()
2082    }
2083
2084    fn system_vars(&self) -> &SystemVars {
2085        &self.state.system_configuration
2086    }
2087
2088    fn system_vars_mut(&mut self) -> &mut SystemVars {
2089        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2090    }
2091
2092    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2093        self.state().get_owner_id(id, self.conn_id())
2094    }
2095
2096    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2097        match id {
2098            SystemObjectId::System => Some(&self.state.system_privileges),
2099            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2100                Some(self.get_cluster(*id).privileges())
2101            }
2102            SystemObjectId::Object(ObjectId::Database(id)) => {
2103                Some(self.get_database(id).privileges())
2104            }
2105            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2106                // For temporary schemas that haven't been created yet (lazy creation),
2107                // we return None - the RBAC check will need to handle this case.
2108                self.state
2109                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2110                    .map(|schema| schema.privileges())
2111            }
2112            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2113            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2114                Some(self.get_network_policy(id).privileges())
2115            }
2116            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2117            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2118        }
2119    }
2120
2121    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2122        let mut seen = BTreeSet::new();
2123        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2124    }
2125
2126    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2127        let mut seen = BTreeSet::new();
2128        self.state.item_dependents(id, &mut seen)
2129    }
2130
2131    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2132        rbac::all_object_privileges(object_type)
2133    }
2134
2135    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2136        self.state.get_object_type(object_id)
2137    }
2138
2139    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2140        self.state.get_system_object_type(id)
2141    }
2142
2143    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2144    /// the object.
2145    ///
2146    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2147    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2148    /// only for error msgs and other humanizations.
2149    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2150        if qualified_name.qualifiers.schema_spec.is_temporary() {
2151            // All bets are off. Just give up and return the qualified name as is.
2152            // TODO: Figure out what's going on with temporary objects.
2153
2154            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2155            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2156            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2157            // which we don't have a simple repro.
2158            return qualified_name.item.clone().into();
2159        }
2160
2161        let database_id = match &qualified_name.qualifiers.database_spec {
2162            ResolvedDatabaseSpecifier::Ambient => None,
2163            ResolvedDatabaseSpecifier::Id(id)
2164                if self.database.is_some() && self.database == Some(*id) =>
2165            {
2166                None
2167            }
2168            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2169        };
2170
2171        let schema_spec = if database_id.is_none()
2172            && self.resolve_item_name(&PartialItemName {
2173                database: None,
2174                schema: None,
2175                item: qualified_name.item.clone(),
2176            }) == Ok(qualified_name)
2177            || self.resolve_function_name(&PartialItemName {
2178                database: None,
2179                schema: None,
2180                item: qualified_name.item.clone(),
2181            }) == Ok(qualified_name)
2182            || self.resolve_type_name(&PartialItemName {
2183                database: None,
2184                schema: None,
2185                item: qualified_name.item.clone(),
2186            }) == Ok(qualified_name)
2187        {
2188            None
2189        } else {
2190            // If `search_path` does not contain `full_name.schema`, the
2191            // `PartialName` must contain it.
2192            Some(qualified_name.qualifiers.schema_spec.clone())
2193        };
2194
2195        let res = PartialItemName {
2196            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2197            schema: schema_spec.map(|spec| {
2198                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2199                    .name()
2200                    .schema
2201                    .clone()
2202            }),
2203            item: qualified_name.item.clone(),
2204        };
2205        assert!(
2206            self.resolve_item_name(&res) == Ok(qualified_name)
2207                || self.resolve_function_name(&res) == Ok(qualified_name)
2208                || self.resolve_type_name(&res) == Ok(qualified_name)
2209        );
2210        res
2211    }
2212
2213    fn add_notice(&self, notice: PlanNotice) {
2214        let _ = self.notices_tx.send(notice.into());
2215    }
2216
2217    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2218        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2219        self.state.comments.get_object_comments(comment_id)
2220    }
2221
2222    fn is_cluster_size_cc(&self, size: &str) -> bool {
2223        self.state
2224            .cluster_replica_sizes
2225            .0
2226            .get(size)
2227            .map_or(false, |a| a.is_cc)
2228    }
2229}
2230
2231#[cfg(test)]
2232mod tests {
2233    use std::collections::{BTreeMap, BTreeSet};
2234    use std::sync::Arc;
2235    use std::{env, iter};
2236
2237    use itertools::Itertools;
2238    use mz_catalog::memory::objects::CatalogItem;
2239    use tokio_postgres::NoTls;
2240    use tokio_postgres::types::Type;
2241    use uuid::Uuid;
2242
2243    use mz_catalog::SYSTEM_CONN_ID;
2244    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2245    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2246    use mz_controller_types::{ClusterId, ReplicaId};
2247    use mz_expr::MirScalarExpr;
2248    use mz_ore::now::to_datetime;
2249    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2250    use mz_persist_client::PersistClient;
2251    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2252    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2253    use mz_repr::role_id::RoleId;
2254    use mz_repr::{
2255        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2256        SqlScalarType, Timestamp,
2257    };
2258    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2259    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2260    use mz_sql::names::{
2261        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2262        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2263    };
2264    use mz_sql::plan::{
2265        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2266        QueryLifetime, Scope, StatementContext,
2267    };
2268    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2269    use mz_sql::session::vars::{SystemVars, VarInput};
2270
2271    use crate::catalog::state::LocalExpressionCache;
2272    use crate::catalog::{Catalog, Op};
2273    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2274    use crate::session::Session;
2275
2276    /// System sessions have an empty `search_path` so it's necessary to
2277    /// schema-qualify all referenced items.
2278    ///
2279    /// Dummy (and ostensibly client) sessions contain system schemas in their
2280    /// search paths, so do not require schema qualification on system objects such
2281    /// as types.
2282    #[mz_ore::test(tokio::test)]
2283    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2284    async fn test_minimal_qualification() {
2285        Catalog::with_debug(|catalog| async move {
2286            struct TestCase {
2287                input: QualifiedItemName,
2288                system_output: PartialItemName,
2289                normal_output: PartialItemName,
2290            }
2291
2292            let test_cases = vec![
2293                TestCase {
2294                    input: QualifiedItemName {
2295                        qualifiers: ItemQualifiers {
2296                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2297                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2298                        },
2299                        item: "numeric".to_string(),
2300                    },
2301                    system_output: PartialItemName {
2302                        database: None,
2303                        schema: None,
2304                        item: "numeric".to_string(),
2305                    },
2306                    normal_output: PartialItemName {
2307                        database: None,
2308                        schema: None,
2309                        item: "numeric".to_string(),
2310                    },
2311                },
2312                TestCase {
2313                    input: QualifiedItemName {
2314                        qualifiers: ItemQualifiers {
2315                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2316                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2317                        },
2318                        item: "mz_array_types".to_string(),
2319                    },
2320                    system_output: PartialItemName {
2321                        database: None,
2322                        schema: None,
2323                        item: "mz_array_types".to_string(),
2324                    },
2325                    normal_output: PartialItemName {
2326                        database: None,
2327                        schema: None,
2328                        item: "mz_array_types".to_string(),
2329                    },
2330                },
2331            ];
2332
2333            for tc in test_cases {
2334                assert_eq!(
2335                    catalog
2336                        .for_system_session()
2337                        .minimal_qualification(&tc.input),
2338                    tc.system_output
2339                );
2340                assert_eq!(
2341                    catalog
2342                        .for_session(&Session::dummy())
2343                        .minimal_qualification(&tc.input),
2344                    tc.normal_output
2345                );
2346            }
2347            catalog.expire().await;
2348        })
2349        .await
2350    }
2351
2352    #[mz_ore::test(tokio::test)]
2353    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2354    async fn test_catalog_revision() {
2355        let persist_client = PersistClient::new_for_tests().await;
2356        let organization_id = Uuid::new_v4();
2357        let bootstrap_args = test_bootstrap_args();
2358        {
2359            let mut catalog = Catalog::open_debug_catalog(
2360                persist_client.clone(),
2361                organization_id.clone(),
2362                &bootstrap_args,
2363            )
2364            .await
2365            .expect("unable to open debug catalog");
2366            assert_eq!(catalog.transient_revision(), 1);
2367            let commit_ts = catalog.current_upper().await;
2368            catalog
2369                .transact(
2370                    None,
2371                    commit_ts,
2372                    None,
2373                    vec![Op::CreateDatabase {
2374                        name: "test".to_string(),
2375                        owner_id: MZ_SYSTEM_ROLE_ID,
2376                    }],
2377                )
2378                .await
2379                .expect("failed to transact");
2380            assert_eq!(catalog.transient_revision(), 2);
2381            catalog.expire().await;
2382        }
2383        {
2384            let catalog =
2385                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2386                    .await
2387                    .expect("unable to open debug catalog");
2388            // Re-opening the same catalog resets the transient_revision to 1.
2389            assert_eq!(catalog.transient_revision(), 1);
2390            catalog.expire().await;
2391        }
2392    }
2393
2394    #[mz_ore::test(tokio::test)]
2395    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2396    async fn test_effective_search_path() {
2397        Catalog::with_debug(|catalog| async move {
2398            let mz_catalog_schema = (
2399                ResolvedDatabaseSpecifier::Ambient,
2400                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2401            );
2402            let pg_catalog_schema = (
2403                ResolvedDatabaseSpecifier::Ambient,
2404                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2405            );
2406            let mz_temp_schema = (
2407                ResolvedDatabaseSpecifier::Ambient,
2408                SchemaSpecifier::Temporary,
2409            );
2410
2411            // Behavior with the default search_schema (public)
2412            let session = Session::dummy();
2413            let conn_catalog = catalog.for_session(&session);
2414            assert_ne!(
2415                conn_catalog.effective_search_path(false),
2416                conn_catalog.search_path
2417            );
2418            assert_ne!(
2419                conn_catalog.effective_search_path(true),
2420                conn_catalog.search_path
2421            );
2422            assert_eq!(
2423                conn_catalog.effective_search_path(false),
2424                vec![
2425                    mz_catalog_schema.clone(),
2426                    pg_catalog_schema.clone(),
2427                    conn_catalog.search_path[0].clone()
2428                ]
2429            );
2430            assert_eq!(
2431                conn_catalog.effective_search_path(true),
2432                vec![
2433                    mz_temp_schema.clone(),
2434                    mz_catalog_schema.clone(),
2435                    pg_catalog_schema.clone(),
2436                    conn_catalog.search_path[0].clone()
2437                ]
2438            );
2439
2440            // missing schemas are added when missing
2441            let mut session = Session::dummy();
2442            session
2443                .vars_mut()
2444                .set(
2445                    &SystemVars::new(),
2446                    "search_path",
2447                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2448                    false,
2449                )
2450                .expect("failed to set search_path");
2451            let conn_catalog = catalog.for_session(&session);
2452            assert_ne!(
2453                conn_catalog.effective_search_path(false),
2454                conn_catalog.search_path
2455            );
2456            assert_ne!(
2457                conn_catalog.effective_search_path(true),
2458                conn_catalog.search_path
2459            );
2460            assert_eq!(
2461                conn_catalog.effective_search_path(false),
2462                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2463            );
2464            assert_eq!(
2465                conn_catalog.effective_search_path(true),
2466                vec![
2467                    mz_temp_schema.clone(),
2468                    mz_catalog_schema.clone(),
2469                    pg_catalog_schema.clone()
2470                ]
2471            );
2472
2473            let mut session = Session::dummy();
2474            session
2475                .vars_mut()
2476                .set(
2477                    &SystemVars::new(),
2478                    "search_path",
2479                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2480                    false,
2481                )
2482                .expect("failed to set search_path");
2483            let conn_catalog = catalog.for_session(&session);
2484            assert_ne!(
2485                conn_catalog.effective_search_path(false),
2486                conn_catalog.search_path
2487            );
2488            assert_ne!(
2489                conn_catalog.effective_search_path(true),
2490                conn_catalog.search_path
2491            );
2492            assert_eq!(
2493                conn_catalog.effective_search_path(false),
2494                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2495            );
2496            assert_eq!(
2497                conn_catalog.effective_search_path(true),
2498                vec![
2499                    mz_temp_schema.clone(),
2500                    pg_catalog_schema.clone(),
2501                    mz_catalog_schema.clone()
2502                ]
2503            );
2504
2505            let mut session = Session::dummy();
2506            session
2507                .vars_mut()
2508                .set(
2509                    &SystemVars::new(),
2510                    "search_path",
2511                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2512                    false,
2513                )
2514                .expect("failed to set search_path");
2515            let conn_catalog = catalog.for_session(&session);
2516            assert_ne!(
2517                conn_catalog.effective_search_path(false),
2518                conn_catalog.search_path
2519            );
2520            assert_ne!(
2521                conn_catalog.effective_search_path(true),
2522                conn_catalog.search_path
2523            );
2524            assert_eq!(
2525                conn_catalog.effective_search_path(false),
2526                vec![
2527                    mz_catalog_schema.clone(),
2528                    pg_catalog_schema.clone(),
2529                    mz_temp_schema.clone()
2530                ]
2531            );
2532            assert_eq!(
2533                conn_catalog.effective_search_path(true),
2534                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2535            );
2536            catalog.expire().await;
2537        })
2538        .await
2539    }
2540
2541    #[mz_ore::test(tokio::test)]
2542    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2543    async fn test_normalized_create() {
2544        use mz_ore::collections::CollectionExt;
2545        Catalog::with_debug(|catalog| async move {
2546            let conn_catalog = catalog.for_system_session();
2547            let scx = &mut StatementContext::new(None, &conn_catalog);
2548
2549            let parsed = mz_sql_parser::parser::parse_statements(
2550                "create view public.foo as select 1 as bar",
2551            )
2552            .expect("")
2553            .into_element()
2554            .ast;
2555
2556            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2557
2558            // Ensure that all identifiers are quoted.
2559            assert_eq!(
2560                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2561                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2562            );
2563            catalog.expire().await;
2564        })
2565        .await;
2566    }
2567
2568    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2569    #[mz_ore::test(tokio::test)]
2570    #[cfg_attr(miri, ignore)] // slow
2571    async fn test_large_catalog_item() {
2572        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2573        let column = ", 1";
2574        let view_def_size = view_def.bytes().count();
2575        let column_size = column.bytes().count();
2576        let column_count =
2577            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2578        let columns = iter::repeat(column).take(column_count).join("");
2579        let create_sql = format!("{view_def}{columns})");
2580        let create_sql_check = create_sql.clone();
2581        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2582        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2583            &create_sql
2584        ));
2585
2586        let persist_client = PersistClient::new_for_tests().await;
2587        let organization_id = Uuid::new_v4();
2588        let id = CatalogItemId::User(1);
2589        let gid = GlobalId::User(1);
2590        let bootstrap_args = test_bootstrap_args();
2591        {
2592            let mut catalog = Catalog::open_debug_catalog(
2593                persist_client.clone(),
2594                organization_id.clone(),
2595                &bootstrap_args,
2596            )
2597            .await
2598            .expect("unable to open debug catalog");
2599            let item = catalog
2600                .state()
2601                .deserialize_item(
2602                    gid,
2603                    &create_sql,
2604                    &BTreeMap::new(),
2605                    &mut LocalExpressionCache::Closed,
2606                    None,
2607                )
2608                .expect("unable to parse view");
2609            let commit_ts = catalog.current_upper().await;
2610            catalog
2611                .transact(
2612                    None,
2613                    commit_ts,
2614                    None,
2615                    vec![Op::CreateItem {
2616                        item,
2617                        name: QualifiedItemName {
2618                            qualifiers: ItemQualifiers {
2619                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2620                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2621                            },
2622                            item: "v".to_string(),
2623                        },
2624                        id,
2625                        owner_id: MZ_SYSTEM_ROLE_ID,
2626                    }],
2627                )
2628                .await
2629                .expect("failed to transact");
2630            catalog.expire().await;
2631        }
2632        {
2633            let catalog =
2634                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2635                    .await
2636                    .expect("unable to open debug catalog");
2637            let view = catalog.get_entry(&id);
2638            assert_eq!("v", view.name.item);
2639            match &view.item {
2640                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2641                item => panic!("expected view, got {}", item.typ()),
2642            }
2643            catalog.expire().await;
2644        }
2645    }
2646
2647    #[mz_ore::test(tokio::test)]
2648    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2649    async fn test_object_type() {
2650        Catalog::with_debug(|catalog| async move {
2651            let conn_catalog = catalog.for_system_session();
2652
2653            assert_eq!(
2654                mz_sql::catalog::ObjectType::ClusterReplica,
2655                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2656                    ClusterId::user(1).expect("1 is a valid ID"),
2657                    ReplicaId::User(1)
2658                )))
2659            );
2660            assert_eq!(
2661                mz_sql::catalog::ObjectType::Role,
2662                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2663            );
2664            catalog.expire().await;
2665        })
2666        .await;
2667    }
2668
2669    #[mz_ore::test(tokio::test)]
2670    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2671    async fn test_get_privileges() {
2672        Catalog::with_debug(|catalog| async move {
2673            let conn_catalog = catalog.for_system_session();
2674
2675            assert_eq!(
2676                None,
2677                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2678                    ClusterId::user(1).expect("1 is a valid ID"),
2679                    ReplicaId::User(1),
2680                ))))
2681            );
2682            assert_eq!(
2683                None,
2684                conn_catalog
2685                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2686            );
2687            catalog.expire().await;
2688        })
2689        .await;
2690    }
2691
2692    #[mz_ore::test(tokio::test)]
2693    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2694    async fn verify_builtin_descs() {
2695        Catalog::with_debug(|catalog| async move {
2696            let conn_catalog = catalog.for_system_session();
2697
2698            let builtins_cfg = BuiltinsConfig {
2699                include_continual_tasks: true,
2700            };
2701            for builtin in BUILTINS::iter(&builtins_cfg) {
2702                let (schema, name, expected_desc) = match builtin {
2703                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2704                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2705                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2706                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2707                    Builtin::Log(_)
2708                    | Builtin::Type(_)
2709                    | Builtin::Func(_)
2710                    | Builtin::ContinualTask(_)
2711                    | Builtin::Index(_)
2712                    | Builtin::Connection(_) => continue,
2713                };
2714                let item = conn_catalog
2715                    .resolve_item(&PartialItemName {
2716                        database: None,
2717                        schema: Some(schema.to_string()),
2718                        item: name.to_string(),
2719                    })
2720                    .expect("unable to resolve item")
2721                    .at_version(RelationVersionSelector::Latest);
2722
2723                let actual_desc = item.relation_desc().expect("invalid item type");
2724                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2725                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2726                {
2727                    assert_eq!(
2728                        actual_name, expected_name,
2729                        "item {schema}.{name} column {index} name did not match its expected name"
2730                    );
2731                    assert_eq!(
2732                        actual_typ, expected_typ,
2733                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2734                    );
2735                }
2736                assert_eq!(
2737                    &*actual_desc, expected_desc,
2738                    "item {schema}.{name} did not match its expected RelationDesc"
2739                );
2740            }
2741            catalog.expire().await;
2742        })
2743        .await
2744    }
2745
2746    // Connect to a running Postgres server and verify that our builtin
2747    // types and functions match it, in addition to some other things.
2748    #[mz_ore::test(tokio::test)]
2749    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2750    async fn test_compare_builtins_postgres() {
2751        async fn inner(catalog: Catalog) {
2752            // Verify that all builtin functions:
2753            // - have a unique OID
2754            // - if they have a postgres counterpart (same oid) then they have matching name
2755            let (client, connection) = tokio_postgres::connect(
2756                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2757                NoTls,
2758            )
2759            .await
2760            .expect("failed to connect to Postgres");
2761
2762            task::spawn(|| "compare_builtin_postgres", async move {
2763                if let Err(e) = connection.await {
2764                    panic!("connection error: {}", e);
2765                }
2766            });
2767
2768            struct PgProc {
2769                name: String,
2770                arg_oids: Vec<u32>,
2771                ret_oid: Option<u32>,
2772                ret_set: bool,
2773            }
2774
2775            struct PgType {
2776                name: String,
2777                ty: String,
2778                elem: u32,
2779                array: u32,
2780                input: u32,
2781                receive: u32,
2782            }
2783
2784            struct PgOper {
2785                oprresult: u32,
2786                name: String,
2787            }
2788
2789            let pg_proc: BTreeMap<_, _> = client
2790                .query(
2791                    "SELECT
2792                    p.oid,
2793                    proname,
2794                    proargtypes,
2795                    prorettype,
2796                    proretset
2797                FROM pg_proc p
2798                JOIN pg_namespace n ON p.pronamespace = n.oid",
2799                    &[],
2800                )
2801                .await
2802                .expect("pg query failed")
2803                .into_iter()
2804                .map(|row| {
2805                    let oid: u32 = row.get("oid");
2806                    let pg_proc = PgProc {
2807                        name: row.get("proname"),
2808                        arg_oids: row.get("proargtypes"),
2809                        ret_oid: row.get("prorettype"),
2810                        ret_set: row.get("proretset"),
2811                    };
2812                    (oid, pg_proc)
2813                })
2814                .collect();
2815
2816            let pg_type: BTreeMap<_, _> = client
2817                .query(
2818                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2819                    &[],
2820                )
2821                .await
2822                .expect("pg query failed")
2823                .into_iter()
2824                .map(|row| {
2825                    let oid: u32 = row.get("oid");
2826                    let pg_type = PgType {
2827                        name: row.get("typname"),
2828                        ty: row.get("typtype"),
2829                        elem: row.get("typelem"),
2830                        array: row.get("typarray"),
2831                        input: row.get("typinput"),
2832                        receive: row.get("typreceive"),
2833                    };
2834                    (oid, pg_type)
2835                })
2836                .collect();
2837
2838            let pg_oper: BTreeMap<_, _> = client
2839                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2840                .await
2841                .expect("pg query failed")
2842                .into_iter()
2843                .map(|row| {
2844                    let oid: u32 = row.get("oid");
2845                    let pg_oper = PgOper {
2846                        name: row.get("oprname"),
2847                        oprresult: row.get("oprresult"),
2848                    };
2849                    (oid, pg_oper)
2850                })
2851                .collect();
2852
2853            let conn_catalog = catalog.for_system_session();
2854            let resolve_type_oid = |item: &str| {
2855                conn_catalog
2856                    .resolve_type(&PartialItemName {
2857                        database: None,
2858                        // All functions we check exist in PG, so the types must, as
2859                        // well
2860                        schema: Some(PG_CATALOG_SCHEMA.into()),
2861                        item: item.to_string(),
2862                    })
2863                    .expect("unable to resolve type")
2864                    .oid()
2865            };
2866
2867            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2868                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2869                .collect();
2870
2871            let mut all_oids = BTreeSet::new();
2872
2873            // A function to determine if two oids are equivalent enough for these tests. We don't
2874            // support some types, so map exceptions here.
2875            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2876                [
2877                    // We don't support NAME.
2878                    (Type::NAME, Type::TEXT),
2879                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2880                    // We don't support time with time zone.
2881                    (Type::TIME, Type::TIMETZ),
2882                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2883                ]
2884                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2885            );
2886            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2887                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2888            ]);
2889            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2890                if ignore_return_types.contains(&fn_oid) {
2891                    return true;
2892                }
2893                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2894                    return true;
2895                }
2896                a == b
2897            };
2898
2899            let builtins_cfg = BuiltinsConfig {
2900                include_continual_tasks: true,
2901            };
2902            for builtin in BUILTINS::iter(&builtins_cfg) {
2903                match builtin {
2904                    Builtin::Type(ty) => {
2905                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2906
2907                        if ty.oid >= FIRST_MATERIALIZE_OID {
2908                            // High OIDs are reserved in Materialize and don't have
2909                            // PostgreSQL counterparts.
2910                            continue;
2911                        }
2912
2913                        // For types that have a PostgreSQL counterpart, verify that
2914                        // the name and oid match.
2915                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2916                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2917                        });
2918                        assert_eq!(
2919                            ty.name, pg_ty.name,
2920                            "oid {} has name {} in postgres; expected {}",
2921                            ty.oid, pg_ty.name, ty.name,
2922                        );
2923
2924                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2925                            None => (0, 0),
2926                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2927                        };
2928                        assert_eq!(
2929                            typinput_oid, pg_ty.input,
2930                            "type {} has typinput OID {:?} in mz but {:?} in pg",
2931                            ty.name, typinput_oid, pg_ty.input,
2932                        );
2933                        assert_eq!(
2934                            typreceive_oid, pg_ty.receive,
2935                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
2936                            ty.name, typreceive_oid, pg_ty.receive,
2937                        );
2938                        if typinput_oid != 0 {
2939                            assert!(
2940                                func_oids.contains(&typinput_oid),
2941                                "type {} has typinput OID {} that does not exist in pg_proc",
2942                                ty.name,
2943                                typinput_oid,
2944                            );
2945                        }
2946                        if typreceive_oid != 0 {
2947                            assert!(
2948                                func_oids.contains(&typreceive_oid),
2949                                "type {} has typreceive OID {} that does not exist in pg_proc",
2950                                ty.name,
2951                                typreceive_oid,
2952                            );
2953                        }
2954
2955                        // Ensure the type matches.
2956                        match &ty.details.typ {
2957                            CatalogType::Array { element_reference } => {
2958                                let elem_ty = BUILTINS::iter(&builtins_cfg)
2959                                    .filter_map(|builtin| match builtin {
2960                                        Builtin::Type(ty @ BuiltinType { name, .. })
2961                                            if element_reference == name =>
2962                                        {
2963                                            Some(ty)
2964                                        }
2965                                        _ => None,
2966                                    })
2967                                    .next();
2968                                let elem_ty = match elem_ty {
2969                                    Some(ty) => ty,
2970                                    None => {
2971                                        panic!("{} is unexpectedly not a type", element_reference)
2972                                    }
2973                                };
2974                                assert_eq!(
2975                                    pg_ty.elem, elem_ty.oid,
2976                                    "type {} has mismatched element OIDs",
2977                                    ty.name
2978                                )
2979                            }
2980                            CatalogType::Pseudo => {
2981                                assert_eq!(
2982                                    pg_ty.ty, "p",
2983                                    "type {} is not a pseudo type as expected",
2984                                    ty.name
2985                                )
2986                            }
2987                            CatalogType::Range { .. } => {
2988                                assert_eq!(
2989                                    pg_ty.ty, "r",
2990                                    "type {} is not a range type as expected",
2991                                    ty.name
2992                                );
2993                            }
2994                            _ => {
2995                                assert_eq!(
2996                                    pg_ty.ty, "b",
2997                                    "type {} is not a base type as expected",
2998                                    ty.name
2999                                )
3000                            }
3001                        }
3002
3003                        // Ensure the array type reference is correct.
3004                        let schema = catalog
3005                            .resolve_schema_in_database(
3006                                &ResolvedDatabaseSpecifier::Ambient,
3007                                ty.schema,
3008                                &SYSTEM_CONN_ID,
3009                            )
3010                            .expect("unable to resolve schema");
3011                        let allocated_type = catalog
3012                            .resolve_type(
3013                                None,
3014                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3015                                &PartialItemName {
3016                                    database: None,
3017                                    schema: Some(schema.name().schema.clone()),
3018                                    item: ty.name.to_string(),
3019                                },
3020                                &SYSTEM_CONN_ID,
3021                            )
3022                            .expect("unable to resolve type");
3023                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3024                            ty
3025                        } else {
3026                            panic!("unexpectedly not a type")
3027                        };
3028                        match ty.details.array_id {
3029                            Some(array_id) => {
3030                                let array_ty = catalog.get_entry(&array_id);
3031                                assert_eq!(
3032                                    pg_ty.array, array_ty.oid,
3033                                    "type {} has mismatched array OIDs",
3034                                    allocated_type.name.item,
3035                                );
3036                            }
3037                            None => assert_eq!(
3038                                pg_ty.array, 0,
3039                                "type {} does not have an array type in mz but does in pg",
3040                                allocated_type.name.item,
3041                            ),
3042                        }
3043                    }
3044                    Builtin::Func(func) => {
3045                        for imp in func.inner.func_impls() {
3046                            assert!(
3047                                all_oids.insert(imp.oid),
3048                                "{} reused oid {}",
3049                                func.name,
3050                                imp.oid
3051                            );
3052
3053                            assert!(
3054                                imp.oid < FIRST_USER_OID,
3055                                "built-in function {} erroneously has OID in user space ({})",
3056                                func.name,
3057                                imp.oid,
3058                            );
3059
3060                            // For functions that have a postgres counterpart, verify that the name and
3061                            // oid match.
3062                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3063                                continue;
3064                            } else {
3065                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3066                                    panic!(
3067                                        "pg_proc missing function {}: oid {}",
3068                                        func.name, imp.oid
3069                                    )
3070                                })
3071                            };
3072                            assert_eq!(
3073                                func.name, pg_fn.name,
3074                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3075                                imp.oid, func.name, pg_fn.name
3076                            );
3077
3078                            // Complain, but don't fail, if argument oids don't match.
3079                            // TODO: make these match.
3080                            let imp_arg_oids = imp
3081                                .arg_typs
3082                                .iter()
3083                                .map(|item| resolve_type_oid(item))
3084                                .collect::<Vec<_>>();
3085
3086                            if imp_arg_oids != pg_fn.arg_oids {
3087                                println!(
3088                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3089                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3090                                );
3091                            }
3092
3093                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3094
3095                            assert!(
3096                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3097                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3098                                imp.oid,
3099                                func.name,
3100                                imp_return_oid,
3101                                pg_fn.ret_oid
3102                            );
3103
3104                            assert_eq!(
3105                                imp.return_is_set, pg_fn.ret_set,
3106                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3107                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3108                            );
3109                        }
3110                    }
3111                    _ => (),
3112                }
3113            }
3114
3115            for (op, func) in OP_IMPLS.iter() {
3116                for imp in func.func_impls() {
3117                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3118
3119                    // For operators that have a postgres counterpart, verify that the name and oid match.
3120                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3121                        continue;
3122                    } else {
3123                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3124                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3125                        })
3126                    };
3127
3128                    assert_eq!(*op, pg_op.name);
3129
3130                    let imp_return_oid =
3131                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3132                    if imp_return_oid != pg_op.oprresult {
3133                        panic!(
3134                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3135                            imp.oid, op, imp_return_oid, pg_op.oprresult
3136                        );
3137                    }
3138                }
3139            }
3140            catalog.expire().await;
3141        }
3142
3143        Catalog::with_debug(inner).await
3144    }
3145
3146    // Execute all builtin functions with all combinations of arguments from interesting datums.
3147    #[mz_ore::test(tokio::test)]
3148    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3149    async fn test_smoketest_all_builtins() {
3150        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3151            let catalog = Arc::new(catalog);
3152            let conn_catalog = catalog.for_system_session();
3153
3154            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3155            let mut handles = Vec::new();
3156
3157            // Extracted during planning; always panics when executed.
3158            let ignore_names = BTreeSet::from([
3159                "avg",
3160                "avg_internal_v1",
3161                "bool_and",
3162                "bool_or",
3163                "has_table_privilege", // > 3 s each
3164                "has_type_privilege",  // > 3 s each
3165                "mod",
3166                "mz_panic",
3167                "mz_sleep",
3168                "pow",
3169                "stddev_pop",
3170                "stddev_samp",
3171                "stddev",
3172                "var_pop",
3173                "var_samp",
3174                "variance",
3175            ]);
3176
3177            let fns = BUILTINS::funcs()
3178                .map(|func| (&func.name, func.inner))
3179                .chain(OP_IMPLS.iter());
3180
3181            for (name, func) in fns {
3182                if ignore_names.contains(name) {
3183                    continue;
3184                }
3185                let Func::Scalar(impls) = func else {
3186                    continue;
3187                };
3188
3189                'outer: for imp in impls {
3190                    let details = imp.details();
3191                    let mut styps = Vec::new();
3192                    for item in details.arg_typs.iter() {
3193                        let oid = resolve_type_oid(item);
3194                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3195                            continue 'outer;
3196                        };
3197                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3198                    }
3199                    let datums = styps
3200                        .iter()
3201                        .map(|styp| {
3202                            let mut datums = vec![Datum::Null];
3203                            datums.extend(styp.interesting_datums());
3204                            datums
3205                        })
3206                        .collect::<Vec<_>>();
3207                    // Skip nullary fns.
3208                    if datums.is_empty() {
3209                        continue;
3210                    }
3211
3212                    let return_oid = details
3213                        .return_typ
3214                        .map(resolve_type_oid)
3215                        .expect("must exist");
3216                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3217                        .ok()
3218                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3219
3220                    let mut idxs = vec![0; datums.len()];
3221                    while idxs[0] < datums[0].len() {
3222                        let mut args = Vec::with_capacity(idxs.len());
3223                        for i in 0..(datums.len()) {
3224                            args.push(datums[i][idxs[i]]);
3225                        }
3226
3227                        let op = &imp.op;
3228                        let scalars = args
3229                            .iter()
3230                            .enumerate()
3231                            .map(|(i, datum)| {
3232                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3233                                    datum.clone(),
3234                                    styps[i].clone(),
3235                                ))
3236                            })
3237                            .collect();
3238
3239                        let call_name = format!(
3240                            "{name}({}) (oid: {})",
3241                            args.iter()
3242                                .map(|d| d.to_string())
3243                                .collect::<Vec<_>>()
3244                                .join(", "),
3245                            imp.oid
3246                        );
3247                        let catalog = Arc::clone(&catalog);
3248                        let call_name_fn = call_name.clone();
3249                        let return_styp = return_styp.clone();
3250                        let handle = task::spawn_blocking(
3251                            || call_name,
3252                            move || {
3253                                smoketest_fn(
3254                                    name,
3255                                    call_name_fn,
3256                                    op,
3257                                    imp,
3258                                    args,
3259                                    catalog,
3260                                    scalars,
3261                                    return_styp,
3262                                )
3263                            },
3264                        );
3265                        handles.push(handle);
3266
3267                        // Advance to the next datum combination.
3268                        for i in (0..datums.len()).rev() {
3269                            idxs[i] += 1;
3270                            if idxs[i] >= datums[i].len() {
3271                                if i == 0 {
3272                                    break;
3273                                }
3274                                idxs[i] = 0;
3275                                continue;
3276                            } else {
3277                                break;
3278                            }
3279                        }
3280                    }
3281                }
3282            }
3283            handles
3284        }
3285
3286        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3287        for handle in handles {
3288            handle.await;
3289        }
3290    }
3291
3292    fn smoketest_fn(
3293        name: &&str,
3294        call_name: String,
3295        op: &Operation<HirScalarExpr>,
3296        imp: &FuncImpl<HirScalarExpr>,
3297        args: Vec<Datum<'_>>,
3298        catalog: Arc<Catalog>,
3299        scalars: Vec<CoercibleScalarExpr>,
3300        return_styp: Option<SqlScalarType>,
3301    ) {
3302        let conn_catalog = catalog.for_system_session();
3303        let pcx = PlanContext::zero();
3304        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3305        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3306        let ecx = ExprContext {
3307            qcx: &qcx,
3308            name: "smoketest",
3309            scope: &Scope::empty(),
3310            relation_type: &SqlRelationType::empty(),
3311            allow_aggregates: false,
3312            allow_subqueries: false,
3313            allow_parameters: false,
3314            allow_windows: false,
3315        };
3316        let arena = RowArena::new();
3317        let mut session = Session::dummy();
3318        session
3319            .start_transaction(to_datetime(0), None, None)
3320            .expect("must succeed");
3321        let prep_style = ExprPrepOneShot {
3322            logical_time: EvalTime::Time(Timestamp::MIN),
3323            session: &session,
3324            catalog_state: &catalog.state,
3325        };
3326
3327        // Execute the function as much as possible, ensuring no panics occur, but
3328        // otherwise ignoring eval errors. We also do various other checks.
3329        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3330        if let Ok(hir) = res {
3331            let uneliminated_result_row = {
3332                if let HirScalarExpr::CallUnary { func, .. } = &hir
3333                    && func.is_eliminable_cast()
3334                {
3335                    let mut uneliminated_mir = hir
3336                        .clone()
3337                        .lower_uncorrelated(HirToMirConfig {
3338                            enable_cast_elimination: false,
3339                            ..catalog.system_config().into()
3340                        })
3341                        .expect("lowering eliminable cast should always succeed");
3342                    prep_style
3343                        .prep_scalar_expr(&mut uneliminated_mir)
3344                        .expect("must succeed");
3345
3346                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3347                    uneliminated_mir
3348                        .eval(&[], &arena)
3349                        .ok()
3350                        .map(|datum| Row::pack([datum]))
3351                } else {
3352                    None
3353                }
3354            };
3355
3356            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3357                // Populate unmaterialized functions.
3358                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3359
3360                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3361                    if let Some(return_styp) = return_styp {
3362                        let mir_typ = mir.typ(&[]);
3363                        // MIR type inference should be consistent with the type
3364                        // we get from the catalog.
3365                        soft_assert_eq_or_log!(
3366                            mir_typ.scalar_type,
3367                            (&return_styp).into(),
3368                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3369                        );
3370                        // The following will check not just that the scalar type
3371                        // is ok, but also catches if the function returned a null
3372                        // but the MIR type inference said "non-nullable".
3373                        if !eval_result_datum.is_instance_of(&mir_typ) {
3374                            panic!(
3375                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3376                            );
3377                        }
3378                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3379                        if let Some(row) = uneliminated_result_row {
3380                            let uneliminated_result_datum = row.unpack_first();
3381                            assert_eq!(
3382                                uneliminated_result_datum, eval_result_datum,
3383                                "datums should not change if cast is eliminable"
3384                            );
3385                        }
3386                        // Check the consistency of `introduces_nulls` and
3387                        // `propagates_nulls` with `MirScalarExpr::typ`.
3388                        if let Some((introduces_nulls, propagates_nulls)) =
3389                            call_introduces_propagates_nulls(&mir)
3390                        {
3391                            if introduces_nulls {
3392                                // If the function introduces_nulls, then the return
3393                                // type should always be nullable, regardless of
3394                                // the nullability of the input types.
3395                                assert!(
3396                                    mir_typ.nullable,
3397                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3398                                    name, args, mir, mir_typ.nullable
3399                                );
3400                            } else {
3401                                let any_input_null = args.iter().any(|arg| arg.is_null());
3402                                if !any_input_null {
3403                                    assert!(
3404                                        !mir_typ.nullable,
3405                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3406                                        name, args, mir, mir_typ.nullable
3407                                    );
3408                                } else if propagates_nulls {
3409                                    // propagates_nulls means the optimizer short-circuits
3410                                    // all-null inputs, so the output must be nullable.
3411                                    assert!(
3412                                        mir_typ.nullable,
3413                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3414                                        name, args, mir, mir_typ.nullable
3415                                    );
3416                                }
3417                                // When propagates_nulls is false, the output may still
3418                                // be nullable if a non-nullable parameter received a null
3419                                // input (per-position null rejection). The is_instance_of
3420                                // check above ensures type consistency.
3421                            }
3422                        }
3423                        // Check that `MirScalarExpr::reduce` yields the same result
3424                        // as the real evaluation.
3425                        let mut reduced = mir.clone();
3426                        reduced.reduce(&[]);
3427                        match reduced {
3428                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3429                                match reduce_result {
3430                                    Ok(reduce_result_row) => {
3431                                        let reduce_result_datum = reduce_result_row.unpack_first();
3432                                        assert_eq!(
3433                                            reduce_result_datum,
3434                                            eval_result_datum,
3435                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3436                                            name,
3437                                            args,
3438                                            mir,
3439                                            eval_result_datum,
3440                                            mir_typ.scalar_type,
3441                                            reduce_result_datum,
3442                                            ctyp.scalar_type
3443                                        );
3444                                        // Let's check that the types also match.
3445                                        // (We are not checking nullability here,
3446                                        // because it's ok when we know a more
3447                                        // precise nullability after actually
3448                                        // evaluating a function than before.)
3449                                        assert_eq!(
3450                                            ctyp.scalar_type,
3451                                            mir_typ.scalar_type,
3452                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3453                                            name,
3454                                            args,
3455                                            mir,
3456                                            eval_result_datum,
3457                                            mir_typ.scalar_type,
3458                                            reduce_result_datum,
3459                                            ctyp.scalar_type
3460                                        );
3461                                    }
3462                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3463                                }
3464                            }
3465                            _ => unreachable!(
3466                                "all args are literals, so should have reduced to a literal"
3467                            ),
3468                        }
3469                    }
3470                }
3471            }
3472        }
3473    }
3474
3475    /// If the given MirScalarExpr
3476    ///  - is a function call, and
3477    ///  - all arguments are literals
3478    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3479    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3480        match mir_func_call {
3481            MirScalarExpr::CallUnary { func, expr } => {
3482                if expr.is_literal() {
3483                    Some((func.introduces_nulls(), func.propagates_nulls()))
3484                } else {
3485                    None
3486                }
3487            }
3488            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3489                if expr1.is_literal() && expr2.is_literal() {
3490                    Some((func.introduces_nulls(), func.propagates_nulls()))
3491                } else {
3492                    None
3493                }
3494            }
3495            MirScalarExpr::CallVariadic { func, exprs } => {
3496                if exprs.iter().all(|arg| arg.is_literal()) {
3497                    Some((func.introduces_nulls(), func.propagates_nulls()))
3498                } else {
3499                    None
3500                }
3501            }
3502            _ => None,
3503        }
3504    }
3505
3506    // Make sure pg views don't use types that only exist in Materialize.
3507    #[mz_ore::test(tokio::test)]
3508    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3509    async fn test_pg_views_forbidden_types() {
3510        Catalog::with_debug(|catalog| async move {
3511            let conn_catalog = catalog.for_system_session();
3512
3513            for view in BUILTINS::views().filter(|view| {
3514                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3515            }) {
3516                let item = conn_catalog
3517                    .resolve_item(&PartialItemName {
3518                        database: None,
3519                        schema: Some(view.schema.to_string()),
3520                        item: view.name.to_string(),
3521                    })
3522                    .expect("unable to resolve view")
3523                    // TODO(alter_table)
3524                    .at_version(RelationVersionSelector::Latest);
3525                let full_name = conn_catalog.resolve_full_name(item.name());
3526                let desc = item.relation_desc().expect("invalid item type");
3527                for col_type in desc.iter_types() {
3528                    match &col_type.scalar_type {
3529                        typ @ SqlScalarType::UInt16
3530                        | typ @ SqlScalarType::UInt32
3531                        | typ @ SqlScalarType::UInt64
3532                        | typ @ SqlScalarType::MzTimestamp
3533                        | typ @ SqlScalarType::List { .. }
3534                        | typ @ SqlScalarType::Map { .. }
3535                        | typ @ SqlScalarType::MzAclItem => {
3536                            panic!("{typ:?} type found in {full_name}");
3537                        }
3538                        SqlScalarType::AclItem
3539                        | SqlScalarType::Bool
3540                        | SqlScalarType::Int16
3541                        | SqlScalarType::Int32
3542                        | SqlScalarType::Int64
3543                        | SqlScalarType::Float32
3544                        | SqlScalarType::Float64
3545                        | SqlScalarType::Numeric { .. }
3546                        | SqlScalarType::Date
3547                        | SqlScalarType::Time
3548                        | SqlScalarType::Timestamp { .. }
3549                        | SqlScalarType::TimestampTz { .. }
3550                        | SqlScalarType::Interval
3551                        | SqlScalarType::PgLegacyChar
3552                        | SqlScalarType::Bytes
3553                        | SqlScalarType::String
3554                        | SqlScalarType::Char { .. }
3555                        | SqlScalarType::VarChar { .. }
3556                        | SqlScalarType::Jsonb
3557                        | SqlScalarType::Uuid
3558                        | SqlScalarType::Array(_)
3559                        | SqlScalarType::Record { .. }
3560                        | SqlScalarType::Oid
3561                        | SqlScalarType::RegProc
3562                        | SqlScalarType::RegType
3563                        | SqlScalarType::RegClass
3564                        | SqlScalarType::Int2Vector
3565                        | SqlScalarType::Range { .. }
3566                        | SqlScalarType::PgLegacyName => {}
3567                    }
3568                }
3569            }
3570            catalog.expire().await;
3571        })
3572        .await
3573    }
3574
3575    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3576    // introspection relations.
3577    #[mz_ore::test(tokio::test)]
3578    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3579    async fn test_mz_introspection_builtins() {
3580        Catalog::with_debug(|catalog| async move {
3581            let conn_catalog = catalog.for_system_session();
3582
3583            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3584            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3585
3586            for entry in catalog.entries() {
3587                let schema_spec = entry.name().qualifiers.schema_spec;
3588                let introspection_deps = catalog.introspection_dependencies(entry.id);
3589                if introspection_deps.is_empty() {
3590                    assert!(
3591                        schema_spec != introspection_schema_spec,
3592                        "entry does not depend on introspection sources but is in \
3593                         `mz_introspection`: {}",
3594                        conn_catalog.resolve_full_name(entry.name()),
3595                    );
3596                } else {
3597                    assert!(
3598                        schema_spec == introspection_schema_spec,
3599                        "entry depends on introspection sources but is not in \
3600                         `mz_introspection`: {}",
3601                        conn_catalog.resolve_full_name(entry.name()),
3602                    );
3603                }
3604            }
3605        })
3606        .await
3607    }
3608
3609    #[mz_ore::test(tokio::test)]
3610    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3611    async fn test_multi_subscriber_catalog() {
3612        let persist_client = PersistClient::new_for_tests().await;
3613        let bootstrap_args = test_bootstrap_args();
3614        let organization_id = Uuid::new_v4();
3615        let db_name = "DB";
3616
3617        let mut writer_catalog = Catalog::open_debug_catalog(
3618            persist_client.clone(),
3619            organization_id.clone(),
3620            &bootstrap_args,
3621        )
3622        .await
3623        .expect("open_debug_catalog");
3624        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3625            persist_client.clone(),
3626            organization_id.clone(),
3627            &bootstrap_args,
3628        )
3629        .await
3630        .expect("open_debug_read_only_catalog");
3631        assert_err!(writer_catalog.resolve_database(db_name));
3632        assert_err!(read_only_catalog.resolve_database(db_name));
3633
3634        let commit_ts = writer_catalog.current_upper().await;
3635        writer_catalog
3636            .transact(
3637                None,
3638                commit_ts,
3639                None,
3640                vec![Op::CreateDatabase {
3641                    name: db_name.to_string(),
3642                    owner_id: MZ_SYSTEM_ROLE_ID,
3643                }],
3644            )
3645            .await
3646            .expect("failed to transact");
3647
3648        let write_db = writer_catalog
3649            .resolve_database(db_name)
3650            .expect("resolve_database");
3651        read_only_catalog
3652            .sync_to_current_updates()
3653            .await
3654            .expect("sync_to_current_updates");
3655        let read_db = read_only_catalog
3656            .resolve_database(db_name)
3657            .expect("resolve_database");
3658
3659        assert_eq!(write_db, read_db);
3660
3661        let writer_catalog_fencer =
3662            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3663                .await
3664                .expect("open_debug_catalog for fencer");
3665        let fencer_db = writer_catalog_fencer
3666            .resolve_database(db_name)
3667            .expect("resolve_database for fencer");
3668        assert_eq!(fencer_db, read_db);
3669
3670        let write_fence_err = writer_catalog
3671            .sync_to_current_updates()
3672            .await
3673            .expect_err("sync_to_current_updates for fencer");
3674        assert!(matches!(
3675            write_fence_err,
3676            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3677        ));
3678        let read_fence_err = read_only_catalog
3679            .sync_to_current_updates()
3680            .await
3681            .expect_err("sync_to_current_updates after fencer");
3682        assert!(matches!(
3683            read_fence_err,
3684            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3685        ));
3686
3687        writer_catalog.expire().await;
3688        read_only_catalog.expire().await;
3689        writer_catalog_fencer.expire().await;
3690    }
3691}