Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, STORAGE_USAGE_ID_ALLOC_KEY, TestCatalogStateBuilder,
39    test_bootstrap_args,
40};
41use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
42use mz_catalog::memory::error::{Error, ErrorKind};
43use mz_catalog::memory::objects::{
44    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
45    NetworkPolicy, Role, RoleAuth, Schema,
46};
47use mz_compute_types::dataflows::DataflowDescription;
48use mz_controller::clusters::ReplicaLocation;
49use mz_controller_types::{ClusterId, ReplicaId};
50use mz_expr::OptimizedMirRelationExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68    SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89// DO NOT add any more imports from `crate` outside of `crate::catalog`.
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    expr_cache_handle: Option<ExpressionCacheHandle>,
140    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141    transient_revision: u64,
142}
143
144// Implement our own Clone because derive can't unless S is Clone, which it's
145// not (hence the Arc).
146impl Clone for Catalog {
147    fn clone(&self) -> Self {
148        Self {
149            state: self.state.clone(),
150            expr_cache_handle: self.expr_cache_handle.clone(),
151            storage: Arc::clone(&self.storage),
152            transient_revision: self.transient_revision,
153        }
154    }
155}
156
157impl Catalog {
158    /// Set the optimized plan for the item identified by `id`.
159    ///
160    /// # Panics
161    /// If the item is not an `Index`, `MaterializedView`, or
162    /// `ContinualTask`.
163    #[mz_ore::instrument(level = "trace")]
164    pub fn set_optimized_plan(
165        &mut self,
166        id: GlobalId,
167        plan: DataflowDescription<OptimizedMirRelationExpr>,
168    ) {
169        self.state.set_optimized_plan(id, plan);
170    }
171
172    /// Set the physical plan for the item identified by `id`.
173    ///
174    /// # Panics
175    /// If the item is not an `Index`, `MaterializedView`, or
176    /// `ContinualTask`.
177    #[mz_ore::instrument(level = "trace")]
178    pub fn set_physical_plan(
179        &mut self,
180        id: GlobalId,
181        plan: DataflowDescription<mz_compute_types::plan::Plan>,
182    ) {
183        self.state.set_physical_plan(id, plan);
184    }
185
186    /// Try to get the optimized plan for the item identified by `id`.
187    #[mz_ore::instrument(level = "trace")]
188    pub fn try_get_optimized_plan(
189        &self,
190        id: &GlobalId,
191    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192        let entry = self.state.try_get_entry_by_global_id(id)?;
193        entry.item().optimized_plan().map(AsRef::as_ref)
194    }
195
196    /// Try to get the physical plan for the item identified by `id`.
197    #[mz_ore::instrument(level = "trace")]
198    pub fn try_get_physical_plan(
199        &self,
200        id: &GlobalId,
201    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202        let entry = self.state.try_get_entry_by_global_id(id)?;
203        entry.item().physical_plan().map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    ///
208    /// # Panics
209    /// If the item is not an `Index`, `MaterializedView`, or
210    /// `ContinualTask`.
211    #[mz_ore::instrument(level = "trace")]
212    pub fn set_dataflow_metainfo(
213        &mut self,
214        id: GlobalId,
215        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216    ) {
217        self.state.set_dataflow_metainfo(id, metainfo);
218    }
219
220    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
221    #[mz_ore::instrument(level = "trace")]
222    pub fn try_get_dataflow_metainfo(
223        &self,
224        id: &GlobalId,
225    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226        let entry = self.state.try_get_entry_by_global_id(id)?;
227        entry.item().dataflow_metainfo()
228    }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233    state: Cow<'a, CatalogState>,
234    /// Because we don't have any way of removing items from the catalog
235    /// temporarily, we allow the ConnCatalog to pretend that a set of items
236    /// don't exist during resolution.
237    ///
238    /// This feature is necessary to allow re-planning of statements, which is
239    /// either incredibly useful or required when altering item definitions.
240    ///
241    /// Note that uses of this field should be used by short-lived
242    /// catalogs.
243    unresolvable_ids: BTreeSet<CatalogItemId>,
244    conn_id: ConnectionId,
245    cluster: String,
246    database: Option<DatabaseId>,
247    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248    role_id: RoleId,
249    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250    portals: Option<&'a BTreeMap<String, Portal>>,
251    notices_tx: UnboundedSender<AdapterNotice>,
252    restrict_to_user_objects: bool,
253}
254
255impl ConnCatalog<'_> {
256    pub fn conn_id(&self) -> &ConnectionId {
257        &self.conn_id
258    }
259
260    pub fn state(&self) -> &CatalogState {
261        &*self.state
262    }
263
264    /// Prevent planning from resolving item with the provided ID. Instead,
265    /// return an error as if the item did not exist.
266    ///
267    /// This feature is meant exclusively to permit re-planning statements
268    /// during update operations and should not be used otherwise given its
269    /// extremely "powerful" semantics.
270    ///
271    /// # Panics
272    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
273    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
274        assert_eq!(
275            self.role_id, MZ_SYSTEM_ROLE_ID,
276            "only the system role can mark IDs unresolvable",
277        );
278        self.unresolvable_ids.insert(id);
279    }
280
281    /// Returns the schemas:
282    /// - mz_catalog
283    /// - pg_catalog
284    /// - temp (if requested)
285    /// - all schemas from the session's search_path var that exist
286    pub fn effective_search_path(
287        &self,
288        include_temp_schema: bool,
289    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
290        self.state
291            .effective_search_path(&self.search_path, include_temp_schema)
292    }
293}
294
295impl ConnectionResolver for ConnCatalog<'_> {
296    fn resolve_connection(
297        &self,
298        id: CatalogItemId,
299    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
300        self.state().resolve_connection(id)
301    }
302}
303
304impl Catalog {
305    /// Returns the catalog's transient revision, which starts at 1 and is
306    /// incremented on every change. This is not persisted to disk, and will
307    /// restart on every load.
308    pub fn transient_revision(&self) -> u64 {
309        self.transient_revision
310    }
311
312    /// Creates a debug catalog from the current
313    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
314    /// like in tests.
315    ///
316    /// WARNING! This function can arbitrarily fail because it does not make any
317    /// effort to adjust the catalog's contents' structure or semantics to the
318    /// currently running version, i.e. it does not apply any migrations.
319    ///
320    /// This function must not be called in production contexts. Use
321    /// [`Catalog::open`] with appropriately set configuration parameters
322    /// instead.
323    pub async fn with_debug<F, Fut, T>(f: F) -> T
324    where
325        F: FnOnce(Catalog) -> Fut,
326        Fut: Future<Output = T>,
327    {
328        let persist_client = PersistClient::new_for_tests().await;
329        let organization_id = Uuid::new_v4();
330        let bootstrap_args = test_bootstrap_args();
331        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
332            .await
333            .expect("can open debug catalog");
334        f(catalog).await
335    }
336
337    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
338    /// in progress.
339    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
340    where
341        F: FnOnce(Catalog) -> Fut,
342        Fut: Future<Output = T>,
343    {
344        let persist_client = PersistClient::new_for_tests().await;
345        let organization_id = Uuid::new_v4();
346        let bootstrap_args = test_bootstrap_args();
347        let mut catalog =
348            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
349                .await
350                .expect("can open debug catalog");
351
352        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
353        let now = SYSTEM_TIME.clone();
354        let openable_storage = TestCatalogStateBuilder::new(persist_client)
355            .with_organization_id(organization_id)
356            .with_default_deploy_generation()
357            .build()
358            .await
359            .expect("can create durable catalog");
360        let mut storage = openable_storage
361            .open(now().into(), &bootstrap_args)
362            .await
363            .expect("can open durable catalog")
364            .0;
365        // Drain updates.
366        let _ = storage
367            .sync_to_current_updates()
368            .await
369            .expect("can sync to current updates");
370        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
371
372        f(catalog).await
373    }
374
375    /// Opens a debug catalog.
376    ///
377    /// See [`Catalog::with_debug`].
378    pub async fn open_debug_catalog(
379        persist_client: PersistClient,
380        organization_id: Uuid,
381        bootstrap_args: &BootstrapArgs,
382    ) -> Result<Catalog, anyhow::Error> {
383        let now = SYSTEM_TIME.clone();
384        let environment_id = None;
385        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
386            .with_organization_id(organization_id)
387            .with_default_deploy_generation()
388            .build()
389            .await?;
390        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
391        let system_parameter_defaults = BTreeMap::default();
392        Self::open_debug_catalog_inner(
393            persist_client,
394            storage,
395            now,
396            environment_id,
397            &DUMMY_BUILD_INFO,
398            system_parameter_defaults,
399            bootstrap_args,
400            None,
401        )
402        .await
403    }
404
405    /// Opens a read only debug persist backed catalog defined by `persist_client` and
406    /// `organization_id`.
407    ///
408    /// See [`Catalog::with_debug`].
409    pub async fn open_debug_read_only_catalog(
410        persist_client: PersistClient,
411        organization_id: Uuid,
412        bootstrap_args: &BootstrapArgs,
413    ) -> Result<Catalog, anyhow::Error> {
414        let now = SYSTEM_TIME.clone();
415        let environment_id = None;
416        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
417            .with_organization_id(organization_id)
418            .build()
419            .await?;
420        let storage = openable_storage
421            .open_read_only(&test_bootstrap_args())
422            .await?;
423        let system_parameter_defaults = BTreeMap::default();
424        Self::open_debug_catalog_inner(
425            persist_client,
426            storage,
427            now,
428            environment_id,
429            &DUMMY_BUILD_INFO,
430            system_parameter_defaults,
431            bootstrap_args,
432            None,
433        )
434        .await
435    }
436
437    /// Opens a read only debug persist backed catalog defined by `persist_client` and
438    /// `organization_id`.
439    ///
440    /// See [`Catalog::with_debug`].
441    pub async fn open_debug_read_only_persist_catalog_config(
442        persist_client: PersistClient,
443        now: NowFn,
444        environment_id: EnvironmentId,
445        system_parameter_defaults: BTreeMap<String, String>,
446        build_info: &'static BuildInfo,
447        bootstrap_args: &BootstrapArgs,
448        enable_expression_cache_override: Option<bool>,
449    ) -> Result<Catalog, anyhow::Error> {
450        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
451            .with_organization_id(environment_id.organization_id())
452            .with_version(
453                build_info
454                    .version
455                    .parse()
456                    .expect("build version is parseable"),
457            )
458            .build()
459            .await?;
460        let storage = openable_storage.open_read_only(bootstrap_args).await?;
461        Self::open_debug_catalog_inner(
462            persist_client,
463            storage,
464            now,
465            Some(environment_id),
466            build_info,
467            system_parameter_defaults,
468            bootstrap_args,
469            enable_expression_cache_override,
470        )
471        .await
472    }
473
474    async fn open_debug_catalog_inner(
475        persist_client: PersistClient,
476        storage: Box<dyn DurableCatalogState>,
477        now: NowFn,
478        environment_id: Option<EnvironmentId>,
479        build_info: &'static BuildInfo,
480        system_parameter_defaults: BTreeMap<String, String>,
481        bootstrap_args: &BootstrapArgs,
482        enable_expression_cache_override: Option<bool>,
483    ) -> Result<Catalog, anyhow::Error> {
484        let metrics_registry = &MetricsRegistry::new();
485        let secrets_reader = Arc::new(InMemorySecretsController::new());
486        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
487        // debugging/testing.
488        let previous_ts = now().into();
489        let replica_size = &bootstrap_args.default_cluster_replica_size;
490        let read_only = false;
491
492        let OpenCatalogResult {
493            catalog,
494            migrated_storage_collections_0dt: _,
495            new_builtin_collections: _,
496            builtin_table_updates: _,
497            cached_global_exprs: _,
498            uncached_local_exprs: _,
499        } = Catalog::open(Config {
500            storage,
501            metrics_registry,
502            state: StateConfig {
503                unsafe_mode: true,
504                all_features: false,
505                build_info,
506                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
507                read_only,
508                now,
509                boot_ts: previous_ts,
510                skip_migrations: true,
511                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
512                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
513                    size: replica_size.clone(),
514                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
515                },
516                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
517                    size: replica_size.clone(),
518                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
519                },
520                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
521                    size: replica_size.clone(),
522                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
523                },
524                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
525                    size: replica_size.clone(),
526                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
527                },
528                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
529                    size: replica_size.clone(),
530                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
531                },
532                system_parameter_defaults,
533                remote_system_parameters: None,
534                availability_zones: vec![],
535                egress_addresses: vec![],
536                aws_principal_context: None,
537                aws_privatelink_availability_zones: None,
538                http_host_name: None,
539                connection_context: ConnectionContext::for_tests(secrets_reader),
540                builtin_item_migration_config: BuiltinItemMigrationConfig {
541                    persist_client: persist_client.clone(),
542                    read_only,
543                    force_migration: None,
544                },
545                persist_client,
546                enable_expression_cache_override,
547                helm_chart_version: None,
548                external_login_password_mz_system: None,
549                license_key: ValidatedLicenseKey::for_tests(),
550            },
551        })
552        .await?;
553        Ok(catalog)
554    }
555
556    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
557        self.state.for_session(session)
558    }
559
560    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
561        self.state.for_sessionless_user(role_id)
562    }
563
564    pub fn for_system_session(&self) -> ConnCatalog<'_> {
565        self.state.for_system_session()
566    }
567
568    async fn storage<'a>(
569        &'a self,
570    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
571        self.storage.lock().await
572    }
573
574    pub async fn current_upper(&self) -> mz_repr::Timestamp {
575        self.storage().await.current_upper().await
576    }
577
578    pub async fn allocate_user_id(
579        &self,
580        commit_ts: mz_repr::Timestamp,
581    ) -> Result<(CatalogItemId, GlobalId), Error> {
582        self.storage()
583            .await
584            .allocate_user_id(commit_ts)
585            .await
586            .maybe_terminate("allocating user ids")
587            .err_into()
588    }
589
590    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
591    pub async fn allocate_user_ids(
592        &self,
593        amount: u64,
594        commit_ts: mz_repr::Timestamp,
595    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
596        self.storage()
597            .await
598            .allocate_user_ids(amount, commit_ts)
599            .await
600            .maybe_terminate("allocating user ids")
601            .err_into()
602    }
603
604    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
605        let commit_ts = self.storage().await.current_upper().await;
606        self.allocate_user_id(commit_ts).await
607    }
608
609    /// Allocates a single durable id for a storage usage collection batch.
610    ///
611    /// Bumps the durable `STORAGE_USAGE_ID_ALLOC_KEY` allocator by one and
612    /// returns the previous value. The bump is committed at `commit_ts`.
613    /// One id is shared by every row produced by a collection cycle (see
614    /// `Coordinator::storage_usage_update`), so the durable cost is one
615    /// allocator round-trip per cycle, not per shard.
616    pub async fn allocate_storage_usage_id(
617        &self,
618        commit_ts: mz_repr::Timestamp,
619    ) -> Result<u64, Error> {
620        use mz_ore::collections::CollectionExt;
621
622        self.storage()
623            .await
624            .allocate_id(STORAGE_USAGE_ID_ALLOC_KEY, 1, commit_ts)
625            .await
626            .maybe_terminate("allocating storage usage id")
627            .map(|ids| ids.into_element())
628            .err_into()
629    }
630
631    /// Get the next user item ID without allocating it.
632    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
633        self.storage()
634            .await
635            .get_next_user_item_id()
636            .await
637            .err_into()
638    }
639
640    #[cfg(test)]
641    pub async fn allocate_system_id(
642        &self,
643        commit_ts: mz_repr::Timestamp,
644    ) -> Result<(CatalogItemId, GlobalId), Error> {
645        use mz_ore::collections::CollectionExt;
646
647        let mut storage = self.storage().await;
648        let mut txn = storage.transaction().await?;
649        let id = txn
650            .allocate_system_item_ids(1)
651            .maybe_terminate("allocating system ids")?
652            .into_element();
653        // Drain transaction.
654        let _ = txn.get_and_commit_op_updates();
655        txn.commit(commit_ts).await?;
656        Ok(id)
657    }
658
659    /// Get the next system item ID without allocating it.
660    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
661        self.storage()
662            .await
663            .get_next_system_item_id()
664            .await
665            .err_into()
666    }
667
668    pub async fn allocate_user_cluster_id(
669        &self,
670        commit_ts: mz_repr::Timestamp,
671    ) -> Result<ClusterId, Error> {
672        self.storage()
673            .await
674            .allocate_user_cluster_id(commit_ts)
675            .await
676            .maybe_terminate("allocating user cluster ids")
677            .err_into()
678    }
679
680    /// Get the next system replica id without allocating it.
681    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
682        self.storage()
683            .await
684            .get_next_system_replica_id()
685            .await
686            .err_into()
687    }
688
689    /// Get the next user replica id without allocating it.
690    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
691        self.storage()
692            .await
693            .get_next_user_replica_id()
694            .await
695            .err_into()
696    }
697
698    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
699        self.state.resolve_database(database_name)
700    }
701
702    pub fn resolve_schema(
703        &self,
704        current_database: Option<&DatabaseId>,
705        database_name: Option<&str>,
706        schema_name: &str,
707        conn_id: &ConnectionId,
708    ) -> Result<&Schema, SqlCatalogError> {
709        self.state
710            .resolve_schema(current_database, database_name, schema_name, conn_id)
711    }
712
713    pub fn resolve_schema_in_database(
714        &self,
715        database_spec: &ResolvedDatabaseSpecifier,
716        schema_name: &str,
717        conn_id: &ConnectionId,
718    ) -> Result<&Schema, SqlCatalogError> {
719        self.state
720            .resolve_schema_in_database(database_spec, schema_name, conn_id)
721    }
722
723    pub fn resolve_replica_in_cluster(
724        &self,
725        cluster_id: &ClusterId,
726        replica_name: &str,
727    ) -> Result<&ClusterReplica, SqlCatalogError> {
728        self.state
729            .resolve_replica_in_cluster(cluster_id, replica_name)
730    }
731
732    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
733        self.state.resolve_system_schema(name)
734    }
735
736    pub fn resolve_search_path(
737        &self,
738        session: &Session,
739    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
740        self.state.resolve_search_path(session)
741    }
742
743    /// Resolves `name` to a non-function [`CatalogEntry`].
744    pub fn resolve_entry(
745        &self,
746        current_database: Option<&DatabaseId>,
747        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
748        name: &PartialItemName,
749        conn_id: &ConnectionId,
750    ) -> Result<&CatalogEntry, SqlCatalogError> {
751        self.state
752            .resolve_entry(current_database, search_path, name, conn_id)
753    }
754
755    /// Resolves a `BuiltinTable`.
756    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
757        self.state.resolve_builtin_table(builtin)
758    }
759
760    /// Resolves a `BuiltinLog`.
761    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
762        self.state.resolve_builtin_log(builtin).0
763    }
764
765    /// Resolves a `BuiltinSource`.
766    pub fn resolve_builtin_storage_collection(
767        &self,
768        builtin: &'static BuiltinSource,
769    ) -> CatalogItemId {
770        self.state.resolve_builtin_source(builtin)
771    }
772
773    /// Resolves `name` to a function [`CatalogEntry`].
774    pub fn resolve_function(
775        &self,
776        current_database: Option<&DatabaseId>,
777        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
778        name: &PartialItemName,
779        conn_id: &ConnectionId,
780    ) -> Result<&CatalogEntry, SqlCatalogError> {
781        self.state
782            .resolve_function(current_database, search_path, name, conn_id)
783    }
784
785    /// Resolves `name` to a type [`CatalogEntry`].
786    pub fn resolve_type(
787        &self,
788        current_database: Option<&DatabaseId>,
789        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
790        name: &PartialItemName,
791        conn_id: &ConnectionId,
792    ) -> Result<&CatalogEntry, SqlCatalogError> {
793        self.state
794            .resolve_type(current_database, search_path, name, conn_id)
795    }
796
797    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
798        self.state.resolve_cluster(name)
799    }
800
801    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
802    ///
803    /// # Panics
804    /// * If the [`BuiltinCluster`] doesn't exist.
805    ///
806    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
807        self.state.resolve_builtin_cluster(cluster)
808    }
809
810    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
811        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
812    }
813
814    /// Resolves a [`Cluster`] for a TargetCluster.
815    pub fn resolve_target_cluster(
816        &self,
817        target_cluster: TargetCluster,
818        session: &Session,
819    ) -> Result<&Cluster, AdapterError> {
820        match target_cluster {
821            TargetCluster::CatalogServer => {
822                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
823            }
824            TargetCluster::Active => self.active_cluster(session),
825            TargetCluster::Transaction(cluster_id) => self
826                .try_get_cluster(cluster_id)
827                .ok_or(AdapterError::ConcurrentClusterDrop),
828        }
829    }
830
831    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
832        // TODO(benesch): this check here is not sufficiently protective. It'd
833        // be very easy for a code path to accidentally avoid this check by
834        // calling `resolve_cluster(session.vars().cluster())`.
835        if session.user().name != SYSTEM_USER.name
836            && session.user().name != SUPPORT_USER.name
837            && session.vars().cluster() == SYSTEM_USER.name
838        {
839            coord_bail!(
840                "system cluster '{}' cannot execute user queries",
841                SYSTEM_USER.name
842            );
843        }
844        let cluster = self.resolve_cluster(session.vars().cluster())?;
845        Ok(cluster)
846    }
847
848    pub fn state(&self) -> &CatalogState {
849        &self.state
850    }
851
852    pub fn resolve_full_name(
853        &self,
854        name: &QualifiedItemName,
855        conn_id: Option<&ConnectionId>,
856    ) -> FullItemName {
857        self.state.resolve_full_name(name, conn_id)
858    }
859
860    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
861        self.state.try_get_entry(id)
862    }
863
864    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
865        self.state.try_get_entry_by_global_id(id)
866    }
867
868    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
869        self.state.get_entry(id)
870    }
871
872    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
873        self.state.get_entry_by_global_id(id)
874    }
875
876    pub fn get_global_ids<'a>(
877        &'a self,
878        id: &CatalogItemId,
879    ) -> impl Iterator<Item = GlobalId> + use<'a> {
880        self.get_entry(id).global_ids()
881    }
882
883    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
884        self.get_entry_by_global_id(id).id()
885    }
886
887    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
888        let item = self.try_get_entry_by_global_id(id)?;
889        Some(item.id())
890    }
891
892    pub fn get_schema(
893        &self,
894        database_spec: &ResolvedDatabaseSpecifier,
895        schema_spec: &SchemaSpecifier,
896        conn_id: &ConnectionId,
897    ) -> &Schema {
898        self.state.get_schema(database_spec, schema_spec, conn_id)
899    }
900
901    pub fn try_get_schema(
902        &self,
903        database_spec: &ResolvedDatabaseSpecifier,
904        schema_spec: &SchemaSpecifier,
905        conn_id: &ConnectionId,
906    ) -> Option<&Schema> {
907        self.state
908            .try_get_schema(database_spec, schema_spec, conn_id)
909    }
910
911    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
912        self.state.get_mz_catalog_schema_id()
913    }
914
915    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
916        self.state.get_pg_catalog_schema_id()
917    }
918
919    pub fn get_information_schema_id(&self) -> SchemaId {
920        self.state.get_information_schema_id()
921    }
922
923    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
924        self.state.get_mz_internal_schema_id()
925    }
926
927    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
928        self.state.get_mz_introspection_schema_id()
929    }
930
931    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
932        self.state.get_mz_unsafe_schema_id()
933    }
934
935    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
936        self.state.system_schema_ids()
937    }
938
939    pub fn get_database(&self, id: &DatabaseId) -> &Database {
940        self.state.get_database(id)
941    }
942
943    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
944        self.state.try_get_role(id)
945    }
946
947    pub fn get_role(&self, id: &RoleId) -> &Role {
948        self.state.get_role(id)
949    }
950
951    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
952        self.state.try_get_role_by_name(role_name)
953    }
954
955    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
956        self.state.try_get_role_auth_by_id(id)
957    }
958
959    /// Creates a new schema in the `Catalog` for temporary items
960    /// indicated by the TEMPORARY or TEMP keywords.
961    pub fn create_temporary_schema(
962        &mut self,
963        conn_id: &ConnectionId,
964        owner_id: RoleId,
965    ) -> Result<(), Error> {
966        self.state.create_temporary_schema(conn_id, owner_id)
967    }
968
969    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
970        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
971        self.state
972            .temporary_schemas
973            .get(conn_id)
974            .map(|schema| schema.items.contains_key(item_name))
975            .unwrap_or(false)
976    }
977
978    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
979    /// Returns Ok if conn_id's temp schema does not exist.
980    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
981        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
982            return Ok(());
983        };
984        if !schema.items.is_empty() {
985            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
986        }
987        Ok(())
988    }
989
990    pub(crate) fn object_dependents(
991        &self,
992        object_ids: &Vec<ObjectId>,
993        conn_id: &ConnectionId,
994    ) -> Vec<ObjectId> {
995        let mut seen = BTreeSet::new();
996        self.state.object_dependents(object_ids, conn_id, &mut seen)
997    }
998
999    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1000        FullNameV1 {
1001            database: name.database.to_string(),
1002            schema: name.schema.clone(),
1003            item: name.item.clone(),
1004        }
1005    }
1006
1007    pub fn find_available_cluster_name(&self, name: &str) -> String {
1008        let mut i = 0;
1009        let mut candidate = name.to_string();
1010        while self.state.clusters_by_name.contains_key(&candidate) {
1011            i += 1;
1012            candidate = format!("{}{}", name, i);
1013        }
1014        candidate
1015    }
1016
1017    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1018        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1019            self.cluster_replica_sizes()
1020                .enabled_allocations()
1021                .map(|a| a.0.to_owned())
1022                .collect::<Vec<_>>()
1023        } else {
1024            self.system_config().allowed_cluster_replica_sizes()
1025        }
1026    }
1027
1028    pub fn concretize_replica_location(
1029        &self,
1030        location: mz_catalog::durable::ReplicaLocation,
1031        allowed_sizes: &Vec<String>,
1032        allowed_availability_zones: Option<&[String]>,
1033    ) -> Result<ReplicaLocation, Error> {
1034        self.state
1035            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1036    }
1037
1038    pub(crate) fn ensure_valid_replica_size(
1039        &self,
1040        allowed_sizes: &[String],
1041        size: &String,
1042    ) -> Result<(), Error> {
1043        self.state.ensure_valid_replica_size(allowed_sizes, size)
1044    }
1045
1046    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1047        &self.state.cluster_replica_sizes
1048    }
1049
1050    /// Returns the privileges of an object by its ID.
1051    pub fn get_privileges(
1052        &self,
1053        id: &SystemObjectId,
1054        conn_id: &ConnectionId,
1055    ) -> Option<&PrivilegeMap> {
1056        match id {
1057            SystemObjectId::Object(id) => match id {
1058                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1059                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1060                ObjectId::Schema((database_spec, schema_spec)) => Some(
1061                    self.get_schema(database_spec, schema_spec, conn_id)
1062                        .privileges(),
1063                ),
1064                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1065                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1066                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1067            },
1068            SystemObjectId::System => Some(&self.state.system_privileges),
1069        }
1070    }
1071
1072    #[mz_ore::instrument(level = "debug")]
1073    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1074        Ok(self.storage().await.advance_upper(new_upper).await?)
1075    }
1076
1077    /// Return the ids of all log sources the given object depends on.
1078    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1079        self.state.introspection_dependencies(id)
1080    }
1081
1082    /// Serializes the catalog's in-memory state.
1083    ///
1084    /// There are no guarantees about the format of the serialized state, except
1085    /// that the serialized state for two identical catalogs will compare
1086    /// identically.
1087    pub fn dump(&self) -> Result<CatalogDump, Error> {
1088        Ok(CatalogDump::new(self.state.dump(None)?))
1089    }
1090
1091    /// Checks the [`Catalog`]s internal consistency.
1092    ///
1093    /// Returns a JSON object describing the inconsistencies, if there are any.
1094    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1095        self.state.check_consistency().map_err(|inconsistencies| {
1096            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1097                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1098            })
1099        })
1100    }
1101
1102    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1103        self.state.config()
1104    }
1105
1106    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1107        self.state.entry_by_id.values()
1108    }
1109
1110    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1111        self.entries()
1112            .filter(|entry| entry.is_connection() && entry.id().is_user())
1113    }
1114
1115    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1116        self.entries()
1117            .filter(|entry| entry.is_table() && entry.id().is_user())
1118    }
1119
1120    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1121        self.entries()
1122            .filter(|entry| entry.is_source() && entry.id().is_user())
1123    }
1124
1125    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1126        self.entries()
1127            .filter(|entry| entry.is_sink() && entry.id().is_user())
1128    }
1129
1130    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1131        self.entries()
1132            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1133    }
1134
1135    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1136        self.entries()
1137            .filter(|entry| entry.is_secret() && entry.id().is_user())
1138    }
1139
1140    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1141        self.state.get_network_policy(&network_policy_id)
1142    }
1143
1144    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1145        self.state.try_get_network_policy_by_name(name)
1146    }
1147
1148    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1149        self.state.clusters_by_id.values()
1150    }
1151
1152    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1153        self.state.get_cluster(cluster_id)
1154    }
1155
1156    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1157        self.state.try_get_cluster(cluster_id)
1158    }
1159
1160    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1161        self.clusters().filter(|cluster| cluster.id.is_user())
1162    }
1163
1164    pub fn get_cluster_replica(
1165        &self,
1166        cluster_id: ClusterId,
1167        replica_id: ReplicaId,
1168    ) -> &ClusterReplica {
1169        self.state.get_cluster_replica(cluster_id, replica_id)
1170    }
1171
1172    pub fn try_get_cluster_replica(
1173        &self,
1174        cluster_id: ClusterId,
1175        replica_id: ReplicaId,
1176    ) -> Option<&ClusterReplica> {
1177        self.state.try_get_cluster_replica(cluster_id, replica_id)
1178    }
1179
1180    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1181        self.user_clusters()
1182            .flat_map(|cluster| cluster.user_replicas())
1183    }
1184
1185    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1186        self.state.database_by_id.values()
1187    }
1188
1189    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1190        self.state
1191            .roles_by_id
1192            .values()
1193            .filter(|role| role.is_user())
1194    }
1195
1196    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1197        self.state
1198            .network_policies_by_id
1199            .iter()
1200            .filter(|(id, _)| id.is_user())
1201            .map(|(_, policy)| policy)
1202    }
1203
1204    pub fn system_privileges(&self) -> &PrivilegeMap {
1205        &self.state.system_privileges
1206    }
1207
1208    pub fn default_privileges(
1209        &self,
1210    ) -> impl Iterator<
1211        Item = (
1212            &DefaultPrivilegeObject,
1213            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1214        ),
1215    > {
1216        self.state.default_privileges.iter()
1217    }
1218
1219    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1220        self.state
1221            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1222    }
1223
1224    pub fn pack_storage_usage_update(
1225        &self,
1226        event: VersionedStorageUsage,
1227        diff: Diff,
1228    ) -> BuiltinTableUpdate {
1229        self.state
1230            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1231    }
1232
1233    pub fn system_config(&self) -> &SystemVars {
1234        self.state.system_config()
1235    }
1236
1237    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1238        self.state.system_config_mut()
1239    }
1240
1241    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1242        self.state.ensure_not_reserved_role(role_id)
1243    }
1244
1245    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1246        self.state.ensure_grantable_role(role_id)
1247    }
1248
1249    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1250        self.state.ensure_not_system_role(role_id)
1251    }
1252
1253    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1254        self.state.ensure_not_predefined_role(role_id)
1255    }
1256
1257    pub fn ensure_not_reserved_network_policy(
1258        &self,
1259        network_policy_id: &NetworkPolicyId,
1260    ) -> Result<(), Error> {
1261        self.state
1262            .ensure_not_reserved_network_policy(network_policy_id)
1263    }
1264
1265    pub fn ensure_not_reserved_object(
1266        &self,
1267        object_id: &ObjectId,
1268        conn_id: &ConnectionId,
1269    ) -> Result<(), Error> {
1270        match object_id {
1271            ObjectId::Cluster(cluster_id) => {
1272                if cluster_id.is_system() {
1273                    let cluster = self.get_cluster(*cluster_id);
1274                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1275                        cluster.name().to_string(),
1276                    )))
1277                } else {
1278                    Ok(())
1279                }
1280            }
1281            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1282                if replica_id.is_system() {
1283                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1284                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1285                        replica.name().to_string(),
1286                    )))
1287                } else {
1288                    Ok(())
1289                }
1290            }
1291            ObjectId::Database(database_id) => {
1292                if database_id.is_system() {
1293                    let database = self.get_database(database_id);
1294                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1295                        database.name().to_string(),
1296                    )))
1297                } else {
1298                    Ok(())
1299                }
1300            }
1301            ObjectId::Schema((database_spec, schema_spec)) => {
1302                if schema_spec.is_system() {
1303                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1304                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1305                        schema.name().schema.clone(),
1306                    )))
1307                } else {
1308                    Ok(())
1309                }
1310            }
1311            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1312            ObjectId::Item(item_id) => {
1313                if item_id.is_system() {
1314                    let item = self.get_entry(item_id);
1315                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1316                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1317                } else {
1318                    Ok(())
1319                }
1320            }
1321            ObjectId::NetworkPolicy(network_policy_id) => {
1322                self.ensure_not_reserved_network_policy(network_policy_id)
1323            }
1324        }
1325    }
1326
1327    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1328    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1329        &mut self,
1330        create_sql: &str,
1331        force_if_exists_skip: bool,
1332    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1333        self.state
1334            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1335    }
1336
1337    /// Cache global and, optionally, local expressions for the given
1338    /// `GlobalId`.
1339    ///
1340    /// Takes the plans and metainfo directly as parameters (rather than
1341    /// fishing them out of catalog state), so this can be called **before**
1342    /// the catalog transaction that creates the item. Returns the future
1343    /// returned by [`Catalog::update_expression_cache`]; callers should
1344    /// `.await` it before the catalog transaction commits, so the durable
1345    /// expression cache is observed to contain the entries by the time any
1346    /// other process (or a subsequent bootstrap on this process) reads them.
1347    pub(crate) fn cache_expressions(
1348        &self,
1349        id: GlobalId,
1350        local_mir: Option<OptimizedMirRelationExpr>,
1351        mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1352        mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1353        dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1354        optimizer_features: OptimizerFeatures,
1355    ) -> BoxFuture<'static, ()> {
1356        // Make sure we're not caching the result of timestamp selection, as
1357        // it will almost certainly be wrong if we re-install the dataflow at
1358        // a later time.
1359        global_mir.as_of = None;
1360        global_mir.until = Default::default();
1361        physical_plan.as_of = None;
1362        physical_plan.until = Default::default();
1363
1364        let mut local_exprs = Vec::new();
1365        if let Some(local_mir) = local_mir {
1366            local_exprs.push((
1367                id,
1368                LocalExpressions {
1369                    local_mir,
1370                    optimizer_features: optimizer_features.clone(),
1371                },
1372            ));
1373        }
1374        let global_exprs = vec![(
1375            id,
1376            GlobalExpressions {
1377                global_mir,
1378                physical_plan,
1379                dataflow_metainfos,
1380                optimizer_features,
1381            },
1382        )];
1383        self.update_expression_cache(local_exprs, global_exprs, Default::default())
1384    }
1385
1386    pub(crate) fn update_expression_cache<'a, 'b>(
1387        &'a self,
1388        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1389        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1390        invalidate_ids: BTreeSet<GlobalId>,
1391    ) -> BoxFuture<'b, ()> {
1392        if let Some(expr_cache) = &self.expr_cache_handle {
1393            expr_cache
1394                .update(
1395                    new_local_expressions,
1396                    new_global_expressions,
1397                    invalidate_ids,
1398                )
1399                .boxed()
1400        } else {
1401            async {}.boxed()
1402        }
1403    }
1404
1405    /// Listen for and apply all unconsumed updates to the durable catalog state.
1406    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1407    // `#[cfg(test)]` annotation.
1408    #[cfg(test)]
1409    async fn sync_to_current_updates(
1410        &mut self,
1411    ) -> Result<
1412        (
1413            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1414            Vec<ParsedStateUpdate>,
1415        ),
1416        CatalogError,
1417    > {
1418        let updates = self.storage().await.sync_to_current_updates().await?;
1419        let (builtin_table_updates, catalog_updates) = self
1420            .state
1421            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1422            .await;
1423        Ok((builtin_table_updates, catalog_updates))
1424    }
1425}
1426
1427pub fn is_reserved_name(name: &str) -> bool {
1428    BUILTIN_PREFIXES
1429        .iter()
1430        .any(|prefix| name.starts_with(prefix))
1431}
1432
1433pub fn is_reserved_role_name(name: &str) -> bool {
1434    is_reserved_name(name) || is_public_role(name)
1435}
1436
1437pub fn is_public_role(name: &str) -> bool {
1438    name == &*PUBLIC_ROLE_NAME
1439}
1440
1441pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1442    object_type_to_audit_object_type(sql_type.into())
1443}
1444
1445pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1446    match id {
1447        CommentObjectId::Table(_) => ObjectType::Table,
1448        CommentObjectId::View(_) => ObjectType::View,
1449        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1450        CommentObjectId::Source(_) => ObjectType::Source,
1451        CommentObjectId::Sink(_) => ObjectType::Sink,
1452        CommentObjectId::Index(_) => ObjectType::Index,
1453        CommentObjectId::Func(_) => ObjectType::Func,
1454        CommentObjectId::Connection(_) => ObjectType::Connection,
1455        CommentObjectId::Type(_) => ObjectType::Type,
1456        CommentObjectId::Secret(_) => ObjectType::Secret,
1457        CommentObjectId::Role(_) => ObjectType::Role,
1458        CommentObjectId::Database(_) => ObjectType::Database,
1459        CommentObjectId::Schema(_) => ObjectType::Schema,
1460        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1461        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1462        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1463    }
1464}
1465
1466pub(crate) fn object_type_to_audit_object_type(
1467    object_type: mz_sql::catalog::ObjectType,
1468) -> ObjectType {
1469    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1470}
1471
1472pub(crate) fn system_object_type_to_audit_object_type(
1473    system_type: &SystemObjectType,
1474) -> ObjectType {
1475    match system_type {
1476        SystemObjectType::Object(object_type) => match object_type {
1477            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1478            mz_sql::catalog::ObjectType::View => ObjectType::View,
1479            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1480            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1481            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1482            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1483            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1484            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1485            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1486            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1487            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1488            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1489            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1490            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1491            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1492            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1493        },
1494        SystemObjectType::System => ObjectType::System,
1495    }
1496}
1497
1498#[derive(Debug, Copy, Clone)]
1499pub enum UpdatePrivilegeVariant {
1500    Grant,
1501    Revoke,
1502}
1503
1504impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1505    fn from(variant: UpdatePrivilegeVariant) -> Self {
1506        match variant {
1507            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1508            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1509        }
1510    }
1511}
1512
1513impl From<UpdatePrivilegeVariant> for EventType {
1514    fn from(variant: UpdatePrivilegeVariant) -> Self {
1515        match variant {
1516            UpdatePrivilegeVariant::Grant => EventType::Grant,
1517            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1518        }
1519    }
1520}
1521
1522impl ConnCatalog<'_> {
1523    fn resolve_item_name(
1524        &self,
1525        name: &PartialItemName,
1526    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1527        self.resolve_item(name).map(|entry| entry.name())
1528    }
1529
1530    fn resolve_function_name(
1531        &self,
1532        name: &PartialItemName,
1533    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1534        self.resolve_function(name).map(|entry| entry.name())
1535    }
1536
1537    fn resolve_type_name(
1538        &self,
1539        name: &PartialItemName,
1540    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1541        self.resolve_type(name).map(|entry| entry.name())
1542    }
1543}
1544
1545impl ExprHumanizer for ConnCatalog<'_> {
1546    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1547        let entry = self.state.try_get_entry_by_global_id(&id)?;
1548        Some(self.resolve_full_name(entry.name()).to_string())
1549    }
1550
1551    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1552        let entry = self.state.try_get_entry_by_global_id(&id)?;
1553        Some(entry.name().item.clone())
1554    }
1555
1556    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1557        let entry = self.state.try_get_entry_by_global_id(&id)?;
1558        Some(self.resolve_full_name(entry.name()).into_parts())
1559    }
1560
1561    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1562        use SqlScalarType::*;
1563
1564        match typ {
1565            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1566            List {
1567                custom_id: Some(item_id),
1568                ..
1569            }
1570            | Map {
1571                custom_id: Some(item_id),
1572                ..
1573            } => {
1574                let item = self.get_item(item_id);
1575                self.minimal_qualification(item.name()).to_string()
1576            }
1577            List { element_type, .. } => {
1578                format!(
1579                    "{} list",
1580                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1581                )
1582            }
1583            Map { value_type, .. } => format!(
1584                "map[{}=>{}]",
1585                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1586                self.humanize_sql_scalar_type(value_type, postgres_compat)
1587            ),
1588            Record {
1589                custom_id: Some(item_id),
1590                ..
1591            } => {
1592                let item = self.get_item(item_id);
1593                self.minimal_qualification(item.name()).to_string()
1594            }
1595            Record { fields, .. } => format!(
1596                "record({})",
1597                fields
1598                    .iter()
1599                    .map(|f| format!(
1600                        "{}: {}",
1601                        f.0,
1602                        self.humanize_sql_column_type(&f.1, postgres_compat)
1603                    ))
1604                    .join(",")
1605            ),
1606            PgLegacyChar => "\"char\"".into(),
1607            Char { length } if !postgres_compat => match length {
1608                None => "char".into(),
1609                Some(length) => format!("char({})", length.into_u32()),
1610            },
1611            VarChar { max_length } if !postgres_compat => match max_length {
1612                None => "varchar".into(),
1613                Some(length) => format!("varchar({})", length.into_u32()),
1614            },
1615            UInt16 => "uint2".into(),
1616            UInt32 => "uint4".into(),
1617            UInt64 => "uint8".into(),
1618            ty => {
1619                let pgrepr_type = mz_pgrepr::Type::from(ty);
1620                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1621
1622                let res = if self
1623                    .effective_search_path(true)
1624                    .iter()
1625                    .any(|(_, schema)| schema == &pg_catalog_schema)
1626                {
1627                    pgrepr_type.name().to_string()
1628                } else {
1629                    // If PG_CATALOG_SCHEMA is not in search path, you need
1630                    // qualified object name to refer to type.
1631                    let name = QualifiedItemName {
1632                        qualifiers: ItemQualifiers {
1633                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1634                            schema_spec: pg_catalog_schema,
1635                        },
1636                        item: pgrepr_type.name().to_string(),
1637                    };
1638                    self.resolve_full_name(&name).to_string()
1639                };
1640                res
1641            }
1642        }
1643    }
1644
1645    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1646        let entry = self.state.try_get_entry_by_global_id(&id)?;
1647
1648        match entry.index() {
1649            Some(index) => {
1650                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1651                let mut on_names = on_desc
1652                    .iter_names()
1653                    .map(|col_name| col_name.to_string())
1654                    .collect::<Vec<_>>();
1655
1656                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1657
1658                // Init ix_names with unknown column names. Unknown columns are
1659                // represented as an empty String and rendered as `#c` by the
1660                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1661                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1662                let mut ix_names = vec![String::new(); ix_arity];
1663
1664                // Apply the permutation by swapping on_names with ix_names.
1665                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1666                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1667                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1668                    std::mem::swap(on_name, ix_name);
1669                }
1670
1671                Some(ix_names) // Return the updated ix_names vector.
1672            }
1673            None => {
1674                let desc = self.state.try_get_desc_by_global_id(&id)?;
1675                let column_names = desc
1676                    .iter_names()
1677                    .map(|col_name| col_name.to_string())
1678                    .collect();
1679
1680                Some(column_names)
1681            }
1682        }
1683    }
1684
1685    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1686        let desc = self.state.try_get_desc_by_global_id(&id)?;
1687        Some(desc.get_name(column).to_string())
1688    }
1689
1690    fn id_exists(&self, id: GlobalId) -> bool {
1691        self.state.entry_by_global_id.contains_key(&id)
1692    }
1693}
1694
1695impl SessionCatalog for ConnCatalog<'_> {
1696    fn active_role_id(&self) -> &RoleId {
1697        &self.role_id
1698    }
1699
1700    fn restrict_to_user_objects(&self) -> bool {
1701        self.restrict_to_user_objects
1702    }
1703
1704    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1705        self.prepared_statements
1706            .as_ref()
1707            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1708            .flatten()
1709    }
1710
1711    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1712        self.portals
1713            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1714    }
1715
1716    fn active_database(&self) -> Option<&DatabaseId> {
1717        self.database.as_ref()
1718    }
1719
1720    fn active_cluster(&self) -> &str {
1721        &self.cluster
1722    }
1723
1724    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1725        &self.search_path
1726    }
1727
1728    fn resolve_database(
1729        &self,
1730        database_name: &str,
1731    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1732        Ok(self.state.resolve_database(database_name)?)
1733    }
1734
1735    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1736        self.state
1737            .database_by_id
1738            .get(id)
1739            .expect("database doesn't exist")
1740    }
1741
1742    // `as` is ok to use to cast to a trait object.
1743    #[allow(clippy::as_conversions)]
1744    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1745        self.state
1746            .database_by_id
1747            .values()
1748            .map(|database| database as &dyn CatalogDatabase)
1749            .collect()
1750    }
1751
1752    fn resolve_schema(
1753        &self,
1754        database_name: Option<&str>,
1755        schema_name: &str,
1756    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1757        Ok(self.state.resolve_schema(
1758            self.database.as_ref(),
1759            database_name,
1760            schema_name,
1761            &self.conn_id,
1762        )?)
1763    }
1764
1765    fn resolve_schema_in_database(
1766        &self,
1767        database_spec: &ResolvedDatabaseSpecifier,
1768        schema_name: &str,
1769    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1770        Ok(self
1771            .state
1772            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1773    }
1774
1775    fn get_schema(
1776        &self,
1777        database_spec: &ResolvedDatabaseSpecifier,
1778        schema_spec: &SchemaSpecifier,
1779    ) -> &dyn CatalogSchema {
1780        self.state
1781            .get_schema(database_spec, schema_spec, &self.conn_id)
1782    }
1783
1784    // `as` is ok to use to cast to a trait object.
1785    #[allow(clippy::as_conversions)]
1786    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1787        self.get_databases()
1788            .into_iter()
1789            .flat_map(|database| database.schemas().into_iter())
1790            .chain(
1791                self.state
1792                    .ambient_schemas_by_id
1793                    .values()
1794                    .chain(self.state.temporary_schemas.values())
1795                    .map(|schema| schema as &dyn CatalogSchema),
1796            )
1797            .collect()
1798    }
1799
1800    fn get_mz_internal_schema_id(&self) -> SchemaId {
1801        self.state().get_mz_internal_schema_id()
1802    }
1803
1804    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1805        self.state().get_mz_unsafe_schema_id()
1806    }
1807
1808    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1809        self.state.is_system_schema_specifier(schema)
1810    }
1811
1812    fn resolve_role(
1813        &self,
1814        role_name: &str,
1815    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1816        match self.state.try_get_role_by_name(role_name) {
1817            Some(role) => Ok(role),
1818            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1819        }
1820    }
1821
1822    fn resolve_network_policy(
1823        &self,
1824        policy_name: &str,
1825    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1826        match self.state.try_get_network_policy_by_name(policy_name) {
1827            Some(policy) => Ok(policy),
1828            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1829        }
1830    }
1831
1832    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1833        Some(self.state.roles_by_id.get(id)?)
1834    }
1835
1836    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1837        self.state.get_role(id)
1838    }
1839
1840    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1841        // `as` is ok to use to cast to a trait object.
1842        #[allow(clippy::as_conversions)]
1843        self.state
1844            .roles_by_id
1845            .values()
1846            .map(|role| role as &dyn CatalogRole)
1847            .collect()
1848    }
1849
1850    fn mz_system_role_id(&self) -> RoleId {
1851        MZ_SYSTEM_ROLE_ID
1852    }
1853
1854    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1855        self.state.collect_role_membership(id)
1856    }
1857
1858    fn get_network_policy(
1859        &self,
1860        id: &NetworkPolicyId,
1861    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1862        self.state.get_network_policy(id)
1863    }
1864
1865    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1866        // `as` is ok to use to cast to a trait object.
1867        #[allow(clippy::as_conversions)]
1868        self.state
1869            .network_policies_by_id
1870            .values()
1871            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1872            .collect()
1873    }
1874
1875    fn resolve_cluster(
1876        &self,
1877        cluster_name: Option<&str>,
1878    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1879        Ok(self
1880            .state
1881            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1882    }
1883
1884    fn resolve_cluster_replica(
1885        &self,
1886        cluster_replica_name: &QualifiedReplica,
1887    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1888        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1889    }
1890
1891    fn resolve_item(
1892        &self,
1893        name: &PartialItemName,
1894    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1895        let r = self.state.resolve_entry(
1896            self.database.as_ref(),
1897            &self.effective_search_path(true),
1898            name,
1899            &self.conn_id,
1900        )?;
1901        if self.unresolvable_ids.contains(&r.id()) {
1902            Err(SqlCatalogError::UnknownItem(name.to_string()))
1903        } else {
1904            Ok(r)
1905        }
1906    }
1907
1908    fn resolve_function(
1909        &self,
1910        name: &PartialItemName,
1911    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1912        let r = self.state.resolve_function(
1913            self.database.as_ref(),
1914            &self.effective_search_path(false),
1915            name,
1916            &self.conn_id,
1917        )?;
1918
1919        if self.unresolvable_ids.contains(&r.id()) {
1920            Err(SqlCatalogError::UnknownFunction {
1921                name: name.to_string(),
1922                alternative: None,
1923            })
1924        } else {
1925            Ok(r)
1926        }
1927    }
1928
1929    fn resolve_type(
1930        &self,
1931        name: &PartialItemName,
1932    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1933        let r = self.state.resolve_type(
1934            self.database.as_ref(),
1935            &self.effective_search_path(false),
1936            name,
1937            &self.conn_id,
1938        )?;
1939
1940        if self.unresolvable_ids.contains(&r.id()) {
1941            Err(SqlCatalogError::UnknownType {
1942                name: name.to_string(),
1943            })
1944        } else {
1945            Ok(r)
1946        }
1947    }
1948
1949    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1950        self.state.get_system_type(name)
1951    }
1952
1953    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1954        Some(self.state.try_get_entry(id)?)
1955    }
1956
1957    fn try_get_item_by_global_id(
1958        &self,
1959        id: &GlobalId,
1960    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1961        let entry = self.state.try_get_entry_by_global_id(id)?;
1962        let entry = match &entry.item {
1963            CatalogItem::Table(table) => {
1964                let (version, _gid) = table
1965                    .collections
1966                    .iter()
1967                    .find(|(_version, gid)| *gid == id)
1968                    .expect("catalog out of sync, mismatched GlobalId");
1969                entry.at_version(RelationVersionSelector::Specific(*version))
1970            }
1971            _ => entry.at_version(RelationVersionSelector::Latest),
1972        };
1973        Some(entry)
1974    }
1975
1976    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1977        self.state.get_entry(id)
1978    }
1979
1980    fn get_item_by_global_id(
1981        &self,
1982        id: &GlobalId,
1983    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1984        let entry = self.state.get_entry_by_global_id(id);
1985        let entry = match &entry.item {
1986            CatalogItem::Table(table) => {
1987                let (version, _gid) = table
1988                    .collections
1989                    .iter()
1990                    .find(|(_version, gid)| *gid == id)
1991                    .expect("catalog out of sync, mismatched GlobalId");
1992                entry.at_version(RelationVersionSelector::Specific(*version))
1993            }
1994            _ => entry.at_version(RelationVersionSelector::Latest),
1995        };
1996        entry
1997    }
1998
1999    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2000        self.get_schemas()
2001            .into_iter()
2002            .flat_map(|schema| schema.item_ids())
2003            .map(|id| self.get_item(&id))
2004            .collect()
2005    }
2006
2007    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2008        self.state
2009            .get_item_by_name(name, &self.conn_id)
2010            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2011    }
2012
2013    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2014        self.state
2015            .get_type_by_name(name, &self.conn_id)
2016            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2017    }
2018
2019    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2020        &self.state.clusters_by_id[&id]
2021    }
2022
2023    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2024        self.state
2025            .clusters_by_id
2026            .values()
2027            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2028            .collect()
2029    }
2030
2031    fn get_cluster_replica(
2032        &self,
2033        cluster_id: ClusterId,
2034        replica_id: ReplicaId,
2035    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2036        let cluster = self.get_cluster(cluster_id);
2037        cluster.replica(replica_id)
2038    }
2039
2040    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2041        self.get_clusters()
2042            .into_iter()
2043            .flat_map(|cluster| cluster.replicas().into_iter())
2044            .collect()
2045    }
2046
2047    fn get_system_privileges(&self) -> &PrivilegeMap {
2048        &self.state.system_privileges
2049    }
2050
2051    fn get_default_privileges(
2052        &self,
2053    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2054        self.state
2055            .default_privileges
2056            .iter()
2057            .map(|(object, acl_items)| (object, acl_items.collect()))
2058            .collect()
2059    }
2060
2061    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2062        self.state.find_available_name(name, &self.conn_id)
2063    }
2064
2065    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2066        self.state.resolve_full_name(name, Some(&self.conn_id))
2067    }
2068
2069    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2070        self.state.resolve_full_schema_name(name)
2071    }
2072
2073    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2074        self.state.get_entry_by_global_id(global_id).id()
2075    }
2076
2077    fn resolve_global_id(
2078        &self,
2079        item_id: &CatalogItemId,
2080        version: RelationVersionSelector,
2081    ) -> GlobalId {
2082        self.state
2083            .get_entry(item_id)
2084            .at_version(version)
2085            .global_id()
2086    }
2087
2088    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2089        self.state.config()
2090    }
2091
2092    fn now(&self) -> EpochMillis {
2093        (self.state.config().now)()
2094    }
2095
2096    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2097        self.state.aws_privatelink_availability_zones.clone()
2098    }
2099
2100    fn system_vars(&self) -> &SystemVars {
2101        &self.state.system_configuration
2102    }
2103
2104    fn system_vars_mut(&mut self) -> &mut SystemVars {
2105        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2106    }
2107
2108    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2109        self.state().get_owner_id(id, self.conn_id())
2110    }
2111
2112    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2113        match id {
2114            SystemObjectId::System => Some(&self.state.system_privileges),
2115            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2116                Some(self.get_cluster(*id).privileges())
2117            }
2118            SystemObjectId::Object(ObjectId::Database(id)) => {
2119                Some(self.get_database(id).privileges())
2120            }
2121            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2122                // For temporary schemas that haven't been created yet (lazy creation),
2123                // we return None - the RBAC check will need to handle this case.
2124                self.state
2125                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2126                    .map(|schema| schema.privileges())
2127            }
2128            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2129            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2130                Some(self.get_network_policy(id).privileges())
2131            }
2132            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2133            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2134        }
2135    }
2136
2137    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2138        let mut seen = BTreeSet::new();
2139        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2140    }
2141
2142    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2143        let mut seen = BTreeSet::new();
2144        self.state.item_dependents(id, &mut seen)
2145    }
2146
2147    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2148        rbac::all_object_privileges(object_type)
2149    }
2150
2151    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2152        self.state.get_object_type(object_id)
2153    }
2154
2155    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2156        self.state.get_system_object_type(id)
2157    }
2158
2159    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2160    /// the object.
2161    ///
2162    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2163    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2164    /// only for error msgs and other humanizations.
2165    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2166        if qualified_name.qualifiers.schema_spec.is_temporary() {
2167            // All bets are off. Just give up and return the qualified name as is.
2168            // TODO: Figure out what's going on with temporary objects.
2169
2170            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2171            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2172            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2173            // which we don't have a simple repro.
2174            return qualified_name.item.clone().into();
2175        }
2176
2177        let database_id = match &qualified_name.qualifiers.database_spec {
2178            ResolvedDatabaseSpecifier::Ambient => None,
2179            ResolvedDatabaseSpecifier::Id(id)
2180                if self.database.is_some() && self.database == Some(*id) =>
2181            {
2182                None
2183            }
2184            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2185        };
2186
2187        let schema_spec = if database_id.is_none()
2188            && self.resolve_item_name(&PartialItemName {
2189                database: None,
2190                schema: None,
2191                item: qualified_name.item.clone(),
2192            }) == Ok(qualified_name)
2193            || self.resolve_function_name(&PartialItemName {
2194                database: None,
2195                schema: None,
2196                item: qualified_name.item.clone(),
2197            }) == Ok(qualified_name)
2198            || self.resolve_type_name(&PartialItemName {
2199                database: None,
2200                schema: None,
2201                item: qualified_name.item.clone(),
2202            }) == Ok(qualified_name)
2203        {
2204            None
2205        } else {
2206            // If `search_path` does not contain `full_name.schema`, the
2207            // `PartialName` must contain it.
2208            Some(qualified_name.qualifiers.schema_spec.clone())
2209        };
2210
2211        let res = PartialItemName {
2212            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2213            schema: schema_spec.map(|spec| {
2214                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2215                    .name()
2216                    .schema
2217                    .clone()
2218            }),
2219            item: qualified_name.item.clone(),
2220        };
2221        assert!(
2222            self.resolve_item_name(&res) == Ok(qualified_name)
2223                || self.resolve_function_name(&res) == Ok(qualified_name)
2224                || self.resolve_type_name(&res) == Ok(qualified_name)
2225        );
2226        res
2227    }
2228
2229    fn add_notice(&self, notice: PlanNotice) {
2230        let _ = self.notices_tx.send(notice.into());
2231    }
2232
2233    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2234        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2235        self.state.comments.get_object_comments(comment_id)
2236    }
2237
2238    fn is_cluster_size_cc(&self, size: &str) -> bool {
2239        self.state
2240            .cluster_replica_sizes
2241            .0
2242            .get(size)
2243            .map_or(false, |a| a.is_cc)
2244    }
2245}
2246
2247#[cfg(test)]
2248mod tests {
2249    use std::collections::{BTreeMap, BTreeSet};
2250    use std::sync::Arc;
2251    use std::{env, iter};
2252
2253    use itertools::Itertools;
2254    use mz_catalog::memory::objects::CatalogItem;
2255    use tokio_postgres::NoTls;
2256    use tokio_postgres::types::Type;
2257    use uuid::Uuid;
2258
2259    use mz_catalog::SYSTEM_CONN_ID;
2260    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2261    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2262    use mz_controller_types::{ClusterId, ReplicaId};
2263    use mz_expr::{Eval, MirScalarExpr};
2264    use mz_ore::now::to_datetime;
2265    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2266    use mz_persist_client::PersistClient;
2267    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2268    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2269    use mz_repr::role_id::RoleId;
2270    use mz_repr::{
2271        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2272        SqlScalarType, Timestamp,
2273    };
2274    use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2275    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2276    use mz_sql::names::{
2277        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2278        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2279    };
2280    use mz_sql::plan::{
2281        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2282        QueryLifetime, Scope, StatementContext,
2283    };
2284    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2285    use mz_sql::session::vars::{SystemVars, VarInput};
2286
2287    use crate::catalog::state::LocalExpressionCache;
2288    use crate::catalog::{Catalog, Op};
2289    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2290    use crate::session::Session;
2291
2292    /// System sessions have an empty `search_path` so it's necessary to
2293    /// schema-qualify all referenced items.
2294    ///
2295    /// Dummy (and ostensibly client) sessions contain system schemas in their
2296    /// search paths, so do not require schema qualification on system objects such
2297    /// as types.
2298    #[mz_ore::test(tokio::test)]
2299    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2300    async fn test_minimal_qualification() {
2301        Catalog::with_debug(|catalog| async move {
2302            struct TestCase {
2303                input: QualifiedItemName,
2304                system_output: PartialItemName,
2305                normal_output: PartialItemName,
2306            }
2307
2308            let test_cases = vec![
2309                TestCase {
2310                    input: QualifiedItemName {
2311                        qualifiers: ItemQualifiers {
2312                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2313                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2314                        },
2315                        item: "numeric".to_string(),
2316                    },
2317                    system_output: PartialItemName {
2318                        database: None,
2319                        schema: None,
2320                        item: "numeric".to_string(),
2321                    },
2322                    normal_output: PartialItemName {
2323                        database: None,
2324                        schema: None,
2325                        item: "numeric".to_string(),
2326                    },
2327                },
2328                TestCase {
2329                    input: QualifiedItemName {
2330                        qualifiers: ItemQualifiers {
2331                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2332                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2333                        },
2334                        item: "mz_array_types".to_string(),
2335                    },
2336                    system_output: PartialItemName {
2337                        database: None,
2338                        schema: None,
2339                        item: "mz_array_types".to_string(),
2340                    },
2341                    normal_output: PartialItemName {
2342                        database: None,
2343                        schema: None,
2344                        item: "mz_array_types".to_string(),
2345                    },
2346                },
2347            ];
2348
2349            for tc in test_cases {
2350                assert_eq!(
2351                    catalog
2352                        .for_system_session()
2353                        .minimal_qualification(&tc.input),
2354                    tc.system_output
2355                );
2356                assert_eq!(
2357                    catalog
2358                        .for_session(&Session::dummy())
2359                        .minimal_qualification(&tc.input),
2360                    tc.normal_output
2361                );
2362            }
2363            catalog.expire().await;
2364        })
2365        .await
2366    }
2367
2368    #[mz_ore::test(tokio::test)]
2369    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2370    async fn test_catalog_revision() {
2371        let persist_client = PersistClient::new_for_tests().await;
2372        let organization_id = Uuid::new_v4();
2373        let bootstrap_args = test_bootstrap_args();
2374        {
2375            let mut catalog = Catalog::open_debug_catalog(
2376                persist_client.clone(),
2377                organization_id.clone(),
2378                &bootstrap_args,
2379            )
2380            .await
2381            .expect("unable to open debug catalog");
2382            assert_eq!(catalog.transient_revision(), 1);
2383            let commit_ts = catalog.current_upper().await;
2384            catalog
2385                .transact(
2386                    None,
2387                    commit_ts,
2388                    None,
2389                    vec![Op::CreateDatabase {
2390                        name: "test".to_string(),
2391                        owner_id: MZ_SYSTEM_ROLE_ID,
2392                    }],
2393                )
2394                .await
2395                .expect("failed to transact");
2396            assert_eq!(catalog.transient_revision(), 2);
2397            catalog.expire().await;
2398        }
2399        {
2400            let catalog =
2401                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2402                    .await
2403                    .expect("unable to open debug catalog");
2404            // Re-opening the same catalog resets the transient_revision to 1.
2405            assert_eq!(catalog.transient_revision(), 1);
2406            catalog.expire().await;
2407        }
2408    }
2409
2410    #[mz_ore::test(tokio::test)]
2411    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2412    async fn test_effective_search_path() {
2413        Catalog::with_debug(|catalog| async move {
2414            let mz_catalog_schema = (
2415                ResolvedDatabaseSpecifier::Ambient,
2416                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2417            );
2418            let pg_catalog_schema = (
2419                ResolvedDatabaseSpecifier::Ambient,
2420                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2421            );
2422            let mz_temp_schema = (
2423                ResolvedDatabaseSpecifier::Ambient,
2424                SchemaSpecifier::Temporary,
2425            );
2426
2427            // Behavior with the default search_schema (public)
2428            let session = Session::dummy();
2429            let conn_catalog = catalog.for_session(&session);
2430            assert_ne!(
2431                conn_catalog.effective_search_path(false),
2432                conn_catalog.search_path
2433            );
2434            assert_ne!(
2435                conn_catalog.effective_search_path(true),
2436                conn_catalog.search_path
2437            );
2438            assert_eq!(
2439                conn_catalog.effective_search_path(false),
2440                vec![
2441                    mz_catalog_schema.clone(),
2442                    pg_catalog_schema.clone(),
2443                    conn_catalog.search_path[0].clone()
2444                ]
2445            );
2446            assert_eq!(
2447                conn_catalog.effective_search_path(true),
2448                vec![
2449                    mz_temp_schema.clone(),
2450                    mz_catalog_schema.clone(),
2451                    pg_catalog_schema.clone(),
2452                    conn_catalog.search_path[0].clone()
2453                ]
2454            );
2455
2456            // missing schemas are added when missing
2457            let mut session = Session::dummy();
2458            session
2459                .vars_mut()
2460                .set(
2461                    &SystemVars::new(),
2462                    "search_path",
2463                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2464                    false,
2465                )
2466                .expect("failed to set search_path");
2467            let conn_catalog = catalog.for_session(&session);
2468            assert_ne!(
2469                conn_catalog.effective_search_path(false),
2470                conn_catalog.search_path
2471            );
2472            assert_ne!(
2473                conn_catalog.effective_search_path(true),
2474                conn_catalog.search_path
2475            );
2476            assert_eq!(
2477                conn_catalog.effective_search_path(false),
2478                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2479            );
2480            assert_eq!(
2481                conn_catalog.effective_search_path(true),
2482                vec![
2483                    mz_temp_schema.clone(),
2484                    mz_catalog_schema.clone(),
2485                    pg_catalog_schema.clone()
2486                ]
2487            );
2488
2489            let mut session = Session::dummy();
2490            session
2491                .vars_mut()
2492                .set(
2493                    &SystemVars::new(),
2494                    "search_path",
2495                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2496                    false,
2497                )
2498                .expect("failed to set search_path");
2499            let conn_catalog = catalog.for_session(&session);
2500            assert_ne!(
2501                conn_catalog.effective_search_path(false),
2502                conn_catalog.search_path
2503            );
2504            assert_ne!(
2505                conn_catalog.effective_search_path(true),
2506                conn_catalog.search_path
2507            );
2508            assert_eq!(
2509                conn_catalog.effective_search_path(false),
2510                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2511            );
2512            assert_eq!(
2513                conn_catalog.effective_search_path(true),
2514                vec![
2515                    mz_temp_schema.clone(),
2516                    pg_catalog_schema.clone(),
2517                    mz_catalog_schema.clone()
2518                ]
2519            );
2520
2521            let mut session = Session::dummy();
2522            session
2523                .vars_mut()
2524                .set(
2525                    &SystemVars::new(),
2526                    "search_path",
2527                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2528                    false,
2529                )
2530                .expect("failed to set search_path");
2531            let conn_catalog = catalog.for_session(&session);
2532            assert_ne!(
2533                conn_catalog.effective_search_path(false),
2534                conn_catalog.search_path
2535            );
2536            assert_ne!(
2537                conn_catalog.effective_search_path(true),
2538                conn_catalog.search_path
2539            );
2540            assert_eq!(
2541                conn_catalog.effective_search_path(false),
2542                vec![
2543                    mz_catalog_schema.clone(),
2544                    pg_catalog_schema.clone(),
2545                    mz_temp_schema.clone()
2546                ]
2547            );
2548            assert_eq!(
2549                conn_catalog.effective_search_path(true),
2550                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2551            );
2552            catalog.expire().await;
2553        })
2554        .await
2555    }
2556
2557    #[mz_ore::test(tokio::test)]
2558    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2559    async fn test_normalized_create() {
2560        use mz_ore::collections::CollectionExt;
2561        Catalog::with_debug(|catalog| async move {
2562            let conn_catalog = catalog.for_system_session();
2563            let scx = &mut StatementContext::new(None, &conn_catalog);
2564
2565            let parsed = mz_sql_parser::parser::parse_statements(
2566                "create view public.foo as select 1 as bar",
2567            )
2568            .expect("")
2569            .into_element()
2570            .ast;
2571
2572            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2573
2574            // Ensure that all identifiers are quoted.
2575            assert_eq!(
2576                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2577                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2578            );
2579            catalog.expire().await;
2580        })
2581        .await;
2582    }
2583
2584    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2585    #[mz_ore::test(tokio::test)]
2586    #[cfg_attr(miri, ignore)] // slow
2587    async fn test_large_catalog_item() {
2588        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2589        let column = ", 1";
2590        let view_def_size = view_def.bytes().count();
2591        let column_size = column.bytes().count();
2592        let column_count =
2593            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2594        let columns = iter::repeat(column).take(column_count).join("");
2595        let create_sql = format!("{view_def}{columns})");
2596        let create_sql_check = create_sql.clone();
2597        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2598        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2599            &create_sql
2600        ));
2601
2602        let persist_client = PersistClient::new_for_tests().await;
2603        let organization_id = Uuid::new_v4();
2604        let id = CatalogItemId::User(1);
2605        let gid = GlobalId::User(1);
2606        let bootstrap_args = test_bootstrap_args();
2607        {
2608            let mut catalog = Catalog::open_debug_catalog(
2609                persist_client.clone(),
2610                organization_id.clone(),
2611                &bootstrap_args,
2612            )
2613            .await
2614            .expect("unable to open debug catalog");
2615            let item = catalog
2616                .state()
2617                .deserialize_item(
2618                    gid,
2619                    &create_sql,
2620                    &BTreeMap::new(),
2621                    &mut LocalExpressionCache::Closed,
2622                    None,
2623                )
2624                .expect("unable to parse view");
2625            let commit_ts = catalog.current_upper().await;
2626            catalog
2627                .transact(
2628                    None,
2629                    commit_ts,
2630                    None,
2631                    vec![Op::CreateItem {
2632                        item,
2633                        name: QualifiedItemName {
2634                            qualifiers: ItemQualifiers {
2635                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2636                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2637                            },
2638                            item: "v".to_string(),
2639                        },
2640                        id,
2641                        owner_id: MZ_SYSTEM_ROLE_ID,
2642                    }],
2643                )
2644                .await
2645                .expect("failed to transact");
2646            catalog.expire().await;
2647        }
2648        {
2649            let catalog =
2650                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2651                    .await
2652                    .expect("unable to open debug catalog");
2653            let view = catalog.get_entry(&id);
2654            assert_eq!("v", view.name.item);
2655            match &view.item {
2656                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2657                item => panic!("expected view, got {}", item.typ()),
2658            }
2659            catalog.expire().await;
2660        }
2661    }
2662
2663    #[mz_ore::test(tokio::test)]
2664    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2665    async fn test_object_type() {
2666        Catalog::with_debug(|catalog| async move {
2667            let conn_catalog = catalog.for_system_session();
2668
2669            assert_eq!(
2670                mz_sql::catalog::ObjectType::ClusterReplica,
2671                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2672                    ClusterId::user(1).expect("1 is a valid ID"),
2673                    ReplicaId::User(1)
2674                )))
2675            );
2676            assert_eq!(
2677                mz_sql::catalog::ObjectType::Role,
2678                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2679            );
2680            catalog.expire().await;
2681        })
2682        .await;
2683    }
2684
2685    #[mz_ore::test(tokio::test)]
2686    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2687    async fn test_get_privileges() {
2688        Catalog::with_debug(|catalog| async move {
2689            let conn_catalog = catalog.for_system_session();
2690
2691            assert_eq!(
2692                None,
2693                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2694                    ClusterId::user(1).expect("1 is a valid ID"),
2695                    ReplicaId::User(1),
2696                ))))
2697            );
2698            assert_eq!(
2699                None,
2700                conn_catalog
2701                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2702            );
2703            catalog.expire().await;
2704        })
2705        .await;
2706    }
2707
2708    #[mz_ore::test(tokio::test)]
2709    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2710    async fn verify_builtin_descs() {
2711        Catalog::with_debug(|catalog| async move {
2712            let conn_catalog = catalog.for_system_session();
2713
2714            for builtin in BUILTINS::iter() {
2715                let (schema, name, expected_desc) = match builtin {
2716                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2717                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2718                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2719                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2720                    Builtin::Log(_)
2721                    | Builtin::Type(_)
2722                    | Builtin::Func(_)
2723                    | Builtin::Index(_)
2724                    | Builtin::Connection(_) => continue,
2725                };
2726                let item = conn_catalog
2727                    .resolve_item(&PartialItemName {
2728                        database: None,
2729                        schema: Some(schema.to_string()),
2730                        item: name.to_string(),
2731                    })
2732                    .expect("unable to resolve item")
2733                    .at_version(RelationVersionSelector::Latest);
2734
2735                let actual_desc = item.relation_desc().expect("invalid item type");
2736                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2737                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2738                {
2739                    assert_eq!(
2740                        actual_name, expected_name,
2741                        "item {schema}.{name} column {index} name did not match its expected name"
2742                    );
2743                    assert_eq!(
2744                        actual_typ, expected_typ,
2745                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2746                    );
2747                }
2748                assert_eq!(
2749                    &*actual_desc, expected_desc,
2750                    "item {schema}.{name} did not match its expected RelationDesc"
2751                );
2752            }
2753            catalog.expire().await;
2754        })
2755        .await
2756    }
2757
2758    // Connect to a running Postgres server and verify that our builtin
2759    // types and functions match it, in addition to some other things.
2760    #[mz_ore::test(tokio::test)]
2761    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2762    async fn test_compare_builtins_postgres() {
2763        async fn inner(catalog: Catalog) {
2764            // Verify that all builtin functions:
2765            // - have a unique OID
2766            // - if they have a postgres counterpart (same oid) then they have matching name
2767            let (client, connection) = tokio_postgres::connect(
2768                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2769                NoTls,
2770            )
2771            .await
2772            .expect("failed to connect to Postgres");
2773
2774            task::spawn(|| "compare_builtin_postgres", async move {
2775                if let Err(e) = connection.await {
2776                    panic!("connection error: {}", e);
2777                }
2778            });
2779
2780            struct PgProc {
2781                name: String,
2782                arg_oids: Vec<u32>,
2783                ret_oid: Option<u32>,
2784                ret_set: bool,
2785            }
2786
2787            struct PgType {
2788                name: String,
2789                ty: String,
2790                elem: u32,
2791                array: u32,
2792                input: u32,
2793                receive: u32,
2794            }
2795
2796            struct PgOper {
2797                oprresult: u32,
2798                name: String,
2799            }
2800
2801            let pg_proc: BTreeMap<_, _> = client
2802                .query(
2803                    "SELECT
2804                    p.oid,
2805                    proname,
2806                    proargtypes,
2807                    prorettype,
2808                    proretset
2809                FROM pg_proc p
2810                JOIN pg_namespace n ON p.pronamespace = n.oid",
2811                    &[],
2812                )
2813                .await
2814                .expect("pg query failed")
2815                .into_iter()
2816                .map(|row| {
2817                    let oid: u32 = row.get("oid");
2818                    let pg_proc = PgProc {
2819                        name: row.get("proname"),
2820                        arg_oids: row.get("proargtypes"),
2821                        ret_oid: row.get("prorettype"),
2822                        ret_set: row.get("proretset"),
2823                    };
2824                    (oid, pg_proc)
2825                })
2826                .collect();
2827
2828            let pg_type: BTreeMap<_, _> = client
2829                .query(
2830                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2831                    &[],
2832                )
2833                .await
2834                .expect("pg query failed")
2835                .into_iter()
2836                .map(|row| {
2837                    let oid: u32 = row.get("oid");
2838                    let pg_type = PgType {
2839                        name: row.get("typname"),
2840                        ty: row.get("typtype"),
2841                        elem: row.get("typelem"),
2842                        array: row.get("typarray"),
2843                        input: row.get("typinput"),
2844                        receive: row.get("typreceive"),
2845                    };
2846                    (oid, pg_type)
2847                })
2848                .collect();
2849
2850            let pg_oper: BTreeMap<_, _> = client
2851                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2852                .await
2853                .expect("pg query failed")
2854                .into_iter()
2855                .map(|row| {
2856                    let oid: u32 = row.get("oid");
2857                    let pg_oper = PgOper {
2858                        name: row.get("oprname"),
2859                        oprresult: row.get("oprresult"),
2860                    };
2861                    (oid, pg_oper)
2862                })
2863                .collect();
2864
2865            let conn_catalog = catalog.for_system_session();
2866            let resolve_type_oid = |item: &str| {
2867                conn_catalog
2868                    .resolve_type(&PartialItemName {
2869                        database: None,
2870                        // All functions we check exist in PG, so the types must, as
2871                        // well
2872                        schema: Some(PG_CATALOG_SCHEMA.into()),
2873                        item: item.to_string(),
2874                    })
2875                    .expect("unable to resolve type")
2876                    .oid()
2877            };
2878
2879            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2880                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2881                .collect();
2882
2883            let mut all_oids = BTreeSet::new();
2884
2885            // A function to determine if two oids are equivalent enough for these tests. We don't
2886            // support some types, so map exceptions here.
2887            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2888                [
2889                    // We don't support NAME.
2890                    (Type::NAME, Type::TEXT),
2891                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2892                    // We don't support time with time zone.
2893                    (Type::TIME, Type::TIMETZ),
2894                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2895                ]
2896                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2897            );
2898            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2899                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2900            ]);
2901            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2902                if ignore_return_types.contains(&fn_oid) {
2903                    return true;
2904                }
2905                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2906                    return true;
2907                }
2908                a == b
2909            };
2910
2911            for builtin in BUILTINS::iter() {
2912                match builtin {
2913                    Builtin::Type(ty) => {
2914                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2915
2916                        if ty.oid >= FIRST_MATERIALIZE_OID {
2917                            // High OIDs are reserved in Materialize and don't have
2918                            // PostgreSQL counterparts.
2919                            continue;
2920                        }
2921
2922                        // For types that have a PostgreSQL counterpart, verify that
2923                        // the name and oid match.
2924                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2925                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2926                        });
2927                        assert_eq!(
2928                            ty.name, pg_ty.name,
2929                            "oid {} has name {} in postgres; expected {}",
2930                            ty.oid, pg_ty.name, ty.name,
2931                        );
2932
2933                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2934                            None => (0, 0),
2935                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2936                        };
2937                        assert_eq!(
2938                            typinput_oid, pg_ty.input,
2939                            "type {} has typinput OID {:?} in mz but {:?} in pg",
2940                            ty.name, typinput_oid, pg_ty.input,
2941                        );
2942                        assert_eq!(
2943                            typreceive_oid, pg_ty.receive,
2944                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
2945                            ty.name, typreceive_oid, pg_ty.receive,
2946                        );
2947                        if typinput_oid != 0 {
2948                            assert!(
2949                                func_oids.contains(&typinput_oid),
2950                                "type {} has typinput OID {} that does not exist in pg_proc",
2951                                ty.name,
2952                                typinput_oid,
2953                            );
2954                        }
2955                        if typreceive_oid != 0 {
2956                            assert!(
2957                                func_oids.contains(&typreceive_oid),
2958                                "type {} has typreceive OID {} that does not exist in pg_proc",
2959                                ty.name,
2960                                typreceive_oid,
2961                            );
2962                        }
2963
2964                        // Ensure the type matches.
2965                        match &ty.details.typ {
2966                            CatalogType::Array { element_reference } => {
2967                                let elem_ty = BUILTINS::iter()
2968                                    .filter_map(|builtin| match builtin {
2969                                        Builtin::Type(ty @ BuiltinType { name, .. })
2970                                            if element_reference == name =>
2971                                        {
2972                                            Some(ty)
2973                                        }
2974                                        _ => None,
2975                                    })
2976                                    .next();
2977                                let elem_ty = match elem_ty {
2978                                    Some(ty) => ty,
2979                                    None => {
2980                                        panic!("{} is unexpectedly not a type", element_reference)
2981                                    }
2982                                };
2983                                assert_eq!(
2984                                    pg_ty.elem, elem_ty.oid,
2985                                    "type {} has mismatched element OIDs",
2986                                    ty.name
2987                                )
2988                            }
2989                            CatalogType::Pseudo => {
2990                                assert_eq!(
2991                                    pg_ty.ty, "p",
2992                                    "type {} is not a pseudo type as expected",
2993                                    ty.name
2994                                )
2995                            }
2996                            CatalogType::Range { .. } => {
2997                                assert_eq!(
2998                                    pg_ty.ty, "r",
2999                                    "type {} is not a range type as expected",
3000                                    ty.name
3001                                );
3002                            }
3003                            _ => {
3004                                assert_eq!(
3005                                    pg_ty.ty, "b",
3006                                    "type {} is not a base type as expected",
3007                                    ty.name
3008                                )
3009                            }
3010                        }
3011
3012                        // Ensure the array type reference is correct.
3013                        let schema = catalog
3014                            .resolve_schema_in_database(
3015                                &ResolvedDatabaseSpecifier::Ambient,
3016                                ty.schema,
3017                                &SYSTEM_CONN_ID,
3018                            )
3019                            .expect("unable to resolve schema");
3020                        let allocated_type = catalog
3021                            .resolve_type(
3022                                None,
3023                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3024                                &PartialItemName {
3025                                    database: None,
3026                                    schema: Some(schema.name().schema.clone()),
3027                                    item: ty.name.to_string(),
3028                                },
3029                                &SYSTEM_CONN_ID,
3030                            )
3031                            .expect("unable to resolve type");
3032                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3033                            ty
3034                        } else {
3035                            panic!("unexpectedly not a type")
3036                        };
3037                        match ty.details.array_id {
3038                            Some(array_id) => {
3039                                let array_ty = catalog.get_entry(&array_id);
3040                                assert_eq!(
3041                                    pg_ty.array, array_ty.oid,
3042                                    "type {} has mismatched array OIDs",
3043                                    allocated_type.name.item,
3044                                );
3045                            }
3046                            None => assert_eq!(
3047                                pg_ty.array, 0,
3048                                "type {} does not have an array type in mz but does in pg",
3049                                allocated_type.name.item,
3050                            ),
3051                        }
3052                    }
3053                    Builtin::Func(func) => {
3054                        for imp in func.inner.func_impls() {
3055                            assert!(
3056                                all_oids.insert(imp.oid),
3057                                "{} reused oid {}",
3058                                func.name,
3059                                imp.oid
3060                            );
3061
3062                            assert!(
3063                                imp.oid < FIRST_USER_OID,
3064                                "built-in function {} erroneously has OID in user space ({})",
3065                                func.name,
3066                                imp.oid,
3067                            );
3068
3069                            // For functions that have a postgres counterpart, verify that the name and
3070                            // oid match.
3071                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3072                                continue;
3073                            } else {
3074                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3075                                    panic!(
3076                                        "pg_proc missing function {}: oid {}",
3077                                        func.name, imp.oid
3078                                    )
3079                                })
3080                            };
3081                            assert_eq!(
3082                                func.name, pg_fn.name,
3083                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3084                                imp.oid, func.name, pg_fn.name
3085                            );
3086
3087                            // Complain, but don't fail, if argument oids don't match.
3088                            // TODO: make these match.
3089                            let imp_arg_oids = imp
3090                                .arg_typs
3091                                .iter()
3092                                .map(|item| resolve_type_oid(item))
3093                                .collect::<Vec<_>>();
3094
3095                            if imp_arg_oids != pg_fn.arg_oids {
3096                                println!(
3097                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3098                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3099                                );
3100                            }
3101
3102                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3103
3104                            assert!(
3105                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3106                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3107                                imp.oid,
3108                                func.name,
3109                                imp_return_oid,
3110                                pg_fn.ret_oid
3111                            );
3112
3113                            assert_eq!(
3114                                imp.return_is_set, pg_fn.ret_set,
3115                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3116                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3117                            );
3118                        }
3119                    }
3120                    _ => (),
3121                }
3122            }
3123
3124            for (op, func) in OP_IMPLS.iter() {
3125                for imp in func.func_impls() {
3126                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3127
3128                    // For operators that have a postgres counterpart, verify that the name and oid match.
3129                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3130                        continue;
3131                    } else {
3132                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3133                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3134                        })
3135                    };
3136
3137                    assert_eq!(*op, pg_op.name);
3138
3139                    let imp_return_oid =
3140                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3141                    if imp_return_oid != pg_op.oprresult {
3142                        panic!(
3143                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3144                            imp.oid, op, imp_return_oid, pg_op.oprresult
3145                        );
3146                    }
3147                }
3148            }
3149            catalog.expire().await;
3150        }
3151
3152        Catalog::with_debug(inner).await
3153    }
3154
3155    // Execute all builtin functions with all combinations of arguments from interesting datums.
3156    #[mz_ore::test(tokio::test)]
3157    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3158    async fn test_smoketest_all_builtins() {
3159        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3160            let catalog = Arc::new(catalog);
3161            let conn_catalog = catalog.for_system_session();
3162
3163            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3164            let mut handles = Vec::new();
3165
3166            // Extracted during planning; always panics when executed.
3167            let ignore_names = BTreeSet::from([
3168                "avg",
3169                "avg_internal_v1",
3170                "bool_and",
3171                "bool_or",
3172                "has_table_privilege", // > 3 s each
3173                "has_type_privilege",  // > 3 s each
3174                "mod",
3175                "mz_panic",
3176                "mz_sleep",
3177                "pow",
3178                "stddev_pop",
3179                "stddev_samp",
3180                "stddev",
3181                "var_pop",
3182                "var_samp",
3183                "variance",
3184            ]);
3185
3186            let fns = BUILTINS::funcs()
3187                .map(|func| (&func.name, func.inner))
3188                .chain(OP_IMPLS.iter());
3189
3190            for (name, func) in fns {
3191                if ignore_names.contains(name) {
3192                    continue;
3193                }
3194                let Func::Scalar(impls) = func else {
3195                    continue;
3196                };
3197
3198                'outer: for imp in impls {
3199                    let details = imp.details();
3200                    let mut styps = Vec::new();
3201                    for item in details.arg_typs.iter() {
3202                        let oid = resolve_type_oid(item);
3203                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3204                            continue 'outer;
3205                        };
3206                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3207                    }
3208                    let datums = styps
3209                        .iter()
3210                        .map(|styp| {
3211                            let mut datums = vec![Datum::Null];
3212                            datums.extend(styp.interesting_datums());
3213                            datums
3214                        })
3215                        .collect::<Vec<_>>();
3216                    // Skip nullary fns.
3217                    if datums.is_empty() {
3218                        continue;
3219                    }
3220
3221                    let return_oid = details
3222                        .return_typ
3223                        .map(resolve_type_oid)
3224                        .expect("must exist");
3225                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3226                        .ok()
3227                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3228
3229                    let mut idxs = vec![0; datums.len()];
3230                    while idxs[0] < datums[0].len() {
3231                        let mut args = Vec::with_capacity(idxs.len());
3232                        for i in 0..(datums.len()) {
3233                            args.push(datums[i][idxs[i]]);
3234                        }
3235
3236                        let op = &imp.op;
3237                        let scalars = args
3238                            .iter()
3239                            .enumerate()
3240                            .map(|(i, datum)| {
3241                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3242                                    datum.clone(),
3243                                    styps[i].clone(),
3244                                ))
3245                            })
3246                            .collect();
3247
3248                        let call_name = format!(
3249                            "{name}({}) (oid: {})",
3250                            args.iter()
3251                                .map(|d| d.to_string())
3252                                .collect::<Vec<_>>()
3253                                .join(", "),
3254                            imp.oid
3255                        );
3256                        let catalog = Arc::clone(&catalog);
3257                        let call_name_fn = call_name.clone();
3258                        let return_styp = return_styp.clone();
3259                        let handle = task::spawn_blocking(
3260                            || call_name,
3261                            move || {
3262                                smoketest_fn(
3263                                    name,
3264                                    call_name_fn,
3265                                    op,
3266                                    imp,
3267                                    args,
3268                                    catalog,
3269                                    scalars,
3270                                    return_styp,
3271                                )
3272                            },
3273                        );
3274                        handles.push(handle);
3275
3276                        // Advance to the next datum combination.
3277                        for i in (0..datums.len()).rev() {
3278                            idxs[i] += 1;
3279                            if idxs[i] >= datums[i].len() {
3280                                if i == 0 {
3281                                    break;
3282                                }
3283                                idxs[i] = 0;
3284                                continue;
3285                            } else {
3286                                break;
3287                            }
3288                        }
3289                    }
3290                }
3291            }
3292            handles
3293        }
3294
3295        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3296        for handle in handles {
3297            handle.await;
3298        }
3299    }
3300
3301    fn smoketest_fn(
3302        name: &&str,
3303        call_name: String,
3304        op: &Operation<HirScalarExpr>,
3305        imp: &FuncImpl<HirScalarExpr>,
3306        args: Vec<Datum<'_>>,
3307        catalog: Arc<Catalog>,
3308        scalars: Vec<CoercibleScalarExpr>,
3309        return_styp: Option<SqlScalarType>,
3310    ) {
3311        let conn_catalog = catalog.for_system_session();
3312        let pcx = PlanContext::zero();
3313        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3314        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3315        let ecx = ExprContext {
3316            qcx: &qcx,
3317            name: "smoketest",
3318            scope: &Scope::empty(),
3319            relation_type: &SqlRelationType::empty(),
3320            allow_aggregates: false,
3321            allow_subqueries: false,
3322            allow_parameters: false,
3323            allow_windows: false,
3324        };
3325        let arena = RowArena::new();
3326        let mut session = Session::dummy();
3327        session
3328            .start_transaction(to_datetime(0), None, None)
3329            .expect("must succeed");
3330        let prep_style = ExprPrepOneShot {
3331            logical_time: EvalTime::Time(Timestamp::MIN),
3332            session: &session,
3333            catalog_state: &catalog.state,
3334        };
3335
3336        // Execute the function as much as possible, ensuring no panics occur, but
3337        // otherwise ignoring eval errors. We also do various other checks.
3338        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3339        if let Ok(hir) = res {
3340            let uneliminated_result_row = {
3341                if let HirScalarExpr::CallUnary { func, .. } = &hir
3342                    && func.is_eliminable_cast()
3343                {
3344                    let mut uneliminated_mir = hir
3345                        .clone()
3346                        .lower_uncorrelated(HirToMirConfig {
3347                            enable_cast_elimination: false,
3348                            ..catalog.system_config().into()
3349                        })
3350                        .expect("lowering eliminable cast should always succeed");
3351                    prep_style
3352                        .prep_scalar_expr(&mut uneliminated_mir)
3353                        .expect("must succeed");
3354
3355                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3356                    uneliminated_mir
3357                        .eval(&[], &arena)
3358                        .ok()
3359                        .map(|datum| Row::pack([datum]))
3360                } else {
3361                    None
3362                }
3363            };
3364
3365            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3366                // Populate unmaterialized functions.
3367                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3368
3369                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3370                    if let Some(return_styp) = return_styp {
3371                        let mir_typ = mir.typ(&[]);
3372                        // MIR type inference should be consistent with the type
3373                        // we get from the catalog.
3374                        soft_assert_eq_or_log!(
3375                            mir_typ.scalar_type,
3376                            (&return_styp).into(),
3377                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3378                        );
3379                        // The following will check not just that the scalar type
3380                        // is ok, but also catches if the function returned a null
3381                        // but the MIR type inference said "non-nullable".
3382                        if !eval_result_datum.is_instance_of(&mir_typ) {
3383                            panic!(
3384                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3385                            );
3386                        }
3387                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3388                        if let Some(row) = uneliminated_result_row {
3389                            let uneliminated_result_datum = row.unpack_first();
3390                            assert_eq!(
3391                                uneliminated_result_datum, eval_result_datum,
3392                                "datums should not change if cast is eliminable"
3393                            );
3394                        }
3395                        // Check the consistency of `introduces_nulls` and
3396                        // `propagates_nulls` with `MirScalarExpr::typ`.
3397                        if let Some((introduces_nulls, propagates_nulls)) =
3398                            call_introduces_propagates_nulls(&mir)
3399                        {
3400                            if introduces_nulls {
3401                                // If the function introduces_nulls, then the return
3402                                // type should always be nullable, regardless of
3403                                // the nullability of the input types.
3404                                assert!(
3405                                    mir_typ.nullable,
3406                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3407                                    name, args, mir, mir_typ.nullable
3408                                );
3409                            } else {
3410                                let any_input_null = args.iter().any(|arg| arg.is_null());
3411                                if !any_input_null {
3412                                    assert!(
3413                                        !mir_typ.nullable,
3414                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3415                                        name, args, mir, mir_typ.nullable
3416                                    );
3417                                } else if propagates_nulls {
3418                                    // propagates_nulls means the optimizer short-circuits
3419                                    // all-null inputs, so the output must be nullable.
3420                                    assert!(
3421                                        mir_typ.nullable,
3422                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3423                                        name, args, mir, mir_typ.nullable
3424                                    );
3425                                }
3426                                // When propagates_nulls is false, the output may still
3427                                // be nullable if a non-nullable parameter received a null
3428                                // input (per-position null rejection). The is_instance_of
3429                                // check above ensures type consistency.
3430                            }
3431                        }
3432                        // Check that `MirScalarExpr::reduce` yields the same result
3433                        // as the real evaluation.
3434                        let mut reduced = mir.clone();
3435                        reduced.reduce(&[]);
3436                        match reduced {
3437                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3438                                match reduce_result {
3439                                    Ok(reduce_result_row) => {
3440                                        let reduce_result_datum = reduce_result_row.unpack_first();
3441                                        assert_eq!(
3442                                            reduce_result_datum,
3443                                            eval_result_datum,
3444                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3445                                            name,
3446                                            args,
3447                                            mir,
3448                                            eval_result_datum,
3449                                            mir_typ.scalar_type,
3450                                            reduce_result_datum,
3451                                            ctyp.scalar_type
3452                                        );
3453                                        // Let's check that the types also match.
3454                                        // (We are not checking nullability here,
3455                                        // because it's ok when we know a more
3456                                        // precise nullability after actually
3457                                        // evaluating a function than before.)
3458                                        assert_eq!(
3459                                            ctyp.scalar_type,
3460                                            mir_typ.scalar_type,
3461                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3462                                            name,
3463                                            args,
3464                                            mir,
3465                                            eval_result_datum,
3466                                            mir_typ.scalar_type,
3467                                            reduce_result_datum,
3468                                            ctyp.scalar_type
3469                                        );
3470                                    }
3471                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3472                                }
3473                            }
3474                            _ => unreachable!(
3475                                "all args are literals, so should have reduced to a literal"
3476                            ),
3477                        }
3478                    }
3479                }
3480            }
3481        }
3482    }
3483
3484    /// If the given MirScalarExpr
3485    ///  - is a function call, and
3486    ///  - all arguments are literals
3487    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3488    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3489        match mir_func_call {
3490            MirScalarExpr::CallUnary { func, expr } => {
3491                if expr.is_literal() {
3492                    Some((func.introduces_nulls(), func.propagates_nulls()))
3493                } else {
3494                    None
3495                }
3496            }
3497            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3498                if expr1.is_literal() && expr2.is_literal() {
3499                    Some((func.introduces_nulls(), func.propagates_nulls()))
3500                } else {
3501                    None
3502                }
3503            }
3504            MirScalarExpr::CallVariadic { func, exprs } => {
3505                if exprs.iter().all(|arg| arg.is_literal()) {
3506                    Some((func.introduces_nulls(), func.propagates_nulls()))
3507                } else {
3508                    None
3509                }
3510            }
3511            _ => None,
3512        }
3513    }
3514
3515    // Make sure pg views don't use types that only exist in Materialize.
3516    #[mz_ore::test(tokio::test)]
3517    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3518    async fn test_pg_views_forbidden_types() {
3519        Catalog::with_debug(|catalog| async move {
3520            let conn_catalog = catalog.for_system_session();
3521
3522            for view in BUILTINS::views().filter(|view| {
3523                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3524            }) {
3525                let item = conn_catalog
3526                    .resolve_item(&PartialItemName {
3527                        database: None,
3528                        schema: Some(view.schema.to_string()),
3529                        item: view.name.to_string(),
3530                    })
3531                    .expect("unable to resolve view")
3532                    // TODO(alter_table)
3533                    .at_version(RelationVersionSelector::Latest);
3534                let full_name = conn_catalog.resolve_full_name(item.name());
3535                let desc = item.relation_desc().expect("invalid item type");
3536                for col_type in desc.iter_types() {
3537                    match &col_type.scalar_type {
3538                        typ @ SqlScalarType::UInt16
3539                        | typ @ SqlScalarType::UInt32
3540                        | typ @ SqlScalarType::UInt64
3541                        | typ @ SqlScalarType::MzTimestamp
3542                        | typ @ SqlScalarType::List { .. }
3543                        | typ @ SqlScalarType::Map { .. }
3544                        | typ @ SqlScalarType::MzAclItem => {
3545                            panic!("{typ:?} type found in {full_name}");
3546                        }
3547                        SqlScalarType::AclItem
3548                        | SqlScalarType::Bool
3549                        | SqlScalarType::Int16
3550                        | SqlScalarType::Int32
3551                        | SqlScalarType::Int64
3552                        | SqlScalarType::Float32
3553                        | SqlScalarType::Float64
3554                        | SqlScalarType::Numeric { .. }
3555                        | SqlScalarType::Date
3556                        | SqlScalarType::Time
3557                        | SqlScalarType::Timestamp { .. }
3558                        | SqlScalarType::TimestampTz { .. }
3559                        | SqlScalarType::Interval
3560                        | SqlScalarType::PgLegacyChar
3561                        | SqlScalarType::Bytes
3562                        | SqlScalarType::String
3563                        | SqlScalarType::Char { .. }
3564                        | SqlScalarType::VarChar { .. }
3565                        | SqlScalarType::Jsonb
3566                        | SqlScalarType::Uuid
3567                        | SqlScalarType::Array(_)
3568                        | SqlScalarType::Record { .. }
3569                        | SqlScalarType::Oid
3570                        | SqlScalarType::RegProc
3571                        | SqlScalarType::RegType
3572                        | SqlScalarType::RegClass
3573                        | SqlScalarType::Int2Vector
3574                        | SqlScalarType::Range { .. }
3575                        | SqlScalarType::PgLegacyName => {}
3576                    }
3577                }
3578            }
3579            catalog.expire().await;
3580        })
3581        .await
3582    }
3583
3584    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3585    // introspection relations.
3586    #[mz_ore::test(tokio::test)]
3587    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3588    async fn test_mz_introspection_builtins() {
3589        Catalog::with_debug(|catalog| async move {
3590            let conn_catalog = catalog.for_system_session();
3591
3592            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3593            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3594
3595            for entry in catalog.entries() {
3596                let schema_spec = entry.name().qualifiers.schema_spec;
3597                let introspection_deps = catalog.introspection_dependencies(entry.id);
3598                if introspection_deps.is_empty() {
3599                    assert!(
3600                        schema_spec != introspection_schema_spec,
3601                        "entry does not depend on introspection sources but is in \
3602                         `mz_introspection`: {}",
3603                        conn_catalog.resolve_full_name(entry.name()),
3604                    );
3605                } else {
3606                    assert!(
3607                        schema_spec == introspection_schema_spec,
3608                        "entry depends on introspection sources but is not in \
3609                         `mz_introspection`: {}",
3610                        conn_catalog.resolve_full_name(entry.name()),
3611                    );
3612                }
3613            }
3614        })
3615        .await
3616    }
3617
3618    #[mz_ore::test(tokio::test)]
3619    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3620    async fn test_multi_subscriber_catalog() {
3621        let persist_client = PersistClient::new_for_tests().await;
3622        let bootstrap_args = test_bootstrap_args();
3623        let organization_id = Uuid::new_v4();
3624        let db_name = "DB";
3625
3626        let mut writer_catalog = Catalog::open_debug_catalog(
3627            persist_client.clone(),
3628            organization_id.clone(),
3629            &bootstrap_args,
3630        )
3631        .await
3632        .expect("open_debug_catalog");
3633        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3634            persist_client.clone(),
3635            organization_id.clone(),
3636            &bootstrap_args,
3637        )
3638        .await
3639        .expect("open_debug_read_only_catalog");
3640        assert_err!(writer_catalog.resolve_database(db_name));
3641        assert_err!(read_only_catalog.resolve_database(db_name));
3642
3643        let commit_ts = writer_catalog.current_upper().await;
3644        writer_catalog
3645            .transact(
3646                None,
3647                commit_ts,
3648                None,
3649                vec![Op::CreateDatabase {
3650                    name: db_name.to_string(),
3651                    owner_id: MZ_SYSTEM_ROLE_ID,
3652                }],
3653            )
3654            .await
3655            .expect("failed to transact");
3656
3657        let write_db = writer_catalog
3658            .resolve_database(db_name)
3659            .expect("resolve_database");
3660        read_only_catalog
3661            .sync_to_current_updates()
3662            .await
3663            .expect("sync_to_current_updates");
3664        let read_db = read_only_catalog
3665            .resolve_database(db_name)
3666            .expect("resolve_database");
3667
3668        assert_eq!(write_db, read_db);
3669
3670        let writer_catalog_fencer =
3671            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3672                .await
3673                .expect("open_debug_catalog for fencer");
3674        let fencer_db = writer_catalog_fencer
3675            .resolve_database(db_name)
3676            .expect("resolve_database for fencer");
3677        assert_eq!(fencer_db, read_db);
3678
3679        let write_fence_err = writer_catalog
3680            .sync_to_current_updates()
3681            .await
3682            .expect_err("sync_to_current_updates for fencer");
3683        assert!(matches!(
3684            write_fence_err,
3685            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3686        ));
3687        let read_fence_err = read_only_catalog
3688            .sync_to_current_updates()
3689            .await
3690            .expect_err("sync_to_current_updates after fencer");
3691        assert!(matches!(
3692            read_fence_err,
3693            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3694        ));
3695
3696        writer_catalog.expire().await;
3697        read_only_catalog.expire().await;
3698        writer_catalog_fencer.expire().await;
3699    }
3700}