Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, STORAGE_USAGE_ID_ALLOC_KEY, TestCatalogStateBuilder,
39    test_bootstrap_args,
40};
41use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
42use mz_catalog::memory::error::{Error, ErrorKind};
43use mz_catalog::memory::objects::{
44    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
45    NetworkPolicy, Role, RoleAuth, Schema,
46};
47use mz_compute_types::dataflows::DataflowDescription;
48use mz_controller::clusters::ReplicaLocation;
49use mz_controller_types::{ClusterId, ReplicaId};
50use mz_expr::OptimizedMirRelationExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68    SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89// DO NOT add any more imports from `crate` outside of `crate::catalog`.
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    expr_cache_handle: Option<ExpressionCacheHandle>,
140    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141    transient_revision: u64,
142}
143
144// Implement our own Clone because derive can't unless S is Clone, which it's
145// not (hence the Arc).
146impl Clone for Catalog {
147    fn clone(&self) -> Self {
148        Self {
149            state: self.state.clone(),
150            expr_cache_handle: self.expr_cache_handle.clone(),
151            storage: Arc::clone(&self.storage),
152            transient_revision: self.transient_revision,
153        }
154    }
155}
156
157impl Catalog {
158    /// Set the optimized plan for the item identified by `id`.
159    ///
160    /// # Panics
161    /// If the item is not an `Index`, `MaterializedView`, or
162    /// `ContinualTask`.
163    #[mz_ore::instrument(level = "trace")]
164    pub fn set_optimized_plan(
165        &mut self,
166        id: GlobalId,
167        plan: DataflowDescription<OptimizedMirRelationExpr>,
168    ) {
169        self.state.set_optimized_plan(id, plan);
170    }
171
172    /// Set the physical plan for the item identified by `id`.
173    ///
174    /// # Panics
175    /// If the item is not an `Index`, `MaterializedView`, or
176    /// `ContinualTask`.
177    #[mz_ore::instrument(level = "trace")]
178    pub fn set_physical_plan(
179        &mut self,
180        id: GlobalId,
181        plan: DataflowDescription<mz_compute_types::plan::Plan>,
182    ) {
183        self.state.set_physical_plan(id, plan);
184    }
185
186    /// Try to get the optimized plan for the item identified by `id`.
187    #[mz_ore::instrument(level = "trace")]
188    pub fn try_get_optimized_plan(
189        &self,
190        id: &GlobalId,
191    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192        let entry = self.state.try_get_entry_by_global_id(id)?;
193        entry.item().optimized_plan().map(AsRef::as_ref)
194    }
195
196    /// Try to get the physical plan for the item identified by `id`.
197    #[mz_ore::instrument(level = "trace")]
198    pub fn try_get_physical_plan(
199        &self,
200        id: &GlobalId,
201    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202        let entry = self.state.try_get_entry_by_global_id(id)?;
203        entry.item().physical_plan().map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    ///
208    /// # Panics
209    /// If the item is not an `Index`, `MaterializedView`, or
210    /// `ContinualTask`.
211    #[mz_ore::instrument(level = "trace")]
212    pub fn set_dataflow_metainfo(
213        &mut self,
214        id: GlobalId,
215        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216    ) {
217        self.state.set_dataflow_metainfo(id, metainfo);
218    }
219
220    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
221    #[mz_ore::instrument(level = "trace")]
222    pub fn try_get_dataflow_metainfo(
223        &self,
224        id: &GlobalId,
225    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226        let entry = self.state.try_get_entry_by_global_id(id)?;
227        entry.item().dataflow_metainfo()
228    }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233    state: Cow<'a, CatalogState>,
234    /// Because we don't have any way of removing items from the catalog
235    /// temporarily, we allow the ConnCatalog to pretend that a set of items
236    /// don't exist during resolution.
237    ///
238    /// This feature is necessary to allow re-planning of statements, which is
239    /// either incredibly useful or required when altering item definitions.
240    ///
241    /// Note that uses of this field should be used by short-lived
242    /// catalogs.
243    unresolvable_ids: BTreeSet<CatalogItemId>,
244    conn_id: ConnectionId,
245    cluster: String,
246    database: Option<DatabaseId>,
247    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248    role_id: RoleId,
249    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250    portals: Option<&'a BTreeMap<String, Portal>>,
251    notices_tx: UnboundedSender<AdapterNotice>,
252    restrict_to_user_objects: bool,
253}
254
255impl ConnCatalog<'_> {
256    pub fn conn_id(&self) -> &ConnectionId {
257        &self.conn_id
258    }
259
260    pub fn state(&self) -> &CatalogState {
261        &*self.state
262    }
263
264    /// Prevent planning from resolving item with the provided ID. Instead,
265    /// return an error as if the item did not exist.
266    ///
267    /// This feature is meant exclusively to permit re-planning statements
268    /// during update operations and should not be used otherwise given its
269    /// extremely "powerful" semantics.
270    ///
271    /// # Panics
272    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
273    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
274        assert_eq!(
275            self.role_id, MZ_SYSTEM_ROLE_ID,
276            "only the system role can mark IDs unresolvable",
277        );
278        self.unresolvable_ids.insert(id);
279    }
280
281    /// Returns the schemas:
282    /// - mz_catalog
283    /// - pg_catalog
284    /// - temp (if requested)
285    /// - all schemas from the session's search_path var that exist
286    pub fn effective_search_path(
287        &self,
288        include_temp_schema: bool,
289    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
290        self.state
291            .effective_search_path(&self.search_path, include_temp_schema)
292    }
293}
294
295impl ConnectionResolver for ConnCatalog<'_> {
296    fn resolve_connection(
297        &self,
298        id: CatalogItemId,
299    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
300        self.state().resolve_connection(id)
301    }
302}
303
304impl Catalog {
305    /// Returns the catalog's transient revision, which starts at 1 and is
306    /// incremented on every change. This is not persisted to disk, and will
307    /// restart on every load.
308    pub fn transient_revision(&self) -> u64 {
309        self.transient_revision
310    }
311
312    /// Creates a debug catalog from the current
313    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
314    /// like in tests.
315    ///
316    /// WARNING! This function can arbitrarily fail because it does not make any
317    /// effort to adjust the catalog's contents' structure or semantics to the
318    /// currently running version, i.e. it does not apply any migrations.
319    ///
320    /// This function must not be called in production contexts. Use
321    /// [`Catalog::open`] with appropriately set configuration parameters
322    /// instead.
323    pub async fn with_debug<F, Fut, T>(f: F) -> T
324    where
325        F: FnOnce(Catalog) -> Fut,
326        Fut: Future<Output = T>,
327    {
328        let persist_client = PersistClient::new_for_tests().await;
329        let organization_id = Uuid::new_v4();
330        let bootstrap_args = test_bootstrap_args();
331        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
332            .await
333            .expect("can open debug catalog");
334        f(catalog).await
335    }
336
337    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
338    /// in progress.
339    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
340    where
341        F: FnOnce(Catalog) -> Fut,
342        Fut: Future<Output = T>,
343    {
344        let persist_client = PersistClient::new_for_tests().await;
345        let organization_id = Uuid::new_v4();
346        let bootstrap_args = test_bootstrap_args();
347        let mut catalog =
348            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
349                .await
350                .expect("can open debug catalog");
351
352        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
353        let now = SYSTEM_TIME.clone();
354        let openable_storage = TestCatalogStateBuilder::new(persist_client)
355            .with_organization_id(organization_id)
356            .with_default_deploy_generation()
357            .build()
358            .await
359            .expect("can create durable catalog");
360        let mut storage = openable_storage
361            .open(now().into(), &bootstrap_args)
362            .await
363            .expect("can open durable catalog")
364            .0;
365        // Drain updates.
366        let _ = storage
367            .sync_to_current_updates()
368            .await
369            .expect("can sync to current updates");
370        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
371
372        f(catalog).await
373    }
374
375    /// Opens a debug catalog.
376    ///
377    /// See [`Catalog::with_debug`].
378    pub async fn open_debug_catalog(
379        persist_client: PersistClient,
380        organization_id: Uuid,
381        bootstrap_args: &BootstrapArgs,
382    ) -> Result<Catalog, anyhow::Error> {
383        let now = SYSTEM_TIME.clone();
384        let environment_id = None;
385        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
386            .with_organization_id(organization_id)
387            .with_default_deploy_generation()
388            .build()
389            .await?;
390        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
391        let system_parameter_defaults = BTreeMap::default();
392        Self::open_debug_catalog_inner(
393            persist_client,
394            storage,
395            now,
396            environment_id,
397            &DUMMY_BUILD_INFO,
398            system_parameter_defaults,
399            bootstrap_args,
400            None,
401        )
402        .await
403    }
404
405    /// Opens a read only debug persist backed catalog defined by `persist_client` and
406    /// `organization_id`.
407    ///
408    /// See [`Catalog::with_debug`].
409    pub async fn open_debug_read_only_catalog(
410        persist_client: PersistClient,
411        organization_id: Uuid,
412        bootstrap_args: &BootstrapArgs,
413    ) -> Result<Catalog, anyhow::Error> {
414        let now = SYSTEM_TIME.clone();
415        let environment_id = None;
416        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
417            .with_organization_id(organization_id)
418            .build()
419            .await?;
420        let storage = openable_storage
421            .open_read_only(&test_bootstrap_args())
422            .await?;
423        let system_parameter_defaults = BTreeMap::default();
424        Self::open_debug_catalog_inner(
425            persist_client,
426            storage,
427            now,
428            environment_id,
429            &DUMMY_BUILD_INFO,
430            system_parameter_defaults,
431            bootstrap_args,
432            None,
433        )
434        .await
435    }
436
437    /// Opens a read only debug persist backed catalog defined by `persist_client` and
438    /// `organization_id`.
439    ///
440    /// See [`Catalog::with_debug`].
441    pub async fn open_debug_read_only_persist_catalog_config(
442        persist_client: PersistClient,
443        now: NowFn,
444        environment_id: EnvironmentId,
445        system_parameter_defaults: BTreeMap<String, String>,
446        build_info: &'static BuildInfo,
447        bootstrap_args: &BootstrapArgs,
448        enable_expression_cache_override: Option<bool>,
449    ) -> Result<Catalog, anyhow::Error> {
450        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
451            .with_organization_id(environment_id.organization_id())
452            .with_version(
453                build_info
454                    .version
455                    .parse()
456                    .expect("build version is parseable"),
457            )
458            .build()
459            .await?;
460        let storage = openable_storage.open_read_only(bootstrap_args).await?;
461        Self::open_debug_catalog_inner(
462            persist_client,
463            storage,
464            now,
465            Some(environment_id),
466            build_info,
467            system_parameter_defaults,
468            bootstrap_args,
469            enable_expression_cache_override,
470        )
471        .await
472    }
473
474    async fn open_debug_catalog_inner(
475        persist_client: PersistClient,
476        storage: Box<dyn DurableCatalogState>,
477        now: NowFn,
478        environment_id: Option<EnvironmentId>,
479        build_info: &'static BuildInfo,
480        system_parameter_defaults: BTreeMap<String, String>,
481        bootstrap_args: &BootstrapArgs,
482        enable_expression_cache_override: Option<bool>,
483    ) -> Result<Catalog, anyhow::Error> {
484        let metrics_registry = &MetricsRegistry::new();
485        let secrets_reader = Arc::new(InMemorySecretsController::new());
486        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
487        // debugging/testing.
488        let previous_ts = now().into();
489        let replica_size = &bootstrap_args.default_cluster_replica_size;
490        let read_only = false;
491
492        let OpenCatalogResult {
493            catalog,
494            migrated_storage_collections_0dt: _,
495            new_builtin_collections: _,
496            builtin_table_updates: _,
497            cached_global_exprs: _,
498            uncached_local_exprs: _,
499        } = Catalog::open(Config {
500            storage,
501            metrics_registry,
502            state: StateConfig {
503                unsafe_mode: true,
504                all_features: false,
505                build_info,
506                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
507                read_only,
508                now,
509                boot_ts: previous_ts,
510                skip_migrations: true,
511                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
512                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
513                    size: replica_size.clone(),
514                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
515                },
516                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
517                    size: replica_size.clone(),
518                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
519                },
520                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
521                    size: replica_size.clone(),
522                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
523                },
524                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
525                    size: replica_size.clone(),
526                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
527                },
528                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
529                    size: replica_size.clone(),
530                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
531                },
532                system_parameter_defaults,
533                remote_system_parameters: None,
534                availability_zones: vec![],
535                egress_addresses: vec![],
536                aws_principal_context: None,
537                aws_privatelink_availability_zones: None,
538                http_host_name: None,
539                connection_context: ConnectionContext::for_tests(secrets_reader),
540                builtin_item_migration_config: BuiltinItemMigrationConfig {
541                    persist_client: persist_client.clone(),
542                    read_only,
543                    force_migration: None,
544                },
545                persist_client,
546                enable_expression_cache_override,
547                helm_chart_version: None,
548                external_login_password_mz_system: None,
549                license_key: ValidatedLicenseKey::for_tests(),
550            },
551        })
552        .await?;
553        Ok(catalog)
554    }
555
556    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
557        self.state.for_session(session)
558    }
559
560    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
561        self.state.for_sessionless_user(role_id)
562    }
563
564    pub fn for_system_session(&self) -> ConnCatalog<'_> {
565        self.state.for_system_session()
566    }
567
568    async fn storage<'a>(
569        &'a self,
570    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
571        self.storage.lock().await
572    }
573
574    pub async fn current_upper(&self) -> mz_repr::Timestamp {
575        self.storage().await.current_upper().await
576    }
577
578    pub async fn allocate_user_id(
579        &self,
580        commit_ts: mz_repr::Timestamp,
581    ) -> Result<(CatalogItemId, GlobalId), Error> {
582        self.storage()
583            .await
584            .allocate_user_id(commit_ts)
585            .await
586            .maybe_terminate("allocating user ids")
587            .err_into()
588    }
589
590    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
591    pub async fn allocate_user_ids(
592        &self,
593        amount: u64,
594        commit_ts: mz_repr::Timestamp,
595    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
596        self.storage()
597            .await
598            .allocate_user_ids(amount, commit_ts)
599            .await
600            .maybe_terminate("allocating user ids")
601            .err_into()
602    }
603
604    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
605        let commit_ts = self.storage().await.current_upper().await;
606        self.allocate_user_id(commit_ts).await
607    }
608
609    /// Allocates a single durable id for a storage usage collection batch.
610    ///
611    /// Bumps the durable `STORAGE_USAGE_ID_ALLOC_KEY` allocator by one and
612    /// returns the previous value. The bump is committed at `commit_ts`.
613    /// One id is shared by every row produced by a collection cycle (see
614    /// `Coordinator::storage_usage_update`), so the durable cost is one
615    /// allocator round-trip per cycle, not per shard.
616    pub async fn allocate_storage_usage_id(
617        &self,
618        commit_ts: mz_repr::Timestamp,
619    ) -> Result<u64, Error> {
620        use mz_ore::collections::CollectionExt;
621
622        self.storage()
623            .await
624            .allocate_id(STORAGE_USAGE_ID_ALLOC_KEY, 1, commit_ts)
625            .await
626            .maybe_terminate("allocating storage usage id")
627            .map(|ids| ids.into_element())
628            .err_into()
629    }
630
631    /// Get the next user item ID without allocating it.
632    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
633        self.storage()
634            .await
635            .get_next_user_item_id()
636            .await
637            .err_into()
638    }
639
640    #[cfg(test)]
641    pub async fn allocate_system_id(
642        &self,
643        commit_ts: mz_repr::Timestamp,
644    ) -> Result<(CatalogItemId, GlobalId), Error> {
645        use mz_ore::collections::CollectionExt;
646
647        let mut storage = self.storage().await;
648        let mut txn = storage.transaction().await?;
649        let id = txn
650            .allocate_system_item_ids(1)
651            .maybe_terminate("allocating system ids")?
652            .into_element();
653        // Drain transaction.
654        let _ = txn.get_and_commit_op_updates();
655        txn.commit(commit_ts).await?;
656        Ok(id)
657    }
658
659    /// Get the next system item ID without allocating it.
660    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
661        self.storage()
662            .await
663            .get_next_system_item_id()
664            .await
665            .err_into()
666    }
667
668    pub async fn allocate_user_cluster_id(
669        &self,
670        commit_ts: mz_repr::Timestamp,
671    ) -> Result<ClusterId, Error> {
672        self.storage()
673            .await
674            .allocate_user_cluster_id(commit_ts)
675            .await
676            .maybe_terminate("allocating user cluster ids")
677            .err_into()
678    }
679
680    /// Allocate `amount` many user replica IDs. See
681    /// [`DurableCatalogState::allocate_user_replica_ids`].
682    pub async fn allocate_user_replica_ids(
683        &self,
684        amount: u64,
685        commit_ts: mz_repr::Timestamp,
686    ) -> Result<Vec<ReplicaId>, Error> {
687        self.storage()
688            .await
689            .allocate_user_replica_ids(amount, commit_ts)
690            .await
691            .maybe_terminate("allocating user replica ids")
692            .err_into()
693    }
694
695    /// Allocate `amount` many system replica IDs. See
696    /// [`DurableCatalogState::allocate_system_replica_ids`].
697    pub async fn allocate_system_replica_ids(
698        &self,
699        amount: u64,
700        commit_ts: mz_repr::Timestamp,
701    ) -> Result<Vec<ReplicaId>, Error> {
702        self.storage()
703            .await
704            .allocate_system_replica_ids(amount, commit_ts)
705            .await
706            .maybe_terminate("allocating system replica ids")
707            .err_into()
708    }
709
710    /// Allocate `amount` many replica IDs for `cluster_id`, picking user or
711    /// system IDs based on the cluster's ID type.
712    pub async fn allocate_replica_ids(
713        &self,
714        cluster_id: ClusterId,
715        amount: u64,
716        commit_ts: mz_repr::Timestamp,
717    ) -> Result<Vec<ReplicaId>, Error> {
718        if cluster_id.is_system() {
719            self.allocate_system_replica_ids(amount, commit_ts).await
720        } else {
721            self.allocate_user_replica_ids(amount, commit_ts).await
722        }
723    }
724
725    /// Get the next system replica id without allocating it.
726    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
727        self.storage()
728            .await
729            .get_next_system_replica_id()
730            .await
731            .err_into()
732    }
733
734    /// Get the next user replica id without allocating it.
735    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
736        self.storage()
737            .await
738            .get_next_user_replica_id()
739            .await
740            .err_into()
741    }
742
743    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
744        self.state.resolve_database(database_name)
745    }
746
747    pub fn resolve_schema(
748        &self,
749        current_database: Option<&DatabaseId>,
750        database_name: Option<&str>,
751        schema_name: &str,
752        conn_id: &ConnectionId,
753    ) -> Result<&Schema, SqlCatalogError> {
754        self.state
755            .resolve_schema(current_database, database_name, schema_name, conn_id)
756    }
757
758    pub fn resolve_schema_in_database(
759        &self,
760        database_spec: &ResolvedDatabaseSpecifier,
761        schema_name: &str,
762        conn_id: &ConnectionId,
763    ) -> Result<&Schema, SqlCatalogError> {
764        self.state
765            .resolve_schema_in_database(database_spec, schema_name, conn_id)
766    }
767
768    pub fn resolve_replica_in_cluster(
769        &self,
770        cluster_id: &ClusterId,
771        replica_name: &str,
772    ) -> Result<&ClusterReplica, SqlCatalogError> {
773        self.state
774            .resolve_replica_in_cluster(cluster_id, replica_name)
775    }
776
777    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
778        self.state.resolve_system_schema(name)
779    }
780
781    pub fn resolve_search_path(
782        &self,
783        session: &Session,
784    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
785        self.state.resolve_search_path(session)
786    }
787
788    /// Resolves `name` to a non-function [`CatalogEntry`].
789    pub fn resolve_entry(
790        &self,
791        current_database: Option<&DatabaseId>,
792        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
793        name: &PartialItemName,
794        conn_id: &ConnectionId,
795    ) -> Result<&CatalogEntry, SqlCatalogError> {
796        self.state
797            .resolve_entry(current_database, search_path, name, conn_id)
798    }
799
800    /// Resolves a `BuiltinTable`.
801    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
802        self.state.resolve_builtin_table(builtin)
803    }
804
805    /// Resolves a `BuiltinLog`.
806    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
807        self.state.resolve_builtin_log(builtin).0
808    }
809
810    /// Resolves a `BuiltinSource`.
811    pub fn resolve_builtin_storage_collection(
812        &self,
813        builtin: &'static BuiltinSource,
814    ) -> CatalogItemId {
815        self.state.resolve_builtin_source(builtin)
816    }
817
818    /// Resolves `name` to a function [`CatalogEntry`].
819    pub fn resolve_function(
820        &self,
821        current_database: Option<&DatabaseId>,
822        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
823        name: &PartialItemName,
824        conn_id: &ConnectionId,
825    ) -> Result<&CatalogEntry, SqlCatalogError> {
826        self.state
827            .resolve_function(current_database, search_path, name, conn_id)
828    }
829
830    /// Resolves `name` to a type [`CatalogEntry`].
831    pub fn resolve_type(
832        &self,
833        current_database: Option<&DatabaseId>,
834        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
835        name: &PartialItemName,
836        conn_id: &ConnectionId,
837    ) -> Result<&CatalogEntry, SqlCatalogError> {
838        self.state
839            .resolve_type(current_database, search_path, name, conn_id)
840    }
841
842    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
843        self.state.resolve_cluster(name)
844    }
845
846    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
847    ///
848    /// # Panics
849    /// * If the [`BuiltinCluster`] doesn't exist.
850    ///
851    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
852        self.state.resolve_builtin_cluster(cluster)
853    }
854
855    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
856        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
857    }
858
859    /// Resolves a [`Cluster`] for a TargetCluster.
860    pub fn resolve_target_cluster(
861        &self,
862        target_cluster: TargetCluster,
863        session: &Session,
864    ) -> Result<&Cluster, AdapterError> {
865        match target_cluster {
866            TargetCluster::CatalogServer => {
867                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
868            }
869            TargetCluster::Active => self.active_cluster(session),
870            TargetCluster::Transaction(cluster_id) => self
871                .try_get_cluster(cluster_id)
872                .ok_or(AdapterError::ConcurrentClusterDrop),
873        }
874    }
875
876    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
877        // TODO(benesch): this check here is not sufficiently protective. It'd
878        // be very easy for a code path to accidentally avoid this check by
879        // calling `resolve_cluster(session.vars().cluster())`.
880        if session.user().name != SYSTEM_USER.name
881            && session.user().name != SUPPORT_USER.name
882            && session.vars().cluster() == SYSTEM_USER.name
883        {
884            coord_bail!(
885                "system cluster '{}' cannot execute user queries",
886                SYSTEM_USER.name
887            );
888        }
889        let cluster = self.resolve_cluster(session.vars().cluster())?;
890        Ok(cluster)
891    }
892
893    pub fn state(&self) -> &CatalogState {
894        &self.state
895    }
896
897    pub fn resolve_full_name(
898        &self,
899        name: &QualifiedItemName,
900        conn_id: Option<&ConnectionId>,
901    ) -> FullItemName {
902        self.state.resolve_full_name(name, conn_id)
903    }
904
905    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
906        self.state.try_get_entry(id)
907    }
908
909    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
910        self.state.try_get_entry_by_global_id(id)
911    }
912
913    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
914        self.state.get_entry(id)
915    }
916
917    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
918        self.state.get_entry_by_global_id(id)
919    }
920
921    pub fn get_global_ids<'a>(
922        &'a self,
923        id: &CatalogItemId,
924    ) -> impl Iterator<Item = GlobalId> + use<'a> {
925        self.get_entry(id).global_ids()
926    }
927
928    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
929        self.get_entry_by_global_id(id).id()
930    }
931
932    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
933        let item = self.try_get_entry_by_global_id(id)?;
934        Some(item.id())
935    }
936
937    pub fn get_schema(
938        &self,
939        database_spec: &ResolvedDatabaseSpecifier,
940        schema_spec: &SchemaSpecifier,
941        conn_id: &ConnectionId,
942    ) -> &Schema {
943        self.state.get_schema(database_spec, schema_spec, conn_id)
944    }
945
946    pub fn try_get_schema(
947        &self,
948        database_spec: &ResolvedDatabaseSpecifier,
949        schema_spec: &SchemaSpecifier,
950        conn_id: &ConnectionId,
951    ) -> Option<&Schema> {
952        self.state
953            .try_get_schema(database_spec, schema_spec, conn_id)
954    }
955
956    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
957        self.state.get_mz_catalog_schema_id()
958    }
959
960    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
961        self.state.get_pg_catalog_schema_id()
962    }
963
964    pub fn get_information_schema_id(&self) -> SchemaId {
965        self.state.get_information_schema_id()
966    }
967
968    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
969        self.state.get_mz_internal_schema_id()
970    }
971
972    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
973        self.state.get_mz_introspection_schema_id()
974    }
975
976    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
977        self.state.get_mz_unsafe_schema_id()
978    }
979
980    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
981        self.state.system_schema_ids()
982    }
983
984    pub fn get_database(&self, id: &DatabaseId) -> &Database {
985        self.state.get_database(id)
986    }
987
988    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
989        self.state.try_get_role(id)
990    }
991
992    pub fn get_role(&self, id: &RoleId) -> &Role {
993        self.state.get_role(id)
994    }
995
996    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
997        self.state.try_get_role_by_name(role_name)
998    }
999
1000    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1001        self.state.try_get_role_auth_by_id(id)
1002    }
1003
1004    /// Creates a new schema in the `Catalog` for temporary items
1005    /// indicated by the TEMPORARY or TEMP keywords.
1006    pub fn create_temporary_schema(
1007        &mut self,
1008        conn_id: &ConnectionId,
1009        owner_id: RoleId,
1010    ) -> Result<(), Error> {
1011        self.state.create_temporary_schema(conn_id, owner_id)
1012    }
1013
1014    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1015        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
1016        self.state
1017            .temporary_schemas
1018            .get(conn_id)
1019            .map(|schema| schema.items.contains_key(item_name))
1020            .unwrap_or(false)
1021    }
1022
1023    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1024    /// Returns Ok if conn_id's temp schema does not exist.
1025    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1026        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1027            return Ok(());
1028        };
1029        if !schema.items.is_empty() {
1030            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1031        }
1032        Ok(())
1033    }
1034
1035    pub(crate) fn object_dependents(
1036        &self,
1037        object_ids: &Vec<ObjectId>,
1038        conn_id: &ConnectionId,
1039    ) -> Vec<ObjectId> {
1040        let mut seen = BTreeSet::new();
1041        self.state.object_dependents(object_ids, conn_id, &mut seen)
1042    }
1043
1044    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1045        FullNameV1 {
1046            database: name.database.to_string(),
1047            schema: name.schema.clone(),
1048            item: name.item.clone(),
1049        }
1050    }
1051
1052    pub fn find_available_cluster_name(&self, name: &str) -> String {
1053        let mut i = 0;
1054        let mut candidate = name.to_string();
1055        while self.state.clusters_by_name.contains_key(&candidate) {
1056            i += 1;
1057            candidate = format!("{}{}", name, i);
1058        }
1059        candidate
1060    }
1061
1062    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1063        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1064            self.cluster_replica_sizes()
1065                .enabled_allocations()
1066                .map(|a| a.0.to_owned())
1067                .collect::<Vec<_>>()
1068        } else {
1069            self.system_config().allowed_cluster_replica_sizes()
1070        }
1071    }
1072
1073    pub fn concretize_replica_location(
1074        &self,
1075        location: mz_catalog::durable::ReplicaLocation,
1076        allowed_sizes: &Vec<String>,
1077        allowed_availability_zones: Option<&[String]>,
1078        allow_disabled: bool,
1079    ) -> Result<ReplicaLocation, Error> {
1080        self.state.concretize_replica_location(
1081            location,
1082            allowed_sizes,
1083            allowed_availability_zones,
1084            allow_disabled,
1085        )
1086    }
1087
1088    pub(crate) fn ensure_valid_replica_size(
1089        &self,
1090        allowed_sizes: &[String],
1091        size: &String,
1092        allow_disabled: bool,
1093    ) -> Result<(), Error> {
1094        self.state
1095            .ensure_valid_replica_size(allowed_sizes, size, allow_disabled)
1096    }
1097
1098    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1099        &self.state.cluster_replica_sizes
1100    }
1101
1102    /// Returns the privileges of an object by its ID.
1103    pub fn get_privileges(
1104        &self,
1105        id: &SystemObjectId,
1106        conn_id: &ConnectionId,
1107    ) -> Option<&PrivilegeMap> {
1108        match id {
1109            SystemObjectId::Object(id) => match id {
1110                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1111                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1112                ObjectId::Schema((database_spec, schema_spec)) => Some(
1113                    self.get_schema(database_spec, schema_spec, conn_id)
1114                        .privileges(),
1115                ),
1116                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1117                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1118                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1119            },
1120            SystemObjectId::System => Some(&self.state.system_privileges),
1121        }
1122    }
1123
1124    #[mz_ore::instrument(level = "debug")]
1125    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1126        Ok(self.storage().await.advance_upper(new_upper).await?)
1127    }
1128
1129    /// Return the ids of all log sources the given object depends on.
1130    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1131        self.state.introspection_dependencies(id)
1132    }
1133
1134    /// Serializes the catalog's in-memory state.
1135    ///
1136    /// There are no guarantees about the format of the serialized state, except
1137    /// that the serialized state for two identical catalogs will compare
1138    /// identically.
1139    pub fn dump(&self) -> Result<CatalogDump, Error> {
1140        Ok(CatalogDump::new(self.state.dump(None)?))
1141    }
1142
1143    /// Checks the [`Catalog`]s internal consistency.
1144    ///
1145    /// Returns a JSON object describing the inconsistencies, if there are any.
1146    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1147        self.state.check_consistency().map_err(|inconsistencies| {
1148            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1149                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1150            })
1151        })
1152    }
1153
1154    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1155        self.state.config()
1156    }
1157
1158    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1159        self.state.entry_by_id.values()
1160    }
1161
1162    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1163        self.entries()
1164            .filter(|entry| entry.is_connection() && entry.id().is_user())
1165    }
1166
1167    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1168        self.entries()
1169            .filter(|entry| entry.is_table() && entry.id().is_user())
1170    }
1171
1172    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1173        self.entries()
1174            .filter(|entry| entry.is_source() && entry.id().is_user())
1175    }
1176
1177    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1178        self.entries()
1179            .filter(|entry| entry.is_sink() && entry.id().is_user())
1180    }
1181
1182    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1183        self.entries()
1184            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1185    }
1186
1187    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1188        self.entries()
1189            .filter(|entry| entry.is_secret() && entry.id().is_user())
1190    }
1191
1192    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1193        self.state.get_network_policy(&network_policy_id)
1194    }
1195
1196    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1197        self.state.try_get_network_policy_by_name(name)
1198    }
1199
1200    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1201        self.state.clusters_by_id.values()
1202    }
1203
1204    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1205        self.state.get_cluster(cluster_id)
1206    }
1207
1208    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1209        self.state.try_get_cluster(cluster_id)
1210    }
1211
1212    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1213        self.clusters().filter(|cluster| cluster.id.is_user())
1214    }
1215
1216    pub fn get_cluster_replica(
1217        &self,
1218        cluster_id: ClusterId,
1219        replica_id: ReplicaId,
1220    ) -> &ClusterReplica {
1221        self.state.get_cluster_replica(cluster_id, replica_id)
1222    }
1223
1224    pub fn try_get_cluster_replica(
1225        &self,
1226        cluster_id: ClusterId,
1227        replica_id: ReplicaId,
1228    ) -> Option<&ClusterReplica> {
1229        self.state.try_get_cluster_replica(cluster_id, replica_id)
1230    }
1231
1232    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1233        self.user_clusters()
1234            .flat_map(|cluster| cluster.user_replicas())
1235    }
1236
1237    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1238        self.state.database_by_id.values()
1239    }
1240
1241    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1242        self.state
1243            .roles_by_id
1244            .values()
1245            .filter(|role| role.is_user())
1246    }
1247
1248    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1249        self.state
1250            .network_policies_by_id
1251            .iter()
1252            .filter(|(id, _)| id.is_user())
1253            .map(|(_, policy)| policy)
1254    }
1255
1256    pub fn system_privileges(&self) -> &PrivilegeMap {
1257        &self.state.system_privileges
1258    }
1259
1260    pub fn default_privileges(
1261        &self,
1262    ) -> impl Iterator<
1263        Item = (
1264            &DefaultPrivilegeObject,
1265            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1266        ),
1267    > {
1268        self.state.default_privileges.iter()
1269    }
1270
1271    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1272        self.state
1273            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1274    }
1275
1276    pub fn pack_storage_usage_update(
1277        &self,
1278        event: VersionedStorageUsage,
1279        diff: Diff,
1280    ) -> BuiltinTableUpdate {
1281        self.state
1282            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1283    }
1284
1285    pub fn system_config(&self) -> &SystemVars {
1286        self.state.system_config()
1287    }
1288
1289    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1290        self.state.system_config_mut()
1291    }
1292
1293    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1294        self.state.ensure_not_reserved_role(role_id)
1295    }
1296
1297    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1298        self.state.ensure_grantable_role(role_id)
1299    }
1300
1301    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1302        self.state.ensure_not_system_role(role_id)
1303    }
1304
1305    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1306        self.state.ensure_not_predefined_role(role_id)
1307    }
1308
1309    pub fn ensure_not_reserved_network_policy(
1310        &self,
1311        network_policy_id: &NetworkPolicyId,
1312    ) -> Result<(), Error> {
1313        self.state
1314            .ensure_not_reserved_network_policy(network_policy_id)
1315    }
1316
1317    pub fn ensure_not_reserved_object(
1318        &self,
1319        object_id: &ObjectId,
1320        conn_id: &ConnectionId,
1321    ) -> Result<(), Error> {
1322        match object_id {
1323            ObjectId::Cluster(cluster_id) => {
1324                if cluster_id.is_system() {
1325                    let cluster = self.get_cluster(*cluster_id);
1326                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1327                        cluster.name().to_string(),
1328                    )))
1329                } else {
1330                    Ok(())
1331                }
1332            }
1333            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1334                if replica_id.is_system() {
1335                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1336                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1337                        replica.name().to_string(),
1338                    )))
1339                } else {
1340                    Ok(())
1341                }
1342            }
1343            ObjectId::Database(database_id) => {
1344                if database_id.is_system() {
1345                    let database = self.get_database(database_id);
1346                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1347                        database.name().to_string(),
1348                    )))
1349                } else {
1350                    Ok(())
1351                }
1352            }
1353            ObjectId::Schema((database_spec, schema_spec)) => {
1354                if schema_spec.is_system() {
1355                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1356                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1357                        schema.name().schema.clone(),
1358                    )))
1359                } else {
1360                    Ok(())
1361                }
1362            }
1363            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1364            ObjectId::Item(item_id) => {
1365                if item_id.is_system() {
1366                    let item = self.get_entry(item_id);
1367                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1368                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1369                } else {
1370                    Ok(())
1371                }
1372            }
1373            ObjectId::NetworkPolicy(network_policy_id) => {
1374                self.ensure_not_reserved_network_policy(network_policy_id)
1375            }
1376        }
1377    }
1378
1379    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1380    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1381        &mut self,
1382        create_sql: &str,
1383        force_if_exists_skip: bool,
1384    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1385        self.state
1386            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1387    }
1388
1389    /// Cache global and, optionally, local expressions for the given
1390    /// `GlobalId`.
1391    ///
1392    /// Takes the plans and metainfo directly as parameters (rather than
1393    /// fishing them out of catalog state), so this can be called **before**
1394    /// the catalog transaction that creates the item. Returns the future
1395    /// returned by [`Catalog::update_expression_cache`]; callers should
1396    /// `.await` it before the catalog transaction commits, so the durable
1397    /// expression cache is observed to contain the entries by the time any
1398    /// other process (or a subsequent bootstrap on this process) reads them.
1399    pub(crate) fn cache_expressions(
1400        &self,
1401        id: GlobalId,
1402        local_mir: Option<OptimizedMirRelationExpr>,
1403        mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1404        mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1405        dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1406        optimizer_features: OptimizerFeatures,
1407    ) -> BoxFuture<'static, ()> {
1408        // Make sure we're not caching the result of timestamp selection, as
1409        // it will almost certainly be wrong if we re-install the dataflow at
1410        // a later time.
1411        global_mir.as_of = None;
1412        global_mir.until = Default::default();
1413        physical_plan.as_of = None;
1414        physical_plan.until = Default::default();
1415
1416        let mut local_exprs = Vec::new();
1417        if let Some(local_mir) = local_mir {
1418            local_exprs.push((
1419                id,
1420                LocalExpressions {
1421                    local_mir,
1422                    optimizer_features: optimizer_features.clone(),
1423                },
1424            ));
1425        }
1426        let global_exprs = vec![(
1427            id,
1428            GlobalExpressions {
1429                global_mir,
1430                physical_plan,
1431                dataflow_metainfos,
1432                optimizer_features,
1433            },
1434        )];
1435        self.update_expression_cache(local_exprs, global_exprs, Default::default())
1436    }
1437
1438    pub(crate) fn update_expression_cache<'a, 'b>(
1439        &'a self,
1440        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1441        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1442        invalidate_ids: BTreeSet<GlobalId>,
1443    ) -> BoxFuture<'b, ()> {
1444        if let Some(expr_cache) = &self.expr_cache_handle {
1445            expr_cache
1446                .update(
1447                    new_local_expressions,
1448                    new_global_expressions,
1449                    invalidate_ids,
1450                )
1451                .boxed()
1452        } else {
1453            async {}.boxed()
1454        }
1455    }
1456
1457    /// Listen for and apply all unconsumed updates to the durable catalog state.
1458    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1459    // `#[cfg(test)]` annotation.
1460    #[cfg(test)]
1461    async fn sync_to_current_updates(
1462        &mut self,
1463    ) -> Result<
1464        (
1465            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1466            Vec<ParsedStateUpdate>,
1467        ),
1468        CatalogError,
1469    > {
1470        let updates = self.storage().await.sync_to_current_updates().await?;
1471        let (builtin_table_updates, catalog_updates) = self
1472            .state
1473            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1474            .await;
1475        Ok((builtin_table_updates, catalog_updates))
1476    }
1477}
1478
1479pub fn is_reserved_name(name: &str) -> bool {
1480    BUILTIN_PREFIXES
1481        .iter()
1482        .any(|prefix| name.starts_with(prefix))
1483}
1484
1485pub fn is_reserved_role_name(name: &str) -> bool {
1486    is_reserved_name(name) || is_public_role(name)
1487}
1488
1489pub fn is_public_role(name: &str) -> bool {
1490    name == &*PUBLIC_ROLE_NAME
1491}
1492
1493pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1494    object_type_to_audit_object_type(sql_type.into())
1495}
1496
1497pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1498    match id {
1499        CommentObjectId::Table(_) => ObjectType::Table,
1500        CommentObjectId::View(_) => ObjectType::View,
1501        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1502        CommentObjectId::Source(_) => ObjectType::Source,
1503        CommentObjectId::Sink(_) => ObjectType::Sink,
1504        CommentObjectId::Index(_) => ObjectType::Index,
1505        CommentObjectId::Func(_) => ObjectType::Func,
1506        CommentObjectId::Connection(_) => ObjectType::Connection,
1507        CommentObjectId::Type(_) => ObjectType::Type,
1508        CommentObjectId::Secret(_) => ObjectType::Secret,
1509        CommentObjectId::Role(_) => ObjectType::Role,
1510        CommentObjectId::Database(_) => ObjectType::Database,
1511        CommentObjectId::Schema(_) => ObjectType::Schema,
1512        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1513        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1514        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1515    }
1516}
1517
1518pub(crate) fn object_type_to_audit_object_type(
1519    object_type: mz_sql::catalog::ObjectType,
1520) -> ObjectType {
1521    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1522}
1523
1524pub(crate) fn system_object_type_to_audit_object_type(
1525    system_type: &SystemObjectType,
1526) -> ObjectType {
1527    match system_type {
1528        SystemObjectType::Object(object_type) => match object_type {
1529            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1530            mz_sql::catalog::ObjectType::View => ObjectType::View,
1531            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1532            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1533            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1534            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1535            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1536            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1537            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1538            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1539            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1540            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1541            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1542            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1543            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1544            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1545        },
1546        SystemObjectType::System => ObjectType::System,
1547    }
1548}
1549
1550#[derive(Debug, Copy, Clone)]
1551pub enum UpdatePrivilegeVariant {
1552    Grant,
1553    Revoke,
1554}
1555
1556impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1557    fn from(variant: UpdatePrivilegeVariant) -> Self {
1558        match variant {
1559            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1560            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1561        }
1562    }
1563}
1564
1565impl From<UpdatePrivilegeVariant> for EventType {
1566    fn from(variant: UpdatePrivilegeVariant) -> Self {
1567        match variant {
1568            UpdatePrivilegeVariant::Grant => EventType::Grant,
1569            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1570        }
1571    }
1572}
1573
1574impl ConnCatalog<'_> {
1575    fn resolve_item_name(
1576        &self,
1577        name: &PartialItemName,
1578    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1579        self.resolve_item(name).map(|entry| entry.name())
1580    }
1581
1582    fn resolve_function_name(
1583        &self,
1584        name: &PartialItemName,
1585    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1586        self.resolve_function(name).map(|entry| entry.name())
1587    }
1588
1589    fn resolve_type_name(
1590        &self,
1591        name: &PartialItemName,
1592    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1593        self.resolve_type(name).map(|entry| entry.name())
1594    }
1595}
1596
1597impl ExprHumanizer for ConnCatalog<'_> {
1598    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1599        let entry = self.state.try_get_entry_by_global_id(&id)?;
1600        Some(self.resolve_full_name(entry.name()).to_string())
1601    }
1602
1603    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1604        let entry = self.state.try_get_entry_by_global_id(&id)?;
1605        Some(entry.name().item.clone())
1606    }
1607
1608    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1609        let entry = self.state.try_get_entry_by_global_id(&id)?;
1610        Some(self.resolve_full_name(entry.name()).into_parts())
1611    }
1612
1613    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1614        use SqlScalarType::*;
1615
1616        match typ {
1617            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1618            List {
1619                custom_id: Some(item_id),
1620                ..
1621            }
1622            | Map {
1623                custom_id: Some(item_id),
1624                ..
1625            } => {
1626                let item = self.get_item(item_id);
1627                self.minimal_qualification(item.name()).to_string()
1628            }
1629            List { element_type, .. } => {
1630                format!(
1631                    "{} list",
1632                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1633                )
1634            }
1635            Map { value_type, .. } => format!(
1636                "map[{}=>{}]",
1637                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1638                self.humanize_sql_scalar_type(value_type, postgres_compat)
1639            ),
1640            Record {
1641                custom_id: Some(item_id),
1642                ..
1643            } => {
1644                let item = self.get_item(item_id);
1645                self.minimal_qualification(item.name()).to_string()
1646            }
1647            Record { fields, .. } => format!(
1648                "record({})",
1649                fields
1650                    .iter()
1651                    .map(|f| format!(
1652                        "{}: {}",
1653                        f.0,
1654                        self.humanize_sql_column_type(&f.1, postgres_compat)
1655                    ))
1656                    .join(",")
1657            ),
1658            PgLegacyChar => "\"char\"".into(),
1659            Char { length } if !postgres_compat => match length {
1660                None => "char".into(),
1661                Some(length) => format!("char({})", length.into_u32()),
1662            },
1663            VarChar { max_length } if !postgres_compat => match max_length {
1664                None => "varchar".into(),
1665                Some(length) => format!("varchar({})", length.into_u32()),
1666            },
1667            UInt16 => "uint2".into(),
1668            UInt32 => "uint4".into(),
1669            UInt64 => "uint8".into(),
1670            ty => {
1671                let pgrepr_type = mz_pgrepr::Type::from(ty);
1672                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1673
1674                let res = if self
1675                    .effective_search_path(true)
1676                    .iter()
1677                    .any(|(_, schema)| schema == &pg_catalog_schema)
1678                {
1679                    pgrepr_type.name().to_string()
1680                } else {
1681                    // If PG_CATALOG_SCHEMA is not in search path, you need
1682                    // qualified object name to refer to type.
1683                    let name = QualifiedItemName {
1684                        qualifiers: ItemQualifiers {
1685                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1686                            schema_spec: pg_catalog_schema,
1687                        },
1688                        item: pgrepr_type.name().to_string(),
1689                    };
1690                    self.resolve_full_name(&name).to_string()
1691                };
1692                res
1693            }
1694        }
1695    }
1696
1697    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1698        let entry = self.state.try_get_entry_by_global_id(&id)?;
1699
1700        match entry.index() {
1701            Some(index) => {
1702                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1703                let mut on_names = on_desc
1704                    .iter_names()
1705                    .map(|col_name| col_name.to_string())
1706                    .collect::<Vec<_>>();
1707
1708                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1709
1710                // Init ix_names with unknown column names. Unknown columns are
1711                // represented as an empty String and rendered as `#c` by the
1712                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1713                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1714                let mut ix_names = vec![String::new(); ix_arity];
1715
1716                // Apply the permutation by swapping on_names with ix_names.
1717                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1718                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1719                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1720                    std::mem::swap(on_name, ix_name);
1721                }
1722
1723                Some(ix_names) // Return the updated ix_names vector.
1724            }
1725            None => {
1726                let desc = self.state.try_get_desc_by_global_id(&id)?;
1727                let column_names = desc
1728                    .iter_names()
1729                    .map(|col_name| col_name.to_string())
1730                    .collect();
1731
1732                Some(column_names)
1733            }
1734        }
1735    }
1736
1737    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1738        let desc = self.state.try_get_desc_by_global_id(&id)?;
1739        Some(desc.get_name(column).to_string())
1740    }
1741
1742    fn id_exists(&self, id: GlobalId) -> bool {
1743        self.state.entry_by_global_id.contains_key(&id)
1744    }
1745}
1746
1747impl SessionCatalog for ConnCatalog<'_> {
1748    fn active_role_id(&self) -> &RoleId {
1749        &self.role_id
1750    }
1751
1752    fn restrict_to_user_objects(&self) -> bool {
1753        self.restrict_to_user_objects
1754    }
1755
1756    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1757        self.prepared_statements
1758            .as_ref()
1759            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1760            .flatten()
1761    }
1762
1763    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1764        self.portals
1765            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1766    }
1767
1768    fn active_database(&self) -> Option<&DatabaseId> {
1769        self.database.as_ref()
1770    }
1771
1772    fn active_cluster(&self) -> &str {
1773        &self.cluster
1774    }
1775
1776    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1777        &self.search_path
1778    }
1779
1780    fn resolve_database(
1781        &self,
1782        database_name: &str,
1783    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1784        Ok(self.state.resolve_database(database_name)?)
1785    }
1786
1787    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1788        self.state
1789            .database_by_id
1790            .get(id)
1791            .expect("database doesn't exist")
1792    }
1793
1794    // `as` is ok to use to cast to a trait object.
1795    #[allow(clippy::as_conversions)]
1796    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1797        self.state
1798            .database_by_id
1799            .values()
1800            .map(|database| database as &dyn CatalogDatabase)
1801            .collect()
1802    }
1803
1804    fn resolve_schema(
1805        &self,
1806        database_name: Option<&str>,
1807        schema_name: &str,
1808    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1809        Ok(self.state.resolve_schema(
1810            self.database.as_ref(),
1811            database_name,
1812            schema_name,
1813            &self.conn_id,
1814        )?)
1815    }
1816
1817    fn resolve_schema_in_database(
1818        &self,
1819        database_spec: &ResolvedDatabaseSpecifier,
1820        schema_name: &str,
1821    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1822        Ok(self
1823            .state
1824            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1825    }
1826
1827    fn get_schema(
1828        &self,
1829        database_spec: &ResolvedDatabaseSpecifier,
1830        schema_spec: &SchemaSpecifier,
1831    ) -> &dyn CatalogSchema {
1832        self.state
1833            .get_schema(database_spec, schema_spec, &self.conn_id)
1834    }
1835
1836    // `as` is ok to use to cast to a trait object.
1837    #[allow(clippy::as_conversions)]
1838    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1839        self.get_databases()
1840            .into_iter()
1841            .flat_map(|database| database.schemas().into_iter())
1842            .chain(
1843                self.state
1844                    .ambient_schemas_by_id
1845                    .values()
1846                    .chain(self.state.temporary_schemas.values())
1847                    .map(|schema| schema as &dyn CatalogSchema),
1848            )
1849            .collect()
1850    }
1851
1852    fn get_mz_internal_schema_id(&self) -> SchemaId {
1853        self.state().get_mz_internal_schema_id()
1854    }
1855
1856    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1857        self.state().get_mz_unsafe_schema_id()
1858    }
1859
1860    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1861        self.state.is_system_schema_specifier(schema)
1862    }
1863
1864    fn resolve_role(
1865        &self,
1866        role_name: &str,
1867    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1868        match self.state.try_get_role_by_name(role_name) {
1869            Some(role) => Ok(role),
1870            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1871        }
1872    }
1873
1874    fn resolve_network_policy(
1875        &self,
1876        policy_name: &str,
1877    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1878        match self.state.try_get_network_policy_by_name(policy_name) {
1879            Some(policy) => Ok(policy),
1880            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1881        }
1882    }
1883
1884    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1885        Some(self.state.roles_by_id.get(id)?)
1886    }
1887
1888    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1889        self.state.get_role(id)
1890    }
1891
1892    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1893        // `as` is ok to use to cast to a trait object.
1894        #[allow(clippy::as_conversions)]
1895        self.state
1896            .roles_by_id
1897            .values()
1898            .map(|role| role as &dyn CatalogRole)
1899            .collect()
1900    }
1901
1902    fn mz_system_role_id(&self) -> RoleId {
1903        MZ_SYSTEM_ROLE_ID
1904    }
1905
1906    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1907        self.state.collect_role_membership(id)
1908    }
1909
1910    fn get_network_policy(
1911        &self,
1912        id: &NetworkPolicyId,
1913    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1914        self.state.get_network_policy(id)
1915    }
1916
1917    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1918        // `as` is ok to use to cast to a trait object.
1919        #[allow(clippy::as_conversions)]
1920        self.state
1921            .network_policies_by_id
1922            .values()
1923            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1924            .collect()
1925    }
1926
1927    fn resolve_cluster(
1928        &self,
1929        cluster_name: Option<&str>,
1930    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1931        Ok(self
1932            .state
1933            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1934    }
1935
1936    fn resolve_cluster_replica(
1937        &self,
1938        cluster_replica_name: &QualifiedReplica,
1939    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1940        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1941    }
1942
1943    fn resolve_item(
1944        &self,
1945        name: &PartialItemName,
1946    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1947        let r = self.state.resolve_entry(
1948            self.database.as_ref(),
1949            &self.effective_search_path(true),
1950            name,
1951            &self.conn_id,
1952        )?;
1953        if self.unresolvable_ids.contains(&r.id()) {
1954            Err(SqlCatalogError::UnknownItem(name.to_string()))
1955        } else {
1956            Ok(r)
1957        }
1958    }
1959
1960    fn resolve_function(
1961        &self,
1962        name: &PartialItemName,
1963    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1964        let r = self.state.resolve_function(
1965            self.database.as_ref(),
1966            &self.effective_search_path(false),
1967            name,
1968            &self.conn_id,
1969        )?;
1970
1971        if self.unresolvable_ids.contains(&r.id()) {
1972            Err(SqlCatalogError::UnknownFunction {
1973                name: name.to_string(),
1974                alternative: None,
1975            })
1976        } else {
1977            Ok(r)
1978        }
1979    }
1980
1981    fn resolve_type(
1982        &self,
1983        name: &PartialItemName,
1984    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1985        let r = self.state.resolve_type(
1986            self.database.as_ref(),
1987            &self.effective_search_path(false),
1988            name,
1989            &self.conn_id,
1990        )?;
1991
1992        if self.unresolvable_ids.contains(&r.id()) {
1993            Err(SqlCatalogError::UnknownType {
1994                name: name.to_string(),
1995            })
1996        } else {
1997            Ok(r)
1998        }
1999    }
2000
2001    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2002        self.state.get_system_type(name)
2003    }
2004
2005    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2006        Some(self.state.try_get_entry(id)?)
2007    }
2008
2009    fn try_get_item_by_global_id(
2010        &self,
2011        id: &GlobalId,
2012    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2013        let entry = self.state.try_get_entry_by_global_id(id)?;
2014        let entry = match &entry.item {
2015            CatalogItem::Table(table) => {
2016                let (version, _gid) = table
2017                    .collections
2018                    .iter()
2019                    .find(|(_version, gid)| *gid == id)
2020                    .expect("catalog out of sync, mismatched GlobalId");
2021                entry.at_version(RelationVersionSelector::Specific(*version))
2022            }
2023            _ => entry.at_version(RelationVersionSelector::Latest),
2024        };
2025        Some(entry)
2026    }
2027
2028    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2029        self.state.get_entry(id)
2030    }
2031
2032    fn get_item_by_global_id(
2033        &self,
2034        id: &GlobalId,
2035    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2036        let entry = self.state.get_entry_by_global_id(id);
2037        let entry = match &entry.item {
2038            CatalogItem::Table(table) => {
2039                let (version, _gid) = table
2040                    .collections
2041                    .iter()
2042                    .find(|(_version, gid)| *gid == id)
2043                    .expect("catalog out of sync, mismatched GlobalId");
2044                entry.at_version(RelationVersionSelector::Specific(*version))
2045            }
2046            _ => entry.at_version(RelationVersionSelector::Latest),
2047        };
2048        entry
2049    }
2050
2051    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2052        self.get_schemas()
2053            .into_iter()
2054            .flat_map(|schema| schema.item_ids())
2055            .map(|id| self.get_item(&id))
2056            .collect()
2057    }
2058
2059    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2060        self.state
2061            .get_item_by_name(name, &self.conn_id)
2062            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2063    }
2064
2065    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2066        self.state
2067            .get_type_by_name(name, &self.conn_id)
2068            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2069    }
2070
2071    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2072        &self.state.clusters_by_id[&id]
2073    }
2074
2075    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2076        self.state
2077            .clusters_by_id
2078            .values()
2079            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2080            .collect()
2081    }
2082
2083    fn get_cluster_replica(
2084        &self,
2085        cluster_id: ClusterId,
2086        replica_id: ReplicaId,
2087    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2088        let cluster = self.get_cluster(cluster_id);
2089        cluster.replica(replica_id)
2090    }
2091
2092    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2093        self.get_clusters()
2094            .into_iter()
2095            .flat_map(|cluster| cluster.replicas().into_iter())
2096            .collect()
2097    }
2098
2099    fn get_system_privileges(&self) -> &PrivilegeMap {
2100        &self.state.system_privileges
2101    }
2102
2103    fn get_default_privileges(
2104        &self,
2105    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2106        self.state
2107            .default_privileges
2108            .iter()
2109            .map(|(object, acl_items)| (object, acl_items.collect()))
2110            .collect()
2111    }
2112
2113    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2114        self.state.find_available_name(name, &self.conn_id)
2115    }
2116
2117    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2118        self.state.resolve_full_name(name, Some(&self.conn_id))
2119    }
2120
2121    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2122        self.state.resolve_full_schema_name(name)
2123    }
2124
2125    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2126        self.state.get_entry_by_global_id(global_id).id()
2127    }
2128
2129    fn resolve_global_id(
2130        &self,
2131        item_id: &CatalogItemId,
2132        version: RelationVersionSelector,
2133    ) -> GlobalId {
2134        self.state
2135            .get_entry(item_id)
2136            .at_version(version)
2137            .global_id()
2138    }
2139
2140    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2141        self.state.config()
2142    }
2143
2144    fn now(&self) -> EpochMillis {
2145        (self.state.config().now)()
2146    }
2147
2148    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2149        self.state.aws_privatelink_availability_zones.clone()
2150    }
2151
2152    fn system_vars(&self) -> &SystemVars {
2153        &self.state.system_configuration
2154    }
2155
2156    fn system_vars_mut(&mut self) -> &mut SystemVars {
2157        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2158    }
2159
2160    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2161        self.state().get_owner_id(id, self.conn_id())
2162    }
2163
2164    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2165        match id {
2166            SystemObjectId::System => Some(&self.state.system_privileges),
2167            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2168                Some(self.get_cluster(*id).privileges())
2169            }
2170            SystemObjectId::Object(ObjectId::Database(id)) => {
2171                Some(self.get_database(id).privileges())
2172            }
2173            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2174                // For temporary schemas that haven't been created yet (lazy creation),
2175                // we return None - the RBAC check will need to handle this case.
2176                self.state
2177                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2178                    .map(|schema| schema.privileges())
2179            }
2180            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2181            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2182                Some(self.get_network_policy(id).privileges())
2183            }
2184            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2185            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2186        }
2187    }
2188
2189    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2190        let mut seen = BTreeSet::new();
2191        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2192    }
2193
2194    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2195        let mut seen = BTreeSet::new();
2196        self.state.item_dependents(id, &mut seen)
2197    }
2198
2199    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2200        rbac::all_object_privileges(object_type)
2201    }
2202
2203    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2204        self.state.get_object_type(object_id)
2205    }
2206
2207    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2208        self.state.get_system_object_type(id)
2209    }
2210
2211    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2212    /// the object.
2213    ///
2214    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2215    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2216    /// only for error msgs and other humanizations.
2217    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2218        if qualified_name.qualifiers.schema_spec.is_temporary() {
2219            // All bets are off. Just give up and return the qualified name as is.
2220            // TODO: Figure out what's going on with temporary objects.
2221
2222            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2223            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2224            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2225            // which we don't have a simple repro.
2226            return qualified_name.item.clone().into();
2227        }
2228
2229        let database_id = match &qualified_name.qualifiers.database_spec {
2230            ResolvedDatabaseSpecifier::Ambient => None,
2231            ResolvedDatabaseSpecifier::Id(id)
2232                if self.database.is_some() && self.database == Some(*id) =>
2233            {
2234                None
2235            }
2236            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2237        };
2238
2239        let schema_spec = if database_id.is_none()
2240            && self.resolve_item_name(&PartialItemName {
2241                database: None,
2242                schema: None,
2243                item: qualified_name.item.clone(),
2244            }) == Ok(qualified_name)
2245            || self.resolve_function_name(&PartialItemName {
2246                database: None,
2247                schema: None,
2248                item: qualified_name.item.clone(),
2249            }) == Ok(qualified_name)
2250            || self.resolve_type_name(&PartialItemName {
2251                database: None,
2252                schema: None,
2253                item: qualified_name.item.clone(),
2254            }) == Ok(qualified_name)
2255        {
2256            None
2257        } else {
2258            // If `search_path` does not contain `full_name.schema`, the
2259            // `PartialName` must contain it.
2260            Some(qualified_name.qualifiers.schema_spec.clone())
2261        };
2262
2263        let res = PartialItemName {
2264            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2265            schema: schema_spec.map(|spec| {
2266                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2267                    .name()
2268                    .schema
2269                    .clone()
2270            }),
2271            item: qualified_name.item.clone(),
2272        };
2273        assert!(
2274            self.resolve_item_name(&res) == Ok(qualified_name)
2275                || self.resolve_function_name(&res) == Ok(qualified_name)
2276                || self.resolve_type_name(&res) == Ok(qualified_name)
2277        );
2278        res
2279    }
2280
2281    fn add_notice(&self, notice: PlanNotice) {
2282        let _ = self.notices_tx.send(notice.into());
2283    }
2284
2285    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2286        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2287        self.state.comments.get_object_comments(comment_id)
2288    }
2289
2290    fn is_cluster_size_cc(&self, size: &str) -> bool {
2291        self.state
2292            .cluster_replica_sizes
2293            .0
2294            .get(size)
2295            .map_or(false, |a| a.is_cc)
2296    }
2297}
2298
2299#[cfg(test)]
2300mod tests {
2301    use std::collections::{BTreeMap, BTreeSet};
2302    use std::sync::Arc;
2303    use std::{env, iter};
2304
2305    use itertools::Itertools;
2306    use mz_catalog::memory::objects::CatalogItem;
2307    use mz_postgres_util::{query, sql};
2308    use tokio_postgres::NoTls;
2309    use tokio_postgres::types::Type;
2310    use uuid::Uuid;
2311
2312    use mz_catalog::SYSTEM_CONN_ID;
2313    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2314    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2315    use mz_controller_types::{ClusterId, ReplicaId};
2316    use mz_expr::{Eval, MirScalarExpr};
2317    use mz_ore::now::to_datetime;
2318    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2319    use mz_persist_client::PersistClient;
2320    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2321    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2322    use mz_repr::role_id::RoleId;
2323    use mz_repr::{
2324        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2325        SqlScalarType, Timestamp,
2326    };
2327    use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2328    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2329    use mz_sql::names::{
2330        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2331        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2332    };
2333    use mz_sql::plan::{
2334        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2335        QueryLifetime, Scope, StatementContext,
2336    };
2337    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2338    use mz_sql::session::vars::{SystemVars, VarInput};
2339
2340    use crate::catalog::state::LocalExpressionCache;
2341    use crate::catalog::{Catalog, Op};
2342    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2343    use crate::session::Session;
2344
2345    /// System sessions have an empty `search_path` so it's necessary to
2346    /// schema-qualify all referenced items.
2347    ///
2348    /// Dummy (and ostensibly client) sessions contain system schemas in their
2349    /// search paths, so do not require schema qualification on system objects such
2350    /// as types.
2351    #[mz_ore::test(tokio::test)]
2352    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2353    async fn test_minimal_qualification() {
2354        Catalog::with_debug(|catalog| async move {
2355            struct TestCase {
2356                input: QualifiedItemName,
2357                system_output: PartialItemName,
2358                normal_output: PartialItemName,
2359            }
2360
2361            let test_cases = vec![
2362                TestCase {
2363                    input: QualifiedItemName {
2364                        qualifiers: ItemQualifiers {
2365                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2366                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2367                        },
2368                        item: "numeric".to_string(),
2369                    },
2370                    system_output: PartialItemName {
2371                        database: None,
2372                        schema: None,
2373                        item: "numeric".to_string(),
2374                    },
2375                    normal_output: PartialItemName {
2376                        database: None,
2377                        schema: None,
2378                        item: "numeric".to_string(),
2379                    },
2380                },
2381                TestCase {
2382                    input: QualifiedItemName {
2383                        qualifiers: ItemQualifiers {
2384                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2385                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2386                        },
2387                        item: "mz_array_types".to_string(),
2388                    },
2389                    system_output: PartialItemName {
2390                        database: None,
2391                        schema: None,
2392                        item: "mz_array_types".to_string(),
2393                    },
2394                    normal_output: PartialItemName {
2395                        database: None,
2396                        schema: None,
2397                        item: "mz_array_types".to_string(),
2398                    },
2399                },
2400            ];
2401
2402            for tc in test_cases {
2403                assert_eq!(
2404                    catalog
2405                        .for_system_session()
2406                        .minimal_qualification(&tc.input),
2407                    tc.system_output
2408                );
2409                assert_eq!(
2410                    catalog
2411                        .for_session(&Session::dummy())
2412                        .minimal_qualification(&tc.input),
2413                    tc.normal_output
2414                );
2415            }
2416            catalog.expire().await;
2417        })
2418        .await
2419    }
2420
2421    #[mz_ore::test(tokio::test)]
2422    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2423    async fn test_catalog_revision() {
2424        let persist_client = PersistClient::new_for_tests().await;
2425        let organization_id = Uuid::new_v4();
2426        let bootstrap_args = test_bootstrap_args();
2427        {
2428            let mut catalog = Catalog::open_debug_catalog(
2429                persist_client.clone(),
2430                organization_id.clone(),
2431                &bootstrap_args,
2432            )
2433            .await
2434            .expect("unable to open debug catalog");
2435            assert_eq!(catalog.transient_revision(), 1);
2436            let commit_ts = catalog.current_upper().await;
2437            catalog
2438                .transact(
2439                    None,
2440                    commit_ts,
2441                    None,
2442                    vec![Op::CreateDatabase {
2443                        name: "test".to_string(),
2444                        owner_id: MZ_SYSTEM_ROLE_ID,
2445                    }],
2446                )
2447                .await
2448                .expect("failed to transact");
2449            assert_eq!(catalog.transient_revision(), 2);
2450            catalog.expire().await;
2451        }
2452        {
2453            let catalog =
2454                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2455                    .await
2456                    .expect("unable to open debug catalog");
2457            // Re-opening the same catalog resets the transient_revision to 1.
2458            assert_eq!(catalog.transient_revision(), 1);
2459            catalog.expire().await;
2460        }
2461    }
2462
2463    #[mz_ore::test(tokio::test)]
2464    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2465    async fn test_effective_search_path() {
2466        Catalog::with_debug(|catalog| async move {
2467            let mz_catalog_schema = (
2468                ResolvedDatabaseSpecifier::Ambient,
2469                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2470            );
2471            let pg_catalog_schema = (
2472                ResolvedDatabaseSpecifier::Ambient,
2473                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2474            );
2475            let mz_temp_schema = (
2476                ResolvedDatabaseSpecifier::Ambient,
2477                SchemaSpecifier::Temporary,
2478            );
2479
2480            // Behavior with the default search_schema (public)
2481            let session = Session::dummy();
2482            let conn_catalog = catalog.for_session(&session);
2483            assert_ne!(
2484                conn_catalog.effective_search_path(false),
2485                conn_catalog.search_path
2486            );
2487            assert_ne!(
2488                conn_catalog.effective_search_path(true),
2489                conn_catalog.search_path
2490            );
2491            assert_eq!(
2492                conn_catalog.effective_search_path(false),
2493                vec![
2494                    mz_catalog_schema.clone(),
2495                    pg_catalog_schema.clone(),
2496                    conn_catalog.search_path[0].clone()
2497                ]
2498            );
2499            assert_eq!(
2500                conn_catalog.effective_search_path(true),
2501                vec![
2502                    mz_temp_schema.clone(),
2503                    mz_catalog_schema.clone(),
2504                    pg_catalog_schema.clone(),
2505                    conn_catalog.search_path[0].clone()
2506                ]
2507            );
2508
2509            // missing schemas are added when missing
2510            let mut session = Session::dummy();
2511            session
2512                .vars_mut()
2513                .set(
2514                    &SystemVars::new(),
2515                    "search_path",
2516                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2517                    false,
2518                )
2519                .expect("failed to set search_path");
2520            let conn_catalog = catalog.for_session(&session);
2521            assert_ne!(
2522                conn_catalog.effective_search_path(false),
2523                conn_catalog.search_path
2524            );
2525            assert_ne!(
2526                conn_catalog.effective_search_path(true),
2527                conn_catalog.search_path
2528            );
2529            assert_eq!(
2530                conn_catalog.effective_search_path(false),
2531                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2532            );
2533            assert_eq!(
2534                conn_catalog.effective_search_path(true),
2535                vec![
2536                    mz_temp_schema.clone(),
2537                    mz_catalog_schema.clone(),
2538                    pg_catalog_schema.clone()
2539                ]
2540            );
2541
2542            let mut session = Session::dummy();
2543            session
2544                .vars_mut()
2545                .set(
2546                    &SystemVars::new(),
2547                    "search_path",
2548                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2549                    false,
2550                )
2551                .expect("failed to set search_path");
2552            let conn_catalog = catalog.for_session(&session);
2553            assert_ne!(
2554                conn_catalog.effective_search_path(false),
2555                conn_catalog.search_path
2556            );
2557            assert_ne!(
2558                conn_catalog.effective_search_path(true),
2559                conn_catalog.search_path
2560            );
2561            assert_eq!(
2562                conn_catalog.effective_search_path(false),
2563                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2564            );
2565            assert_eq!(
2566                conn_catalog.effective_search_path(true),
2567                vec![
2568                    mz_temp_schema.clone(),
2569                    pg_catalog_schema.clone(),
2570                    mz_catalog_schema.clone()
2571                ]
2572            );
2573
2574            let mut session = Session::dummy();
2575            session
2576                .vars_mut()
2577                .set(
2578                    &SystemVars::new(),
2579                    "search_path",
2580                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2581                    false,
2582                )
2583                .expect("failed to set search_path");
2584            let conn_catalog = catalog.for_session(&session);
2585            assert_ne!(
2586                conn_catalog.effective_search_path(false),
2587                conn_catalog.search_path
2588            );
2589            assert_ne!(
2590                conn_catalog.effective_search_path(true),
2591                conn_catalog.search_path
2592            );
2593            assert_eq!(
2594                conn_catalog.effective_search_path(false),
2595                vec![
2596                    mz_catalog_schema.clone(),
2597                    pg_catalog_schema.clone(),
2598                    mz_temp_schema.clone()
2599                ]
2600            );
2601            assert_eq!(
2602                conn_catalog.effective_search_path(true),
2603                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2604            );
2605            catalog.expire().await;
2606        })
2607        .await
2608    }
2609
2610    #[mz_ore::test(tokio::test)]
2611    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2612    async fn test_normalized_create() {
2613        use mz_ore::collections::CollectionExt;
2614        Catalog::with_debug(|catalog| async move {
2615            let conn_catalog = catalog.for_system_session();
2616            let scx = &mut StatementContext::new(None, &conn_catalog);
2617
2618            let parsed = mz_sql_parser::parser::parse_statements(
2619                "create view public.foo as select 1 as bar",
2620            )
2621            .expect("")
2622            .into_element()
2623            .ast;
2624
2625            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2626
2627            // Ensure that all identifiers are quoted.
2628            assert_eq!(
2629                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2630                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2631            );
2632            catalog.expire().await;
2633        })
2634        .await;
2635    }
2636
2637    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2638    #[mz_ore::test(tokio::test)]
2639    #[cfg_attr(miri, ignore)] // slow
2640    async fn test_large_catalog_item() {
2641        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2642        let column = ", 1";
2643        let view_def_size = view_def.bytes().count();
2644        let column_size = column.bytes().count();
2645        let column_count =
2646            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2647        let columns = iter::repeat(column).take(column_count).join("");
2648        let create_sql = format!("{view_def}{columns})");
2649        let create_sql_check = create_sql.clone();
2650        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2651        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2652            &create_sql
2653        ));
2654
2655        let persist_client = PersistClient::new_for_tests().await;
2656        let organization_id = Uuid::new_v4();
2657        let id = CatalogItemId::User(1);
2658        let gid = GlobalId::User(1);
2659        let bootstrap_args = test_bootstrap_args();
2660        {
2661            let mut catalog = Catalog::open_debug_catalog(
2662                persist_client.clone(),
2663                organization_id.clone(),
2664                &bootstrap_args,
2665            )
2666            .await
2667            .expect("unable to open debug catalog");
2668            let item = catalog
2669                .state()
2670                .deserialize_item(
2671                    gid,
2672                    &create_sql,
2673                    &BTreeMap::new(),
2674                    &mut LocalExpressionCache::Closed,
2675                    None,
2676                )
2677                .expect("unable to parse view");
2678            let commit_ts = catalog.current_upper().await;
2679            catalog
2680                .transact(
2681                    None,
2682                    commit_ts,
2683                    None,
2684                    vec![Op::CreateItem {
2685                        item,
2686                        name: QualifiedItemName {
2687                            qualifiers: ItemQualifiers {
2688                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2689                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2690                            },
2691                            item: "v".to_string(),
2692                        },
2693                        id,
2694                        owner_id: MZ_SYSTEM_ROLE_ID,
2695                    }],
2696                )
2697                .await
2698                .expect("failed to transact");
2699            catalog.expire().await;
2700        }
2701        {
2702            let catalog =
2703                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2704                    .await
2705                    .expect("unable to open debug catalog");
2706            let view = catalog.get_entry(&id);
2707            assert_eq!("v", view.name.item);
2708            match &view.item {
2709                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2710                item => panic!("expected view, got {}", item.typ()),
2711            }
2712            catalog.expire().await;
2713        }
2714    }
2715
2716    #[mz_ore::test(tokio::test)]
2717    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2718    async fn test_object_type() {
2719        Catalog::with_debug(|catalog| async move {
2720            let conn_catalog = catalog.for_system_session();
2721
2722            assert_eq!(
2723                mz_sql::catalog::ObjectType::ClusterReplica,
2724                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2725                    ClusterId::user(1).expect("1 is a valid ID"),
2726                    ReplicaId::User(1)
2727                )))
2728            );
2729            assert_eq!(
2730                mz_sql::catalog::ObjectType::Role,
2731                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2732            );
2733            catalog.expire().await;
2734        })
2735        .await;
2736    }
2737
2738    #[mz_ore::test(tokio::test)]
2739    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2740    async fn test_get_privileges() {
2741        Catalog::with_debug(|catalog| async move {
2742            let conn_catalog = catalog.for_system_session();
2743
2744            assert_eq!(
2745                None,
2746                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2747                    ClusterId::user(1).expect("1 is a valid ID"),
2748                    ReplicaId::User(1),
2749                ))))
2750            );
2751            assert_eq!(
2752                None,
2753                conn_catalog
2754                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2755            );
2756            catalog.expire().await;
2757        })
2758        .await;
2759    }
2760
2761    #[mz_ore::test(tokio::test)]
2762    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2763    async fn verify_builtin_descs() {
2764        Catalog::with_debug(|catalog| async move {
2765            let conn_catalog = catalog.for_system_session();
2766
2767            for builtin in BUILTINS::iter() {
2768                let (schema, name, expected_desc) = match builtin {
2769                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2770                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2771                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2772                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2773                    Builtin::Log(_)
2774                    | Builtin::Type(_)
2775                    | Builtin::Func(_)
2776                    | Builtin::Index(_)
2777                    | Builtin::Connection(_) => continue,
2778                };
2779                let item = conn_catalog
2780                    .resolve_item(&PartialItemName {
2781                        database: None,
2782                        schema: Some(schema.to_string()),
2783                        item: name.to_string(),
2784                    })
2785                    .expect("unable to resolve item")
2786                    .at_version(RelationVersionSelector::Latest);
2787
2788                let actual_desc = item.relation_desc().expect("invalid item type");
2789                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2790                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2791                {
2792                    assert_eq!(
2793                        actual_name, expected_name,
2794                        "item {schema}.{name} column {index} name did not match its expected name"
2795                    );
2796                    assert_eq!(
2797                        actual_typ, expected_typ,
2798                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2799                    );
2800                }
2801                assert_eq!(
2802                    &*actual_desc, expected_desc,
2803                    "item {schema}.{name} did not match its expected RelationDesc"
2804                );
2805            }
2806            catalog.expire().await;
2807        })
2808        .await
2809    }
2810
2811    // Connect to a running Postgres server and verify that our builtin
2812    // types and functions match it, in addition to some other things.
2813    #[mz_ore::test(tokio::test)]
2814    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2815    async fn test_compare_builtins_postgres() {
2816        async fn inner(catalog: Catalog) {
2817            // Verify that all builtin functions:
2818            // - have a unique OID
2819            // - if they have a postgres counterpart (same oid) then they have matching name
2820            let (client, connection) = tokio_postgres::connect(
2821                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2822                NoTls,
2823            )
2824            .await
2825            .expect("failed to connect to Postgres");
2826
2827            task::spawn(|| "compare_builtin_postgres", async move {
2828                if let Err(e) = connection.await {
2829                    panic!("connection error: {}", e);
2830                }
2831            });
2832
2833            struct PgProc {
2834                name: String,
2835                arg_oids: Vec<u32>,
2836                ret_oid: Option<u32>,
2837                ret_set: bool,
2838            }
2839
2840            struct PgType {
2841                name: String,
2842                ty: String,
2843                elem: u32,
2844                array: u32,
2845                input: u32,
2846                receive: u32,
2847            }
2848
2849            struct PgOper {
2850                oprresult: u32,
2851                name: String,
2852            }
2853
2854            let pg_proc: BTreeMap<_, _> = query(
2855                &client,
2856                sql!(
2857                    "SELECT
2858                    p.oid,
2859                    proname,
2860                    proargtypes,
2861                    prorettype,
2862                    proretset
2863                FROM pg_proc p
2864                JOIN pg_namespace n ON p.pronamespace = n.oid"
2865                ),
2866                &[],
2867            )
2868            .await
2869            .expect("pg query failed")
2870            .into_iter()
2871            .map(|row| {
2872                let oid: u32 = row.get("oid");
2873                let pg_proc = PgProc {
2874                    name: row.get("proname"),
2875                    arg_oids: row.get("proargtypes"),
2876                    ret_oid: row.get("prorettype"),
2877                    ret_set: row.get("proretset"),
2878                };
2879                (oid, pg_proc)
2880            })
2881            .collect();
2882
2883            let pg_type: BTreeMap<_, _> = query(
2884                &client,
2885                sql!(
2886                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type"
2887                ),
2888                &[],
2889            )
2890            .await
2891                .expect("pg query failed")
2892                .into_iter()
2893                .map(|row| {
2894                    let oid: u32 = row.get("oid");
2895                    let pg_type = PgType {
2896                        name: row.get("typname"),
2897                        ty: row.get("typtype"),
2898                        elem: row.get("typelem"),
2899                        array: row.get("typarray"),
2900                        input: row.get("typinput"),
2901                        receive: row.get("typreceive"),
2902                    };
2903                    (oid, pg_type)
2904                })
2905                .collect();
2906
2907            let pg_oper: BTreeMap<_, _> = query(
2908                &client,
2909                sql!("SELECT oid, oprname, oprresult FROM pg_operator"),
2910                &[],
2911            )
2912            .await
2913            .expect("pg query failed")
2914            .into_iter()
2915            .map(|row| {
2916                let oid: u32 = row.get("oid");
2917                let pg_oper = PgOper {
2918                    name: row.get("oprname"),
2919                    oprresult: row.get("oprresult"),
2920                };
2921                (oid, pg_oper)
2922            })
2923            .collect();
2924
2925            let conn_catalog = catalog.for_system_session();
2926            let resolve_type_oid = |item: &str| {
2927                conn_catalog
2928                    .resolve_type(&PartialItemName {
2929                        database: None,
2930                        // All functions we check exist in PG, so the types must, as
2931                        // well
2932                        schema: Some(PG_CATALOG_SCHEMA.into()),
2933                        item: item.to_string(),
2934                    })
2935                    .expect("unable to resolve type")
2936                    .oid()
2937            };
2938
2939            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2940                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2941                .collect();
2942
2943            let mut all_oids = BTreeSet::new();
2944
2945            // A function to determine if two oids are equivalent enough for these tests. We don't
2946            // support some types, so map exceptions here.
2947            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2948                [
2949                    // We don't support NAME.
2950                    (Type::NAME, Type::TEXT),
2951                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2952                    // We don't support time with time zone.
2953                    (Type::TIME, Type::TIMETZ),
2954                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2955                ]
2956                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2957            );
2958            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2959                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2960            ]);
2961            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2962                if ignore_return_types.contains(&fn_oid) {
2963                    return true;
2964                }
2965                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2966                    return true;
2967                }
2968                a == b
2969            };
2970
2971            for builtin in BUILTINS::iter() {
2972                match builtin {
2973                    Builtin::Type(ty) => {
2974                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2975
2976                        if ty.oid >= FIRST_MATERIALIZE_OID {
2977                            // High OIDs are reserved in Materialize and don't have
2978                            // PostgreSQL counterparts.
2979                            continue;
2980                        }
2981
2982                        // For types that have a PostgreSQL counterpart, verify that
2983                        // the name and oid match.
2984                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2985                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2986                        });
2987                        assert_eq!(
2988                            ty.name, pg_ty.name,
2989                            "oid {} has name {} in postgres; expected {}",
2990                            ty.oid, pg_ty.name, ty.name,
2991                        );
2992
2993                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2994                            None => (0, 0),
2995                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2996                        };
2997                        assert_eq!(
2998                            typinput_oid, pg_ty.input,
2999                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3000                            ty.name, typinput_oid, pg_ty.input,
3001                        );
3002                        assert_eq!(
3003                            typreceive_oid, pg_ty.receive,
3004                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3005                            ty.name, typreceive_oid, pg_ty.receive,
3006                        );
3007                        if typinput_oid != 0 {
3008                            assert!(
3009                                func_oids.contains(&typinput_oid),
3010                                "type {} has typinput OID {} that does not exist in pg_proc",
3011                                ty.name,
3012                                typinput_oid,
3013                            );
3014                        }
3015                        if typreceive_oid != 0 {
3016                            assert!(
3017                                func_oids.contains(&typreceive_oid),
3018                                "type {} has typreceive OID {} that does not exist in pg_proc",
3019                                ty.name,
3020                                typreceive_oid,
3021                            );
3022                        }
3023
3024                        // Ensure the type matches.
3025                        match &ty.details.typ {
3026                            CatalogType::Array { element_reference } => {
3027                                let elem_ty = BUILTINS::iter()
3028                                    .filter_map(|builtin| match builtin {
3029                                        Builtin::Type(ty @ BuiltinType { name, .. })
3030                                            if element_reference == name =>
3031                                        {
3032                                            Some(ty)
3033                                        }
3034                                        _ => None,
3035                                    })
3036                                    .next();
3037                                let elem_ty = match elem_ty {
3038                                    Some(ty) => ty,
3039                                    None => {
3040                                        panic!("{} is unexpectedly not a type", element_reference)
3041                                    }
3042                                };
3043                                assert_eq!(
3044                                    pg_ty.elem, elem_ty.oid,
3045                                    "type {} has mismatched element OIDs",
3046                                    ty.name
3047                                )
3048                            }
3049                            CatalogType::Pseudo => {
3050                                assert_eq!(
3051                                    pg_ty.ty, "p",
3052                                    "type {} is not a pseudo type as expected",
3053                                    ty.name
3054                                )
3055                            }
3056                            CatalogType::Range { .. } => {
3057                                assert_eq!(
3058                                    pg_ty.ty, "r",
3059                                    "type {} is not a range type as expected",
3060                                    ty.name
3061                                );
3062                            }
3063                            _ => {
3064                                assert_eq!(
3065                                    pg_ty.ty, "b",
3066                                    "type {} is not a base type as expected",
3067                                    ty.name
3068                                )
3069                            }
3070                        }
3071
3072                        // Ensure the array type reference is correct.
3073                        let schema = catalog
3074                            .resolve_schema_in_database(
3075                                &ResolvedDatabaseSpecifier::Ambient,
3076                                ty.schema,
3077                                &SYSTEM_CONN_ID,
3078                            )
3079                            .expect("unable to resolve schema");
3080                        let allocated_type = catalog
3081                            .resolve_type(
3082                                None,
3083                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3084                                &PartialItemName {
3085                                    database: None,
3086                                    schema: Some(schema.name().schema.clone()),
3087                                    item: ty.name.to_string(),
3088                                },
3089                                &SYSTEM_CONN_ID,
3090                            )
3091                            .expect("unable to resolve type");
3092                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3093                            ty
3094                        } else {
3095                            panic!("unexpectedly not a type")
3096                        };
3097                        match ty.details.array_id {
3098                            Some(array_id) => {
3099                                let array_ty = catalog.get_entry(&array_id);
3100                                assert_eq!(
3101                                    pg_ty.array, array_ty.oid,
3102                                    "type {} has mismatched array OIDs",
3103                                    allocated_type.name.item,
3104                                );
3105                            }
3106                            None => assert_eq!(
3107                                pg_ty.array, 0,
3108                                "type {} does not have an array type in mz but does in pg",
3109                                allocated_type.name.item,
3110                            ),
3111                        }
3112                    }
3113                    Builtin::Func(func) => {
3114                        for imp in func.inner.func_impls() {
3115                            assert!(
3116                                all_oids.insert(imp.oid),
3117                                "{} reused oid {}",
3118                                func.name,
3119                                imp.oid
3120                            );
3121
3122                            assert!(
3123                                imp.oid < FIRST_USER_OID,
3124                                "built-in function {} erroneously has OID in user space ({})",
3125                                func.name,
3126                                imp.oid,
3127                            );
3128
3129                            // For functions that have a postgres counterpart, verify that the name and
3130                            // oid match.
3131                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3132                                continue;
3133                            } else {
3134                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3135                                    panic!(
3136                                        "pg_proc missing function {}: oid {}",
3137                                        func.name, imp.oid
3138                                    )
3139                                })
3140                            };
3141                            assert_eq!(
3142                                func.name, pg_fn.name,
3143                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3144                                imp.oid, func.name, pg_fn.name
3145                            );
3146
3147                            // Complain, but don't fail, if argument oids don't match.
3148                            // TODO: make these match.
3149                            let imp_arg_oids = imp
3150                                .arg_typs
3151                                .iter()
3152                                .map(|item| resolve_type_oid(item))
3153                                .collect::<Vec<_>>();
3154
3155                            if imp_arg_oids != pg_fn.arg_oids {
3156                                println!(
3157                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3158                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3159                                );
3160                            }
3161
3162                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3163
3164                            assert!(
3165                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3166                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3167                                imp.oid,
3168                                func.name,
3169                                imp_return_oid,
3170                                pg_fn.ret_oid
3171                            );
3172
3173                            assert_eq!(
3174                                imp.return_is_set, pg_fn.ret_set,
3175                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3176                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3177                            );
3178                        }
3179                    }
3180                    _ => (),
3181                }
3182            }
3183
3184            for (op, func) in OP_IMPLS.iter() {
3185                for imp in func.func_impls() {
3186                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3187
3188                    // For operators that have a postgres counterpart, verify that the name and oid match.
3189                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3190                        continue;
3191                    } else {
3192                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3193                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3194                        })
3195                    };
3196
3197                    assert_eq!(*op, pg_op.name);
3198
3199                    let imp_return_oid =
3200                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3201                    if imp_return_oid != pg_op.oprresult {
3202                        panic!(
3203                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3204                            imp.oid, op, imp_return_oid, pg_op.oprresult
3205                        );
3206                    }
3207                }
3208            }
3209            catalog.expire().await;
3210        }
3211
3212        Catalog::with_debug(inner).await
3213    }
3214
3215    // Execute all builtin functions with all combinations of arguments from interesting datums.
3216    #[mz_ore::test(tokio::test)]
3217    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3218    async fn test_smoketest_all_builtins() {
3219        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3220            let catalog = Arc::new(catalog);
3221            let conn_catalog = catalog.for_system_session();
3222
3223            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3224            let mut handles = Vec::new();
3225
3226            // Extracted during planning; always panics when executed.
3227            let ignore_names = BTreeSet::from([
3228                "avg",
3229                "avg_internal_v1",
3230                "bool_and",
3231                "bool_or",
3232                "has_table_privilege", // > 3 s each
3233                "has_type_privilege",  // > 3 s each
3234                "mod",
3235                "mz_panic",
3236                "mz_sleep",
3237                "pow",
3238                "stddev_pop",
3239                "stddev_samp",
3240                "stddev",
3241                "var_pop",
3242                "var_samp",
3243                "variance",
3244            ]);
3245
3246            let fns = BUILTINS::funcs()
3247                .map(|func| (&func.name, func.inner))
3248                .chain(OP_IMPLS.iter());
3249
3250            for (name, func) in fns {
3251                if ignore_names.contains(name) {
3252                    continue;
3253                }
3254                let Func::Scalar(impls) = func else {
3255                    continue;
3256                };
3257
3258                'outer: for imp in impls {
3259                    let details = imp.details();
3260                    let mut styps = Vec::new();
3261                    for item in details.arg_typs.iter() {
3262                        let oid = resolve_type_oid(item);
3263                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3264                            continue 'outer;
3265                        };
3266                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3267                    }
3268                    let datums = styps
3269                        .iter()
3270                        .map(|styp| {
3271                            let mut datums = vec![Datum::Null];
3272                            datums.extend(styp.interesting_datums());
3273                            datums
3274                        })
3275                        .collect::<Vec<_>>();
3276                    // Skip nullary fns.
3277                    if datums.is_empty() {
3278                        continue;
3279                    }
3280
3281                    let return_oid = details
3282                        .return_typ
3283                        .map(resolve_type_oid)
3284                        .expect("must exist");
3285                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3286                        .ok()
3287                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3288
3289                    let mut idxs = vec![0; datums.len()];
3290                    while idxs[0] < datums[0].len() {
3291                        let mut args = Vec::with_capacity(idxs.len());
3292                        for i in 0..(datums.len()) {
3293                            args.push(datums[i][idxs[i]]);
3294                        }
3295
3296                        let op = &imp.op;
3297                        let scalars = args
3298                            .iter()
3299                            .enumerate()
3300                            .map(|(i, datum)| {
3301                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3302                                    datum.clone(),
3303                                    styps[i].clone(),
3304                                ))
3305                            })
3306                            .collect();
3307
3308                        let call_name = format!(
3309                            "{name}({}) (oid: {})",
3310                            args.iter()
3311                                .map(|d| d.to_string())
3312                                .collect::<Vec<_>>()
3313                                .join(", "),
3314                            imp.oid
3315                        );
3316                        let catalog = Arc::clone(&catalog);
3317                        let call_name_fn = call_name.clone();
3318                        let return_styp = return_styp.clone();
3319                        let handle = task::spawn_blocking(
3320                            || call_name,
3321                            move || {
3322                                smoketest_fn(
3323                                    name,
3324                                    call_name_fn,
3325                                    op,
3326                                    imp,
3327                                    args,
3328                                    catalog,
3329                                    scalars,
3330                                    return_styp,
3331                                )
3332                            },
3333                        );
3334                        handles.push(handle);
3335
3336                        // Advance to the next datum combination.
3337                        for i in (0..datums.len()).rev() {
3338                            idxs[i] += 1;
3339                            if idxs[i] >= datums[i].len() {
3340                                if i == 0 {
3341                                    break;
3342                                }
3343                                idxs[i] = 0;
3344                                continue;
3345                            } else {
3346                                break;
3347                            }
3348                        }
3349                    }
3350                }
3351            }
3352            handles
3353        }
3354
3355        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3356        for handle in handles {
3357            handle.await;
3358        }
3359    }
3360
3361    fn smoketest_fn(
3362        name: &&str,
3363        call_name: String,
3364        op: &Operation<HirScalarExpr>,
3365        imp: &FuncImpl<HirScalarExpr>,
3366        args: Vec<Datum<'_>>,
3367        catalog: Arc<Catalog>,
3368        scalars: Vec<CoercibleScalarExpr>,
3369        return_styp: Option<SqlScalarType>,
3370    ) {
3371        let conn_catalog = catalog.for_system_session();
3372        let pcx = PlanContext::zero();
3373        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3374        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3375        let ecx = ExprContext {
3376            qcx: &qcx,
3377            name: "smoketest",
3378            scope: &Scope::empty(),
3379            relation_type: &SqlRelationType::empty(),
3380            allow_aggregates: false,
3381            allow_subqueries: false,
3382            allow_parameters: false,
3383            allow_windows: false,
3384        };
3385        let arena = RowArena::new();
3386        let mut session = Session::dummy();
3387        session
3388            .start_transaction(to_datetime(0), None, None)
3389            .expect("must succeed");
3390        let prep_style = ExprPrepOneShot {
3391            logical_time: EvalTime::Time(Timestamp::MIN),
3392            session: &session,
3393            catalog_state: &catalog.state,
3394        };
3395
3396        // Execute the function as much as possible, ensuring no panics occur, but
3397        // otherwise ignoring eval errors. We also do various other checks.
3398        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3399        if let Ok(hir) = res {
3400            let uneliminated_result_row = {
3401                if let HirScalarExpr::CallUnary { func, .. } = &hir
3402                    && func.is_eliminable_cast()
3403                {
3404                    let mut uneliminated_mir = hir
3405                        .clone()
3406                        .lower_uncorrelated(HirToMirConfig {
3407                            enable_cast_elimination: false,
3408                            ..catalog.system_config().into()
3409                        })
3410                        .expect("lowering eliminable cast should always succeed");
3411                    prep_style
3412                        .prep_scalar_expr(&mut uneliminated_mir)
3413                        .expect("must succeed");
3414
3415                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3416                    uneliminated_mir
3417                        .eval(&[], &arena)
3418                        .ok()
3419                        .map(|datum| Row::pack([datum]))
3420                } else {
3421                    None
3422                }
3423            };
3424
3425            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3426                // Populate unmaterialized functions.
3427                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3428
3429                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3430                    if let Some(return_styp) = return_styp {
3431                        let mir_typ = mir.typ(&[]);
3432                        // MIR type inference should be consistent with the type
3433                        // we get from the catalog.
3434                        soft_assert_eq_or_log!(
3435                            mir_typ.scalar_type,
3436                            (&return_styp).into(),
3437                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3438                        );
3439                        // The following will check not just that the scalar type
3440                        // is ok, but also catches if the function returned a null
3441                        // but the MIR type inference said "non-nullable".
3442                        if !eval_result_datum.is_instance_of(&mir_typ) {
3443                            panic!(
3444                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3445                            );
3446                        }
3447                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3448                        if let Some(row) = uneliminated_result_row {
3449                            let uneliminated_result_datum = row.unpack_first();
3450                            assert_eq!(
3451                                uneliminated_result_datum, eval_result_datum,
3452                                "datums should not change if cast is eliminable"
3453                            );
3454                        }
3455                        // Check the consistency of `introduces_nulls` and
3456                        // `propagates_nulls` with `MirScalarExpr::typ`.
3457                        if let Some((introduces_nulls, propagates_nulls)) =
3458                            call_introduces_propagates_nulls(&mir)
3459                        {
3460                            if introduces_nulls {
3461                                // If the function introduces_nulls, then the return
3462                                // type should always be nullable, regardless of
3463                                // the nullability of the input types.
3464                                assert!(
3465                                    mir_typ.nullable,
3466                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3467                                    name, args, mir, mir_typ.nullable
3468                                );
3469                            } else {
3470                                let any_input_null = args.iter().any(|arg| arg.is_null());
3471                                if !any_input_null {
3472                                    assert!(
3473                                        !mir_typ.nullable,
3474                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3475                                        name, args, mir, mir_typ.nullable
3476                                    );
3477                                } else if propagates_nulls {
3478                                    // propagates_nulls means the optimizer short-circuits
3479                                    // all-null inputs, so the output must be nullable.
3480                                    assert!(
3481                                        mir_typ.nullable,
3482                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3483                                        name, args, mir, mir_typ.nullable
3484                                    );
3485                                }
3486                                // When propagates_nulls is false, the output may still
3487                                // be nullable if a non-nullable parameter received a null
3488                                // input (per-position null rejection). The is_instance_of
3489                                // check above ensures type consistency.
3490                            }
3491                        }
3492                        // Check that `MirScalarExpr::reduce` yields the same result
3493                        // as the real evaluation.
3494                        let mut reduced = mir.clone();
3495                        reduced.reduce(&[]);
3496                        match reduced {
3497                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3498                                match reduce_result {
3499                                    Ok(reduce_result_row) => {
3500                                        let reduce_result_datum = reduce_result_row.unpack_first();
3501                                        assert_eq!(
3502                                            reduce_result_datum,
3503                                            eval_result_datum,
3504                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3505                                            name,
3506                                            args,
3507                                            mir,
3508                                            eval_result_datum,
3509                                            mir_typ.scalar_type,
3510                                            reduce_result_datum,
3511                                            ctyp.scalar_type
3512                                        );
3513                                        // Let's check that the types also match.
3514                                        // (We are not checking nullability here,
3515                                        // because it's ok when we know a more
3516                                        // precise nullability after actually
3517                                        // evaluating a function than before.)
3518                                        assert_eq!(
3519                                            ctyp.scalar_type,
3520                                            mir_typ.scalar_type,
3521                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3522                                            name,
3523                                            args,
3524                                            mir,
3525                                            eval_result_datum,
3526                                            mir_typ.scalar_type,
3527                                            reduce_result_datum,
3528                                            ctyp.scalar_type
3529                                        );
3530                                    }
3531                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3532                                }
3533                            }
3534                            _ => unreachable!(
3535                                "all args are literals, so should have reduced to a literal"
3536                            ),
3537                        }
3538                    }
3539                }
3540            }
3541        }
3542    }
3543
3544    /// If the given MirScalarExpr
3545    ///  - is a function call, and
3546    ///  - all arguments are literals
3547    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3548    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3549        match mir_func_call {
3550            MirScalarExpr::CallUnary { func, expr } => {
3551                if expr.is_literal() {
3552                    Some((func.introduces_nulls(), func.propagates_nulls()))
3553                } else {
3554                    None
3555                }
3556            }
3557            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3558                if expr1.is_literal() && expr2.is_literal() {
3559                    Some((func.introduces_nulls(), func.propagates_nulls()))
3560                } else {
3561                    None
3562                }
3563            }
3564            MirScalarExpr::CallVariadic { func, exprs } => {
3565                if exprs.iter().all(|arg| arg.is_literal()) {
3566                    Some((func.introduces_nulls(), func.propagates_nulls()))
3567                } else {
3568                    None
3569                }
3570            }
3571            _ => None,
3572        }
3573    }
3574
3575    // Make sure pg views don't use types that only exist in Materialize.
3576    #[mz_ore::test(tokio::test)]
3577    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3578    async fn test_pg_views_forbidden_types() {
3579        Catalog::with_debug(|catalog| async move {
3580            let conn_catalog = catalog.for_system_session();
3581
3582            for view in BUILTINS::views().filter(|view| {
3583                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3584            }) {
3585                let item = conn_catalog
3586                    .resolve_item(&PartialItemName {
3587                        database: None,
3588                        schema: Some(view.schema.to_string()),
3589                        item: view.name.to_string(),
3590                    })
3591                    .expect("unable to resolve view")
3592                    // TODO(alter_table)
3593                    .at_version(RelationVersionSelector::Latest);
3594                let full_name = conn_catalog.resolve_full_name(item.name());
3595                let desc = item.relation_desc().expect("invalid item type");
3596                for col_type in desc.iter_types() {
3597                    match &col_type.scalar_type {
3598                        typ @ SqlScalarType::UInt16
3599                        | typ @ SqlScalarType::UInt32
3600                        | typ @ SqlScalarType::UInt64
3601                        | typ @ SqlScalarType::MzTimestamp
3602                        | typ @ SqlScalarType::List { .. }
3603                        | typ @ SqlScalarType::Map { .. }
3604                        | typ @ SqlScalarType::MzAclItem => {
3605                            panic!("{typ:?} type found in {full_name}");
3606                        }
3607                        SqlScalarType::AclItem
3608                        | SqlScalarType::Bool
3609                        | SqlScalarType::Int16
3610                        | SqlScalarType::Int32
3611                        | SqlScalarType::Int64
3612                        | SqlScalarType::Float32
3613                        | SqlScalarType::Float64
3614                        | SqlScalarType::Numeric { .. }
3615                        | SqlScalarType::Date
3616                        | SqlScalarType::Time
3617                        | SqlScalarType::Timestamp { .. }
3618                        | SqlScalarType::TimestampTz { .. }
3619                        | SqlScalarType::Interval
3620                        | SqlScalarType::PgLegacyChar
3621                        | SqlScalarType::Bytes
3622                        | SqlScalarType::String
3623                        | SqlScalarType::Char { .. }
3624                        | SqlScalarType::VarChar { .. }
3625                        | SqlScalarType::Jsonb
3626                        | SqlScalarType::Uuid
3627                        | SqlScalarType::Array(_)
3628                        | SqlScalarType::Record { .. }
3629                        | SqlScalarType::Oid
3630                        | SqlScalarType::RegProc
3631                        | SqlScalarType::RegType
3632                        | SqlScalarType::RegClass
3633                        | SqlScalarType::Int2Vector
3634                        | SqlScalarType::Range { .. }
3635                        | SqlScalarType::PgLegacyName => {}
3636                    }
3637                }
3638            }
3639            catalog.expire().await;
3640        })
3641        .await
3642    }
3643
3644    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3645    // introspection relations.
3646    #[mz_ore::test(tokio::test)]
3647    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3648    async fn test_mz_introspection_builtins() {
3649        Catalog::with_debug(|catalog| async move {
3650            let conn_catalog = catalog.for_system_session();
3651
3652            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3653            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3654
3655            for entry in catalog.entries() {
3656                let schema_spec = entry.name().qualifiers.schema_spec;
3657                let introspection_deps = catalog.introspection_dependencies(entry.id);
3658                if introspection_deps.is_empty() {
3659                    assert!(
3660                        schema_spec != introspection_schema_spec,
3661                        "entry does not depend on introspection sources but is in \
3662                         `mz_introspection`: {}",
3663                        conn_catalog.resolve_full_name(entry.name()),
3664                    );
3665                } else {
3666                    assert!(
3667                        schema_spec == introspection_schema_spec,
3668                        "entry depends on introspection sources but is not in \
3669                         `mz_introspection`: {}",
3670                        conn_catalog.resolve_full_name(entry.name()),
3671                    );
3672                }
3673            }
3674        })
3675        .await
3676    }
3677
3678    #[mz_ore::test(tokio::test)]
3679    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3680    async fn test_multi_subscriber_catalog() {
3681        let persist_client = PersistClient::new_for_tests().await;
3682        let bootstrap_args = test_bootstrap_args();
3683        let organization_id = Uuid::new_v4();
3684        let db_name = "DB";
3685
3686        let mut writer_catalog = Catalog::open_debug_catalog(
3687            persist_client.clone(),
3688            organization_id.clone(),
3689            &bootstrap_args,
3690        )
3691        .await
3692        .expect("open_debug_catalog");
3693        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3694            persist_client.clone(),
3695            organization_id.clone(),
3696            &bootstrap_args,
3697        )
3698        .await
3699        .expect("open_debug_read_only_catalog");
3700        assert_err!(writer_catalog.resolve_database(db_name));
3701        assert_err!(read_only_catalog.resolve_database(db_name));
3702
3703        let commit_ts = writer_catalog.current_upper().await;
3704        writer_catalog
3705            .transact(
3706                None,
3707                commit_ts,
3708                None,
3709                vec![Op::CreateDatabase {
3710                    name: db_name.to_string(),
3711                    owner_id: MZ_SYSTEM_ROLE_ID,
3712                }],
3713            )
3714            .await
3715            .expect("failed to transact");
3716
3717        let write_db = writer_catalog
3718            .resolve_database(db_name)
3719            .expect("resolve_database");
3720        read_only_catalog
3721            .sync_to_current_updates()
3722            .await
3723            .expect("sync_to_current_updates");
3724        let read_db = read_only_catalog
3725            .resolve_database(db_name)
3726            .expect("resolve_database");
3727
3728        assert_eq!(write_db, read_db);
3729
3730        let writer_catalog_fencer =
3731            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3732                .await
3733                .expect("open_debug_catalog for fencer");
3734        let fencer_db = writer_catalog_fencer
3735            .resolve_database(db_name)
3736            .expect("resolve_database for fencer");
3737        assert_eq!(fencer_db, read_db);
3738
3739        let write_fence_err = writer_catalog
3740            .sync_to_current_updates()
3741            .await
3742            .expect_err("sync_to_current_updates for fencer");
3743        assert!(matches!(
3744            write_fence_err,
3745            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3746        ));
3747        let read_fence_err = read_only_catalog
3748            .sync_to_current_updates()
3749            .await
3750            .expect_err("sync_to_current_updates after fencer");
3751        assert!(matches!(
3752            read_fence_err,
3753            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3754        ));
3755
3756        writer_catalog.expire().await;
3757        read_only_catalog.expire().await;
3758        writer_catalog_fencer.expire().await;
3759    }
3760}