Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68    SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use uuid::Uuid;
89
90// DO NOT add any more imports from `crate` outside of `crate::catalog`.
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
92pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
93pub use crate::catalog::state::CatalogState;
94pub use crate::catalog::transact::{
95    DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
96};
97use crate::command::CatalogDump;
98use crate::coord::TargetCluster;
99#[cfg(test)]
100use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
101use crate::session::{Portal, PreparedStatement, Session};
102use crate::util::ResultExt;
103use crate::{AdapterError, AdapterNotice, ExecuteResponse};
104
105mod builtin_table_updates;
106pub(crate) mod consistency;
107mod migrate;
108
109mod apply;
110mod open;
111mod state;
112mod timeline;
113mod transact;
114
115/// A `Catalog` keeps track of the SQL objects known to the planner.
116///
117/// For each object, it keeps track of both forward and reverse dependencies:
118/// i.e., which objects are depended upon by the object, and which objects
119/// depend upon the object. It enforces the SQL rules around dropping: an object
120/// cannot be dropped until all of the objects that depend upon it are dropped.
121/// It also enforces uniqueness of names.
122///
123/// SQL mandates a hierarchy of exactly three layers. A catalog contains
124/// databases, databases contain schemas, and schemas contain catalog items,
125/// like sources, sinks, view, and indexes.
126///
127/// To the outside world, databases, schemas, and items are all identified by
128/// name. Items can be referred to by their [`FullItemName`], which fully and
129/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
130/// database name and/or the schema name. Partial names can be converted into
131/// full names via a complicated resolution process documented by the
132/// [`CatalogState::resolve`] method.
133///
134/// The catalog also maintains special "ambient schemas": virtual schemas,
135/// implicitly present in all databases, that house various system views.
136/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
137#[derive(Debug)]
138pub struct Catalog {
139    state: CatalogState,
140    plans: CatalogPlans,
141    expr_cache_handle: Option<ExpressionCacheHandle>,
142    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
143    transient_revision: u64,
144}
145
146// Implement our own Clone because derive can't unless S is Clone, which it's
147// not (hence the Arc).
148impl Clone for Catalog {
149    fn clone(&self) -> Self {
150        Self {
151            state: self.state.clone(),
152            plans: self.plans.clone(),
153            expr_cache_handle: self.expr_cache_handle.clone(),
154            storage: Arc::clone(&self.storage),
155            transient_revision: self.transient_revision,
156        }
157    }
158}
159
160#[derive(Default, Debug, Clone)]
161pub struct CatalogPlans {
162    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
163    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
164    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
165    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
166}
167
168impl Catalog {
169    /// Set the optimized plan for the item identified by `id`.
170    #[mz_ore::instrument(level = "trace")]
171    pub fn set_optimized_plan(
172        &mut self,
173        id: GlobalId,
174        plan: DataflowDescription<OptimizedMirRelationExpr>,
175    ) {
176        self.plans.optimized_plan_by_id.insert(id, plan.into());
177    }
178
179    /// Set the physical plan for the item identified by `id`.
180    #[mz_ore::instrument(level = "trace")]
181    pub fn set_physical_plan(
182        &mut self,
183        id: GlobalId,
184        plan: DataflowDescription<mz_compute_types::plan::Plan>,
185    ) {
186        self.plans.physical_plan_by_id.insert(id, plan.into());
187    }
188
189    /// Try to get the optimized plan for the item identified by `id`.
190    #[mz_ore::instrument(level = "trace")]
191    pub fn try_get_optimized_plan(
192        &self,
193        id: &GlobalId,
194    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
195        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
196    }
197
198    /// Try to get the physical plan for the item identified by `id`.
199    #[mz_ore::instrument(level = "trace")]
200    pub fn try_get_physical_plan(
201        &self,
202        id: &GlobalId,
203    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
204        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
205    }
206
207    /// Set the `DataflowMetainfo` for the item identified by `id`.
208    #[mz_ore::instrument(level = "trace")]
209    pub fn set_dataflow_metainfo(
210        &mut self,
211        id: GlobalId,
212        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
213    ) {
214        // Add entries to the `notices_by_dep_id` lookup map.
215        for notice in metainfo.optimizer_notices.iter() {
216            for dep_id in notice.dependencies.iter() {
217                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
218                entry.push(Arc::clone(notice))
219            }
220            if let Some(item_id) = notice.item_id {
221                soft_assert_eq_or_log!(
222                    item_id,
223                    id,
224                    "notice.item_id should match the id for whom we are saving the notice"
225                );
226            }
227        }
228        // Add the dataflow with the scoped entries.
229        self.plans.dataflow_metainfos.insert(id, metainfo);
230    }
231
232    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
233    #[mz_ore::instrument(level = "trace")]
234    pub fn try_get_dataflow_metainfo(
235        &self,
236        id: &GlobalId,
237    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
238        self.plans.dataflow_metainfos.get(id)
239    }
240
241    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
242    /// item identified by `id`.
243    ///
244    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
245    ///
246    /// Return a set containing all dropped notices. Note that if for some
247    /// reason we end up with two identical notices being dropped by the same
248    /// call, the result will contain only one instance of that notice.
249    #[mz_ore::instrument(level = "trace")]
250    pub fn drop_plans_and_metainfos(
251        &mut self,
252        drop_ids: &BTreeSet<GlobalId>,
253    ) -> BTreeSet<Arc<OptimizerNotice>> {
254        // Collect dropped notices in this set.
255        let mut dropped_notices = BTreeSet::new();
256
257        // Remove plans and metainfo.optimizer_notices entries.
258        for id in drop_ids {
259            self.plans.optimized_plan_by_id.remove(id);
260            self.plans.physical_plan_by_id.remove(id);
261            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
262                soft_assert_or_log!(
263                    metainfo.optimizer_notices.iter().all_unique(),
264                    "should have been pushed there by `push_optimizer_notice_dedup`"
265                );
266                for n in metainfo.optimizer_notices.drain(..) {
267                    // Remove the corresponding notices_by_dep_id entries.
268                    for dep_id in n.dependencies.iter() {
269                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
270                            soft_assert_or_log!(
271                                notices.iter().any(|x| &n == x),
272                                "corrupt notices_by_dep_id"
273                            );
274                            notices.retain(|x| &n != x)
275                        }
276                    }
277                    dropped_notices.insert(n);
278                }
279            }
280        }
281
282        // Remove notices_by_dep_id entries.
283        for id in drop_ids {
284            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
285                for n in notices.drain(..) {
286                    // Remove the corresponding metainfo.optimizer_notices entries.
287                    if let Some(item_id) = n.item_id.as_ref() {
288                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
289                            metainfo.optimizer_notices.iter().for_each(|n2| {
290                                if let Some(item_id_2) = n2.item_id {
291                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
292                                }
293                            });
294                            metainfo.optimizer_notices.retain(|x| &n != x);
295                        }
296                    }
297                    dropped_notices.insert(n);
298                }
299            }
300        }
301
302        // Collect dependency ids not in drop_ids with at least one dropped
303        // notice.
304        let mut todo_dep_ids = BTreeSet::new();
305        for notice in dropped_notices.iter() {
306            for dep_id in notice.dependencies.iter() {
307                if !drop_ids.contains(dep_id) {
308                    todo_dep_ids.insert(*dep_id);
309                }
310            }
311        }
312        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
313        // entries in `todo_dep_ids`.
314        for id in todo_dep_ids {
315            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
316                notices.retain(|n| !dropped_notices.contains(n))
317            }
318        }
319
320        // (We used to have a sanity check here that
321        // `dropped_notices.iter().any(|n| Arc::strong_count(n) != 1)`
322        // but this is not a correct assertion: There might be other clones of the catalog (e.g. if
323        // a peek is running concurrently to an index drop), in which case those clones also hold
324        // references to the notices.)
325
326        dropped_notices
327    }
328}
329
330#[derive(Debug)]
331pub struct ConnCatalog<'a> {
332    state: Cow<'a, CatalogState>,
333    /// Because we don't have any way of removing items from the catalog
334    /// temporarily, we allow the ConnCatalog to pretend that a set of items
335    /// don't exist during resolution.
336    ///
337    /// This feature is necessary to allow re-planning of statements, which is
338    /// either incredibly useful or required when altering item definitions.
339    ///
340    /// Note that uses of this field should be used by short-lived
341    /// catalogs.
342    unresolvable_ids: BTreeSet<CatalogItemId>,
343    conn_id: ConnectionId,
344    cluster: String,
345    database: Option<DatabaseId>,
346    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
347    role_id: RoleId,
348    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
349    portals: Option<&'a BTreeMap<String, Portal>>,
350    notices_tx: UnboundedSender<AdapterNotice>,
351}
352
353impl ConnCatalog<'_> {
354    pub fn conn_id(&self) -> &ConnectionId {
355        &self.conn_id
356    }
357
358    pub fn state(&self) -> &CatalogState {
359        &*self.state
360    }
361
362    /// Prevent planning from resolving item with the provided ID. Instead,
363    /// return an error as if the item did not exist.
364    ///
365    /// This feature is meant exclusively to permit re-planning statements
366    /// during update operations and should not be used otherwise given its
367    /// extremely "powerful" semantics.
368    ///
369    /// # Panics
370    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
371    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
372        assert_eq!(
373            self.role_id, MZ_SYSTEM_ROLE_ID,
374            "only the system role can mark IDs unresolvable",
375        );
376        self.unresolvable_ids.insert(id);
377    }
378
379    /// Returns the schemas:
380    /// - mz_catalog
381    /// - pg_catalog
382    /// - temp (if requested)
383    /// - all schemas from the session's search_path var that exist
384    pub fn effective_search_path(
385        &self,
386        include_temp_schema: bool,
387    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
388        self.state
389            .effective_search_path(&self.search_path, include_temp_schema)
390    }
391}
392
393impl ConnectionResolver for ConnCatalog<'_> {
394    fn resolve_connection(
395        &self,
396        id: CatalogItemId,
397    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
398        self.state().resolve_connection(id)
399    }
400}
401
402impl Catalog {
403    /// Returns the catalog's transient revision, which starts at 1 and is
404    /// incremented on every change. This is not persisted to disk, and will
405    /// restart on every load.
406    pub fn transient_revision(&self) -> u64 {
407        self.transient_revision
408    }
409
410    /// Creates a debug catalog from the current
411    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
412    /// like in tests.
413    ///
414    /// WARNING! This function can arbitrarily fail because it does not make any
415    /// effort to adjust the catalog's contents' structure or semantics to the
416    /// currently running version, i.e. it does not apply any migrations.
417    ///
418    /// This function must not be called in production contexts. Use
419    /// [`Catalog::open`] with appropriately set configuration parameters
420    /// instead.
421    pub async fn with_debug<F, Fut, T>(f: F) -> T
422    where
423        F: FnOnce(Catalog) -> Fut,
424        Fut: Future<Output = T>,
425    {
426        let persist_client = PersistClient::new_for_tests().await;
427        let organization_id = Uuid::new_v4();
428        let bootstrap_args = test_bootstrap_args();
429        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
430            .await
431            .expect("can open debug catalog");
432        f(catalog).await
433    }
434
435    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
436    /// in progress.
437    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
438    where
439        F: FnOnce(Catalog) -> Fut,
440        Fut: Future<Output = T>,
441    {
442        let persist_client = PersistClient::new_for_tests().await;
443        let organization_id = Uuid::new_v4();
444        let bootstrap_args = test_bootstrap_args();
445        let mut catalog =
446            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
447                .await
448                .expect("can open debug catalog");
449
450        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
451        let now = SYSTEM_TIME.clone();
452        let openable_storage = TestCatalogStateBuilder::new(persist_client)
453            .with_organization_id(organization_id)
454            .with_default_deploy_generation()
455            .build()
456            .await
457            .expect("can create durable catalog");
458        let mut storage = openable_storage
459            .open(now().into(), &bootstrap_args)
460            .await
461            .expect("can open durable catalog")
462            .0;
463        // Drain updates.
464        let _ = storage
465            .sync_to_current_updates()
466            .await
467            .expect("can sync to current updates");
468        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
469
470        f(catalog).await
471    }
472
473    /// Opens a debug catalog.
474    ///
475    /// See [`Catalog::with_debug`].
476    pub async fn open_debug_catalog(
477        persist_client: PersistClient,
478        organization_id: Uuid,
479        bootstrap_args: &BootstrapArgs,
480    ) -> Result<Catalog, anyhow::Error> {
481        let now = SYSTEM_TIME.clone();
482        let environment_id = None;
483        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
484            .with_organization_id(organization_id)
485            .with_default_deploy_generation()
486            .build()
487            .await?;
488        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
489        let system_parameter_defaults = BTreeMap::default();
490        Self::open_debug_catalog_inner(
491            persist_client,
492            storage,
493            now,
494            environment_id,
495            &DUMMY_BUILD_INFO,
496            system_parameter_defaults,
497            bootstrap_args,
498            None,
499        )
500        .await
501    }
502
503    /// Opens a read only debug persist backed catalog defined by `persist_client` and
504    /// `organization_id`.
505    ///
506    /// See [`Catalog::with_debug`].
507    pub async fn open_debug_read_only_catalog(
508        persist_client: PersistClient,
509        organization_id: Uuid,
510        bootstrap_args: &BootstrapArgs,
511    ) -> Result<Catalog, anyhow::Error> {
512        let now = SYSTEM_TIME.clone();
513        let environment_id = None;
514        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
515            .with_organization_id(organization_id)
516            .build()
517            .await?;
518        let storage = openable_storage
519            .open_read_only(&test_bootstrap_args())
520            .await?;
521        let system_parameter_defaults = BTreeMap::default();
522        Self::open_debug_catalog_inner(
523            persist_client,
524            storage,
525            now,
526            environment_id,
527            &DUMMY_BUILD_INFO,
528            system_parameter_defaults,
529            bootstrap_args,
530            None,
531        )
532        .await
533    }
534
535    /// Opens a read only debug persist backed catalog defined by `persist_client` and
536    /// `organization_id`.
537    ///
538    /// See [`Catalog::with_debug`].
539    pub async fn open_debug_read_only_persist_catalog_config(
540        persist_client: PersistClient,
541        now: NowFn,
542        environment_id: EnvironmentId,
543        system_parameter_defaults: BTreeMap<String, String>,
544        build_info: &'static BuildInfo,
545        bootstrap_args: &BootstrapArgs,
546        enable_expression_cache_override: Option<bool>,
547    ) -> Result<Catalog, anyhow::Error> {
548        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
549            .with_organization_id(environment_id.organization_id())
550            .with_version(
551                build_info
552                    .version
553                    .parse()
554                    .expect("build version is parseable"),
555            )
556            .build()
557            .await?;
558        let storage = openable_storage.open_read_only(bootstrap_args).await?;
559        Self::open_debug_catalog_inner(
560            persist_client,
561            storage,
562            now,
563            Some(environment_id),
564            build_info,
565            system_parameter_defaults,
566            bootstrap_args,
567            enable_expression_cache_override,
568        )
569        .await
570    }
571
572    async fn open_debug_catalog_inner(
573        persist_client: PersistClient,
574        storage: Box<dyn DurableCatalogState>,
575        now: NowFn,
576        environment_id: Option<EnvironmentId>,
577        build_info: &'static BuildInfo,
578        system_parameter_defaults: BTreeMap<String, String>,
579        bootstrap_args: &BootstrapArgs,
580        enable_expression_cache_override: Option<bool>,
581    ) -> Result<Catalog, anyhow::Error> {
582        let metrics_registry = &MetricsRegistry::new();
583        let secrets_reader = Arc::new(InMemorySecretsController::new());
584        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
585        // debugging/testing.
586        let previous_ts = now().into();
587        let replica_size = &bootstrap_args.default_cluster_replica_size;
588        let read_only = false;
589
590        let OpenCatalogResult {
591            catalog,
592            migrated_storage_collections_0dt: _,
593            new_builtin_collections: _,
594            builtin_table_updates: _,
595            cached_global_exprs: _,
596            uncached_local_exprs: _,
597        } = Catalog::open(Config {
598            storage,
599            metrics_registry,
600            state: StateConfig {
601                unsafe_mode: true,
602                all_features: false,
603                build_info,
604                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
605                read_only,
606                now,
607                boot_ts: previous_ts,
608                skip_migrations: true,
609                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
610                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
611                    size: replica_size.clone(),
612                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
613                },
614                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
615                    size: replica_size.clone(),
616                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
617                },
618                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
619                    size: replica_size.clone(),
620                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
621                },
622                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
623                    size: replica_size.clone(),
624                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
625                },
626                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
627                    size: replica_size.clone(),
628                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
629                },
630                system_parameter_defaults,
631                remote_system_parameters: None,
632                availability_zones: vec![],
633                egress_addresses: vec![],
634                aws_principal_context: None,
635                aws_privatelink_availability_zones: None,
636                http_host_name: None,
637                connection_context: ConnectionContext::for_tests(secrets_reader),
638                builtin_item_migration_config: BuiltinItemMigrationConfig {
639                    persist_client: persist_client.clone(),
640                    read_only,
641                    force_migration: None,
642                },
643                persist_client,
644                enable_expression_cache_override,
645                helm_chart_version: None,
646                external_login_password_mz_system: None,
647                license_key: ValidatedLicenseKey::for_tests(),
648            },
649        })
650        .await?;
651        Ok(catalog)
652    }
653
654    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
655        self.state.for_session(session)
656    }
657
658    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
659        self.state.for_sessionless_user(role_id)
660    }
661
662    pub fn for_system_session(&self) -> ConnCatalog<'_> {
663        self.state.for_system_session()
664    }
665
666    async fn storage<'a>(
667        &'a self,
668    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
669        self.storage.lock().await
670    }
671
672    pub async fn current_upper(&self) -> mz_repr::Timestamp {
673        self.storage().await.current_upper().await
674    }
675
676    pub async fn allocate_user_id(
677        &self,
678        commit_ts: mz_repr::Timestamp,
679    ) -> Result<(CatalogItemId, GlobalId), Error> {
680        self.storage()
681            .await
682            .allocate_user_id(commit_ts)
683            .await
684            .maybe_terminate("allocating user ids")
685            .err_into()
686    }
687
688    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
689    pub async fn allocate_user_ids(
690        &self,
691        amount: u64,
692        commit_ts: mz_repr::Timestamp,
693    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
694        self.storage()
695            .await
696            .allocate_user_ids(amount, commit_ts)
697            .await
698            .maybe_terminate("allocating user ids")
699            .err_into()
700    }
701
702    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
703        let commit_ts = self.storage().await.current_upper().await;
704        self.allocate_user_id(commit_ts).await
705    }
706
707    /// Get the next user item ID without allocating it.
708    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
709        self.storage()
710            .await
711            .get_next_user_item_id()
712            .await
713            .err_into()
714    }
715
716    #[cfg(test)]
717    pub async fn allocate_system_id(
718        &self,
719        commit_ts: mz_repr::Timestamp,
720    ) -> Result<(CatalogItemId, GlobalId), Error> {
721        use mz_ore::collections::CollectionExt;
722
723        let mut storage = self.storage().await;
724        let mut txn = storage.transaction().await?;
725        let id = txn
726            .allocate_system_item_ids(1)
727            .maybe_terminate("allocating system ids")?
728            .into_element();
729        // Drain transaction.
730        let _ = txn.get_and_commit_op_updates();
731        txn.commit(commit_ts).await?;
732        Ok(id)
733    }
734
735    /// Get the next system item ID without allocating it.
736    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
737        self.storage()
738            .await
739            .get_next_system_item_id()
740            .await
741            .err_into()
742    }
743
744    pub async fn allocate_user_cluster_id(
745        &self,
746        commit_ts: mz_repr::Timestamp,
747    ) -> Result<ClusterId, Error> {
748        self.storage()
749            .await
750            .allocate_user_cluster_id(commit_ts)
751            .await
752            .maybe_terminate("allocating user cluster ids")
753            .err_into()
754    }
755
756    /// Get the next system replica id without allocating it.
757    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
758        self.storage()
759            .await
760            .get_next_system_replica_id()
761            .await
762            .err_into()
763    }
764
765    /// Get the next user replica id without allocating it.
766    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
767        self.storage()
768            .await
769            .get_next_user_replica_id()
770            .await
771            .err_into()
772    }
773
774    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
775        self.state.resolve_database(database_name)
776    }
777
778    pub fn resolve_schema(
779        &self,
780        current_database: Option<&DatabaseId>,
781        database_name: Option<&str>,
782        schema_name: &str,
783        conn_id: &ConnectionId,
784    ) -> Result<&Schema, SqlCatalogError> {
785        self.state
786            .resolve_schema(current_database, database_name, schema_name, conn_id)
787    }
788
789    pub fn resolve_schema_in_database(
790        &self,
791        database_spec: &ResolvedDatabaseSpecifier,
792        schema_name: &str,
793        conn_id: &ConnectionId,
794    ) -> Result<&Schema, SqlCatalogError> {
795        self.state
796            .resolve_schema_in_database(database_spec, schema_name, conn_id)
797    }
798
799    pub fn resolve_replica_in_cluster(
800        &self,
801        cluster_id: &ClusterId,
802        replica_name: &str,
803    ) -> Result<&ClusterReplica, SqlCatalogError> {
804        self.state
805            .resolve_replica_in_cluster(cluster_id, replica_name)
806    }
807
808    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
809        self.state.resolve_system_schema(name)
810    }
811
812    pub fn resolve_search_path(
813        &self,
814        session: &Session,
815    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
816        self.state.resolve_search_path(session)
817    }
818
819    /// Resolves `name` to a non-function [`CatalogEntry`].
820    pub fn resolve_entry(
821        &self,
822        current_database: Option<&DatabaseId>,
823        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
824        name: &PartialItemName,
825        conn_id: &ConnectionId,
826    ) -> Result<&CatalogEntry, SqlCatalogError> {
827        self.state
828            .resolve_entry(current_database, search_path, name, conn_id)
829    }
830
831    /// Resolves a `BuiltinTable`.
832    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
833        self.state.resolve_builtin_table(builtin)
834    }
835
836    /// Resolves a `BuiltinLog`.
837    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
838        self.state.resolve_builtin_log(builtin).0
839    }
840
841    /// Resolves a `BuiltinSource`.
842    pub fn resolve_builtin_storage_collection(
843        &self,
844        builtin: &'static BuiltinSource,
845    ) -> CatalogItemId {
846        self.state.resolve_builtin_source(builtin)
847    }
848
849    /// Resolves `name` to a function [`CatalogEntry`].
850    pub fn resolve_function(
851        &self,
852        current_database: Option<&DatabaseId>,
853        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
854        name: &PartialItemName,
855        conn_id: &ConnectionId,
856    ) -> Result<&CatalogEntry, SqlCatalogError> {
857        self.state
858            .resolve_function(current_database, search_path, name, conn_id)
859    }
860
861    /// Resolves `name` to a type [`CatalogEntry`].
862    pub fn resolve_type(
863        &self,
864        current_database: Option<&DatabaseId>,
865        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
866        name: &PartialItemName,
867        conn_id: &ConnectionId,
868    ) -> Result<&CatalogEntry, SqlCatalogError> {
869        self.state
870            .resolve_type(current_database, search_path, name, conn_id)
871    }
872
873    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
874        self.state.resolve_cluster(name)
875    }
876
877    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
878    ///
879    /// # Panics
880    /// * If the [`BuiltinCluster`] doesn't exist.
881    ///
882    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
883        self.state.resolve_builtin_cluster(cluster)
884    }
885
886    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
887        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
888    }
889
890    /// Resolves a [`Cluster`] for a TargetCluster.
891    pub fn resolve_target_cluster(
892        &self,
893        target_cluster: TargetCluster,
894        session: &Session,
895    ) -> Result<&Cluster, AdapterError> {
896        match target_cluster {
897            TargetCluster::CatalogServer => {
898                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
899            }
900            TargetCluster::Active => self.active_cluster(session),
901            TargetCluster::Transaction(cluster_id) => self
902                .try_get_cluster(cluster_id)
903                .ok_or(AdapterError::ConcurrentClusterDrop),
904        }
905    }
906
907    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
908        // TODO(benesch): this check here is not sufficiently protective. It'd
909        // be very easy for a code path to accidentally avoid this check by
910        // calling `resolve_cluster(session.vars().cluster())`.
911        if session.user().name != SYSTEM_USER.name
912            && session.user().name != SUPPORT_USER.name
913            && session.vars().cluster() == SYSTEM_USER.name
914        {
915            coord_bail!(
916                "system cluster '{}' cannot execute user queries",
917                SYSTEM_USER.name
918            );
919        }
920        let cluster = self.resolve_cluster(session.vars().cluster())?;
921        Ok(cluster)
922    }
923
924    pub fn state(&self) -> &CatalogState {
925        &self.state
926    }
927
928    pub fn resolve_full_name(
929        &self,
930        name: &QualifiedItemName,
931        conn_id: Option<&ConnectionId>,
932    ) -> FullItemName {
933        self.state.resolve_full_name(name, conn_id)
934    }
935
936    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
937        self.state.try_get_entry(id)
938    }
939
940    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
941        self.state.try_get_entry_by_global_id(id)
942    }
943
944    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
945        self.state.get_entry(id)
946    }
947
948    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
949        self.state.get_entry_by_global_id(id)
950    }
951
952    pub fn get_global_ids<'a>(
953        &'a self,
954        id: &CatalogItemId,
955    ) -> impl Iterator<Item = GlobalId> + use<'a> {
956        self.get_entry(id).global_ids()
957    }
958
959    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
960        self.get_entry_by_global_id(id).id()
961    }
962
963    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
964        let item = self.try_get_entry_by_global_id(id)?;
965        Some(item.id())
966    }
967
968    pub fn get_schema(
969        &self,
970        database_spec: &ResolvedDatabaseSpecifier,
971        schema_spec: &SchemaSpecifier,
972        conn_id: &ConnectionId,
973    ) -> &Schema {
974        self.state.get_schema(database_spec, schema_spec, conn_id)
975    }
976
977    pub fn try_get_schema(
978        &self,
979        database_spec: &ResolvedDatabaseSpecifier,
980        schema_spec: &SchemaSpecifier,
981        conn_id: &ConnectionId,
982    ) -> Option<&Schema> {
983        self.state
984            .try_get_schema(database_spec, schema_spec, conn_id)
985    }
986
987    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
988        self.state.get_mz_catalog_schema_id()
989    }
990
991    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
992        self.state.get_pg_catalog_schema_id()
993    }
994
995    pub fn get_information_schema_id(&self) -> SchemaId {
996        self.state.get_information_schema_id()
997    }
998
999    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1000        self.state.get_mz_internal_schema_id()
1001    }
1002
1003    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1004        self.state.get_mz_introspection_schema_id()
1005    }
1006
1007    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1008        self.state.get_mz_unsafe_schema_id()
1009    }
1010
1011    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1012        self.state.system_schema_ids()
1013    }
1014
1015    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1016        self.state.get_database(id)
1017    }
1018
1019    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1020        self.state.try_get_role(id)
1021    }
1022
1023    pub fn get_role(&self, id: &RoleId) -> &Role {
1024        self.state.get_role(id)
1025    }
1026
1027    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1028        self.state.try_get_role_by_name(role_name)
1029    }
1030
1031    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1032        self.state.try_get_role_auth_by_id(id)
1033    }
1034
1035    /// Creates a new schema in the `Catalog` for temporary items
1036    /// indicated by the TEMPORARY or TEMP keywords.
1037    pub fn create_temporary_schema(
1038        &mut self,
1039        conn_id: &ConnectionId,
1040        owner_id: RoleId,
1041    ) -> Result<(), Error> {
1042        self.state.create_temporary_schema(conn_id, owner_id)
1043    }
1044
1045    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1046        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
1047        self.state
1048            .temporary_schemas
1049            .get(conn_id)
1050            .map(|schema| schema.items.contains_key(item_name))
1051            .unwrap_or(false)
1052    }
1053
1054    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1055    /// Returns Ok if conn_id's temp schema does not exist.
1056    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1057        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1058            return Ok(());
1059        };
1060        if !schema.items.is_empty() {
1061            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1062        }
1063        Ok(())
1064    }
1065
1066    pub(crate) fn object_dependents(
1067        &self,
1068        object_ids: &Vec<ObjectId>,
1069        conn_id: &ConnectionId,
1070    ) -> Vec<ObjectId> {
1071        let mut seen = BTreeSet::new();
1072        self.state.object_dependents(object_ids, conn_id, &mut seen)
1073    }
1074
1075    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1076        FullNameV1 {
1077            database: name.database.to_string(),
1078            schema: name.schema.clone(),
1079            item: name.item.clone(),
1080        }
1081    }
1082
1083    pub fn find_available_cluster_name(&self, name: &str) -> String {
1084        let mut i = 0;
1085        let mut candidate = name.to_string();
1086        while self.state.clusters_by_name.contains_key(&candidate) {
1087            i += 1;
1088            candidate = format!("{}{}", name, i);
1089        }
1090        candidate
1091    }
1092
1093    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1094        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1095            self.cluster_replica_sizes()
1096                .enabled_allocations()
1097                .map(|a| a.0.to_owned())
1098                .collect::<Vec<_>>()
1099        } else {
1100            self.system_config().allowed_cluster_replica_sizes()
1101        }
1102    }
1103
1104    pub fn concretize_replica_location(
1105        &self,
1106        location: mz_catalog::durable::ReplicaLocation,
1107        allowed_sizes: &Vec<String>,
1108        allowed_availability_zones: Option<&[String]>,
1109    ) -> Result<ReplicaLocation, Error> {
1110        self.state
1111            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1112    }
1113
1114    pub(crate) fn ensure_valid_replica_size(
1115        &self,
1116        allowed_sizes: &[String],
1117        size: &String,
1118    ) -> Result<(), Error> {
1119        self.state.ensure_valid_replica_size(allowed_sizes, size)
1120    }
1121
1122    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1123        &self.state.cluster_replica_sizes
1124    }
1125
1126    /// Returns the privileges of an object by its ID.
1127    pub fn get_privileges(
1128        &self,
1129        id: &SystemObjectId,
1130        conn_id: &ConnectionId,
1131    ) -> Option<&PrivilegeMap> {
1132        match id {
1133            SystemObjectId::Object(id) => match id {
1134                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1135                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1136                ObjectId::Schema((database_spec, schema_spec)) => Some(
1137                    self.get_schema(database_spec, schema_spec, conn_id)
1138                        .privileges(),
1139                ),
1140                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1141                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1142                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1143            },
1144            SystemObjectId::System => Some(&self.state.system_privileges),
1145        }
1146    }
1147
1148    #[mz_ore::instrument(level = "debug")]
1149    pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1150        Ok(self.storage().await.advance_upper(new_upper).await?)
1151    }
1152
1153    /// Return the ids of all log sources the given object depends on.
1154    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1155        self.state.introspection_dependencies(id)
1156    }
1157
1158    /// Serializes the catalog's in-memory state.
1159    ///
1160    /// There are no guarantees about the format of the serialized state, except
1161    /// that the serialized state for two identical catalogs will compare
1162    /// identically.
1163    pub fn dump(&self) -> Result<CatalogDump, Error> {
1164        Ok(CatalogDump::new(self.state.dump(None)?))
1165    }
1166
1167    /// Checks the [`Catalog`]s internal consistency.
1168    ///
1169    /// Returns a JSON object describing the inconsistencies, if there are any.
1170    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1171        self.state.check_consistency().map_err(|inconsistencies| {
1172            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1173                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1174            })
1175        })
1176    }
1177
1178    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1179        self.state.config()
1180    }
1181
1182    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1183        self.state.entry_by_id.values()
1184    }
1185
1186    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1187        self.entries()
1188            .filter(|entry| entry.is_connection() && entry.id().is_user())
1189    }
1190
1191    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1192        self.entries()
1193            .filter(|entry| entry.is_table() && entry.id().is_user())
1194    }
1195
1196    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1197        self.entries()
1198            .filter(|entry| entry.is_source() && entry.id().is_user())
1199    }
1200
1201    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1202        self.entries()
1203            .filter(|entry| entry.is_sink() && entry.id().is_user())
1204    }
1205
1206    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1207        self.entries()
1208            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1209    }
1210
1211    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1212        self.entries()
1213            .filter(|entry| entry.is_secret() && entry.id().is_user())
1214    }
1215
1216    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1217        self.state.get_network_policy(&network_policy_id)
1218    }
1219
1220    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1221        self.state.try_get_network_policy_by_name(name)
1222    }
1223
1224    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1225        self.state.clusters_by_id.values()
1226    }
1227
1228    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1229        self.state.get_cluster(cluster_id)
1230    }
1231
1232    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1233        self.state.try_get_cluster(cluster_id)
1234    }
1235
1236    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1237        self.clusters().filter(|cluster| cluster.id.is_user())
1238    }
1239
1240    pub fn get_cluster_replica(
1241        &self,
1242        cluster_id: ClusterId,
1243        replica_id: ReplicaId,
1244    ) -> &ClusterReplica {
1245        self.state.get_cluster_replica(cluster_id, replica_id)
1246    }
1247
1248    pub fn try_get_cluster_replica(
1249        &self,
1250        cluster_id: ClusterId,
1251        replica_id: ReplicaId,
1252    ) -> Option<&ClusterReplica> {
1253        self.state.try_get_cluster_replica(cluster_id, replica_id)
1254    }
1255
1256    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1257        self.user_clusters()
1258            .flat_map(|cluster| cluster.user_replicas())
1259    }
1260
1261    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1262        self.state.database_by_id.values()
1263    }
1264
1265    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1266        self.state
1267            .roles_by_id
1268            .values()
1269            .filter(|role| role.is_user())
1270    }
1271
1272    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1273        self.entries()
1274            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1275    }
1276
1277    pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1278        self.state
1279            .network_policies_by_id
1280            .iter()
1281            .filter(|(id, _)| id.is_user())
1282            .map(|(_, policy)| policy)
1283    }
1284
1285    pub fn system_privileges(&self) -> &PrivilegeMap {
1286        &self.state.system_privileges
1287    }
1288
1289    pub fn default_privileges(
1290        &self,
1291    ) -> impl Iterator<
1292        Item = (
1293            &DefaultPrivilegeObject,
1294            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1295        ),
1296    > {
1297        self.state.default_privileges.iter()
1298    }
1299
1300    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1301        self.state
1302            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1303    }
1304
1305    pub fn pack_storage_usage_update(
1306        &self,
1307        event: VersionedStorageUsage,
1308        diff: Diff,
1309    ) -> BuiltinTableUpdate {
1310        self.state
1311            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1312    }
1313
1314    pub fn system_config(&self) -> &SystemVars {
1315        self.state.system_config()
1316    }
1317
1318    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1319        self.state.system_config_mut()
1320    }
1321
1322    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1323        self.state.ensure_not_reserved_role(role_id)
1324    }
1325
1326    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1327        self.state.ensure_grantable_role(role_id)
1328    }
1329
1330    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1331        self.state.ensure_not_system_role(role_id)
1332    }
1333
1334    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1335        self.state.ensure_not_predefined_role(role_id)
1336    }
1337
1338    pub fn ensure_not_reserved_network_policy(
1339        &self,
1340        network_policy_id: &NetworkPolicyId,
1341    ) -> Result<(), Error> {
1342        self.state
1343            .ensure_not_reserved_network_policy(network_policy_id)
1344    }
1345
1346    pub fn ensure_not_reserved_object(
1347        &self,
1348        object_id: &ObjectId,
1349        conn_id: &ConnectionId,
1350    ) -> Result<(), Error> {
1351        match object_id {
1352            ObjectId::Cluster(cluster_id) => {
1353                if cluster_id.is_system() {
1354                    let cluster = self.get_cluster(*cluster_id);
1355                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1356                        cluster.name().to_string(),
1357                    )))
1358                } else {
1359                    Ok(())
1360                }
1361            }
1362            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1363                if replica_id.is_system() {
1364                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1365                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1366                        replica.name().to_string(),
1367                    )))
1368                } else {
1369                    Ok(())
1370                }
1371            }
1372            ObjectId::Database(database_id) => {
1373                if database_id.is_system() {
1374                    let database = self.get_database(database_id);
1375                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1376                        database.name().to_string(),
1377                    )))
1378                } else {
1379                    Ok(())
1380                }
1381            }
1382            ObjectId::Schema((database_spec, schema_spec)) => {
1383                if schema_spec.is_system() {
1384                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1385                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1386                        schema.name().schema.clone(),
1387                    )))
1388                } else {
1389                    Ok(())
1390                }
1391            }
1392            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1393            ObjectId::Item(item_id) => {
1394                if item_id.is_system() {
1395                    let item = self.get_entry(item_id);
1396                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1397                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1398                } else {
1399                    Ok(())
1400                }
1401            }
1402            ObjectId::NetworkPolicy(network_policy_id) => {
1403                self.ensure_not_reserved_network_policy(network_policy_id)
1404            }
1405        }
1406    }
1407
1408    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1409    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1410        &mut self,
1411        create_sql: &str,
1412        force_if_exists_skip: bool,
1413    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1414        self.state
1415            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1416    }
1417
1418    /// Cache global and, optionally, local expressions for the given `GlobalId`.
1419    ///
1420    /// This takes the required plans and metainfo from the catalog and expects that they were
1421    /// previously stored via [`Catalog::set_optimized_plan`], [`Catalog::set_physical_plan`], and
1422    /// [`Catalog::set_dataflow_metainfo`].
1423    pub(crate) fn cache_expressions(
1424        &self,
1425        id: GlobalId,
1426        local_mir: Option<OptimizedMirRelationExpr>,
1427        optimizer_features: OptimizerFeatures,
1428    ) {
1429        let Some(mut global_mir) = self.try_get_optimized_plan(&id).cloned() else {
1430            soft_panic_or_log!("optimized plan missing for ID {id}");
1431            return;
1432        };
1433        let Some(mut physical_plan) = self.try_get_physical_plan(&id).cloned() else {
1434            soft_panic_or_log!("physical plan missing for ID {id}");
1435            return;
1436        };
1437        let Some(dataflow_metainfos) = self.try_get_dataflow_metainfo(&id).cloned() else {
1438            soft_panic_or_log!("dataflow metainfo missing for ID {id}");
1439            return;
1440        };
1441
1442        // Make sure we're not caching the result of timestamp selection, as it will almost
1443        // certainly be wrong if we re-install the dataflow at a later time.
1444        global_mir.as_of = None;
1445        global_mir.until = Default::default();
1446        physical_plan.as_of = None;
1447        physical_plan.until = Default::default();
1448
1449        let mut local_exprs = Vec::new();
1450        if let Some(local_mir) = local_mir {
1451            local_exprs.push((
1452                id,
1453                LocalExpressions {
1454                    local_mir,
1455                    optimizer_features: optimizer_features.clone(),
1456                },
1457            ));
1458        }
1459        let global_exprs = vec![(
1460            id,
1461            GlobalExpressions {
1462                global_mir,
1463                physical_plan,
1464                dataflow_metainfos,
1465                optimizer_features,
1466            },
1467        )];
1468        let _fut = self.update_expression_cache(local_exprs, global_exprs, Default::default());
1469    }
1470
1471    pub(crate) fn update_expression_cache<'a, 'b>(
1472        &'a self,
1473        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1474        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1475        invalidate_ids: BTreeSet<GlobalId>,
1476    ) -> BoxFuture<'b, ()> {
1477        if let Some(expr_cache) = &self.expr_cache_handle {
1478            expr_cache
1479                .update(
1480                    new_local_expressions,
1481                    new_global_expressions,
1482                    invalidate_ids,
1483                )
1484                .boxed()
1485        } else {
1486            async {}.boxed()
1487        }
1488    }
1489
1490    /// Listen for and apply all unconsumed updates to the durable catalog state.
1491    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1492    // `#[cfg(test)]` annotation.
1493    #[cfg(test)]
1494    async fn sync_to_current_updates(
1495        &mut self,
1496    ) -> Result<
1497        (
1498            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1499            Vec<ParsedStateUpdate>,
1500        ),
1501        CatalogError,
1502    > {
1503        let updates = self.storage().await.sync_to_current_updates().await?;
1504        let (builtin_table_updates, catalog_updates) = self
1505            .state
1506            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1507            .await;
1508        Ok((builtin_table_updates, catalog_updates))
1509    }
1510}
1511
1512pub fn is_reserved_name(name: &str) -> bool {
1513    BUILTIN_PREFIXES
1514        .iter()
1515        .any(|prefix| name.starts_with(prefix))
1516}
1517
1518pub fn is_reserved_role_name(name: &str) -> bool {
1519    is_reserved_name(name) || is_public_role(name)
1520}
1521
1522pub fn is_public_role(name: &str) -> bool {
1523    name == &*PUBLIC_ROLE_NAME
1524}
1525
1526pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1527    object_type_to_audit_object_type(sql_type.into())
1528}
1529
1530pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1531    match id {
1532        CommentObjectId::Table(_) => ObjectType::Table,
1533        CommentObjectId::View(_) => ObjectType::View,
1534        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1535        CommentObjectId::Source(_) => ObjectType::Source,
1536        CommentObjectId::Sink(_) => ObjectType::Sink,
1537        CommentObjectId::Index(_) => ObjectType::Index,
1538        CommentObjectId::Func(_) => ObjectType::Func,
1539        CommentObjectId::Connection(_) => ObjectType::Connection,
1540        CommentObjectId::Type(_) => ObjectType::Type,
1541        CommentObjectId::Secret(_) => ObjectType::Secret,
1542        CommentObjectId::Role(_) => ObjectType::Role,
1543        CommentObjectId::Database(_) => ObjectType::Database,
1544        CommentObjectId::Schema(_) => ObjectType::Schema,
1545        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1546        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1547        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1548        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1549    }
1550}
1551
1552pub(crate) fn object_type_to_audit_object_type(
1553    object_type: mz_sql::catalog::ObjectType,
1554) -> ObjectType {
1555    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1556}
1557
1558pub(crate) fn system_object_type_to_audit_object_type(
1559    system_type: &SystemObjectType,
1560) -> ObjectType {
1561    match system_type {
1562        SystemObjectType::Object(object_type) => match object_type {
1563            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1564            mz_sql::catalog::ObjectType::View => ObjectType::View,
1565            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1566            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1567            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1568            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1569            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1570            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1571            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1572            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1573            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1574            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1575            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1576            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1577            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1578            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1579            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1580        },
1581        SystemObjectType::System => ObjectType::System,
1582    }
1583}
1584
1585#[derive(Debug, Copy, Clone)]
1586pub enum UpdatePrivilegeVariant {
1587    Grant,
1588    Revoke,
1589}
1590
1591impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1592    fn from(variant: UpdatePrivilegeVariant) -> Self {
1593        match variant {
1594            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1595            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1596        }
1597    }
1598}
1599
1600impl From<UpdatePrivilegeVariant> for EventType {
1601    fn from(variant: UpdatePrivilegeVariant) -> Self {
1602        match variant {
1603            UpdatePrivilegeVariant::Grant => EventType::Grant,
1604            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1605        }
1606    }
1607}
1608
1609impl ConnCatalog<'_> {
1610    fn resolve_item_name(
1611        &self,
1612        name: &PartialItemName,
1613    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1614        self.resolve_item(name).map(|entry| entry.name())
1615    }
1616
1617    fn resolve_function_name(
1618        &self,
1619        name: &PartialItemName,
1620    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1621        self.resolve_function(name).map(|entry| entry.name())
1622    }
1623
1624    fn resolve_type_name(
1625        &self,
1626        name: &PartialItemName,
1627    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1628        self.resolve_type(name).map(|entry| entry.name())
1629    }
1630}
1631
1632impl ExprHumanizer for ConnCatalog<'_> {
1633    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1634        let entry = self.state.try_get_entry_by_global_id(&id)?;
1635        Some(self.resolve_full_name(entry.name()).to_string())
1636    }
1637
1638    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1639        let entry = self.state.try_get_entry_by_global_id(&id)?;
1640        Some(entry.name().item.clone())
1641    }
1642
1643    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1644        let entry = self.state.try_get_entry_by_global_id(&id)?;
1645        Some(self.resolve_full_name(entry.name()).into_parts())
1646    }
1647
1648    fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1649        use SqlScalarType::*;
1650
1651        match typ {
1652            Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1653            List {
1654                custom_id: Some(item_id),
1655                ..
1656            }
1657            | Map {
1658                custom_id: Some(item_id),
1659                ..
1660            } => {
1661                let item = self.get_item(item_id);
1662                self.minimal_qualification(item.name()).to_string()
1663            }
1664            List { element_type, .. } => {
1665                format!(
1666                    "{} list",
1667                    self.humanize_sql_scalar_type(element_type, postgres_compat)
1668                )
1669            }
1670            Map { value_type, .. } => format!(
1671                "map[{}=>{}]",
1672                self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1673                self.humanize_sql_scalar_type(value_type, postgres_compat)
1674            ),
1675            Record {
1676                custom_id: Some(item_id),
1677                ..
1678            } => {
1679                let item = self.get_item(item_id);
1680                self.minimal_qualification(item.name()).to_string()
1681            }
1682            Record { fields, .. } => format!(
1683                "record({})",
1684                fields
1685                    .iter()
1686                    .map(|f| format!(
1687                        "{}: {}",
1688                        f.0,
1689                        self.humanize_sql_column_type(&f.1, postgres_compat)
1690                    ))
1691                    .join(",")
1692            ),
1693            PgLegacyChar => "\"char\"".into(),
1694            Char { length } if !postgres_compat => match length {
1695                None => "char".into(),
1696                Some(length) => format!("char({})", length.into_u32()),
1697            },
1698            VarChar { max_length } if !postgres_compat => match max_length {
1699                None => "varchar".into(),
1700                Some(length) => format!("varchar({})", length.into_u32()),
1701            },
1702            UInt16 => "uint2".into(),
1703            UInt32 => "uint4".into(),
1704            UInt64 => "uint8".into(),
1705            ty => {
1706                let pgrepr_type = mz_pgrepr::Type::from(ty);
1707                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1708
1709                let res = if self
1710                    .effective_search_path(true)
1711                    .iter()
1712                    .any(|(_, schema)| schema == &pg_catalog_schema)
1713                {
1714                    pgrepr_type.name().to_string()
1715                } else {
1716                    // If PG_CATALOG_SCHEMA is not in search path, you need
1717                    // qualified object name to refer to type.
1718                    let name = QualifiedItemName {
1719                        qualifiers: ItemQualifiers {
1720                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1721                            schema_spec: pg_catalog_schema,
1722                        },
1723                        item: pgrepr_type.name().to_string(),
1724                    };
1725                    self.resolve_full_name(&name).to_string()
1726                };
1727                res
1728            }
1729        }
1730    }
1731
1732    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1733        let entry = self.state.try_get_entry_by_global_id(&id)?;
1734
1735        match entry.index() {
1736            Some(index) => {
1737                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1738                let mut on_names = on_desc
1739                    .iter_names()
1740                    .map(|col_name| col_name.to_string())
1741                    .collect::<Vec<_>>();
1742
1743                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1744
1745                // Init ix_names with unknown column names. Unknown columns are
1746                // represented as an empty String and rendered as `#c` by the
1747                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1748                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1749                let mut ix_names = vec![String::new(); ix_arity];
1750
1751                // Apply the permutation by swapping on_names with ix_names.
1752                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1753                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1754                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1755                    std::mem::swap(on_name, ix_name);
1756                }
1757
1758                Some(ix_names) // Return the updated ix_names vector.
1759            }
1760            None => {
1761                let desc = self.state.try_get_desc_by_global_id(&id)?;
1762                let column_names = desc
1763                    .iter_names()
1764                    .map(|col_name| col_name.to_string())
1765                    .collect();
1766
1767                Some(column_names)
1768            }
1769        }
1770    }
1771
1772    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1773        let desc = self.state.try_get_desc_by_global_id(&id)?;
1774        Some(desc.get_name(column).to_string())
1775    }
1776
1777    fn id_exists(&self, id: GlobalId) -> bool {
1778        self.state.entry_by_global_id.contains_key(&id)
1779    }
1780}
1781
1782impl SessionCatalog for ConnCatalog<'_> {
1783    fn active_role_id(&self) -> &RoleId {
1784        &self.role_id
1785    }
1786
1787    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1788        self.prepared_statements
1789            .as_ref()
1790            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1791            .flatten()
1792    }
1793
1794    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1795        self.portals
1796            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1797    }
1798
1799    fn active_database(&self) -> Option<&DatabaseId> {
1800        self.database.as_ref()
1801    }
1802
1803    fn active_cluster(&self) -> &str {
1804        &self.cluster
1805    }
1806
1807    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1808        &self.search_path
1809    }
1810
1811    fn resolve_database(
1812        &self,
1813        database_name: &str,
1814    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1815        Ok(self.state.resolve_database(database_name)?)
1816    }
1817
1818    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1819        self.state
1820            .database_by_id
1821            .get(id)
1822            .expect("database doesn't exist")
1823    }
1824
1825    // `as` is ok to use to cast to a trait object.
1826    #[allow(clippy::as_conversions)]
1827    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1828        self.state
1829            .database_by_id
1830            .values()
1831            .map(|database| database as &dyn CatalogDatabase)
1832            .collect()
1833    }
1834
1835    fn resolve_schema(
1836        &self,
1837        database_name: Option<&str>,
1838        schema_name: &str,
1839    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1840        Ok(self.state.resolve_schema(
1841            self.database.as_ref(),
1842            database_name,
1843            schema_name,
1844            &self.conn_id,
1845        )?)
1846    }
1847
1848    fn resolve_schema_in_database(
1849        &self,
1850        database_spec: &ResolvedDatabaseSpecifier,
1851        schema_name: &str,
1852    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1853        Ok(self
1854            .state
1855            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1856    }
1857
1858    fn get_schema(
1859        &self,
1860        database_spec: &ResolvedDatabaseSpecifier,
1861        schema_spec: &SchemaSpecifier,
1862    ) -> &dyn CatalogSchema {
1863        self.state
1864            .get_schema(database_spec, schema_spec, &self.conn_id)
1865    }
1866
1867    // `as` is ok to use to cast to a trait object.
1868    #[allow(clippy::as_conversions)]
1869    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1870        self.get_databases()
1871            .into_iter()
1872            .flat_map(|database| database.schemas().into_iter())
1873            .chain(
1874                self.state
1875                    .ambient_schemas_by_id
1876                    .values()
1877                    .chain(self.state.temporary_schemas.values())
1878                    .map(|schema| schema as &dyn CatalogSchema),
1879            )
1880            .collect()
1881    }
1882
1883    fn get_mz_internal_schema_id(&self) -> SchemaId {
1884        self.state().get_mz_internal_schema_id()
1885    }
1886
1887    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1888        self.state().get_mz_unsafe_schema_id()
1889    }
1890
1891    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1892        self.state.is_system_schema_specifier(schema)
1893    }
1894
1895    fn resolve_role(
1896        &self,
1897        role_name: &str,
1898    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1899        match self.state.try_get_role_by_name(role_name) {
1900            Some(role) => Ok(role),
1901            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1902        }
1903    }
1904
1905    fn resolve_network_policy(
1906        &self,
1907        policy_name: &str,
1908    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1909        match self.state.try_get_network_policy_by_name(policy_name) {
1910            Some(policy) => Ok(policy),
1911            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1912        }
1913    }
1914
1915    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1916        Some(self.state.roles_by_id.get(id)?)
1917    }
1918
1919    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1920        self.state.get_role(id)
1921    }
1922
1923    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1924        // `as` is ok to use to cast to a trait object.
1925        #[allow(clippy::as_conversions)]
1926        self.state
1927            .roles_by_id
1928            .values()
1929            .map(|role| role as &dyn CatalogRole)
1930            .collect()
1931    }
1932
1933    fn mz_system_role_id(&self) -> RoleId {
1934        MZ_SYSTEM_ROLE_ID
1935    }
1936
1937    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1938        self.state.collect_role_membership(id)
1939    }
1940
1941    fn get_network_policy(
1942        &self,
1943        id: &NetworkPolicyId,
1944    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1945        self.state.get_network_policy(id)
1946    }
1947
1948    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1949        // `as` is ok to use to cast to a trait object.
1950        #[allow(clippy::as_conversions)]
1951        self.state
1952            .network_policies_by_id
1953            .values()
1954            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1955            .collect()
1956    }
1957
1958    fn resolve_cluster(
1959        &self,
1960        cluster_name: Option<&str>,
1961    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1962        Ok(self
1963            .state
1964            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1965    }
1966
1967    fn resolve_cluster_replica(
1968        &self,
1969        cluster_replica_name: &QualifiedReplica,
1970    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1971        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1972    }
1973
1974    fn resolve_item(
1975        &self,
1976        name: &PartialItemName,
1977    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1978        let r = self.state.resolve_entry(
1979            self.database.as_ref(),
1980            &self.effective_search_path(true),
1981            name,
1982            &self.conn_id,
1983        )?;
1984        if self.unresolvable_ids.contains(&r.id()) {
1985            Err(SqlCatalogError::UnknownItem(name.to_string()))
1986        } else {
1987            Ok(r)
1988        }
1989    }
1990
1991    fn resolve_function(
1992        &self,
1993        name: &PartialItemName,
1994    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1995        let r = self.state.resolve_function(
1996            self.database.as_ref(),
1997            &self.effective_search_path(false),
1998            name,
1999            &self.conn_id,
2000        )?;
2001
2002        if self.unresolvable_ids.contains(&r.id()) {
2003            Err(SqlCatalogError::UnknownFunction {
2004                name: name.to_string(),
2005                alternative: None,
2006            })
2007        } else {
2008            Ok(r)
2009        }
2010    }
2011
2012    fn resolve_type(
2013        &self,
2014        name: &PartialItemName,
2015    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2016        let r = self.state.resolve_type(
2017            self.database.as_ref(),
2018            &self.effective_search_path(false),
2019            name,
2020            &self.conn_id,
2021        )?;
2022
2023        if self.unresolvable_ids.contains(&r.id()) {
2024            Err(SqlCatalogError::UnknownType {
2025                name: name.to_string(),
2026            })
2027        } else {
2028            Ok(r)
2029        }
2030    }
2031
2032    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2033        self.state.get_system_type(name)
2034    }
2035
2036    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2037        Some(self.state.try_get_entry(id)?)
2038    }
2039
2040    fn try_get_item_by_global_id(
2041        &self,
2042        id: &GlobalId,
2043    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2044        let entry = self.state.try_get_entry_by_global_id(id)?;
2045        let entry = match &entry.item {
2046            CatalogItem::Table(table) => {
2047                let (version, _gid) = table
2048                    .collections
2049                    .iter()
2050                    .find(|(_version, gid)| *gid == id)
2051                    .expect("catalog out of sync, mismatched GlobalId");
2052                entry.at_version(RelationVersionSelector::Specific(*version))
2053            }
2054            _ => entry.at_version(RelationVersionSelector::Latest),
2055        };
2056        Some(entry)
2057    }
2058
2059    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2060        self.state.get_entry(id)
2061    }
2062
2063    fn get_item_by_global_id(
2064        &self,
2065        id: &GlobalId,
2066    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2067        let entry = self.state.get_entry_by_global_id(id);
2068        let entry = match &entry.item {
2069            CatalogItem::Table(table) => {
2070                let (version, _gid) = table
2071                    .collections
2072                    .iter()
2073                    .find(|(_version, gid)| *gid == id)
2074                    .expect("catalog out of sync, mismatched GlobalId");
2075                entry.at_version(RelationVersionSelector::Specific(*version))
2076            }
2077            _ => entry.at_version(RelationVersionSelector::Latest),
2078        };
2079        entry
2080    }
2081
2082    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2083        self.get_schemas()
2084            .into_iter()
2085            .flat_map(|schema| schema.item_ids())
2086            .map(|id| self.get_item(&id))
2087            .collect()
2088    }
2089
2090    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2091        self.state
2092            .get_item_by_name(name, &self.conn_id)
2093            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2094    }
2095
2096    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2097        self.state
2098            .get_type_by_name(name, &self.conn_id)
2099            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2100    }
2101
2102    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2103        &self.state.clusters_by_id[&id]
2104    }
2105
2106    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2107        self.state
2108            .clusters_by_id
2109            .values()
2110            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2111            .collect()
2112    }
2113
2114    fn get_cluster_replica(
2115        &self,
2116        cluster_id: ClusterId,
2117        replica_id: ReplicaId,
2118    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2119        let cluster = self.get_cluster(cluster_id);
2120        cluster.replica(replica_id)
2121    }
2122
2123    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2124        self.get_clusters()
2125            .into_iter()
2126            .flat_map(|cluster| cluster.replicas().into_iter())
2127            .collect()
2128    }
2129
2130    fn get_system_privileges(&self) -> &PrivilegeMap {
2131        &self.state.system_privileges
2132    }
2133
2134    fn get_default_privileges(
2135        &self,
2136    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2137        self.state
2138            .default_privileges
2139            .iter()
2140            .map(|(object, acl_items)| (object, acl_items.collect()))
2141            .collect()
2142    }
2143
2144    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2145        self.state.find_available_name(name, &self.conn_id)
2146    }
2147
2148    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2149        self.state.resolve_full_name(name, Some(&self.conn_id))
2150    }
2151
2152    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2153        self.state.resolve_full_schema_name(name)
2154    }
2155
2156    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2157        self.state.get_entry_by_global_id(global_id).id()
2158    }
2159
2160    fn resolve_global_id(
2161        &self,
2162        item_id: &CatalogItemId,
2163        version: RelationVersionSelector,
2164    ) -> GlobalId {
2165        self.state
2166            .get_entry(item_id)
2167            .at_version(version)
2168            .global_id()
2169    }
2170
2171    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2172        self.state.config()
2173    }
2174
2175    fn now(&self) -> EpochMillis {
2176        (self.state.config().now)()
2177    }
2178
2179    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2180        self.state.aws_privatelink_availability_zones.clone()
2181    }
2182
2183    fn system_vars(&self) -> &SystemVars {
2184        &self.state.system_configuration
2185    }
2186
2187    fn system_vars_mut(&mut self) -> &mut SystemVars {
2188        Arc::make_mut(&mut self.state.to_mut().system_configuration)
2189    }
2190
2191    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2192        self.state().get_owner_id(id, self.conn_id())
2193    }
2194
2195    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2196        match id {
2197            SystemObjectId::System => Some(&self.state.system_privileges),
2198            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2199                Some(self.get_cluster(*id).privileges())
2200            }
2201            SystemObjectId::Object(ObjectId::Database(id)) => {
2202                Some(self.get_database(id).privileges())
2203            }
2204            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2205                // For temporary schemas that haven't been created yet (lazy creation),
2206                // we return None - the RBAC check will need to handle this case.
2207                self.state
2208                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2209                    .map(|schema| schema.privileges())
2210            }
2211            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2212            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2213                Some(self.get_network_policy(id).privileges())
2214            }
2215            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2216            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2217        }
2218    }
2219
2220    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2221        let mut seen = BTreeSet::new();
2222        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2223    }
2224
2225    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2226        let mut seen = BTreeSet::new();
2227        self.state.item_dependents(id, &mut seen)
2228    }
2229
2230    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2231        rbac::all_object_privileges(object_type)
2232    }
2233
2234    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2235        self.state.get_object_type(object_id)
2236    }
2237
2238    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2239        self.state.get_system_object_type(id)
2240    }
2241
2242    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2243    /// the object.
2244    ///
2245    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2246    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2247    /// only for error msgs and other humanizations.
2248    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2249        if qualified_name.qualifiers.schema_spec.is_temporary() {
2250            // All bets are off. Just give up and return the qualified name as is.
2251            // TODO: Figure out what's going on with temporary objects.
2252
2253            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2254            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2255            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2256            // which we don't have a simple repro.
2257            return qualified_name.item.clone().into();
2258        }
2259
2260        let database_id = match &qualified_name.qualifiers.database_spec {
2261            ResolvedDatabaseSpecifier::Ambient => None,
2262            ResolvedDatabaseSpecifier::Id(id)
2263                if self.database.is_some() && self.database == Some(*id) =>
2264            {
2265                None
2266            }
2267            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2268        };
2269
2270        let schema_spec = if database_id.is_none()
2271            && self.resolve_item_name(&PartialItemName {
2272                database: None,
2273                schema: None,
2274                item: qualified_name.item.clone(),
2275            }) == Ok(qualified_name)
2276            || self.resolve_function_name(&PartialItemName {
2277                database: None,
2278                schema: None,
2279                item: qualified_name.item.clone(),
2280            }) == Ok(qualified_name)
2281            || self.resolve_type_name(&PartialItemName {
2282                database: None,
2283                schema: None,
2284                item: qualified_name.item.clone(),
2285            }) == Ok(qualified_name)
2286        {
2287            None
2288        } else {
2289            // If `search_path` does not contain `full_name.schema`, the
2290            // `PartialName` must contain it.
2291            Some(qualified_name.qualifiers.schema_spec.clone())
2292        };
2293
2294        let res = PartialItemName {
2295            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2296            schema: schema_spec.map(|spec| {
2297                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2298                    .name()
2299                    .schema
2300                    .clone()
2301            }),
2302            item: qualified_name.item.clone(),
2303        };
2304        assert!(
2305            self.resolve_item_name(&res) == Ok(qualified_name)
2306                || self.resolve_function_name(&res) == Ok(qualified_name)
2307                || self.resolve_type_name(&res) == Ok(qualified_name)
2308        );
2309        res
2310    }
2311
2312    fn add_notice(&self, notice: PlanNotice) {
2313        let _ = self.notices_tx.send(notice.into());
2314    }
2315
2316    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2317        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2318        self.state.comments.get_object_comments(comment_id)
2319    }
2320
2321    fn is_cluster_size_cc(&self, size: &str) -> bool {
2322        self.state
2323            .cluster_replica_sizes
2324            .0
2325            .get(size)
2326            .map_or(false, |a| a.is_cc)
2327    }
2328}
2329
2330#[cfg(test)]
2331mod tests {
2332    use std::collections::{BTreeMap, BTreeSet};
2333    use std::sync::Arc;
2334    use std::{env, iter};
2335
2336    use itertools::Itertools;
2337    use mz_catalog::memory::objects::CatalogItem;
2338    use tokio_postgres::NoTls;
2339    use tokio_postgres::types::Type;
2340    use uuid::Uuid;
2341
2342    use mz_catalog::SYSTEM_CONN_ID;
2343    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2344    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2345    use mz_controller_types::{ClusterId, ReplicaId};
2346    use mz_expr::MirScalarExpr;
2347    use mz_ore::now::to_datetime;
2348    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2349    use mz_persist_client::PersistClient;
2350    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2351    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2352    use mz_repr::role_id::RoleId;
2353    use mz_repr::{
2354        CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2355        SqlScalarType, Timestamp,
2356    };
2357    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2358    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2359    use mz_sql::names::{
2360        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2361        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2362    };
2363    use mz_sql::plan::{
2364        CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2365        QueryLifetime, Scope, StatementContext,
2366    };
2367    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2368    use mz_sql::session::vars::{SystemVars, VarInput};
2369
2370    use crate::catalog::state::LocalExpressionCache;
2371    use crate::catalog::{Catalog, Op};
2372    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2373    use crate::session::Session;
2374
2375    /// System sessions have an empty `search_path` so it's necessary to
2376    /// schema-qualify all referenced items.
2377    ///
2378    /// Dummy (and ostensibly client) sessions contain system schemas in their
2379    /// search paths, so do not require schema qualification on system objects such
2380    /// as types.
2381    #[mz_ore::test(tokio::test)]
2382    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2383    async fn test_minimal_qualification() {
2384        Catalog::with_debug(|catalog| async move {
2385            struct TestCase {
2386                input: QualifiedItemName,
2387                system_output: PartialItemName,
2388                normal_output: PartialItemName,
2389            }
2390
2391            let test_cases = vec![
2392                TestCase {
2393                    input: QualifiedItemName {
2394                        qualifiers: ItemQualifiers {
2395                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2396                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2397                        },
2398                        item: "numeric".to_string(),
2399                    },
2400                    system_output: PartialItemName {
2401                        database: None,
2402                        schema: None,
2403                        item: "numeric".to_string(),
2404                    },
2405                    normal_output: PartialItemName {
2406                        database: None,
2407                        schema: None,
2408                        item: "numeric".to_string(),
2409                    },
2410                },
2411                TestCase {
2412                    input: QualifiedItemName {
2413                        qualifiers: ItemQualifiers {
2414                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2415                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2416                        },
2417                        item: "mz_array_types".to_string(),
2418                    },
2419                    system_output: PartialItemName {
2420                        database: None,
2421                        schema: None,
2422                        item: "mz_array_types".to_string(),
2423                    },
2424                    normal_output: PartialItemName {
2425                        database: None,
2426                        schema: None,
2427                        item: "mz_array_types".to_string(),
2428                    },
2429                },
2430            ];
2431
2432            for tc in test_cases {
2433                assert_eq!(
2434                    catalog
2435                        .for_system_session()
2436                        .minimal_qualification(&tc.input),
2437                    tc.system_output
2438                );
2439                assert_eq!(
2440                    catalog
2441                        .for_session(&Session::dummy())
2442                        .minimal_qualification(&tc.input),
2443                    tc.normal_output
2444                );
2445            }
2446            catalog.expire().await;
2447        })
2448        .await
2449    }
2450
2451    #[mz_ore::test(tokio::test)]
2452    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2453    async fn test_catalog_revision() {
2454        let persist_client = PersistClient::new_for_tests().await;
2455        let organization_id = Uuid::new_v4();
2456        let bootstrap_args = test_bootstrap_args();
2457        {
2458            let mut catalog = Catalog::open_debug_catalog(
2459                persist_client.clone(),
2460                organization_id.clone(),
2461                &bootstrap_args,
2462            )
2463            .await
2464            .expect("unable to open debug catalog");
2465            assert_eq!(catalog.transient_revision(), 1);
2466            let commit_ts = catalog.current_upper().await;
2467            catalog
2468                .transact(
2469                    None,
2470                    commit_ts,
2471                    None,
2472                    vec![Op::CreateDatabase {
2473                        name: "test".to_string(),
2474                        owner_id: MZ_SYSTEM_ROLE_ID,
2475                    }],
2476                )
2477                .await
2478                .expect("failed to transact");
2479            assert_eq!(catalog.transient_revision(), 2);
2480            catalog.expire().await;
2481        }
2482        {
2483            let catalog =
2484                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2485                    .await
2486                    .expect("unable to open debug catalog");
2487            // Re-opening the same catalog resets the transient_revision to 1.
2488            assert_eq!(catalog.transient_revision(), 1);
2489            catalog.expire().await;
2490        }
2491    }
2492
2493    #[mz_ore::test(tokio::test)]
2494    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2495    async fn test_effective_search_path() {
2496        Catalog::with_debug(|catalog| async move {
2497            let mz_catalog_schema = (
2498                ResolvedDatabaseSpecifier::Ambient,
2499                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2500            );
2501            let pg_catalog_schema = (
2502                ResolvedDatabaseSpecifier::Ambient,
2503                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2504            );
2505            let mz_temp_schema = (
2506                ResolvedDatabaseSpecifier::Ambient,
2507                SchemaSpecifier::Temporary,
2508            );
2509
2510            // Behavior with the default search_schema (public)
2511            let session = Session::dummy();
2512            let conn_catalog = catalog.for_session(&session);
2513            assert_ne!(
2514                conn_catalog.effective_search_path(false),
2515                conn_catalog.search_path
2516            );
2517            assert_ne!(
2518                conn_catalog.effective_search_path(true),
2519                conn_catalog.search_path
2520            );
2521            assert_eq!(
2522                conn_catalog.effective_search_path(false),
2523                vec![
2524                    mz_catalog_schema.clone(),
2525                    pg_catalog_schema.clone(),
2526                    conn_catalog.search_path[0].clone()
2527                ]
2528            );
2529            assert_eq!(
2530                conn_catalog.effective_search_path(true),
2531                vec![
2532                    mz_temp_schema.clone(),
2533                    mz_catalog_schema.clone(),
2534                    pg_catalog_schema.clone(),
2535                    conn_catalog.search_path[0].clone()
2536                ]
2537            );
2538
2539            // missing schemas are added when missing
2540            let mut session = Session::dummy();
2541            session
2542                .vars_mut()
2543                .set(
2544                    &SystemVars::new(),
2545                    "search_path",
2546                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2547                    false,
2548                )
2549                .expect("failed to set search_path");
2550            let conn_catalog = catalog.for_session(&session);
2551            assert_ne!(
2552                conn_catalog.effective_search_path(false),
2553                conn_catalog.search_path
2554            );
2555            assert_ne!(
2556                conn_catalog.effective_search_path(true),
2557                conn_catalog.search_path
2558            );
2559            assert_eq!(
2560                conn_catalog.effective_search_path(false),
2561                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2562            );
2563            assert_eq!(
2564                conn_catalog.effective_search_path(true),
2565                vec![
2566                    mz_temp_schema.clone(),
2567                    mz_catalog_schema.clone(),
2568                    pg_catalog_schema.clone()
2569                ]
2570            );
2571
2572            let mut session = Session::dummy();
2573            session
2574                .vars_mut()
2575                .set(
2576                    &SystemVars::new(),
2577                    "search_path",
2578                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2579                    false,
2580                )
2581                .expect("failed to set search_path");
2582            let conn_catalog = catalog.for_session(&session);
2583            assert_ne!(
2584                conn_catalog.effective_search_path(false),
2585                conn_catalog.search_path
2586            );
2587            assert_ne!(
2588                conn_catalog.effective_search_path(true),
2589                conn_catalog.search_path
2590            );
2591            assert_eq!(
2592                conn_catalog.effective_search_path(false),
2593                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2594            );
2595            assert_eq!(
2596                conn_catalog.effective_search_path(true),
2597                vec![
2598                    mz_temp_schema.clone(),
2599                    pg_catalog_schema.clone(),
2600                    mz_catalog_schema.clone()
2601                ]
2602            );
2603
2604            let mut session = Session::dummy();
2605            session
2606                .vars_mut()
2607                .set(
2608                    &SystemVars::new(),
2609                    "search_path",
2610                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2611                    false,
2612                )
2613                .expect("failed to set search_path");
2614            let conn_catalog = catalog.for_session(&session);
2615            assert_ne!(
2616                conn_catalog.effective_search_path(false),
2617                conn_catalog.search_path
2618            );
2619            assert_ne!(
2620                conn_catalog.effective_search_path(true),
2621                conn_catalog.search_path
2622            );
2623            assert_eq!(
2624                conn_catalog.effective_search_path(false),
2625                vec![
2626                    mz_catalog_schema.clone(),
2627                    pg_catalog_schema.clone(),
2628                    mz_temp_schema.clone()
2629                ]
2630            );
2631            assert_eq!(
2632                conn_catalog.effective_search_path(true),
2633                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2634            );
2635            catalog.expire().await;
2636        })
2637        .await
2638    }
2639
2640    #[mz_ore::test(tokio::test)]
2641    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2642    async fn test_normalized_create() {
2643        use mz_ore::collections::CollectionExt;
2644        Catalog::with_debug(|catalog| async move {
2645            let conn_catalog = catalog.for_system_session();
2646            let scx = &mut StatementContext::new(None, &conn_catalog);
2647
2648            let parsed = mz_sql_parser::parser::parse_statements(
2649                "create view public.foo as select 1 as bar",
2650            )
2651            .expect("")
2652            .into_element()
2653            .ast;
2654
2655            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2656
2657            // Ensure that all identifiers are quoted.
2658            assert_eq!(
2659                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2660                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2661            );
2662            catalog.expire().await;
2663        })
2664        .await;
2665    }
2666
2667    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2668    #[mz_ore::test(tokio::test)]
2669    #[cfg_attr(miri, ignore)] // slow
2670    async fn test_large_catalog_item() {
2671        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2672        let column = ", 1";
2673        let view_def_size = view_def.bytes().count();
2674        let column_size = column.bytes().count();
2675        let column_count =
2676            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2677        let columns = iter::repeat(column).take(column_count).join("");
2678        let create_sql = format!("{view_def}{columns})");
2679        let create_sql_check = create_sql.clone();
2680        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2681        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2682            &create_sql
2683        ));
2684
2685        let persist_client = PersistClient::new_for_tests().await;
2686        let organization_id = Uuid::new_v4();
2687        let id = CatalogItemId::User(1);
2688        let gid = GlobalId::User(1);
2689        let bootstrap_args = test_bootstrap_args();
2690        {
2691            let mut catalog = Catalog::open_debug_catalog(
2692                persist_client.clone(),
2693                organization_id.clone(),
2694                &bootstrap_args,
2695            )
2696            .await
2697            .expect("unable to open debug catalog");
2698            let item = catalog
2699                .state()
2700                .deserialize_item(
2701                    gid,
2702                    &create_sql,
2703                    &BTreeMap::new(),
2704                    &mut LocalExpressionCache::Closed,
2705                    None,
2706                )
2707                .expect("unable to parse view");
2708            let commit_ts = catalog.current_upper().await;
2709            catalog
2710                .transact(
2711                    None,
2712                    commit_ts,
2713                    None,
2714                    vec![Op::CreateItem {
2715                        item,
2716                        name: QualifiedItemName {
2717                            qualifiers: ItemQualifiers {
2718                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2719                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2720                            },
2721                            item: "v".to_string(),
2722                        },
2723                        id,
2724                        owner_id: MZ_SYSTEM_ROLE_ID,
2725                    }],
2726                )
2727                .await
2728                .expect("failed to transact");
2729            catalog.expire().await;
2730        }
2731        {
2732            let catalog =
2733                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2734                    .await
2735                    .expect("unable to open debug catalog");
2736            let view = catalog.get_entry(&id);
2737            assert_eq!("v", view.name.item);
2738            match &view.item {
2739                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2740                item => panic!("expected view, got {}", item.typ()),
2741            }
2742            catalog.expire().await;
2743        }
2744    }
2745
2746    #[mz_ore::test(tokio::test)]
2747    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2748    async fn test_object_type() {
2749        Catalog::with_debug(|catalog| async move {
2750            let conn_catalog = catalog.for_system_session();
2751
2752            assert_eq!(
2753                mz_sql::catalog::ObjectType::ClusterReplica,
2754                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2755                    ClusterId::user(1).expect("1 is a valid ID"),
2756                    ReplicaId::User(1)
2757                )))
2758            );
2759            assert_eq!(
2760                mz_sql::catalog::ObjectType::Role,
2761                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2762            );
2763            catalog.expire().await;
2764        })
2765        .await;
2766    }
2767
2768    #[mz_ore::test(tokio::test)]
2769    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2770    async fn test_get_privileges() {
2771        Catalog::with_debug(|catalog| async move {
2772            let conn_catalog = catalog.for_system_session();
2773
2774            assert_eq!(
2775                None,
2776                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2777                    ClusterId::user(1).expect("1 is a valid ID"),
2778                    ReplicaId::User(1),
2779                ))))
2780            );
2781            assert_eq!(
2782                None,
2783                conn_catalog
2784                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2785            );
2786            catalog.expire().await;
2787        })
2788        .await;
2789    }
2790
2791    #[mz_ore::test(tokio::test)]
2792    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2793    async fn verify_builtin_descs() {
2794        Catalog::with_debug(|catalog| async move {
2795            let conn_catalog = catalog.for_system_session();
2796
2797            let builtins_cfg = BuiltinsConfig {
2798                include_continual_tasks: true,
2799            };
2800            for builtin in BUILTINS::iter(&builtins_cfg) {
2801                let (schema, name, expected_desc) = match builtin {
2802                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2803                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2804                    Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2805                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2806                    Builtin::Log(_)
2807                    | Builtin::Type(_)
2808                    | Builtin::Func(_)
2809                    | Builtin::ContinualTask(_)
2810                    | Builtin::Index(_)
2811                    | Builtin::Connection(_) => continue,
2812                };
2813                let item = conn_catalog
2814                    .resolve_item(&PartialItemName {
2815                        database: None,
2816                        schema: Some(schema.to_string()),
2817                        item: name.to_string(),
2818                    })
2819                    .expect("unable to resolve item")
2820                    .at_version(RelationVersionSelector::Latest);
2821
2822                let actual_desc = item.relation_desc().expect("invalid item type");
2823                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2824                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2825                {
2826                    assert_eq!(
2827                        actual_name, expected_name,
2828                        "item {schema}.{name} column {index} name did not match its expected name"
2829                    );
2830                    assert_eq!(
2831                        actual_typ, expected_typ,
2832                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2833                    );
2834                }
2835                assert_eq!(
2836                    &*actual_desc, expected_desc,
2837                    "item {schema}.{name} did not match its expected RelationDesc"
2838                );
2839            }
2840            catalog.expire().await;
2841        })
2842        .await
2843    }
2844
2845    // Connect to a running Postgres server and verify that our builtin
2846    // types and functions match it, in addition to some other things.
2847    #[mz_ore::test(tokio::test)]
2848    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2849    async fn test_compare_builtins_postgres() {
2850        async fn inner(catalog: Catalog) {
2851            // Verify that all builtin functions:
2852            // - have a unique OID
2853            // - if they have a postgres counterpart (same oid) then they have matching name
2854            let (client, connection) = tokio_postgres::connect(
2855                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2856                NoTls,
2857            )
2858            .await
2859            .expect("failed to connect to Postgres");
2860
2861            task::spawn(|| "compare_builtin_postgres", async move {
2862                if let Err(e) = connection.await {
2863                    panic!("connection error: {}", e);
2864                }
2865            });
2866
2867            struct PgProc {
2868                name: String,
2869                arg_oids: Vec<u32>,
2870                ret_oid: Option<u32>,
2871                ret_set: bool,
2872            }
2873
2874            struct PgType {
2875                name: String,
2876                ty: String,
2877                elem: u32,
2878                array: u32,
2879                input: u32,
2880                receive: u32,
2881            }
2882
2883            struct PgOper {
2884                oprresult: u32,
2885                name: String,
2886            }
2887
2888            let pg_proc: BTreeMap<_, _> = client
2889                .query(
2890                    "SELECT
2891                    p.oid,
2892                    proname,
2893                    proargtypes,
2894                    prorettype,
2895                    proretset
2896                FROM pg_proc p
2897                JOIN pg_namespace n ON p.pronamespace = n.oid",
2898                    &[],
2899                )
2900                .await
2901                .expect("pg query failed")
2902                .into_iter()
2903                .map(|row| {
2904                    let oid: u32 = row.get("oid");
2905                    let pg_proc = PgProc {
2906                        name: row.get("proname"),
2907                        arg_oids: row.get("proargtypes"),
2908                        ret_oid: row.get("prorettype"),
2909                        ret_set: row.get("proretset"),
2910                    };
2911                    (oid, pg_proc)
2912                })
2913                .collect();
2914
2915            let pg_type: BTreeMap<_, _> = client
2916                .query(
2917                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2918                    &[],
2919                )
2920                .await
2921                .expect("pg query failed")
2922                .into_iter()
2923                .map(|row| {
2924                    let oid: u32 = row.get("oid");
2925                    let pg_type = PgType {
2926                        name: row.get("typname"),
2927                        ty: row.get("typtype"),
2928                        elem: row.get("typelem"),
2929                        array: row.get("typarray"),
2930                        input: row.get("typinput"),
2931                        receive: row.get("typreceive"),
2932                    };
2933                    (oid, pg_type)
2934                })
2935                .collect();
2936
2937            let pg_oper: BTreeMap<_, _> = client
2938                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2939                .await
2940                .expect("pg query failed")
2941                .into_iter()
2942                .map(|row| {
2943                    let oid: u32 = row.get("oid");
2944                    let pg_oper = PgOper {
2945                        name: row.get("oprname"),
2946                        oprresult: row.get("oprresult"),
2947                    };
2948                    (oid, pg_oper)
2949                })
2950                .collect();
2951
2952            let conn_catalog = catalog.for_system_session();
2953            let resolve_type_oid = |item: &str| {
2954                conn_catalog
2955                    .resolve_type(&PartialItemName {
2956                        database: None,
2957                        // All functions we check exist in PG, so the types must, as
2958                        // well
2959                        schema: Some(PG_CATALOG_SCHEMA.into()),
2960                        item: item.to_string(),
2961                    })
2962                    .expect("unable to resolve type")
2963                    .oid()
2964            };
2965
2966            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2967                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2968                .collect();
2969
2970            let mut all_oids = BTreeSet::new();
2971
2972            // A function to determine if two oids are equivalent enough for these tests. We don't
2973            // support some types, so map exceptions here.
2974            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2975                [
2976                    // We don't support NAME.
2977                    (Type::NAME, Type::TEXT),
2978                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2979                    // We don't support time with time zone.
2980                    (Type::TIME, Type::TIMETZ),
2981                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2982                ]
2983                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2984            );
2985            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2986                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2987            ]);
2988            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2989                if ignore_return_types.contains(&fn_oid) {
2990                    return true;
2991                }
2992                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2993                    return true;
2994                }
2995                a == b
2996            };
2997
2998            let builtins_cfg = BuiltinsConfig {
2999                include_continual_tasks: true,
3000            };
3001            for builtin in BUILTINS::iter(&builtins_cfg) {
3002                match builtin {
3003                    Builtin::Type(ty) => {
3004                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3005
3006                        if ty.oid >= FIRST_MATERIALIZE_OID {
3007                            // High OIDs are reserved in Materialize and don't have
3008                            // PostgreSQL counterparts.
3009                            continue;
3010                        }
3011
3012                        // For types that have a PostgreSQL counterpart, verify that
3013                        // the name and oid match.
3014                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3015                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3016                        });
3017                        assert_eq!(
3018                            ty.name, pg_ty.name,
3019                            "oid {} has name {} in postgres; expected {}",
3020                            ty.oid, pg_ty.name, ty.name,
3021                        );
3022
3023                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3024                            None => (0, 0),
3025                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3026                        };
3027                        assert_eq!(
3028                            typinput_oid, pg_ty.input,
3029                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3030                            ty.name, typinput_oid, pg_ty.input,
3031                        );
3032                        assert_eq!(
3033                            typreceive_oid, pg_ty.receive,
3034                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3035                            ty.name, typreceive_oid, pg_ty.receive,
3036                        );
3037                        if typinput_oid != 0 {
3038                            assert!(
3039                                func_oids.contains(&typinput_oid),
3040                                "type {} has typinput OID {} that does not exist in pg_proc",
3041                                ty.name,
3042                                typinput_oid,
3043                            );
3044                        }
3045                        if typreceive_oid != 0 {
3046                            assert!(
3047                                func_oids.contains(&typreceive_oid),
3048                                "type {} has typreceive OID {} that does not exist in pg_proc",
3049                                ty.name,
3050                                typreceive_oid,
3051                            );
3052                        }
3053
3054                        // Ensure the type matches.
3055                        match &ty.details.typ {
3056                            CatalogType::Array { element_reference } => {
3057                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3058                                    .filter_map(|builtin| match builtin {
3059                                        Builtin::Type(ty @ BuiltinType { name, .. })
3060                                            if element_reference == name =>
3061                                        {
3062                                            Some(ty)
3063                                        }
3064                                        _ => None,
3065                                    })
3066                                    .next();
3067                                let elem_ty = match elem_ty {
3068                                    Some(ty) => ty,
3069                                    None => {
3070                                        panic!("{} is unexpectedly not a type", element_reference)
3071                                    }
3072                                };
3073                                assert_eq!(
3074                                    pg_ty.elem, elem_ty.oid,
3075                                    "type {} has mismatched element OIDs",
3076                                    ty.name
3077                                )
3078                            }
3079                            CatalogType::Pseudo => {
3080                                assert_eq!(
3081                                    pg_ty.ty, "p",
3082                                    "type {} is not a pseudo type as expected",
3083                                    ty.name
3084                                )
3085                            }
3086                            CatalogType::Range { .. } => {
3087                                assert_eq!(
3088                                    pg_ty.ty, "r",
3089                                    "type {} is not a range type as expected",
3090                                    ty.name
3091                                );
3092                            }
3093                            _ => {
3094                                assert_eq!(
3095                                    pg_ty.ty, "b",
3096                                    "type {} is not a base type as expected",
3097                                    ty.name
3098                                )
3099                            }
3100                        }
3101
3102                        // Ensure the array type reference is correct.
3103                        let schema = catalog
3104                            .resolve_schema_in_database(
3105                                &ResolvedDatabaseSpecifier::Ambient,
3106                                ty.schema,
3107                                &SYSTEM_CONN_ID,
3108                            )
3109                            .expect("unable to resolve schema");
3110                        let allocated_type = catalog
3111                            .resolve_type(
3112                                None,
3113                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3114                                &PartialItemName {
3115                                    database: None,
3116                                    schema: Some(schema.name().schema.clone()),
3117                                    item: ty.name.to_string(),
3118                                },
3119                                &SYSTEM_CONN_ID,
3120                            )
3121                            .expect("unable to resolve type");
3122                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3123                            ty
3124                        } else {
3125                            panic!("unexpectedly not a type")
3126                        };
3127                        match ty.details.array_id {
3128                            Some(array_id) => {
3129                                let array_ty = catalog.get_entry(&array_id);
3130                                assert_eq!(
3131                                    pg_ty.array, array_ty.oid,
3132                                    "type {} has mismatched array OIDs",
3133                                    allocated_type.name.item,
3134                                );
3135                            }
3136                            None => assert_eq!(
3137                                pg_ty.array, 0,
3138                                "type {} does not have an array type in mz but does in pg",
3139                                allocated_type.name.item,
3140                            ),
3141                        }
3142                    }
3143                    Builtin::Func(func) => {
3144                        for imp in func.inner.func_impls() {
3145                            assert!(
3146                                all_oids.insert(imp.oid),
3147                                "{} reused oid {}",
3148                                func.name,
3149                                imp.oid
3150                            );
3151
3152                            assert!(
3153                                imp.oid < FIRST_USER_OID,
3154                                "built-in function {} erroneously has OID in user space ({})",
3155                                func.name,
3156                                imp.oid,
3157                            );
3158
3159                            // For functions that have a postgres counterpart, verify that the name and
3160                            // oid match.
3161                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3162                                continue;
3163                            } else {
3164                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3165                                    panic!(
3166                                        "pg_proc missing function {}: oid {}",
3167                                        func.name, imp.oid
3168                                    )
3169                                })
3170                            };
3171                            assert_eq!(
3172                                func.name, pg_fn.name,
3173                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3174                                imp.oid, func.name, pg_fn.name
3175                            );
3176
3177                            // Complain, but don't fail, if argument oids don't match.
3178                            // TODO: make these match.
3179                            let imp_arg_oids = imp
3180                                .arg_typs
3181                                .iter()
3182                                .map(|item| resolve_type_oid(item))
3183                                .collect::<Vec<_>>();
3184
3185                            if imp_arg_oids != pg_fn.arg_oids {
3186                                println!(
3187                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3188                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3189                                );
3190                            }
3191
3192                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3193
3194                            assert!(
3195                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3196                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3197                                imp.oid,
3198                                func.name,
3199                                imp_return_oid,
3200                                pg_fn.ret_oid
3201                            );
3202
3203                            assert_eq!(
3204                                imp.return_is_set, pg_fn.ret_set,
3205                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3206                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3207                            );
3208                        }
3209                    }
3210                    _ => (),
3211                }
3212            }
3213
3214            for (op, func) in OP_IMPLS.iter() {
3215                for imp in func.func_impls() {
3216                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3217
3218                    // For operators that have a postgres counterpart, verify that the name and oid match.
3219                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3220                        continue;
3221                    } else {
3222                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3223                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3224                        })
3225                    };
3226
3227                    assert_eq!(*op, pg_op.name);
3228
3229                    let imp_return_oid =
3230                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3231                    if imp_return_oid != pg_op.oprresult {
3232                        panic!(
3233                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3234                            imp.oid, op, imp_return_oid, pg_op.oprresult
3235                        );
3236                    }
3237                }
3238            }
3239            catalog.expire().await;
3240        }
3241
3242        Catalog::with_debug(inner).await
3243    }
3244
3245    // Execute all builtin functions with all combinations of arguments from interesting datums.
3246    #[mz_ore::test(tokio::test)]
3247    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3248    async fn test_smoketest_all_builtins() {
3249        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3250            let catalog = Arc::new(catalog);
3251            let conn_catalog = catalog.for_system_session();
3252
3253            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3254            let mut handles = Vec::new();
3255
3256            // Extracted during planning; always panics when executed.
3257            let ignore_names = BTreeSet::from([
3258                "avg",
3259                "avg_internal_v1",
3260                "bool_and",
3261                "bool_or",
3262                "has_table_privilege", // > 3 s each
3263                "has_type_privilege",  // > 3 s each
3264                "mod",
3265                "mz_panic",
3266                "mz_sleep",
3267                "pow",
3268                "stddev_pop",
3269                "stddev_samp",
3270                "stddev",
3271                "var_pop",
3272                "var_samp",
3273                "variance",
3274            ]);
3275
3276            let fns = BUILTINS::funcs()
3277                .map(|func| (&func.name, func.inner))
3278                .chain(OP_IMPLS.iter());
3279
3280            for (name, func) in fns {
3281                if ignore_names.contains(name) {
3282                    continue;
3283                }
3284                let Func::Scalar(impls) = func else {
3285                    continue;
3286                };
3287
3288                'outer: for imp in impls {
3289                    let details = imp.details();
3290                    let mut styps = Vec::new();
3291                    for item in details.arg_typs.iter() {
3292                        let oid = resolve_type_oid(item);
3293                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3294                            continue 'outer;
3295                        };
3296                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3297                    }
3298                    let datums = styps
3299                        .iter()
3300                        .map(|styp| {
3301                            let mut datums = vec![Datum::Null];
3302                            datums.extend(styp.interesting_datums());
3303                            datums
3304                        })
3305                        .collect::<Vec<_>>();
3306                    // Skip nullary fns.
3307                    if datums.is_empty() {
3308                        continue;
3309                    }
3310
3311                    let return_oid = details
3312                        .return_typ
3313                        .map(resolve_type_oid)
3314                        .expect("must exist");
3315                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3316                        .ok()
3317                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3318
3319                    let mut idxs = vec![0; datums.len()];
3320                    while idxs[0] < datums[0].len() {
3321                        let mut args = Vec::with_capacity(idxs.len());
3322                        for i in 0..(datums.len()) {
3323                            args.push(datums[i][idxs[i]]);
3324                        }
3325
3326                        let op = &imp.op;
3327                        let scalars = args
3328                            .iter()
3329                            .enumerate()
3330                            .map(|(i, datum)| {
3331                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3332                                    datum.clone(),
3333                                    styps[i].clone(),
3334                                ))
3335                            })
3336                            .collect();
3337
3338                        let call_name = format!(
3339                            "{name}({}) (oid: {})",
3340                            args.iter()
3341                                .map(|d| d.to_string())
3342                                .collect::<Vec<_>>()
3343                                .join(", "),
3344                            imp.oid
3345                        );
3346                        let catalog = Arc::clone(&catalog);
3347                        let call_name_fn = call_name.clone();
3348                        let return_styp = return_styp.clone();
3349                        let handle = task::spawn_blocking(
3350                            || call_name,
3351                            move || {
3352                                smoketest_fn(
3353                                    name,
3354                                    call_name_fn,
3355                                    op,
3356                                    imp,
3357                                    args,
3358                                    catalog,
3359                                    scalars,
3360                                    return_styp,
3361                                )
3362                            },
3363                        );
3364                        handles.push(handle);
3365
3366                        // Advance to the next datum combination.
3367                        for i in (0..datums.len()).rev() {
3368                            idxs[i] += 1;
3369                            if idxs[i] >= datums[i].len() {
3370                                if i == 0 {
3371                                    break;
3372                                }
3373                                idxs[i] = 0;
3374                                continue;
3375                            } else {
3376                                break;
3377                            }
3378                        }
3379                    }
3380                }
3381            }
3382            handles
3383        }
3384
3385        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3386        for handle in handles {
3387            handle.await;
3388        }
3389    }
3390
3391    fn smoketest_fn(
3392        name: &&str,
3393        call_name: String,
3394        op: &Operation<HirScalarExpr>,
3395        imp: &FuncImpl<HirScalarExpr>,
3396        args: Vec<Datum<'_>>,
3397        catalog: Arc<Catalog>,
3398        scalars: Vec<CoercibleScalarExpr>,
3399        return_styp: Option<SqlScalarType>,
3400    ) {
3401        let conn_catalog = catalog.for_system_session();
3402        let pcx = PlanContext::zero();
3403        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3404        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3405        let ecx = ExprContext {
3406            qcx: &qcx,
3407            name: "smoketest",
3408            scope: &Scope::empty(),
3409            relation_type: &SqlRelationType::empty(),
3410            allow_aggregates: false,
3411            allow_subqueries: false,
3412            allow_parameters: false,
3413            allow_windows: false,
3414        };
3415        let arena = RowArena::new();
3416        let mut session = Session::<Timestamp>::dummy();
3417        session
3418            .start_transaction(to_datetime(0), None, None)
3419            .expect("must succeed");
3420        let prep_style = ExprPrepOneShot {
3421            logical_time: EvalTime::Time(Timestamp::MIN),
3422            session: &session,
3423            catalog_state: &catalog.state,
3424        };
3425
3426        // Execute the function as much as possible, ensuring no panics occur, but
3427        // otherwise ignoring eval errors. We also do various other checks.
3428        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3429        if let Ok(hir) = res {
3430            let uneliminated_result_row = {
3431                if let HirScalarExpr::CallUnary { func, .. } = &hir
3432                    && func.is_eliminable_cast()
3433                {
3434                    let mut uneliminated_mir = hir
3435                        .clone()
3436                        .lower_uncorrelated(HirToMirConfig {
3437                            enable_cast_elimination: false,
3438                            ..catalog.system_config().into()
3439                        })
3440                        .expect("lowering eliminable cast should always succeed");
3441                    prep_style
3442                        .prep_scalar_expr(&mut uneliminated_mir)
3443                        .expect("must succeed");
3444
3445                    // Pack the row, to avoid lifetime issues with the MIR we lowered here
3446                    uneliminated_mir
3447                        .eval(&[], &arena)
3448                        .ok()
3449                        .map(|datum| Row::pack([datum]))
3450                } else {
3451                    None
3452                }
3453            };
3454
3455            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3456                // Populate unmaterialized functions.
3457                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3458
3459                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3460                    if let Some(return_styp) = return_styp {
3461                        let mir_typ = mir.typ(&[]);
3462                        // MIR type inference should be consistent with the type
3463                        // we get from the catalog.
3464                        soft_assert_eq_or_log!(
3465                            mir_typ.scalar_type,
3466                            (&return_styp).into(),
3467                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3468                        );
3469                        // The following will check not just that the scalar type
3470                        // is ok, but also catches if the function returned a null
3471                        // but the MIR type inference said "non-nullable".
3472                        if !eval_result_datum.is_instance_of(&mir_typ) {
3473                            panic!(
3474                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3475                            );
3476                        }
3477                        // Check the consistency of `is_eliminable_cast`---we should get the same datum either way.
3478                        if let Some(row) = uneliminated_result_row {
3479                            let uneliminated_result_datum = row.unpack_first();
3480                            assert_eq!(
3481                                uneliminated_result_datum, eval_result_datum,
3482                                "datums should not change if cast is eliminable"
3483                            );
3484                        }
3485                        // Check the consistency of `introduces_nulls` and
3486                        // `propagates_nulls` with `MirScalarExpr::typ`.
3487                        if let Some((introduces_nulls, propagates_nulls)) =
3488                            call_introduces_propagates_nulls(&mir)
3489                        {
3490                            if introduces_nulls {
3491                                // If the function introduces_nulls, then the return
3492                                // type should always be nullable, regardless of
3493                                // the nullability of the input types.
3494                                assert!(
3495                                    mir_typ.nullable,
3496                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3497                                    name, args, mir, mir_typ.nullable
3498                                );
3499                            } else {
3500                                let any_input_null = args.iter().any(|arg| arg.is_null());
3501                                if !any_input_null {
3502                                    assert!(
3503                                        !mir_typ.nullable,
3504                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3505                                        name, args, mir, mir_typ.nullable
3506                                    );
3507                                } else if propagates_nulls {
3508                                    // propagates_nulls means the optimizer short-circuits
3509                                    // all-null inputs, so the output must be nullable.
3510                                    assert!(
3511                                        mir_typ.nullable,
3512                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3513                                        name, args, mir, mir_typ.nullable
3514                                    );
3515                                }
3516                                // When propagates_nulls is false, the output may still
3517                                // be nullable if a non-nullable parameter received a null
3518                                // input (per-position null rejection). The is_instance_of
3519                                // check above ensures type consistency.
3520                            }
3521                        }
3522                        // Check that `MirScalarExpr::reduce` yields the same result
3523                        // as the real evaluation.
3524                        let mut reduced = mir.clone();
3525                        reduced.reduce(&[]);
3526                        match reduced {
3527                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3528                                match reduce_result {
3529                                    Ok(reduce_result_row) => {
3530                                        let reduce_result_datum = reduce_result_row.unpack_first();
3531                                        assert_eq!(
3532                                            reduce_result_datum,
3533                                            eval_result_datum,
3534                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3535                                            name,
3536                                            args,
3537                                            mir,
3538                                            eval_result_datum,
3539                                            mir_typ.scalar_type,
3540                                            reduce_result_datum,
3541                                            ctyp.scalar_type
3542                                        );
3543                                        // Let's check that the types also match.
3544                                        // (We are not checking nullability here,
3545                                        // because it's ok when we know a more
3546                                        // precise nullability after actually
3547                                        // evaluating a function than before.)
3548                                        assert_eq!(
3549                                            ctyp.scalar_type,
3550                                            mir_typ.scalar_type,
3551                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3552                                            name,
3553                                            args,
3554                                            mir,
3555                                            eval_result_datum,
3556                                            mir_typ.scalar_type,
3557                                            reduce_result_datum,
3558                                            ctyp.scalar_type
3559                                        );
3560                                    }
3561                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3562                                }
3563                            }
3564                            _ => unreachable!(
3565                                "all args are literals, so should have reduced to a literal"
3566                            ),
3567                        }
3568                    }
3569                }
3570            }
3571        }
3572    }
3573
3574    /// If the given MirScalarExpr
3575    ///  - is a function call, and
3576    ///  - all arguments are literals
3577    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3578    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3579        match mir_func_call {
3580            MirScalarExpr::CallUnary { func, expr } => {
3581                if expr.is_literal() {
3582                    Some((func.introduces_nulls(), func.propagates_nulls()))
3583                } else {
3584                    None
3585                }
3586            }
3587            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3588                if expr1.is_literal() && expr2.is_literal() {
3589                    Some((func.introduces_nulls(), func.propagates_nulls()))
3590                } else {
3591                    None
3592                }
3593            }
3594            MirScalarExpr::CallVariadic { func, exprs } => {
3595                if exprs.iter().all(|arg| arg.is_literal()) {
3596                    Some((func.introduces_nulls(), func.propagates_nulls()))
3597                } else {
3598                    None
3599                }
3600            }
3601            _ => None,
3602        }
3603    }
3604
3605    // Make sure pg views don't use types that only exist in Materialize.
3606    #[mz_ore::test(tokio::test)]
3607    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3608    async fn test_pg_views_forbidden_types() {
3609        Catalog::with_debug(|catalog| async move {
3610            let conn_catalog = catalog.for_system_session();
3611
3612            for view in BUILTINS::views().filter(|view| {
3613                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3614            }) {
3615                let item = conn_catalog
3616                    .resolve_item(&PartialItemName {
3617                        database: None,
3618                        schema: Some(view.schema.to_string()),
3619                        item: view.name.to_string(),
3620                    })
3621                    .expect("unable to resolve view")
3622                    // TODO(alter_table)
3623                    .at_version(RelationVersionSelector::Latest);
3624                let full_name = conn_catalog.resolve_full_name(item.name());
3625                let desc = item.relation_desc().expect("invalid item type");
3626                for col_type in desc.iter_types() {
3627                    match &col_type.scalar_type {
3628                        typ @ SqlScalarType::UInt16
3629                        | typ @ SqlScalarType::UInt32
3630                        | typ @ SqlScalarType::UInt64
3631                        | typ @ SqlScalarType::MzTimestamp
3632                        | typ @ SqlScalarType::List { .. }
3633                        | typ @ SqlScalarType::Map { .. }
3634                        | typ @ SqlScalarType::MzAclItem => {
3635                            panic!("{typ:?} type found in {full_name}");
3636                        }
3637                        SqlScalarType::AclItem
3638                        | SqlScalarType::Bool
3639                        | SqlScalarType::Int16
3640                        | SqlScalarType::Int32
3641                        | SqlScalarType::Int64
3642                        | SqlScalarType::Float32
3643                        | SqlScalarType::Float64
3644                        | SqlScalarType::Numeric { .. }
3645                        | SqlScalarType::Date
3646                        | SqlScalarType::Time
3647                        | SqlScalarType::Timestamp { .. }
3648                        | SqlScalarType::TimestampTz { .. }
3649                        | SqlScalarType::Interval
3650                        | SqlScalarType::PgLegacyChar
3651                        | SqlScalarType::Bytes
3652                        | SqlScalarType::String
3653                        | SqlScalarType::Char { .. }
3654                        | SqlScalarType::VarChar { .. }
3655                        | SqlScalarType::Jsonb
3656                        | SqlScalarType::Uuid
3657                        | SqlScalarType::Array(_)
3658                        | SqlScalarType::Record { .. }
3659                        | SqlScalarType::Oid
3660                        | SqlScalarType::RegProc
3661                        | SqlScalarType::RegType
3662                        | SqlScalarType::RegClass
3663                        | SqlScalarType::Int2Vector
3664                        | SqlScalarType::Range { .. }
3665                        | SqlScalarType::PgLegacyName => {}
3666                    }
3667                }
3668            }
3669            catalog.expire().await;
3670        })
3671        .await
3672    }
3673
3674    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3675    // introspection relations.
3676    #[mz_ore::test(tokio::test)]
3677    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3678    async fn test_mz_introspection_builtins() {
3679        Catalog::with_debug(|catalog| async move {
3680            let conn_catalog = catalog.for_system_session();
3681
3682            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3683            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3684
3685            for entry in catalog.entries() {
3686                let schema_spec = entry.name().qualifiers.schema_spec;
3687                let introspection_deps = catalog.introspection_dependencies(entry.id);
3688                if introspection_deps.is_empty() {
3689                    assert!(
3690                        schema_spec != introspection_schema_spec,
3691                        "entry does not depend on introspection sources but is in \
3692                         `mz_introspection`: {}",
3693                        conn_catalog.resolve_full_name(entry.name()),
3694                    );
3695                } else {
3696                    assert!(
3697                        schema_spec == introspection_schema_spec,
3698                        "entry depends on introspection sources but is not in \
3699                         `mz_introspection`: {}",
3700                        conn_catalog.resolve_full_name(entry.name()),
3701                    );
3702                }
3703            }
3704        })
3705        .await
3706    }
3707
3708    #[mz_ore::test(tokio::test)]
3709    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3710    async fn test_multi_subscriber_catalog() {
3711        let persist_client = PersistClient::new_for_tests().await;
3712        let bootstrap_args = test_bootstrap_args();
3713        let organization_id = Uuid::new_v4();
3714        let db_name = "DB";
3715
3716        let mut writer_catalog = Catalog::open_debug_catalog(
3717            persist_client.clone(),
3718            organization_id.clone(),
3719            &bootstrap_args,
3720        )
3721        .await
3722        .expect("open_debug_catalog");
3723        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3724            persist_client.clone(),
3725            organization_id.clone(),
3726            &bootstrap_args,
3727        )
3728        .await
3729        .expect("open_debug_read_only_catalog");
3730        assert_err!(writer_catalog.resolve_database(db_name));
3731        assert_err!(read_only_catalog.resolve_database(db_name));
3732
3733        let commit_ts = writer_catalog.current_upper().await;
3734        writer_catalog
3735            .transact(
3736                None,
3737                commit_ts,
3738                None,
3739                vec![Op::CreateDatabase {
3740                    name: db_name.to_string(),
3741                    owner_id: MZ_SYSTEM_ROLE_ID,
3742                }],
3743            )
3744            .await
3745            .expect("failed to transact");
3746
3747        let write_db = writer_catalog
3748            .resolve_database(db_name)
3749            .expect("resolve_database");
3750        read_only_catalog
3751            .sync_to_current_updates()
3752            .await
3753            .expect("sync_to_current_updates");
3754        let read_db = read_only_catalog
3755            .resolve_database(db_name)
3756            .expect("resolve_database");
3757
3758        assert_eq!(write_db, read_db);
3759
3760        let writer_catalog_fencer =
3761            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3762                .await
3763                .expect("open_debug_catalog for fencer");
3764        let fencer_db = writer_catalog_fencer
3765            .resolve_database(db_name)
3766            .expect("resolve_database for fencer");
3767        assert_eq!(fencer_db, read_db);
3768
3769        let write_fence_err = writer_catalog
3770            .sync_to_current_updates()
3771            .await
3772            .expect_err("sync_to_current_updates for fencer");
3773        assert!(matches!(
3774            write_fence_err,
3775            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3776        ));
3777        let read_fence_err = read_only_catalog
3778            .sync_to_current_updates()
3779            .await
3780            .expect_err("sync_to_current_updates after fencer");
3781        assert!(matches!(
3782            read_fence_err,
3783            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3784        ));
3785
3786        writer_catalog.expire().await;
3787        read_only_catalog.expire().await;
3788        writer_catalog_fencer.expire().await;
3789    }
3790}