Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_persist_client::PersistClient;
55use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
56use mz_repr::explain::ExprHumanizer;
57use mz_repr::namespaces::MZ_TEMP_SCHEMA;
58use mz_repr::network_policy_id::NetworkPolicyId;
59use mz_repr::optimize::OptimizerFeatures;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
66    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
67    SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_transform::dataflow::DataflowMetainfo;
83use mz_transform::notice::OptimizerNotice;
84use tokio::sync::MutexGuard;
85use tokio::sync::mpsc::UnboundedSender;
86use uuid::Uuid;
87
88// DO NOT add any more imports from `crate` outside of `crate::catalog`.
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
90pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
91pub use crate::catalog::state::CatalogState;
92pub use crate::catalog::transact::{
93    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
94};
95use crate::command::CatalogDump;
96use crate::coord::TargetCluster;
97#[cfg(test)]
98use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
99use crate::session::{Portal, PreparedStatement, Session};
100use crate::util::ResultExt;
101use crate::{AdapterError, AdapterNotice, ExecuteResponse};
102
103mod builtin_table_updates;
104pub(crate) mod consistency;
105mod migrate;
106
107mod apply;
108mod open;
109mod state;
110mod timeline;
111mod transact;
112
113/// A `Catalog` keeps track of the SQL objects known to the planner.
114///
115/// For each object, it keeps track of both forward and reverse dependencies:
116/// i.e., which objects are depended upon by the object, and which objects
117/// depend upon the object. It enforces the SQL rules around dropping: an object
118/// cannot be dropped until all of the objects that depend upon it are dropped.
119/// It also enforces uniqueness of names.
120///
121/// SQL mandates a hierarchy of exactly three layers. A catalog contains
122/// databases, databases contain schemas, and schemas contain catalog items,
123/// like sources, sinks, view, and indexes.
124///
125/// To the outside world, databases, schemas, and items are all identified by
126/// name. Items can be referred to by their [`FullItemName`], which fully and
127/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
128/// database name and/or the schema name. Partial names can be converted into
129/// full names via a complicated resolution process documented by the
130/// [`CatalogState::resolve`] method.
131///
132/// The catalog also maintains special "ambient schemas": virtual schemas,
133/// implicitly present in all databases, that house various system views.
134/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
135#[derive(Debug)]
136pub struct Catalog {
137    state: CatalogState,
138    expr_cache_handle: Option<ExpressionCacheHandle>,
139    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
140    transient_revision: u64,
141}
142
143// Implement our own Clone because derive can't unless S is Clone, which it's
144// not (hence the Arc).
145impl Clone for Catalog {
146    fn clone(&self) -> Self {
147        Self {
148            state: self.state.clone(),
149            expr_cache_handle: self.expr_cache_handle.clone(),
150            storage: Arc::clone(&self.storage),
151            transient_revision: self.transient_revision,
152        }
153    }
154}
155
156impl Catalog {
157    /// Set the optimized plan for the item identified by `id`.
158    ///
159    /// # Panics
160    /// If the item is not an `Index`, `MaterializedView`, or
161    /// `ContinualTask`.
162    #[mz_ore::instrument(level = "trace")]
163    pub fn set_optimized_plan(
164        &mut self,
165        id: GlobalId,
166        plan: DataflowDescription<OptimizedMirRelationExpr>,
167    ) {
168        self.state.set_optimized_plan(id, plan);
169    }
170
171    /// Set the physical plan for the item identified by `id`.
172    ///
173    /// # Panics
174    /// If the item is not an `Index`, `MaterializedView`, or
175    /// `ContinualTask`.
176    #[mz_ore::instrument(level = "trace")]
177    pub fn set_physical_plan(
178        &mut self,
179        id: GlobalId,
180        plan: DataflowDescription<mz_compute_types::plan::Plan>,
181    ) {
182        self.state.set_physical_plan(id, plan);
183    }
184
185    /// Try to get the optimized plan for the item identified by `id`.
186    #[mz_ore::instrument(level = "trace")]
187    pub fn try_get_optimized_plan(
188        &self,
189        id: &GlobalId,
190    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
191        let entry = self.state.try_get_entry_by_global_id(id)?;
192        entry.item().optimized_plan().map(AsRef::as_ref)
193    }
194
195    /// Try to get the physical plan for the item identified by `id`.
196    #[mz_ore::instrument(level = "trace")]
197    pub fn try_get_physical_plan(
198        &self,
199        id: &GlobalId,
200    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
201        let entry = self.state.try_get_entry_by_global_id(id)?;
202        entry.item().physical_plan().map(AsRef::as_ref)
203    }
204
205    /// Set the `DataflowMetainfo` for the item identified by `id`.
206    ///
207    /// # Panics
208    /// If the item is not an `Index`, `MaterializedView`, or
209    /// `ContinualTask`.
210    #[mz_ore::instrument(level = "trace")]
211    pub fn set_dataflow_metainfo(
212        &mut self,
213        id: GlobalId,
214        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
215    ) {
216        self.state.set_dataflow_metainfo(id, metainfo);
217    }
218
219    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
220    #[mz_ore::instrument(level = "trace")]
221    pub fn try_get_dataflow_metainfo(
222        &self,
223        id: &GlobalId,
224    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
225        let entry = self.state.try_get_entry_by_global_id(id)?;
226        entry.item().dataflow_metainfo()
227    }
228}
229
230#[derive(Debug)]
231pub struct ConnCatalog<'a> {
232    state: Cow<'a, CatalogState>,
233    /// Because we don't have any way of removing items from the catalog
234    /// temporarily, we allow the ConnCatalog to pretend that a set of items
235    /// don't exist during resolution.
236    ///
237    /// This feature is necessary to allow re-planning of statements, which is
238    /// either incredibly useful or required when altering item definitions.
239    ///
240    /// Note that uses of this field should be used by short-lived
241    /// catalogs.
242    unresolvable_ids: BTreeSet<CatalogItemId>,
243    conn_id: ConnectionId,
244    cluster: String,
245    database: Option<DatabaseId>,
246    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
247    role_id: RoleId,
248    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
249    portals: Option<&'a BTreeMap<String, Portal>>,
250    notices_tx: UnboundedSender<AdapterNotice>,
251}
252
253impl ConnCatalog<'_> {
254    pub fn conn_id(&self) -> &ConnectionId {
255        &self.conn_id
256    }
257
258    pub fn state(&self) -> &CatalogState {
259        &*self.state
260    }
261
262    /// Prevent planning from resolving item with the provided ID. Instead,
263    /// return an error as if the item did not exist.
264    ///
265    /// This feature is meant exclusively to permit re-planning statements
266    /// during update operations and should not be used otherwise given its
267    /// extremely "powerful" semantics.
268    ///
269    /// # Panics
270    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
271    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
272        assert_eq!(
273            self.role_id, MZ_SYSTEM_ROLE_ID,
274            "only the system role can mark IDs unresolvable",
275        );
276        self.unresolvable_ids.insert(id);
277    }
278
279    /// Returns the schemas:
280    /// - mz_catalog
281    /// - pg_catalog
282    /// - temp (if requested)
283    /// - all schemas from the session's search_path var that exist
284    pub fn effective_search_path(
285        &self,
286        include_temp_schema: bool,
287    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
288        self.state
289            .effective_search_path(&self.search_path, include_temp_schema)
290    }
291}
292
293impl ConnectionResolver for ConnCatalog<'_> {
294    fn resolve_connection(
295        &self,
296        id: CatalogItemId,
297    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
298        self.state().resolve_connection(id)
299    }
300}
301
302impl Catalog {
303    /// Returns the catalog's transient revision, which starts at 1 and is
304    /// incremented on every change. This is not persisted to disk, and will
305    /// restart on every load.
306    pub fn transient_revision(&self) -> u64 {
307        self.transient_revision
308    }
309
310    /// Creates a debug catalog from the current
311    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
312    /// like in tests.
313    ///
314    /// WARNING! This function can arbitrarily fail because it does not make any
315    /// effort to adjust the catalog's contents' structure or semantics to the
316    /// currently running version, i.e. it does not apply any migrations.
317    ///
318    /// This function must not be called in production contexts. Use
319    /// [`Catalog::open`] with appropriately set configuration parameters
320    /// instead.
321    pub async fn with_debug<F, Fut, T>(f: F) -> T
322    where
323        F: FnOnce(Catalog) -> Fut,
324        Fut: Future<Output = T>,
325    {
326        let persist_client = PersistClient::new_for_tests().await;
327        let organization_id = Uuid::new_v4();
328        let bootstrap_args = test_bootstrap_args();
329        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
330            .await
331            .expect("can open debug catalog");
332        f(catalog).await
333    }
334
335    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
336    /// in progress.
337    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
338    where
339        F: FnOnce(Catalog) -> Fut,
340        Fut: Future<Output = T>,
341    {
342        let persist_client = PersistClient::new_for_tests().await;
343        let organization_id = Uuid::new_v4();
344        let bootstrap_args = test_bootstrap_args();
345        let mut catalog =
346            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
347                .await
348                .expect("can open debug catalog");
349
350        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
351        let now = SYSTEM_TIME.clone();
352        let openable_storage = TestCatalogStateBuilder::new(persist_client)
353            .with_organization_id(organization_id)
354            .with_default_deploy_generation()
355            .build()
356            .await
357            .expect("can create durable catalog");
358        let mut storage = openable_storage
359            .open(now().into(), &bootstrap_args)
360            .await
361            .expect("can open durable catalog")
362            .0;
363        // Drain updates.
364        let _ = storage
365            .sync_to_current_updates()
366            .await
367            .expect("can sync to current updates");
368        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
369
370        f(catalog).await
371    }
372
373    /// Opens a debug catalog.
374    ///
375    /// See [`Catalog::with_debug`].
376    pub async fn open_debug_catalog(
377        persist_client: PersistClient,
378        organization_id: Uuid,
379        bootstrap_args: &BootstrapArgs,
380    ) -> Result<Catalog, anyhow::Error> {
381        let now = SYSTEM_TIME.clone();
382        let environment_id = None;
383        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
384            .with_organization_id(organization_id)
385            .with_default_deploy_generation()
386            .build()
387            .await?;
388        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
389        let system_parameter_defaults = BTreeMap::default();
390        Self::open_debug_catalog_inner(
391            persist_client,
392            storage,
393            now,
394            environment_id,
395            &DUMMY_BUILD_INFO,
396            system_parameter_defaults,
397            bootstrap_args,
398            None,
399        )
400        .await
401    }
402
403    /// Opens a read only debug persist backed catalog defined by `persist_client` and
404    /// `organization_id`.
405    ///
406    /// See [`Catalog::with_debug`].
407    pub async fn open_debug_read_only_catalog(
408        persist_client: PersistClient,
409        organization_id: Uuid,
410        bootstrap_args: &BootstrapArgs,
411    ) -> Result<Catalog, anyhow::Error> {
412        let now = SYSTEM_TIME.clone();
413        let environment_id = None;
414        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
415            .with_organization_id(organization_id)
416            .build()
417            .await?;
418        let storage = openable_storage
419            .open_read_only(&test_bootstrap_args())
420            .await?;
421        let system_parameter_defaults = BTreeMap::default();
422        Self::open_debug_catalog_inner(
423            persist_client,
424            storage,
425            now,
426            environment_id,
427            &DUMMY_BUILD_INFO,
428            system_parameter_defaults,
429            bootstrap_args,
430            None,
431        )
432        .await
433    }
434
435    /// Opens a read only debug persist backed catalog defined by `persist_client` and
436    /// `organization_id`.
437    ///
438    /// See [`Catalog::with_debug`].
439    pub async fn open_debug_read_only_persist_catalog_config(
440        persist_client: PersistClient,
441        now: NowFn,
442        environment_id: EnvironmentId,
443        system_parameter_defaults: BTreeMap<String, String>,
444        build_info: &'static BuildInfo,
445        bootstrap_args: &BootstrapArgs,
446        enable_expression_cache_override: Option<bool>,
447    ) -> Result<Catalog, anyhow::Error> {
448        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
449            .with_organization_id(environment_id.organization_id())
450            .with_version(
451                build_info
452                    .version
453                    .parse()
454                    .expect("build version is parseable"),
455            )
456            .build()
457            .await?;
458        let storage = openable_storage.open_read_only(bootstrap_args).await?;
459        Self::open_debug_catalog_inner(
460            persist_client,
461            storage,
462            now,
463            Some(environment_id),
464            build_info,
465            system_parameter_defaults,
466            bootstrap_args,
467            enable_expression_cache_override,
468        )
469        .await
470    }
471
472    async fn open_debug_catalog_inner(
473        persist_client: PersistClient,
474        storage: Box<dyn DurableCatalogState>,
475        now: NowFn,
476        environment_id: Option<EnvironmentId>,
477        build_info: &'static BuildInfo,
478        system_parameter_defaults: BTreeMap<String, String>,
479        bootstrap_args: &BootstrapArgs,
480        enable_expression_cache_override: Option<bool>,
481    ) -> Result<Catalog, anyhow::Error> {
482        let metrics_registry = &MetricsRegistry::new();
483        let secrets_reader = Arc::new(InMemorySecretsController::new());
484        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
485        // debugging/testing.
486        let previous_ts = now().into();
487        let replica_size = &bootstrap_args.default_cluster_replica_size;
488        let read_only = false;
489
490        let OpenCatalogResult {
491            catalog,
492            migrated_storage_collections_0dt: _,
493            new_builtin_collections: _,
494            builtin_table_updates: _,
495            cached_global_exprs: _,
496            uncached_local_exprs: _,
497        } = Catalog::open(Config {
498            storage,
499            metrics_registry,
500            state: StateConfig {
501                unsafe_mode: true,
502                all_features: false,
503                build_info,
504                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
505                read_only,
506                now,
507                boot_ts: previous_ts,
508                skip_migrations: true,
509                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
510                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
511                    size: replica_size.clone(),
512                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
513                },
514                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
515                    size: replica_size.clone(),
516                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
517                },
518                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
519                    size: replica_size.clone(),
520                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
521                },
522                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
523                    size: replica_size.clone(),
524                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
525                },
526                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
527                    size: replica_size.clone(),
528                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
529                },
530                system_parameter_defaults,
531                remote_system_parameters: None,
532                availability_zones: vec![],
533                egress_addresses: vec![],
534                aws_principal_context: None,
535                aws_privatelink_availability_zones: None,
536                http_host_name: None,
537                connection_context: ConnectionContext::for_tests(secrets_reader),
538                builtin_item_migration_config: BuiltinItemMigrationConfig {
539                    persist_client: persist_client.clone(),
540                    read_only,
541                    force_migration: None,
542                },
543                persist_client,
544                enable_expression_cache_override,
545                helm_chart_version: None,
546                external_login_password_mz_system: None,
547                license_key: ValidatedLicenseKey::for_tests(),
548            },
549        })
550        .await?;
551        Ok(catalog)
552    }
553
554    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
555        self.state.for_session(session)
556    }
557
558    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
559        self.state.for_sessionless_user(role_id)
560    }
561
562    pub fn for_system_session(&self) -> ConnCatalog<'_> {
563        self.state.for_system_session()
564    }
565
566    async fn storage<'a>(
567        &'a self,
568    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
569        self.storage.lock().await
570    }
571
572    pub async fn current_upper(&self) -> mz_repr::Timestamp {
573        self.storage().await.current_upper().await
574    }
575
576    pub async fn allocate_user_id(
577        &self,
578        commit_ts: mz_repr::Timestamp,
579    ) -> Result<(CatalogItemId, GlobalId), Error> {
580        self.storage()
581            .await
582            .allocate_user_id(commit_ts)
583            .await
584            .maybe_terminate("allocating user ids")
585            .err_into()
586    }
587
588    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
589    pub async fn allocate_user_ids(
590        &self,
591        amount: u64,
592        commit_ts: mz_repr::Timestamp,
593    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
594        self.storage()
595            .await
596            .allocate_user_ids(amount, commit_ts)
597            .await
598            .maybe_terminate("allocating user ids")
599            .err_into()
600    }
601
602    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
603        let commit_ts = self.storage().await.current_upper().await;
604        self.allocate_user_id(commit_ts).await
605    }
606
607    /// Get the next user item ID without allocating it.
608    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
609        self.storage()
610            .await
611            .get_next_user_item_id()
612            .await
613            .err_into()
614    }
615
616    #[cfg(test)]
617    pub async fn allocate_system_id(
618        &self,
619        commit_ts: mz_repr::Timestamp,
620    ) -> Result<(CatalogItemId, GlobalId), Error> {
621        use mz_ore::collections::CollectionExt;
622
623        let mut storage = self.storage().await;
624        let mut txn = storage.transaction().await?;
625        let id = txn
626            .allocate_system_item_ids(1)
627            .maybe_terminate("allocating system ids")?
628            .into_element();
629        // Drain transaction.
630        let _ = txn.get_and_commit_op_updates();
631        txn.commit(commit_ts).await?;
632        Ok(id)
633    }
634
635    /// Get the next system item ID without allocating it.
636    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
637        self.storage()
638            .await
639            .get_next_system_item_id()
640            .await
641            .err_into()
642    }
643
644    pub async fn allocate_user_cluster_id(
645        &self,
646        commit_ts: mz_repr::Timestamp,
647    ) -> Result<ClusterId, Error> {
648        self.storage()
649            .await
650            .allocate_user_cluster_id(commit_ts)
651            .await
652            .maybe_terminate("allocating user cluster ids")
653            .err_into()
654    }
655
656    /// Get the next system replica id without allocating it.
657    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
658        self.storage()
659            .await
660            .get_next_system_replica_id()
661            .await
662            .err_into()
663    }
664
665    /// Get the next user replica id without allocating it.
666    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
667        self.storage()
668            .await
669            .get_next_user_replica_id()
670            .await
671            .err_into()
672    }
673
674    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
675        self.state.resolve_database(database_name)
676    }
677
678    pub fn resolve_schema(
679        &self,
680        current_database: Option<&DatabaseId>,
681        database_name: Option<&str>,
682        schema_name: &str,
683        conn_id: &ConnectionId,
684    ) -> Result<&Schema, SqlCatalogError> {
685        self.state
686            .resolve_schema(current_database, database_name, schema_name, conn_id)
687    }
688
689    pub fn resolve_schema_in_database(
690        &self,
691        database_spec: &ResolvedDatabaseSpecifier,
692        schema_name: &str,
693        conn_id: &ConnectionId,
694    ) -> Result<&Schema, SqlCatalogError> {
695        self.state
696            .resolve_schema_in_database(database_spec, schema_name, conn_id)
697    }
698
699    pub fn resolve_replica_in_cluster(
700        &self,
701        cluster_id: &ClusterId,
702        replica_name: &str,
703    ) -> Result<&ClusterReplica, SqlCatalogError> {
704        self.state
705            .resolve_replica_in_cluster(cluster_id, replica_name)
706    }
707
708    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
709        self.state.resolve_system_schema(name)
710    }
711
712    pub fn resolve_search_path(
713        &self,
714        session: &Session,
715    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
716        self.state.resolve_search_path(session)
717    }
718
719    /// Resolves `name` to a non-function [`CatalogEntry`].
720    pub fn resolve_entry(
721        &self,
722        current_database: Option<&DatabaseId>,
723        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
724        name: &PartialItemName,
725        conn_id: &ConnectionId,
726    ) -> Result<&CatalogEntry, SqlCatalogError> {
727        self.state
728            .resolve_entry(current_database, search_path, name, conn_id)
729    }
730
731    /// Resolves a `BuiltinTable`.
732    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
733        self.state.resolve_builtin_table(builtin)
734    }
735
736    /// Resolves a `BuiltinLog`.
737    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
738        self.state.resolve_builtin_log(builtin).0
739    }
740
741    /// Resolves a `BuiltinSource`.
742    pub fn resolve_builtin_storage_collection(
743        &self,
744        builtin: &'static BuiltinSource,
745    ) -> CatalogItemId {
746        self.state.resolve_builtin_source(builtin)
747    }
748
749    /// Resolves `name` to a function [`CatalogEntry`].
750    pub fn resolve_function(
751        &self,
752        current_database: Option<&DatabaseId>,
753        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
754        name: &PartialItemName,
755        conn_id: &ConnectionId,
756    ) -> Result<&CatalogEntry, SqlCatalogError> {
757        self.state
758            .resolve_function(current_database, search_path, name, conn_id)
759    }
760
761    /// Resolves `name` to a type [`CatalogEntry`].
762    pub fn resolve_type(
763        &self,
764        current_database: Option<&DatabaseId>,
765        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
766        name: &PartialItemName,
767        conn_id: &ConnectionId,
768    ) -> Result<&CatalogEntry, SqlCatalogError> {
769        self.state
770            .resolve_type(current_database, search_path, name, conn_id)
771    }
772
773    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
774        self.state.resolve_cluster(name)
775    }
776
777    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
778    ///
779    /// # Panics
780    /// * If the [`BuiltinCluster`] doesn't exist.
781    ///
782    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
783        self.state.resolve_builtin_cluster(cluster)
784    }
785
786    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
787        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
788    }
789
790    /// Resolves a [`Cluster`] for a TargetCluster.
791    pub fn resolve_target_cluster(
792        &self,
793        target_cluster: TargetCluster,
794        session: &Session,
795    ) -> Result<&Cluster, AdapterError> {
796        match target_cluster {
797            TargetCluster::CatalogServer => {
798                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
799            }
800            TargetCluster::Active => self.active_cluster(session),
801            TargetCluster::Transaction(cluster_id) => self
802                .try_get_cluster(cluster_id)
803                .ok_or(AdapterError::ConcurrentClusterDrop),
804        }
805    }
806
807    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
808        // TODO(benesch): this check here is not sufficiently protective. It'd
809        // be very easy for a code path to accidentally avoid this check by
810        // calling `resolve_cluster(session.vars().cluster())`.
811        if session.user().name != SYSTEM_USER.name
812            && session.user().name != SUPPORT_USER.name
813            && session.vars().cluster() == SYSTEM_USER.name
814        {
815            coord_bail!(
816                "system cluster '{}' cannot execute user queries",
817                SYSTEM_USER.name
818            );
819        }
820        let cluster = self.resolve_cluster(session.vars().cluster())?;
821        Ok(cluster)
822    }
823
824    pub fn state(&self) -> &CatalogState {
825        &self.state
826    }
827
828    pub fn resolve_full_name(
829        &self,
830        name: &QualifiedItemName,
831        conn_id: Option<&ConnectionId>,
832    ) -> FullItemName {
833        self.state.resolve_full_name(name, conn_id)
834    }
835
836    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
837        self.state.try_get_entry(id)
838    }
839
840    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
841        self.state.try_get_entry_by_global_id(id)
842    }
843
844    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
845        self.state.get_entry(id)
846    }
847
848    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
849        self.state.get_entry_by_global_id(id)
850    }
851
852    pub fn get_global_ids<'a>(
853        &'a self,
854        id: &CatalogItemId,
855    ) -> impl Iterator<Item = GlobalId> + use<'a> {
856        self.get_entry(id).global_ids()
857    }
858
859    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
860        self.get_entry_by_global_id(id).id()
861    }
862
863    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
864        let item = self.try_get_entry_by_global_id(id)?;
865        Some(item.id())
866    }
867
868    pub fn get_schema(
869        &self,
870        database_spec: &ResolvedDatabaseSpecifier,
871        schema_spec: &SchemaSpecifier,
872        conn_id: &ConnectionId,
873    ) -> &Schema {
874        self.state.get_schema(database_spec, schema_spec, conn_id)
875    }
876
877    pub fn try_get_schema(
878        &self,
879        database_spec: &ResolvedDatabaseSpecifier,
880        schema_spec: &SchemaSpecifier,
881        conn_id: &ConnectionId,
882    ) -> Option<&Schema> {
883        self.state
884            .try_get_schema(database_spec, schema_spec, conn_id)
885    }
886
887    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
888        self.state.get_mz_catalog_schema_id()
889    }
890
891    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
892        self.state.get_pg_catalog_schema_id()
893    }
894
895    pub fn get_information_schema_id(&self) -> SchemaId {
896        self.state.get_information_schema_id()
897    }
898
899    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
900        self.state.get_mz_internal_schema_id()
901    }
902
903    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
904        self.state.get_mz_introspection_schema_id()
905    }
906
907    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
908        self.state.get_mz_unsafe_schema_id()
909    }
910
911    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
912        self.state.system_schema_ids()
913    }
914
915    pub fn get_database(&self, id: &DatabaseId) -> &Database {
916        self.state.get_database(id)
917    }
918
919    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
920        self.state.try_get_role(id)
921    }
922
923    pub fn get_role(&self, id: &RoleId) -> &Role {
924        self.state.get_role(id)
925    }
926
927    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
928        self.state.try_get_role_by_name(role_name)
929    }
930
931    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
932        self.state.try_get_role_auth_by_id(id)
933    }
934
935    /// Creates a new schema in the `Catalog` for temporary items
936    /// indicated by the TEMPORARY or TEMP keywords.
937    pub fn create_temporary_schema(
938        &mut self,
939        conn_id: &ConnectionId,
940        owner_id: RoleId,
941    ) -> Result<(), Error> {
942        self.state.create_temporary_schema(conn_id, owner_id)
943    }
944
945    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
946        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
947        self.state
948            .temporary_schemas
949            .get(conn_id)
950            .map(|schema| schema.items.contains_key(item_name))
951            .unwrap_or(false)
952    }
953
954    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
955    /// Returns Ok if conn_id's temp schema does not exist.
956    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
957        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
958            return Ok(());
959        };
960        if !schema.items.is_empty() {
961            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
962        }
963        Ok(())
964    }
965
966    pub(crate) fn object_dependents(
967        &self,
968        object_ids: &Vec<ObjectId>,
969        conn_id: &ConnectionId,
970    ) -> Vec<ObjectId> {
971        let mut seen = BTreeSet::new();
972        self.state.object_dependents(object_ids, conn_id, &mut seen)
973    }
974
975    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
976        FullNameV1 {
977            database: name.database.to_string(),
978            schema: name.schema.clone(),
979            item: name.item.clone(),
980        }
981    }
982
983    pub fn find_available_cluster_name(&self, name: &str) -> String {
984        let mut i = 0;
985        let mut candidate = name.to_string();
986        while self.state.clusters_by_name.contains_key(&candidate) {
987            i += 1;
988            candidate = format!("{}{}", name, i);
989        }
990        candidate
991    }
992
993    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
994        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
995            self.cluster_replica_sizes()
996                .enabled_allocations()
997                .map(|a| a.0.to_owned())
998                .collect::<Vec<_>>()
999        } else {
1000            self.system_config().allowed_cluster_replica_sizes()
1001        }
1002    }
1003
1004    pub fn concretize_replica_location(
1005        &self,
1006        location: mz_catalog::durable::ReplicaLocation,
1007        allowed_sizes: &Vec<String>,
1008        allowed_availability_zones: Option<&[String]>,
1009    ) -> Result<ReplicaLocation, Error> {
1010        self.state
1011            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1012    }
1013
1014    pub(crate) fn ensure_valid_replica_size(
1015        &self,
1016        allowed_sizes: &[String],
1017        size: &String,
1018    ) -> Result<(), Error> {
1019        self.state.ensure_valid_replica_size(allowed_sizes, size)
1020    }
1021
1022    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1023        &self.state.cluster_replica_sizes
1024    }
1025
1026    /// Returns the privileges of an object by its ID.
1027    pub fn get_privileges(
1028        &self,
1029        id: &SystemObjectId,
1030        conn_id: &ConnectionId,
1031    ) -> Option<&PrivilegeMap> {
1032        match id {
1033            SystemObjectId::Object(id) => match id {
1034                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1035                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1036                ObjectId::Schema((database_spec, schema_spec)) => Some(
1037                    self.get_schema(database_spec, schema_spec, conn_id)
1038                        .privileges(),
1039                ),
1040                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1041                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1042                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1043            },
1044            SystemObjectId::System => Some(&self.state.system_privileges),
1045        }
1046    }
1047
1048    #[mz_ore::instrument(level = "debug")]
1049    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1050        Ok(self.storage().await.advance_upper(new_upper).await?)
1051    }
1052
1053    /// Return the ids of all log sources the given object depends on.
1054    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1055        self.state.introspection_dependencies(id)
1056    }
1057
1058    /// Serializes the catalog's in-memory state.
1059    ///
1060    /// There are no guarantees about the format of the serialized state, except
1061    /// that the serialized state for two identical catalogs will compare
1062    /// identically.
1063    pub fn dump(&self) -> Result<CatalogDump, Error> {
1064        Ok(CatalogDump::new(self.state.dump(None)?))
1065    }
1066
1067    /// Checks the [`Catalog`]s internal consistency.
1068    ///
1069    /// Returns a JSON object describing the inconsistencies, if there are any.
1070    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1071        self.state.check_consistency().map_err(|inconsistencies| {
1072            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1073                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1074            })
1075        })
1076    }
1077
1078    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1079        self.state.config()
1080    }
1081
1082    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1083        self.state.entry_by_id.values()
1084    }
1085
1086    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1087        self.entries()
1088            .filter(|entry| entry.is_connection() && entry.id().is_user())
1089    }
1090
1091    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1092        self.entries()
1093            .filter(|entry| entry.is_table() && entry.id().is_user())
1094    }
1095
1096    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1097        self.entries()
1098            .filter(|entry| entry.is_source() && entry.id().is_user())
1099    }
1100
1101    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1102        self.entries()
1103            .filter(|entry| entry.is_sink() && entry.id().is_user())
1104    }
1105
1106    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1107        self.entries()
1108            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1109    }
1110
1111    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1112        self.entries()
1113            .filter(|entry| entry.is_secret() && entry.id().is_user())
1114    }
1115
1116    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1117        self.state.get_network_policy(&network_policy_id)
1118    }
1119
1120    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1121        self.state.try_get_network_policy_by_name(name)
1122    }
1123
1124    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1125        self.state.clusters_by_id.values()
1126    }
1127
1128    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1129        self.state.get_cluster(cluster_id)
1130    }
1131
1132    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1133        self.state.try_get_cluster(cluster_id)
1134    }
1135
1136    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1137        self.clusters().filter(|cluster| cluster.id.is_user())
1138    }
1139
1140    pub fn get_cluster_replica(
1141        &self,
1142        cluster_id: ClusterId,
1143        replica_id: ReplicaId,
1144    ) -> &ClusterReplica {
1145        self.state.get_cluster_replica(cluster_id, replica_id)
1146    }
1147
1148    pub fn try_get_cluster_replica(
1149        &self,
1150        cluster_id: ClusterId,
1151        replica_id: ReplicaId,
1152    ) -> Option<&ClusterReplica> {
1153        self.state.try_get_cluster_replica(cluster_id, replica_id)
1154    }
1155
1156    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1157        self.user_clusters()
1158            .flat_map(|cluster| cluster.user_replicas())
1159    }
1160
1161    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1162        self.state.database_by_id.values()
1163    }
1164
1165    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1166        self.state
1167            .roles_by_id
1168            .values()
1169            .filter(|role| role.is_user())
1170    }
1171
1172    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1173        self.state
1174            .network_policies_by_id
1175            .iter()
1176            .filter(|(id, _)| id.is_user())
1177            .map(|(_, policy)| policy)
1178    }
1179
1180    pub fn system_privileges(&self) -> &PrivilegeMap {
1181        &self.state.system_privileges
1182    }
1183
1184    pub fn default_privileges(
1185        &self,
1186    ) -> impl Iterator<
1187        Item = (
1188            &DefaultPrivilegeObject,
1189            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1190        ),
1191    > {
1192        self.state.default_privileges.iter()
1193    }
1194
1195    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1196        self.state
1197            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1198    }
1199
1200    pub fn pack_storage_usage_update(
1201        &self,
1202        event: VersionedStorageUsage,
1203        diff: Diff,
1204    ) -> BuiltinTableUpdate {
1205        self.state
1206            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1207    }
1208
1209    pub fn system_config(&self) -> &SystemVars {
1210        self.state.system_config()
1211    }
1212
1213    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1214        self.state.system_config_mut()
1215    }
1216
1217    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1218        self.state.ensure_not_reserved_role(role_id)
1219    }
1220
1221    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1222        self.state.ensure_grantable_role(role_id)
1223    }
1224
1225    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1226        self.state.ensure_not_system_role(role_id)
1227    }
1228
1229    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1230        self.state.ensure_not_predefined_role(role_id)
1231    }
1232
1233    pub fn ensure_not_reserved_network_policy(
1234        &self,
1235        network_policy_id: &NetworkPolicyId,
1236    ) -> Result<(), Error> {
1237        self.state
1238            .ensure_not_reserved_network_policy(network_policy_id)
1239    }
1240
1241    pub fn ensure_not_reserved_object(
1242        &self,
1243        object_id: &ObjectId,
1244        conn_id: &ConnectionId,
1245    ) -> Result<(), Error> {
1246        match object_id {
1247            ObjectId::Cluster(cluster_id) => {
1248                if cluster_id.is_system() {
1249                    let cluster = self.get_cluster(*cluster_id);
1250                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1251                        cluster.name().to_string(),
1252                    )))
1253                } else {
1254                    Ok(())
1255                }
1256            }
1257            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1258                if replica_id.is_system() {
1259                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1260                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1261                        replica.name().to_string(),
1262                    )))
1263                } else {
1264                    Ok(())
1265                }
1266            }
1267            ObjectId::Database(database_id) => {
1268                if database_id.is_system() {
1269                    let database = self.get_database(database_id);
1270                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1271                        database.name().to_string(),
1272                    )))
1273                } else {
1274                    Ok(())
1275                }
1276            }
1277            ObjectId::Schema((database_spec, schema_spec)) => {
1278                if schema_spec.is_system() {
1279                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1280                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1281                        schema.name().schema.clone(),
1282                    )))
1283                } else {
1284                    Ok(())
1285                }
1286            }
1287            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1288            ObjectId::Item(item_id) => {
1289                if item_id.is_system() {
1290                    let item = self.get_entry(item_id);
1291                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1292                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1293                } else {
1294                    Ok(())
1295                }
1296            }
1297            ObjectId::NetworkPolicy(network_policy_id) => {
1298                self.ensure_not_reserved_network_policy(network_policy_id)
1299            }
1300        }
1301    }
1302
1303    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1304    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1305        &mut self,
1306        create_sql: &str,
1307        force_if_exists_skip: bool,
1308    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1309        self.state
1310            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1311    }
1312
1313    /// Cache global and, optionally, local expressions for the given
1314    /// `GlobalId`.
1315    ///
1316    /// Takes the plans and metainfo directly as parameters (rather than
1317    /// fishing them out of catalog state), so this can be called **before**
1318    /// the catalog transaction that creates the item. Returns the future
1319    /// returned by [`Catalog::update_expression_cache`]; callers should
1320    /// `.await` it before the catalog transaction commits, so the durable
1321    /// expression cache is observed to contain the entries by the time any
1322    /// other process (or a subsequent bootstrap on this process) reads them.
1323    pub(crate) fn cache_expressions(
1324        &self,
1325        id: GlobalId,
1326        local_mir: Option<OptimizedMirRelationExpr>,
1327        mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1328        mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1329        dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1330        optimizer_features: OptimizerFeatures,
1331    ) -> BoxFuture<'static, ()> {
1332        // Make sure we're not caching the result of timestamp selection, as
1333        // it will almost certainly be wrong if we re-install the dataflow at
1334        // a later time.
1335        global_mir.as_of = None;
1336        global_mir.until = Default::default();
1337        physical_plan.as_of = None;
1338        physical_plan.until = Default::default();
1339
1340        let mut local_exprs = Vec::new();
1341        if let Some(local_mir) = local_mir {
1342            local_exprs.push((
1343                id,
1344                LocalExpressions {
1345                    local_mir,
1346                    optimizer_features: optimizer_features.clone(),
1347                },
1348            ));
1349        }
1350        let global_exprs = vec![(
1351            id,
1352            GlobalExpressions {
1353                global_mir,
1354                physical_plan,
1355                dataflow_metainfos,
1356                optimizer_features,
1357            },
1358        )];
1359        self.update_expression_cache(local_exprs, global_exprs, Default::default())
1360    }
1361
1362    pub(crate) fn update_expression_cache<'a, 'b>(
1363        &'a self,
1364        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1365        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1366        invalidate_ids: BTreeSet<GlobalId>,
1367    ) -> BoxFuture<'b, ()> {
1368        if let Some(expr_cache) = &self.expr_cache_handle {
1369            expr_cache
1370                .update(
1371                    new_local_expressions,
1372                    new_global_expressions,
1373                    invalidate_ids,
1374                )
1375                .boxed()
1376        } else {
1377            async {}.boxed()
1378        }
1379    }
1380
1381    /// Listen for and apply all unconsumed updates to the durable catalog state.
1382    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1383    // `#[cfg(test)]` annotation.
1384    #[cfg(test)]
1385    async fn sync_to_current_updates(
1386        &mut self,
1387    ) -> Result<
1388        (
1389            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1390            Vec<ParsedStateUpdate>,
1391        ),
1392        CatalogError,
1393    > {
1394        let updates = self.storage().await.sync_to_current_updates().await?;
1395        let (builtin_table_updates, catalog_updates) = self
1396            .state
1397            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1398            .await;
1399        Ok((builtin_table_updates, catalog_updates))
1400    }
1401}
1402
1403pub fn is_reserved_name(name: &str) -> bool {
1404    BUILTIN_PREFIXES
1405        .iter()
1406        .any(|prefix| name.starts_with(prefix))
1407}
1408
1409pub fn is_reserved_role_name(name: &str) -> bool {
1410    is_reserved_name(name) || is_public_role(name)
1411}
1412
1413pub fn is_public_role(name: &str) -> bool {
1414    name == &*PUBLIC_ROLE_NAME
1415}
1416
1417pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1418    object_type_to_audit_object_type(sql_type.into())
1419}
1420
1421pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1422    match id {
1423        CommentObjectId::Table(_) => ObjectType::Table,
1424        CommentObjectId::View(_) => ObjectType::View,
1425        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1426        CommentObjectId::Source(_) => ObjectType::Source,
1427        CommentObjectId::Sink(_) => ObjectType::Sink,
1428        CommentObjectId::Index(_) => ObjectType::Index,
1429        CommentObjectId::Func(_) => ObjectType::Func,
1430        CommentObjectId::Connection(_) => ObjectType::Connection,
1431        CommentObjectId::Type(_) => ObjectType::Type,
1432        CommentObjectId::Secret(_) => ObjectType::Secret,
1433        CommentObjectId::Role(_) => ObjectType::Role,
1434        CommentObjectId::Database(_) => ObjectType::Database,
1435        CommentObjectId::Schema(_) => ObjectType::Schema,
1436        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1437        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1438        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1439    }
1440}
1441
1442pub(crate) fn object_type_to_audit_object_type(
1443    object_type: mz_sql::catalog::ObjectType,
1444) -> ObjectType {
1445    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1446}
1447
1448pub(crate) fn system_object_type_to_audit_object_type(
1449    system_type: &SystemObjectType,
1450) -> ObjectType {
1451    match system_type {
1452        SystemObjectType::Object(object_type) => match object_type {
1453            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1454            mz_sql::catalog::ObjectType::View => ObjectType::View,
1455            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1456            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1457            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1458            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1459            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1460            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1461            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1462            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1463            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1464            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1465            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1466            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1467            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1468            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1469        },
1470        SystemObjectType::System => ObjectType::System,
1471    }
1472}
1473
1474#[derive(Debug, Copy, Clone)]
1475pub enum UpdatePrivilegeVariant {
1476    Grant,
1477    Revoke,
1478}
1479
1480impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1481    fn from(variant: UpdatePrivilegeVariant) -> Self {
1482        match variant {
1483            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1484            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1485        }
1486    }
1487}
1488
1489impl From<UpdatePrivilegeVariant> for EventType {
1490    fn from(variant: UpdatePrivilegeVariant) -> Self {
1491        match variant {
1492            UpdatePrivilegeVariant::Grant => EventType::Grant,
1493            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1494        }
1495    }
1496}
1497
1498impl ConnCatalog<'_> {
1499    fn resolve_item_name(
1500        &self,
1501        name: &PartialItemName,
1502    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1503        self.resolve_item(name).map(|entry| entry.name())
1504    }
1505
1506    fn resolve_function_name(
1507        &self,
1508        name: &PartialItemName,
1509    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1510        self.resolve_function(name).map(|entry| entry.name())
1511    }
1512
1513    fn resolve_type_name(
1514        &self,
1515        name: &PartialItemName,
1516    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1517        self.resolve_type(name).map(|entry| entry.name())
1518    }
1519}
1520
1521impl ExprHumanizer for ConnCatalog<'_> {
1522    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1523        let entry = self.state.try_get_entry_by_global_id(&id)?;
1524        Some(self.resolve_full_name(entry.name()).to_string())
1525    }
1526
1527    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1528        let entry = self.state.try_get_entry_by_global_id(&id)?;
1529        Some(entry.name().item.clone())
1530    }
1531
1532    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1533        let entry = self.state.try_get_entry_by_global_id(&id)?;
1534        Some(self.resolve_full_name(entry.name()).into_parts())
1535    }
1536
1537    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1538        use SqlScalarType::*;
1539
1540        match typ {
1541            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1542            List {
1543                custom_id: Some(item_id),
1544                ..
1545            }
1546            | Map {
1547                custom_id: Some(item_id),
1548                ..
1549            } => {
1550                let item = self.get_item(item_id);
1551                self.minimal_qualification(item.name()).to_string()
1552            }
1553            List { element_type, .. } => {
1554                format!(
1555                    "{} list",
1556                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1557                )
1558            }
1559            Map { value_type, .. } => format!(
1560                "map[{}=>{}]",
1561                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1562                self.humanize_sql_scalar_type(value_type, postgres_compat)
1563            ),
1564            Record {
1565                custom_id: Some(item_id),
1566                ..
1567            } => {
1568                let item = self.get_item(item_id);
1569                self.minimal_qualification(item.name()).to_string()
1570            }
1571            Record { fields, .. } => format!(
1572                "record({})",
1573                fields
1574                    .iter()
1575                    .map(|f| format!(
1576                        "{}: {}",
1577                        f.0,
1578                        self.humanize_sql_column_type(&f.1, postgres_compat)
1579                    ))
1580                    .join(",")
1581            ),
1582            PgLegacyChar => "\"char\"".into(),
1583            Char { length } if !postgres_compat => match length {
1584                None => "char".into(),
1585                Some(length) => format!("char({})", length.into_u32()),
1586            },
1587            VarChar { max_length } if !postgres_compat => match max_length {
1588                None => "varchar".into(),
1589                Some(length) => format!("varchar({})", length.into_u32()),
1590            },
1591            UInt16 => "uint2".into(),
1592            UInt32 => "uint4".into(),
1593            UInt64 => "uint8".into(),
1594            ty => {
1595                let pgrepr_type = mz_pgrepr::Type::from(ty);
1596                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1597
1598                let res = if self
1599                    .effective_search_path(true)
1600                    .iter()
1601                    .any(|(_, schema)| schema == &pg_catalog_schema)
1602                {
1603                    pgrepr_type.name().to_string()
1604                } else {
1605                    // If PG_CATALOG_SCHEMA is not in search path, you need
1606                    // qualified object name to refer to type.
1607                    let name = QualifiedItemName {
1608                        qualifiers: ItemQualifiers {
1609                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1610                            schema_spec: pg_catalog_schema,
1611                        },
1612                        item: pgrepr_type.name().to_string(),
1613                    };
1614                    self.resolve_full_name(&name).to_string()
1615                };
1616                res
1617            }
1618        }
1619    }
1620
1621    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1622        let entry = self.state.try_get_entry_by_global_id(&id)?;
1623
1624        match entry.index() {
1625            Some(index) => {
1626                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1627                let mut on_names = on_desc
1628                    .iter_names()
1629                    .map(|col_name| col_name.to_string())
1630                    .collect::<Vec<_>>();
1631
1632                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1633
1634                // Init ix_names with unknown column names. Unknown columns are
1635                // represented as an empty String and rendered as `#c` by the
1636                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1637                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1638                let mut ix_names = vec![String::new(); ix_arity];
1639
1640                // Apply the permutation by swapping on_names with ix_names.
1641                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1642                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1643                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1644                    std::mem::swap(on_name, ix_name);
1645                }
1646
1647                Some(ix_names) // Return the updated ix_names vector.
1648            }
1649            None => {
1650                let desc = self.state.try_get_desc_by_global_id(&id)?;
1651                let column_names = desc
1652                    .iter_names()
1653                    .map(|col_name| col_name.to_string())
1654                    .collect();
1655
1656                Some(column_names)
1657            }
1658        }
1659    }
1660
1661    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1662        let desc = self.state.try_get_desc_by_global_id(&id)?;
1663        Some(desc.get_name(column).to_string())
1664    }
1665
1666    fn id_exists(&self, id: GlobalId) -> bool {
1667        self.state.entry_by_global_id.contains_key(&id)
1668    }
1669}
1670
1671impl SessionCatalog for ConnCatalog<'_> {
1672    fn active_role_id(&self) -> &RoleId {
1673        &self.role_id
1674    }
1675
1676    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1677        self.prepared_statements
1678            .as_ref()
1679            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1680            .flatten()
1681    }
1682
1683    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1684        self.portals
1685            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1686    }
1687
1688    fn active_database(&self) -> Option<&DatabaseId> {
1689        self.database.as_ref()
1690    }
1691
1692    fn active_cluster(&self) -> &str {
1693        &self.cluster
1694    }
1695
1696    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1697        &self.search_path
1698    }
1699
1700    fn resolve_database(
1701        &self,
1702        database_name: &str,
1703    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1704        Ok(self.state.resolve_database(database_name)?)
1705    }
1706
1707    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1708        self.state
1709            .database_by_id
1710            .get(id)
1711            .expect("database doesn't exist")
1712    }
1713
1714    // `as` is ok to use to cast to a trait object.
1715    #[allow(clippy::as_conversions)]
1716    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1717        self.state
1718            .database_by_id
1719            .values()
1720            .map(|database| database as &dyn CatalogDatabase)
1721            .collect()
1722    }
1723
1724    fn resolve_schema(
1725        &self,
1726        database_name: Option<&str>,
1727        schema_name: &str,
1728    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1729        Ok(self.state.resolve_schema(
1730            self.database.as_ref(),
1731            database_name,
1732            schema_name,
1733            &self.conn_id,
1734        )?)
1735    }
1736
1737    fn resolve_schema_in_database(
1738        &self,
1739        database_spec: &ResolvedDatabaseSpecifier,
1740        schema_name: &str,
1741    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1742        Ok(self
1743            .state
1744            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1745    }
1746
1747    fn get_schema(
1748        &self,
1749        database_spec: &ResolvedDatabaseSpecifier,
1750        schema_spec: &SchemaSpecifier,
1751    ) -> &dyn CatalogSchema {
1752        self.state
1753            .get_schema(database_spec, schema_spec, &self.conn_id)
1754    }
1755
1756    // `as` is ok to use to cast to a trait object.
1757    #[allow(clippy::as_conversions)]
1758    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1759        self.get_databases()
1760            .into_iter()
1761            .flat_map(|database| database.schemas().into_iter())
1762            .chain(
1763                self.state
1764                    .ambient_schemas_by_id
1765                    .values()
1766                    .chain(self.state.temporary_schemas.values())
1767                    .map(|schema| schema as &dyn CatalogSchema),
1768            )
1769            .collect()
1770    }
1771
1772    fn get_mz_internal_schema_id(&self) -> SchemaId {
1773        self.state().get_mz_internal_schema_id()
1774    }
1775
1776    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1777        self.state().get_mz_unsafe_schema_id()
1778    }
1779
1780    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1781        self.state.is_system_schema_specifier(schema)
1782    }
1783
1784    fn resolve_role(
1785        &self,
1786        role_name: &str,
1787    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1788        match self.state.try_get_role_by_name(role_name) {
1789            Some(role) => Ok(role),
1790            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1791        }
1792    }
1793
1794    fn resolve_network_policy(
1795        &self,
1796        policy_name: &str,
1797    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1798        match self.state.try_get_network_policy_by_name(policy_name) {
1799            Some(policy) => Ok(policy),
1800            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1801        }
1802    }
1803
1804    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1805        Some(self.state.roles_by_id.get(id)?)
1806    }
1807
1808    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1809        self.state.get_role(id)
1810    }
1811
1812    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1813        // `as` is ok to use to cast to a trait object.
1814        #[allow(clippy::as_conversions)]
1815        self.state
1816            .roles_by_id
1817            .values()
1818            .map(|role| role as &dyn CatalogRole)
1819            .collect()
1820    }
1821
1822    fn mz_system_role_id(&self) -> RoleId {
1823        MZ_SYSTEM_ROLE_ID
1824    }
1825
1826    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1827        self.state.collect_role_membership(id)
1828    }
1829
1830    fn get_network_policy(
1831        &self,
1832        id: &NetworkPolicyId,
1833    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1834        self.state.get_network_policy(id)
1835    }
1836
1837    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1838        // `as` is ok to use to cast to a trait object.
1839        #[allow(clippy::as_conversions)]
1840        self.state
1841            .network_policies_by_id
1842            .values()
1843            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1844            .collect()
1845    }
1846
1847    fn resolve_cluster(
1848        &self,
1849        cluster_name: Option<&str>,
1850    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1851        Ok(self
1852            .state
1853            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1854    }
1855
1856    fn resolve_cluster_replica(
1857        &self,
1858        cluster_replica_name: &QualifiedReplica,
1859    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1860        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1861    }
1862
1863    fn resolve_item(
1864        &self,
1865        name: &PartialItemName,
1866    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1867        let r = self.state.resolve_entry(
1868            self.database.as_ref(),
1869            &self.effective_search_path(true),
1870            name,
1871            &self.conn_id,
1872        )?;
1873        if self.unresolvable_ids.contains(&r.id()) {
1874            Err(SqlCatalogError::UnknownItem(name.to_string()))
1875        } else {
1876            Ok(r)
1877        }
1878    }
1879
1880    fn resolve_function(
1881        &self,
1882        name: &PartialItemName,
1883    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1884        let r = self.state.resolve_function(
1885            self.database.as_ref(),
1886            &self.effective_search_path(false),
1887            name,
1888            &self.conn_id,
1889        )?;
1890
1891        if self.unresolvable_ids.contains(&r.id()) {
1892            Err(SqlCatalogError::UnknownFunction {
1893                name: name.to_string(),
1894                alternative: None,
1895            })
1896        } else {
1897            Ok(r)
1898        }
1899    }
1900
1901    fn resolve_type(
1902        &self,
1903        name: &PartialItemName,
1904    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1905        let r = self.state.resolve_type(
1906            self.database.as_ref(),
1907            &self.effective_search_path(false),
1908            name,
1909            &self.conn_id,
1910        )?;
1911
1912        if self.unresolvable_ids.contains(&r.id()) {
1913            Err(SqlCatalogError::UnknownType {
1914                name: name.to_string(),
1915            })
1916        } else {
1917            Ok(r)
1918        }
1919    }
1920
1921    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1922        self.state.get_system_type(name)
1923    }
1924
1925    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1926        Some(self.state.try_get_entry(id)?)
1927    }
1928
1929    fn try_get_item_by_global_id(
1930        &self,
1931        id: &GlobalId,
1932    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1933        let entry = self.state.try_get_entry_by_global_id(id)?;
1934        let entry = match &entry.item {
1935            CatalogItem::Table(table) => {
1936                let (version, _gid) = table
1937                    .collections
1938                    .iter()
1939                    .find(|(_version, gid)| *gid == id)
1940                    .expect("catalog out of sync, mismatched GlobalId");
1941                entry.at_version(RelationVersionSelector::Specific(*version))
1942            }
1943            _ => entry.at_version(RelationVersionSelector::Latest),
1944        };
1945        Some(entry)
1946    }
1947
1948    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1949        self.state.get_entry(id)
1950    }
1951
1952    fn get_item_by_global_id(
1953        &self,
1954        id: &GlobalId,
1955    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1956        let entry = self.state.get_entry_by_global_id(id);
1957        let entry = match &entry.item {
1958            CatalogItem::Table(table) => {
1959                let (version, _gid) = table
1960                    .collections
1961                    .iter()
1962                    .find(|(_version, gid)| *gid == id)
1963                    .expect("catalog out of sync, mismatched GlobalId");
1964                entry.at_version(RelationVersionSelector::Specific(*version))
1965            }
1966            _ => entry.at_version(RelationVersionSelector::Latest),
1967        };
1968        entry
1969    }
1970
1971    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
1972        self.get_schemas()
1973            .into_iter()
1974            .flat_map(|schema| schema.item_ids())
1975            .map(|id| self.get_item(&id))
1976            .collect()
1977    }
1978
1979    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1980        self.state
1981            .get_item_by_name(name, &self.conn_id)
1982            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1983    }
1984
1985    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1986        self.state
1987            .get_type_by_name(name, &self.conn_id)
1988            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1989    }
1990
1991    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
1992        &self.state.clusters_by_id[&id]
1993    }
1994
1995    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
1996        self.state
1997            .clusters_by_id
1998            .values()
1999            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2000            .collect()
2001    }
2002
2003    fn get_cluster_replica(
2004        &self,
2005        cluster_id: ClusterId,
2006        replica_id: ReplicaId,
2007    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2008        let cluster = self.get_cluster(cluster_id);
2009        cluster.replica(replica_id)
2010    }
2011
2012    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2013        self.get_clusters()
2014            .into_iter()
2015            .flat_map(|cluster| cluster.replicas().into_iter())
2016            .collect()
2017    }
2018
2019    fn get_system_privileges(&self) -> &PrivilegeMap {
2020        &self.state.system_privileges
2021    }
2022
2023    fn get_default_privileges(
2024        &self,
2025    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2026        self.state
2027            .default_privileges
2028            .iter()
2029            .map(|(object, acl_items)| (object, acl_items.collect()))
2030            .collect()
2031    }
2032
2033    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2034        self.state.find_available_name(name, &self.conn_id)
2035    }
2036
2037    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2038        self.state.resolve_full_name(name, Some(&self.conn_id))
2039    }
2040
2041    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2042        self.state.resolve_full_schema_name(name)
2043    }
2044
2045    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2046        self.state.get_entry_by_global_id(global_id).id()
2047    }
2048
2049    fn resolve_global_id(
2050        &self,
2051        item_id: &CatalogItemId,
2052        version: RelationVersionSelector,
2053    ) -> GlobalId {
2054        self.state
2055            .get_entry(item_id)
2056            .at_version(version)
2057            .global_id()
2058    }
2059
2060    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2061        self.state.config()
2062    }
2063
2064    fn now(&self) -> EpochMillis {
2065        (self.state.config().now)()
2066    }
2067
2068    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2069        self.state.aws_privatelink_availability_zones.clone()
2070    }
2071
2072    fn system_vars(&self) -> &SystemVars {
2073        &self.state.system_configuration
2074    }
2075
2076    fn system_vars_mut(&mut self) -> &mut SystemVars {
2077        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2078    }
2079
2080    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2081        self.state().get_owner_id(id, self.conn_id())
2082    }
2083
2084    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2085        match id {
2086            SystemObjectId::System => Some(&self.state.system_privileges),
2087            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2088                Some(self.get_cluster(*id).privileges())
2089            }
2090            SystemObjectId::Object(ObjectId::Database(id)) => {
2091                Some(self.get_database(id).privileges())
2092            }
2093            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2094                // For temporary schemas that haven't been created yet (lazy creation),
2095                // we return None - the RBAC check will need to handle this case.
2096                self.state
2097                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2098                    .map(|schema| schema.privileges())
2099            }
2100            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2101            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2102                Some(self.get_network_policy(id).privileges())
2103            }
2104            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2105            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2106        }
2107    }
2108
2109    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2110        let mut seen = BTreeSet::new();
2111        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2112    }
2113
2114    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2115        let mut seen = BTreeSet::new();
2116        self.state.item_dependents(id, &mut seen)
2117    }
2118
2119    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2120        rbac::all_object_privileges(object_type)
2121    }
2122
2123    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2124        self.state.get_object_type(object_id)
2125    }
2126
2127    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2128        self.state.get_system_object_type(id)
2129    }
2130
2131    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2132    /// the object.
2133    ///
2134    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2135    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2136    /// only for error msgs and other humanizations.
2137    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2138        if qualified_name.qualifiers.schema_spec.is_temporary() {
2139            // All bets are off. Just give up and return the qualified name as is.
2140            // TODO: Figure out what's going on with temporary objects.
2141
2142            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2143            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2144            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2145            // which we don't have a simple repro.
2146            return qualified_name.item.clone().into();
2147        }
2148
2149        let database_id = match &qualified_name.qualifiers.database_spec {
2150            ResolvedDatabaseSpecifier::Ambient => None,
2151            ResolvedDatabaseSpecifier::Id(id)
2152                if self.database.is_some() && self.database == Some(*id) =>
2153            {
2154                None
2155            }
2156            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2157        };
2158
2159        let schema_spec = if database_id.is_none()
2160            && self.resolve_item_name(&PartialItemName {
2161                database: None,
2162                schema: None,
2163                item: qualified_name.item.clone(),
2164            }) == Ok(qualified_name)
2165            || self.resolve_function_name(&PartialItemName {
2166                database: None,
2167                schema: None,
2168                item: qualified_name.item.clone(),
2169            }) == Ok(qualified_name)
2170            || self.resolve_type_name(&PartialItemName {
2171                database: None,
2172                schema: None,
2173                item: qualified_name.item.clone(),
2174            }) == Ok(qualified_name)
2175        {
2176            None
2177        } else {
2178            // If `search_path` does not contain `full_name.schema`, the
2179            // `PartialName` must contain it.
2180            Some(qualified_name.qualifiers.schema_spec.clone())
2181        };
2182
2183        let res = PartialItemName {
2184            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2185            schema: schema_spec.map(|spec| {
2186                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2187                    .name()
2188                    .schema
2189                    .clone()
2190            }),
2191            item: qualified_name.item.clone(),
2192        };
2193        assert!(
2194            self.resolve_item_name(&res) == Ok(qualified_name)
2195                || self.resolve_function_name(&res) == Ok(qualified_name)
2196                || self.resolve_type_name(&res) == Ok(qualified_name)
2197        );
2198        res
2199    }
2200
2201    fn add_notice(&self, notice: PlanNotice) {
2202        let _ = self.notices_tx.send(notice.into());
2203    }
2204
2205    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2206        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2207        self.state.comments.get_object_comments(comment_id)
2208    }
2209
2210    fn is_cluster_size_cc(&self, size: &str) -> bool {
2211        self.state
2212            .cluster_replica_sizes
2213            .0
2214            .get(size)
2215            .map_or(false, |a| a.is_cc)
2216    }
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221    use std::collections::{BTreeMap, BTreeSet};
2222    use std::sync::Arc;
2223    use std::{env, iter};
2224
2225    use itertools::Itertools;
2226    use mz_catalog::memory::objects::CatalogItem;
2227    use tokio_postgres::NoTls;
2228    use tokio_postgres::types::Type;
2229    use uuid::Uuid;
2230
2231    use mz_catalog::SYSTEM_CONN_ID;
2232    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2233    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2234    use mz_controller_types::{ClusterId, ReplicaId};
2235    use mz_expr::MirScalarExpr;
2236    use mz_ore::now::to_datetime;
2237    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2238    use mz_persist_client::PersistClient;
2239    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2240    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2241    use mz_repr::role_id::RoleId;
2242    use mz_repr::{
2243        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2244        SqlScalarType, Timestamp,
2245    };
2246    use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2247    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2248    use mz_sql::names::{
2249        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2250        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2251    };
2252    use mz_sql::plan::{
2253        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2254        QueryLifetime, Scope, StatementContext,
2255    };
2256    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2257    use mz_sql::session::vars::{SystemVars, VarInput};
2258
2259    use crate::catalog::state::LocalExpressionCache;
2260    use crate::catalog::{Catalog, Op};
2261    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2262    use crate::session::Session;
2263
2264    /// System sessions have an empty `search_path` so it's necessary to
2265    /// schema-qualify all referenced items.
2266    ///
2267    /// Dummy (and ostensibly client) sessions contain system schemas in their
2268    /// search paths, so do not require schema qualification on system objects such
2269    /// as types.
2270    #[mz_ore::test(tokio::test)]
2271    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2272    async fn test_minimal_qualification() {
2273        Catalog::with_debug(|catalog| async move {
2274            struct TestCase {
2275                input: QualifiedItemName,
2276                system_output: PartialItemName,
2277                normal_output: PartialItemName,
2278            }
2279
2280            let test_cases = vec![
2281                TestCase {
2282                    input: QualifiedItemName {
2283                        qualifiers: ItemQualifiers {
2284                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2285                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2286                        },
2287                        item: "numeric".to_string(),
2288                    },
2289                    system_output: PartialItemName {
2290                        database: None,
2291                        schema: None,
2292                        item: "numeric".to_string(),
2293                    },
2294                    normal_output: PartialItemName {
2295                        database: None,
2296                        schema: None,
2297                        item: "numeric".to_string(),
2298                    },
2299                },
2300                TestCase {
2301                    input: QualifiedItemName {
2302                        qualifiers: ItemQualifiers {
2303                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2304                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2305                        },
2306                        item: "mz_array_types".to_string(),
2307                    },
2308                    system_output: PartialItemName {
2309                        database: None,
2310                        schema: None,
2311                        item: "mz_array_types".to_string(),
2312                    },
2313                    normal_output: PartialItemName {
2314                        database: None,
2315                        schema: None,
2316                        item: "mz_array_types".to_string(),
2317                    },
2318                },
2319            ];
2320
2321            for tc in test_cases {
2322                assert_eq!(
2323                    catalog
2324                        .for_system_session()
2325                        .minimal_qualification(&tc.input),
2326                    tc.system_output
2327                );
2328                assert_eq!(
2329                    catalog
2330                        .for_session(&Session::dummy())
2331                        .minimal_qualification(&tc.input),
2332                    tc.normal_output
2333                );
2334            }
2335            catalog.expire().await;
2336        })
2337        .await
2338    }
2339
2340    #[mz_ore::test(tokio::test)]
2341    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2342    async fn test_catalog_revision() {
2343        let persist_client = PersistClient::new_for_tests().await;
2344        let organization_id = Uuid::new_v4();
2345        let bootstrap_args = test_bootstrap_args();
2346        {
2347            let mut catalog = Catalog::open_debug_catalog(
2348                persist_client.clone(),
2349                organization_id.clone(),
2350                &bootstrap_args,
2351            )
2352            .await
2353            .expect("unable to open debug catalog");
2354            assert_eq!(catalog.transient_revision(), 1);
2355            let commit_ts = catalog.current_upper().await;
2356            catalog
2357                .transact(
2358                    None,
2359                    commit_ts,
2360                    None,
2361                    vec![Op::CreateDatabase {
2362                        name: "test".to_string(),
2363                        owner_id: MZ_SYSTEM_ROLE_ID,
2364                    }],
2365                )
2366                .await
2367                .expect("failed to transact");
2368            assert_eq!(catalog.transient_revision(), 2);
2369            catalog.expire().await;
2370        }
2371        {
2372            let catalog =
2373                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2374                    .await
2375                    .expect("unable to open debug catalog");
2376            // Re-opening the same catalog resets the transient_revision to 1.
2377            assert_eq!(catalog.transient_revision(), 1);
2378            catalog.expire().await;
2379        }
2380    }
2381
2382    #[mz_ore::test(tokio::test)]
2383    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2384    async fn test_effective_search_path() {
2385        Catalog::with_debug(|catalog| async move {
2386            let mz_catalog_schema = (
2387                ResolvedDatabaseSpecifier::Ambient,
2388                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2389            );
2390            let pg_catalog_schema = (
2391                ResolvedDatabaseSpecifier::Ambient,
2392                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2393            );
2394            let mz_temp_schema = (
2395                ResolvedDatabaseSpecifier::Ambient,
2396                SchemaSpecifier::Temporary,
2397            );
2398
2399            // Behavior with the default search_schema (public)
2400            let session = Session::dummy();
2401            let conn_catalog = catalog.for_session(&session);
2402            assert_ne!(
2403                conn_catalog.effective_search_path(false),
2404                conn_catalog.search_path
2405            );
2406            assert_ne!(
2407                conn_catalog.effective_search_path(true),
2408                conn_catalog.search_path
2409            );
2410            assert_eq!(
2411                conn_catalog.effective_search_path(false),
2412                vec![
2413                    mz_catalog_schema.clone(),
2414                    pg_catalog_schema.clone(),
2415                    conn_catalog.search_path[0].clone()
2416                ]
2417            );
2418            assert_eq!(
2419                conn_catalog.effective_search_path(true),
2420                vec![
2421                    mz_temp_schema.clone(),
2422                    mz_catalog_schema.clone(),
2423                    pg_catalog_schema.clone(),
2424                    conn_catalog.search_path[0].clone()
2425                ]
2426            );
2427
2428            // missing schemas are added when missing
2429            let mut session = Session::dummy();
2430            session
2431                .vars_mut()
2432                .set(
2433                    &SystemVars::new(),
2434                    "search_path",
2435                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2436                    false,
2437                )
2438                .expect("failed to set search_path");
2439            let conn_catalog = catalog.for_session(&session);
2440            assert_ne!(
2441                conn_catalog.effective_search_path(false),
2442                conn_catalog.search_path
2443            );
2444            assert_ne!(
2445                conn_catalog.effective_search_path(true),
2446                conn_catalog.search_path
2447            );
2448            assert_eq!(
2449                conn_catalog.effective_search_path(false),
2450                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2451            );
2452            assert_eq!(
2453                conn_catalog.effective_search_path(true),
2454                vec![
2455                    mz_temp_schema.clone(),
2456                    mz_catalog_schema.clone(),
2457                    pg_catalog_schema.clone()
2458                ]
2459            );
2460
2461            let mut session = Session::dummy();
2462            session
2463                .vars_mut()
2464                .set(
2465                    &SystemVars::new(),
2466                    "search_path",
2467                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2468                    false,
2469                )
2470                .expect("failed to set search_path");
2471            let conn_catalog = catalog.for_session(&session);
2472            assert_ne!(
2473                conn_catalog.effective_search_path(false),
2474                conn_catalog.search_path
2475            );
2476            assert_ne!(
2477                conn_catalog.effective_search_path(true),
2478                conn_catalog.search_path
2479            );
2480            assert_eq!(
2481                conn_catalog.effective_search_path(false),
2482                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2483            );
2484            assert_eq!(
2485                conn_catalog.effective_search_path(true),
2486                vec![
2487                    mz_temp_schema.clone(),
2488                    pg_catalog_schema.clone(),
2489                    mz_catalog_schema.clone()
2490                ]
2491            );
2492
2493            let mut session = Session::dummy();
2494            session
2495                .vars_mut()
2496                .set(
2497                    &SystemVars::new(),
2498                    "search_path",
2499                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2500                    false,
2501                )
2502                .expect("failed to set search_path");
2503            let conn_catalog = catalog.for_session(&session);
2504            assert_ne!(
2505                conn_catalog.effective_search_path(false),
2506                conn_catalog.search_path
2507            );
2508            assert_ne!(
2509                conn_catalog.effective_search_path(true),
2510                conn_catalog.search_path
2511            );
2512            assert_eq!(
2513                conn_catalog.effective_search_path(false),
2514                vec![
2515                    mz_catalog_schema.clone(),
2516                    pg_catalog_schema.clone(),
2517                    mz_temp_schema.clone()
2518                ]
2519            );
2520            assert_eq!(
2521                conn_catalog.effective_search_path(true),
2522                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2523            );
2524            catalog.expire().await;
2525        })
2526        .await
2527    }
2528
2529    #[mz_ore::test(tokio::test)]
2530    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2531    async fn test_normalized_create() {
2532        use mz_ore::collections::CollectionExt;
2533        Catalog::with_debug(|catalog| async move {
2534            let conn_catalog = catalog.for_system_session();
2535            let scx = &mut StatementContext::new(None, &conn_catalog);
2536
2537            let parsed = mz_sql_parser::parser::parse_statements(
2538                "create view public.foo as select 1 as bar",
2539            )
2540            .expect("")
2541            .into_element()
2542            .ast;
2543
2544            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2545
2546            // Ensure that all identifiers are quoted.
2547            assert_eq!(
2548                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2549                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2550            );
2551            catalog.expire().await;
2552        })
2553        .await;
2554    }
2555
2556    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2557    #[mz_ore::test(tokio::test)]
2558    #[cfg_attr(miri, ignore)] // slow
2559    async fn test_large_catalog_item() {
2560        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2561        let column = ", 1";
2562        let view_def_size = view_def.bytes().count();
2563        let column_size = column.bytes().count();
2564        let column_count =
2565            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2566        let columns = iter::repeat(column).take(column_count).join("");
2567        let create_sql = format!("{view_def}{columns})");
2568        let create_sql_check = create_sql.clone();
2569        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2570        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2571            &create_sql
2572        ));
2573
2574        let persist_client = PersistClient::new_for_tests().await;
2575        let organization_id = Uuid::new_v4();
2576        let id = CatalogItemId::User(1);
2577        let gid = GlobalId::User(1);
2578        let bootstrap_args = test_bootstrap_args();
2579        {
2580            let mut catalog = Catalog::open_debug_catalog(
2581                persist_client.clone(),
2582                organization_id.clone(),
2583                &bootstrap_args,
2584            )
2585            .await
2586            .expect("unable to open debug catalog");
2587            let item = catalog
2588                .state()
2589                .deserialize_item(
2590                    gid,
2591                    &create_sql,
2592                    &BTreeMap::new(),
2593                    &mut LocalExpressionCache::Closed,
2594                    None,
2595                )
2596                .expect("unable to parse view");
2597            let commit_ts = catalog.current_upper().await;
2598            catalog
2599                .transact(
2600                    None,
2601                    commit_ts,
2602                    None,
2603                    vec![Op::CreateItem {
2604                        item,
2605                        name: QualifiedItemName {
2606                            qualifiers: ItemQualifiers {
2607                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2608                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2609                            },
2610                            item: "v".to_string(),
2611                        },
2612                        id,
2613                        owner_id: MZ_SYSTEM_ROLE_ID,
2614                    }],
2615                )
2616                .await
2617                .expect("failed to transact");
2618            catalog.expire().await;
2619        }
2620        {
2621            let catalog =
2622                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2623                    .await
2624                    .expect("unable to open debug catalog");
2625            let view = catalog.get_entry(&id);
2626            assert_eq!("v", view.name.item);
2627            match &view.item {
2628                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2629                item => panic!("expected view, got {}", item.typ()),
2630            }
2631            catalog.expire().await;
2632        }
2633    }
2634
2635    #[mz_ore::test(tokio::test)]
2636    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2637    async fn test_object_type() {
2638        Catalog::with_debug(|catalog| async move {
2639            let conn_catalog = catalog.for_system_session();
2640
2641            assert_eq!(
2642                mz_sql::catalog::ObjectType::ClusterReplica,
2643                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2644                    ClusterId::user(1).expect("1 is a valid ID"),
2645                    ReplicaId::User(1)
2646                )))
2647            );
2648            assert_eq!(
2649                mz_sql::catalog::ObjectType::Role,
2650                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2651            );
2652            catalog.expire().await;
2653        })
2654        .await;
2655    }
2656
2657    #[mz_ore::test(tokio::test)]
2658    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2659    async fn test_get_privileges() {
2660        Catalog::with_debug(|catalog| async move {
2661            let conn_catalog = catalog.for_system_session();
2662
2663            assert_eq!(
2664                None,
2665                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2666                    ClusterId::user(1).expect("1 is a valid ID"),
2667                    ReplicaId::User(1),
2668                ))))
2669            );
2670            assert_eq!(
2671                None,
2672                conn_catalog
2673                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2674            );
2675            catalog.expire().await;
2676        })
2677        .await;
2678    }
2679
2680    #[mz_ore::test(tokio::test)]
2681    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2682    async fn verify_builtin_descs() {
2683        Catalog::with_debug(|catalog| async move {
2684            let conn_catalog = catalog.for_system_session();
2685
2686            for builtin in BUILTINS::iter() {
2687                let (schema, name, expected_desc) = match builtin {
2688                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2689                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2690                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2691                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2692                    Builtin::Log(_)
2693                    | Builtin::Type(_)
2694                    | Builtin::Func(_)
2695                    | Builtin::Index(_)
2696                    | Builtin::Connection(_) => continue,
2697                };
2698                let item = conn_catalog
2699                    .resolve_item(&PartialItemName {
2700                        database: None,
2701                        schema: Some(schema.to_string()),
2702                        item: name.to_string(),
2703                    })
2704                    .expect("unable to resolve item")
2705                    .at_version(RelationVersionSelector::Latest);
2706
2707                let actual_desc = item.relation_desc().expect("invalid item type");
2708                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2709                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2710                {
2711                    assert_eq!(
2712                        actual_name, expected_name,
2713                        "item {schema}.{name} column {index} name did not match its expected name"
2714                    );
2715                    assert_eq!(
2716                        actual_typ, expected_typ,
2717                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2718                    );
2719                }
2720                assert_eq!(
2721                    &*actual_desc, expected_desc,
2722                    "item {schema}.{name} did not match its expected RelationDesc"
2723                );
2724            }
2725            catalog.expire().await;
2726        })
2727        .await
2728    }
2729
2730    // Connect to a running Postgres server and verify that our builtin
2731    // types and functions match it, in addition to some other things.
2732    #[mz_ore::test(tokio::test)]
2733    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2734    async fn test_compare_builtins_postgres() {
2735        async fn inner(catalog: Catalog) {
2736            // Verify that all builtin functions:
2737            // - have a unique OID
2738            // - if they have a postgres counterpart (same oid) then they have matching name
2739            let (client, connection) = tokio_postgres::connect(
2740                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2741                NoTls,
2742            )
2743            .await
2744            .expect("failed to connect to Postgres");
2745
2746            task::spawn(|| "compare_builtin_postgres", async move {
2747                if let Err(e) = connection.await {
2748                    panic!("connection error: {}", e);
2749                }
2750            });
2751
2752            struct PgProc {
2753                name: String,
2754                arg_oids: Vec<u32>,
2755                ret_oid: Option<u32>,
2756                ret_set: bool,
2757            }
2758
2759            struct PgType {
2760                name: String,
2761                ty: String,
2762                elem: u32,
2763                array: u32,
2764                input: u32,
2765                receive: u32,
2766            }
2767
2768            struct PgOper {
2769                oprresult: u32,
2770                name: String,
2771            }
2772
2773            let pg_proc: BTreeMap<_, _> = client
2774                .query(
2775                    "SELECT
2776                    p.oid,
2777                    proname,
2778                    proargtypes,
2779                    prorettype,
2780                    proretset
2781                FROM pg_proc p
2782                JOIN pg_namespace n ON p.pronamespace = n.oid",
2783                    &[],
2784                )
2785                .await
2786                .expect("pg query failed")
2787                .into_iter()
2788                .map(|row| {
2789                    let oid: u32 = row.get("oid");
2790                    let pg_proc = PgProc {
2791                        name: row.get("proname"),
2792                        arg_oids: row.get("proargtypes"),
2793                        ret_oid: row.get("prorettype"),
2794                        ret_set: row.get("proretset"),
2795                    };
2796                    (oid, pg_proc)
2797                })
2798                .collect();
2799
2800            let pg_type: BTreeMap<_, _> = client
2801                .query(
2802                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2803                    &[],
2804                )
2805                .await
2806                .expect("pg query failed")
2807                .into_iter()
2808                .map(|row| {
2809                    let oid: u32 = row.get("oid");
2810                    let pg_type = PgType {
2811                        name: row.get("typname"),
2812                        ty: row.get("typtype"),
2813                        elem: row.get("typelem"),
2814                        array: row.get("typarray"),
2815                        input: row.get("typinput"),
2816                        receive: row.get("typreceive"),
2817                    };
2818                    (oid, pg_type)
2819                })
2820                .collect();
2821
2822            let pg_oper: BTreeMap<_, _> = client
2823                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2824                .await
2825                .expect("pg query failed")
2826                .into_iter()
2827                .map(|row| {
2828                    let oid: u32 = row.get("oid");
2829                    let pg_oper = PgOper {
2830                        name: row.get("oprname"),
2831                        oprresult: row.get("oprresult"),
2832                    };
2833                    (oid, pg_oper)
2834                })
2835                .collect();
2836
2837            let conn_catalog = catalog.for_system_session();
2838            let resolve_type_oid = |item: &str| {
2839                conn_catalog
2840                    .resolve_type(&PartialItemName {
2841                        database: None,
2842                        // All functions we check exist in PG, so the types must, as
2843                        // well
2844                        schema: Some(PG_CATALOG_SCHEMA.into()),
2845                        item: item.to_string(),
2846                    })
2847                    .expect("unable to resolve type")
2848                    .oid()
2849            };
2850
2851            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2852                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2853                .collect();
2854
2855            let mut all_oids = BTreeSet::new();
2856
2857            // A function to determine if two oids are equivalent enough for these tests. We don't
2858            // support some types, so map exceptions here.
2859            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2860                [
2861                    // We don't support NAME.
2862                    (Type::NAME, Type::TEXT),
2863                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2864                    // We don't support time with time zone.
2865                    (Type::TIME, Type::TIMETZ),
2866                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2867                ]
2868                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2869            );
2870            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2871                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2872            ]);
2873            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2874                if ignore_return_types.contains(&fn_oid) {
2875                    return true;
2876                }
2877                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2878                    return true;
2879                }
2880                a == b
2881            };
2882
2883            for builtin in BUILTINS::iter() {
2884                match builtin {
2885                    Builtin::Type(ty) => {
2886                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2887
2888                        if ty.oid >= FIRST_MATERIALIZE_OID {
2889                            // High OIDs are reserved in Materialize and don't have
2890                            // PostgreSQL counterparts.
2891                            continue;
2892                        }
2893
2894                        // For types that have a PostgreSQL counterpart, verify that
2895                        // the name and oid match.
2896                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2897                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2898                        });
2899                        assert_eq!(
2900                            ty.name, pg_ty.name,
2901                            "oid {} has name {} in postgres; expected {}",
2902                            ty.oid, pg_ty.name, ty.name,
2903                        );
2904
2905                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2906                            None => (0, 0),
2907                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2908                        };
2909                        assert_eq!(
2910                            typinput_oid, pg_ty.input,
2911                            "type {} has typinput OID {:?} in mz but {:?} in pg",
2912                            ty.name, typinput_oid, pg_ty.input,
2913                        );
2914                        assert_eq!(
2915                            typreceive_oid, pg_ty.receive,
2916                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
2917                            ty.name, typreceive_oid, pg_ty.receive,
2918                        );
2919                        if typinput_oid != 0 {
2920                            assert!(
2921                                func_oids.contains(&typinput_oid),
2922                                "type {} has typinput OID {} that does not exist in pg_proc",
2923                                ty.name,
2924                                typinput_oid,
2925                            );
2926                        }
2927                        if typreceive_oid != 0 {
2928                            assert!(
2929                                func_oids.contains(&typreceive_oid),
2930                                "type {} has typreceive OID {} that does not exist in pg_proc",
2931                                ty.name,
2932                                typreceive_oid,
2933                            );
2934                        }
2935
2936                        // Ensure the type matches.
2937                        match &ty.details.typ {
2938                            CatalogType::Array { element_reference } => {
2939                                let elem_ty = BUILTINS::iter()
2940                                    .filter_map(|builtin| match builtin {
2941                                        Builtin::Type(ty @ BuiltinType { name, .. })
2942                                            if element_reference == name =>
2943                                        {
2944                                            Some(ty)
2945                                        }
2946                                        _ => None,
2947                                    })
2948                                    .next();
2949                                let elem_ty = match elem_ty {
2950                                    Some(ty) => ty,
2951                                    None => {
2952                                        panic!("{} is unexpectedly not a type", element_reference)
2953                                    }
2954                                };
2955                                assert_eq!(
2956                                    pg_ty.elem, elem_ty.oid,
2957                                    "type {} has mismatched element OIDs",
2958                                    ty.name
2959                                )
2960                            }
2961                            CatalogType::Pseudo => {
2962                                assert_eq!(
2963                                    pg_ty.ty, "p",
2964                                    "type {} is not a pseudo type as expected",
2965                                    ty.name
2966                                )
2967                            }
2968                            CatalogType::Range { .. } => {
2969                                assert_eq!(
2970                                    pg_ty.ty, "r",
2971                                    "type {} is not a range type as expected",
2972                                    ty.name
2973                                );
2974                            }
2975                            _ => {
2976                                assert_eq!(
2977                                    pg_ty.ty, "b",
2978                                    "type {} is not a base type as expected",
2979                                    ty.name
2980                                )
2981                            }
2982                        }
2983
2984                        // Ensure the array type reference is correct.
2985                        let schema = catalog
2986                            .resolve_schema_in_database(
2987                                &ResolvedDatabaseSpecifier::Ambient,
2988                                ty.schema,
2989                                &SYSTEM_CONN_ID,
2990                            )
2991                            .expect("unable to resolve schema");
2992                        let allocated_type = catalog
2993                            .resolve_type(
2994                                None,
2995                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
2996                                &PartialItemName {
2997                                    database: None,
2998                                    schema: Some(schema.name().schema.clone()),
2999                                    item: ty.name.to_string(),
3000                                },
3001                                &SYSTEM_CONN_ID,
3002                            )
3003                            .expect("unable to resolve type");
3004                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3005                            ty
3006                        } else {
3007                            panic!("unexpectedly not a type")
3008                        };
3009                        match ty.details.array_id {
3010                            Some(array_id) => {
3011                                let array_ty = catalog.get_entry(&array_id);
3012                                assert_eq!(
3013                                    pg_ty.array, array_ty.oid,
3014                                    "type {} has mismatched array OIDs",
3015                                    allocated_type.name.item,
3016                                );
3017                            }
3018                            None => assert_eq!(
3019                                pg_ty.array, 0,
3020                                "type {} does not have an array type in mz but does in pg",
3021                                allocated_type.name.item,
3022                            ),
3023                        }
3024                    }
3025                    Builtin::Func(func) => {
3026                        for imp in func.inner.func_impls() {
3027                            assert!(
3028                                all_oids.insert(imp.oid),
3029                                "{} reused oid {}",
3030                                func.name,
3031                                imp.oid
3032                            );
3033
3034                            assert!(
3035                                imp.oid < FIRST_USER_OID,
3036                                "built-in function {} erroneously has OID in user space ({})",
3037                                func.name,
3038                                imp.oid,
3039                            );
3040
3041                            // For functions that have a postgres counterpart, verify that the name and
3042                            // oid match.
3043                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3044                                continue;
3045                            } else {
3046                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3047                                    panic!(
3048                                        "pg_proc missing function {}: oid {}",
3049                                        func.name, imp.oid
3050                                    )
3051                                })
3052                            };
3053                            assert_eq!(
3054                                func.name, pg_fn.name,
3055                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3056                                imp.oid, func.name, pg_fn.name
3057                            );
3058
3059                            // Complain, but don't fail, if argument oids don't match.
3060                            // TODO: make these match.
3061                            let imp_arg_oids = imp
3062                                .arg_typs
3063                                .iter()
3064                                .map(|item| resolve_type_oid(item))
3065                                .collect::<Vec<_>>();
3066
3067                            if imp_arg_oids != pg_fn.arg_oids {
3068                                println!(
3069                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3070                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3071                                );
3072                            }
3073
3074                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3075
3076                            assert!(
3077                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3078                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3079                                imp.oid,
3080                                func.name,
3081                                imp_return_oid,
3082                                pg_fn.ret_oid
3083                            );
3084
3085                            assert_eq!(
3086                                imp.return_is_set, pg_fn.ret_set,
3087                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3088                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3089                            );
3090                        }
3091                    }
3092                    _ => (),
3093                }
3094            }
3095
3096            for (op, func) in OP_IMPLS.iter() {
3097                for imp in func.func_impls() {
3098                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3099
3100                    // For operators that have a postgres counterpart, verify that the name and oid match.
3101                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3102                        continue;
3103                    } else {
3104                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3105                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3106                        })
3107                    };
3108
3109                    assert_eq!(*op, pg_op.name);
3110
3111                    let imp_return_oid =
3112                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3113                    if imp_return_oid != pg_op.oprresult {
3114                        panic!(
3115                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3116                            imp.oid, op, imp_return_oid, pg_op.oprresult
3117                        );
3118                    }
3119                }
3120            }
3121            catalog.expire().await;
3122        }
3123
3124        Catalog::with_debug(inner).await
3125    }
3126
3127    // Execute all builtin functions with all combinations of arguments from interesting datums.
3128    #[mz_ore::test(tokio::test)]
3129    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3130    async fn test_smoketest_all_builtins() {
3131        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3132            let catalog = Arc::new(catalog);
3133            let conn_catalog = catalog.for_system_session();
3134
3135            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3136            let mut handles = Vec::new();
3137
3138            // Extracted during planning; always panics when executed.
3139            let ignore_names = BTreeSet::from([
3140                "avg",
3141                "avg_internal_v1",
3142                "bool_and",
3143                "bool_or",
3144                "has_table_privilege", // > 3 s each
3145                "has_type_privilege",  // > 3 s each
3146                "mod",
3147                "mz_panic",
3148                "mz_sleep",
3149                "pow",
3150                "stddev_pop",
3151                "stddev_samp",
3152                "stddev",
3153                "var_pop",
3154                "var_samp",
3155                "variance",
3156            ]);
3157
3158            let fns = BUILTINS::funcs()
3159                .map(|func| (&func.name, func.inner))
3160                .chain(OP_IMPLS.iter());
3161
3162            for (name, func) in fns {
3163                if ignore_names.contains(name) {
3164                    continue;
3165                }
3166                let Func::Scalar(impls) = func else {
3167                    continue;
3168                };
3169
3170                'outer: for imp in impls {
3171                    let details = imp.details();
3172                    let mut styps = Vec::new();
3173                    for item in details.arg_typs.iter() {
3174                        let oid = resolve_type_oid(item);
3175                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3176                            continue 'outer;
3177                        };
3178                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3179                    }
3180                    let datums = styps
3181                        .iter()
3182                        .map(|styp| {
3183                            let mut datums = vec![Datum::Null];
3184                            datums.extend(styp.interesting_datums());
3185                            datums
3186                        })
3187                        .collect::<Vec<_>>();
3188                    // Skip nullary fns.
3189                    if datums.is_empty() {
3190                        continue;
3191                    }
3192
3193                    let return_oid = details
3194                        .return_typ
3195                        .map(resolve_type_oid)
3196                        .expect("must exist");
3197                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3198                        .ok()
3199                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3200
3201                    let mut idxs = vec![0; datums.len()];
3202                    while idxs[0] < datums[0].len() {
3203                        let mut args = Vec::with_capacity(idxs.len());
3204                        for i in 0..(datums.len()) {
3205                            args.push(datums[i][idxs[i]]);
3206                        }
3207
3208                        let op = &imp.op;
3209                        let scalars = args
3210                            .iter()
3211                            .enumerate()
3212                            .map(|(i, datum)| {
3213                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3214                                    datum.clone(),
3215                                    styps[i].clone(),
3216                                ))
3217                            })
3218                            .collect();
3219
3220                        let call_name = format!(
3221                            "{name}({}) (oid: {})",
3222                            args.iter()
3223                                .map(|d| d.to_string())
3224                                .collect::<Vec<_>>()
3225                                .join(", "),
3226                            imp.oid
3227                        );
3228                        let catalog = Arc::clone(&catalog);
3229                        let call_name_fn = call_name.clone();
3230                        let return_styp = return_styp.clone();
3231                        let handle = task::spawn_blocking(
3232                            || call_name,
3233                            move || {
3234                                smoketest_fn(
3235                                    name,
3236                                    call_name_fn,
3237                                    op,
3238                                    imp,
3239                                    args,
3240                                    catalog,
3241                                    scalars,
3242                                    return_styp,
3243                                )
3244                            },
3245                        );
3246                        handles.push(handle);
3247
3248                        // Advance to the next datum combination.
3249                        for i in (0..datums.len()).rev() {
3250                            idxs[i] += 1;
3251                            if idxs[i] >= datums[i].len() {
3252                                if i == 0 {
3253                                    break;
3254                                }
3255                                idxs[i] = 0;
3256                                continue;
3257                            } else {
3258                                break;
3259                            }
3260                        }
3261                    }
3262                }
3263            }
3264            handles
3265        }
3266
3267        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3268        for handle in handles {
3269            handle.await;
3270        }
3271    }
3272
3273    fn smoketest_fn(
3274        name: &&str,
3275        call_name: String,
3276        op: &Operation<HirScalarExpr>,
3277        imp: &FuncImpl<HirScalarExpr>,
3278        args: Vec<Datum<'_>>,
3279        catalog: Arc<Catalog>,
3280        scalars: Vec<CoercibleScalarExpr>,
3281        return_styp: Option<SqlScalarType>,
3282    ) {
3283        let conn_catalog = catalog.for_system_session();
3284        let pcx = PlanContext::zero();
3285        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3286        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3287        let ecx = ExprContext {
3288            qcx: &qcx,
3289            name: "smoketest",
3290            scope: &Scope::empty(),
3291            relation_type: &SqlRelationType::empty(),
3292            allow_aggregates: false,
3293            allow_subqueries: false,
3294            allow_parameters: false,
3295            allow_windows: false,
3296        };
3297        let arena = RowArena::new();
3298        let mut session = Session::dummy();
3299        session
3300            .start_transaction(to_datetime(0), None, None)
3301            .expect("must succeed");
3302        let prep_style = ExprPrepOneShot {
3303            logical_time: EvalTime::Time(Timestamp::MIN),
3304            session: &session,
3305            catalog_state: &catalog.state,
3306        };
3307
3308        // Execute the function as much as possible, ensuring no panics occur, but
3309        // otherwise ignoring eval errors. We also do various other checks.
3310        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3311        if let Ok(hir) = res {
3312            let uneliminated_result_row = {
3313                if let HirScalarExpr::CallUnary { func, .. } = &hir
3314                    && func.is_eliminable_cast()
3315                {
3316                    let mut uneliminated_mir = hir
3317                        .clone()
3318                        .lower_uncorrelated(HirToMirConfig {
3319                            enable_cast_elimination: false,
3320                            ..catalog.system_config().into()
3321                        })
3322                        .expect("lowering eliminable cast should always succeed");
3323                    prep_style
3324                        .prep_scalar_expr(&mut uneliminated_mir)
3325                        .expect("must succeed");
3326
3327                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3328                    uneliminated_mir
3329                        .eval(&[], &arena)
3330                        .ok()
3331                        .map(|datum| Row::pack([datum]))
3332                } else {
3333                    None
3334                }
3335            };
3336
3337            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3338                // Populate unmaterialized functions.
3339                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3340
3341                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3342                    if let Some(return_styp) = return_styp {
3343                        let mir_typ = mir.typ(&[]);
3344                        // MIR type inference should be consistent with the type
3345                        // we get from the catalog.
3346                        soft_assert_eq_or_log!(
3347                            mir_typ.scalar_type,
3348                            (&return_styp).into(),
3349                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3350                        );
3351                        // The following will check not just that the scalar type
3352                        // is ok, but also catches if the function returned a null
3353                        // but the MIR type inference said "non-nullable".
3354                        if !eval_result_datum.is_instance_of(&mir_typ) {
3355                            panic!(
3356                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3357                            );
3358                        }
3359                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3360                        if let Some(row) = uneliminated_result_row {
3361                            let uneliminated_result_datum = row.unpack_first();
3362                            assert_eq!(
3363                                uneliminated_result_datum, eval_result_datum,
3364                                "datums should not change if cast is eliminable"
3365                            );
3366                        }
3367                        // Check the consistency of `introduces_nulls` and
3368                        // `propagates_nulls` with `MirScalarExpr::typ`.
3369                        if let Some((introduces_nulls, propagates_nulls)) =
3370                            call_introduces_propagates_nulls(&mir)
3371                        {
3372                            if introduces_nulls {
3373                                // If the function introduces_nulls, then the return
3374                                // type should always be nullable, regardless of
3375                                // the nullability of the input types.
3376                                assert!(
3377                                    mir_typ.nullable,
3378                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3379                                    name, args, mir, mir_typ.nullable
3380                                );
3381                            } else {
3382                                let any_input_null = args.iter().any(|arg| arg.is_null());
3383                                if !any_input_null {
3384                                    assert!(
3385                                        !mir_typ.nullable,
3386                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3387                                        name, args, mir, mir_typ.nullable
3388                                    );
3389                                } else if propagates_nulls {
3390                                    // propagates_nulls means the optimizer short-circuits
3391                                    // all-null inputs, so the output must be nullable.
3392                                    assert!(
3393                                        mir_typ.nullable,
3394                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3395                                        name, args, mir, mir_typ.nullable
3396                                    );
3397                                }
3398                                // When propagates_nulls is false, the output may still
3399                                // be nullable if a non-nullable parameter received a null
3400                                // input (per-position null rejection). The is_instance_of
3401                                // check above ensures type consistency.
3402                            }
3403                        }
3404                        // Check that `MirScalarExpr::reduce` yields the same result
3405                        // as the real evaluation.
3406                        let mut reduced = mir.clone();
3407                        reduced.reduce(&[]);
3408                        match reduced {
3409                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3410                                match reduce_result {
3411                                    Ok(reduce_result_row) => {
3412                                        let reduce_result_datum = reduce_result_row.unpack_first();
3413                                        assert_eq!(
3414                                            reduce_result_datum,
3415                                            eval_result_datum,
3416                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3417                                            name,
3418                                            args,
3419                                            mir,
3420                                            eval_result_datum,
3421                                            mir_typ.scalar_type,
3422                                            reduce_result_datum,
3423                                            ctyp.scalar_type
3424                                        );
3425                                        // Let's check that the types also match.
3426                                        // (We are not checking nullability here,
3427                                        // because it's ok when we know a more
3428                                        // precise nullability after actually
3429                                        // evaluating a function than before.)
3430                                        assert_eq!(
3431                                            ctyp.scalar_type,
3432                                            mir_typ.scalar_type,
3433                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3434                                            name,
3435                                            args,
3436                                            mir,
3437                                            eval_result_datum,
3438                                            mir_typ.scalar_type,
3439                                            reduce_result_datum,
3440                                            ctyp.scalar_type
3441                                        );
3442                                    }
3443                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3444                                }
3445                            }
3446                            _ => unreachable!(
3447                                "all args are literals, so should have reduced to a literal"
3448                            ),
3449                        }
3450                    }
3451                }
3452            }
3453        }
3454    }
3455
3456    /// If the given MirScalarExpr
3457    ///  - is a function call, and
3458    ///  - all arguments are literals
3459    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3460    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3461        match mir_func_call {
3462            MirScalarExpr::CallUnary { func, expr } => {
3463                if expr.is_literal() {
3464                    Some((func.introduces_nulls(), func.propagates_nulls()))
3465                } else {
3466                    None
3467                }
3468            }
3469            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3470                if expr1.is_literal() && expr2.is_literal() {
3471                    Some((func.introduces_nulls(), func.propagates_nulls()))
3472                } else {
3473                    None
3474                }
3475            }
3476            MirScalarExpr::CallVariadic { func, exprs } => {
3477                if exprs.iter().all(|arg| arg.is_literal()) {
3478                    Some((func.introduces_nulls(), func.propagates_nulls()))
3479                } else {
3480                    None
3481                }
3482            }
3483            _ => None,
3484        }
3485    }
3486
3487    // Make sure pg views don't use types that only exist in Materialize.
3488    #[mz_ore::test(tokio::test)]
3489    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3490    async fn test_pg_views_forbidden_types() {
3491        Catalog::with_debug(|catalog| async move {
3492            let conn_catalog = catalog.for_system_session();
3493
3494            for view in BUILTINS::views().filter(|view| {
3495                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3496            }) {
3497                let item = conn_catalog
3498                    .resolve_item(&PartialItemName {
3499                        database: None,
3500                        schema: Some(view.schema.to_string()),
3501                        item: view.name.to_string(),
3502                    })
3503                    .expect("unable to resolve view")
3504                    // TODO(alter_table)
3505                    .at_version(RelationVersionSelector::Latest);
3506                let full_name = conn_catalog.resolve_full_name(item.name());
3507                let desc = item.relation_desc().expect("invalid item type");
3508                for col_type in desc.iter_types() {
3509                    match &col_type.scalar_type {
3510                        typ @ SqlScalarType::UInt16
3511                        | typ @ SqlScalarType::UInt32
3512                        | typ @ SqlScalarType::UInt64
3513                        | typ @ SqlScalarType::MzTimestamp
3514                        | typ @ SqlScalarType::List { .. }
3515                        | typ @ SqlScalarType::Map { .. }
3516                        | typ @ SqlScalarType::MzAclItem => {
3517                            panic!("{typ:?} type found in {full_name}");
3518                        }
3519                        SqlScalarType::AclItem
3520                        | SqlScalarType::Bool
3521                        | SqlScalarType::Int16
3522                        | SqlScalarType::Int32
3523                        | SqlScalarType::Int64
3524                        | SqlScalarType::Float32
3525                        | SqlScalarType::Float64
3526                        | SqlScalarType::Numeric { .. }
3527                        | SqlScalarType::Date
3528                        | SqlScalarType::Time
3529                        | SqlScalarType::Timestamp { .. }
3530                        | SqlScalarType::TimestampTz { .. }
3531                        | SqlScalarType::Interval
3532                        | SqlScalarType::PgLegacyChar
3533                        | SqlScalarType::Bytes
3534                        | SqlScalarType::String
3535                        | SqlScalarType::Char { .. }
3536                        | SqlScalarType::VarChar { .. }
3537                        | SqlScalarType::Jsonb
3538                        | SqlScalarType::Uuid
3539                        | SqlScalarType::Array(_)
3540                        | SqlScalarType::Record { .. }
3541                        | SqlScalarType::Oid
3542                        | SqlScalarType::RegProc
3543                        | SqlScalarType::RegType
3544                        | SqlScalarType::RegClass
3545                        | SqlScalarType::Int2Vector
3546                        | SqlScalarType::Range { .. }
3547                        | SqlScalarType::PgLegacyName => {}
3548                    }
3549                }
3550            }
3551            catalog.expire().await;
3552        })
3553        .await
3554    }
3555
3556    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3557    // introspection relations.
3558    #[mz_ore::test(tokio::test)]
3559    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3560    async fn test_mz_introspection_builtins() {
3561        Catalog::with_debug(|catalog| async move {
3562            let conn_catalog = catalog.for_system_session();
3563
3564            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3565            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3566
3567            for entry in catalog.entries() {
3568                let schema_spec = entry.name().qualifiers.schema_spec;
3569                let introspection_deps = catalog.introspection_dependencies(entry.id);
3570                if introspection_deps.is_empty() {
3571                    assert!(
3572                        schema_spec != introspection_schema_spec,
3573                        "entry does not depend on introspection sources but is in \
3574                         `mz_introspection`: {}",
3575                        conn_catalog.resolve_full_name(entry.name()),
3576                    );
3577                } else {
3578                    assert!(
3579                        schema_spec == introspection_schema_spec,
3580                        "entry depends on introspection sources but is not in \
3581                         `mz_introspection`: {}",
3582                        conn_catalog.resolve_full_name(entry.name()),
3583                    );
3584                }
3585            }
3586        })
3587        .await
3588    }
3589
3590    #[mz_ore::test(tokio::test)]
3591    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3592    async fn test_multi_subscriber_catalog() {
3593        let persist_client = PersistClient::new_for_tests().await;
3594        let bootstrap_args = test_bootstrap_args();
3595        let organization_id = Uuid::new_v4();
3596        let db_name = "DB";
3597
3598        let mut writer_catalog = Catalog::open_debug_catalog(
3599            persist_client.clone(),
3600            organization_id.clone(),
3601            &bootstrap_args,
3602        )
3603        .await
3604        .expect("open_debug_catalog");
3605        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3606            persist_client.clone(),
3607            organization_id.clone(),
3608            &bootstrap_args,
3609        )
3610        .await
3611        .expect("open_debug_read_only_catalog");
3612        assert_err!(writer_catalog.resolve_database(db_name));
3613        assert_err!(read_only_catalog.resolve_database(db_name));
3614
3615        let commit_ts = writer_catalog.current_upper().await;
3616        writer_catalog
3617            .transact(
3618                None,
3619                commit_ts,
3620                None,
3621                vec![Op::CreateDatabase {
3622                    name: db_name.to_string(),
3623                    owner_id: MZ_SYSTEM_ROLE_ID,
3624                }],
3625            )
3626            .await
3627            .expect("failed to transact");
3628
3629        let write_db = writer_catalog
3630            .resolve_database(db_name)
3631            .expect("resolve_database");
3632        read_only_catalog
3633            .sync_to_current_updates()
3634            .await
3635            .expect("sync_to_current_updates");
3636        let read_db = read_only_catalog
3637            .resolve_database(db_name)
3638            .expect("resolve_database");
3639
3640        assert_eq!(write_db, read_db);
3641
3642        let writer_catalog_fencer =
3643            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3644                .await
3645                .expect("open_debug_catalog for fencer");
3646        let fencer_db = writer_catalog_fencer
3647            .resolve_database(db_name)
3648            .expect("resolve_database for fencer");
3649        assert_eq!(fencer_db, read_db);
3650
3651        let write_fence_err = writer_catalog
3652            .sync_to_current_updates()
3653            .await
3654            .expect_err("sync_to_current_updates for fencer");
3655        assert!(matches!(
3656            write_fence_err,
3657            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3658        ));
3659        let read_fence_err = read_only_catalog
3660            .sync_to_current_updates()
3661            .await
3662            .expect_err("sync_to_current_updates after fencer");
3663        assert!(matches!(
3664            read_fence_err,
3665            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3666        ));
3667
3668        writer_catalog.expire().await;
3669        read_only_catalog.expire().await;
3670        writer_catalog_fencer.expire().await;
3671    }
3672}