mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67    CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68    DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91// DO NOT add any more imports from `crate` outside of `crate::catalog`.
92pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100#[cfg(test)]
101use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
102use crate::session::{Portal, PreparedStatement, Session};
103use crate::util::ResultExt;
104use crate::{AdapterError, AdapterNotice, ExecuteResponse};
105
106mod builtin_table_updates;
107pub(crate) mod consistency;
108mod migrate;
109
110mod apply;
111mod open;
112mod state;
113mod timeline;
114mod transact;
115
116/// A `Catalog` keeps track of the SQL objects known to the planner.
117///
118/// For each object, it keeps track of both forward and reverse dependencies:
119/// i.e., which objects are depended upon by the object, and which objects
120/// depend upon the object. It enforces the SQL rules around dropping: an object
121/// cannot be dropped until all of the objects that depend upon it are dropped.
122/// It also enforces uniqueness of names.
123///
124/// SQL mandates a hierarchy of exactly three layers. A catalog contains
125/// databases, databases contain schemas, and schemas contain catalog items,
126/// like sources, sinks, view, and indexes.
127///
128/// To the outside world, databases, schemas, and items are all identified by
129/// name. Items can be referred to by their [`FullItemName`], which fully and
130/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
131/// database name and/or the schema name. Partial names can be converted into
132/// full names via a complicated resolution process documented by the
133/// [`CatalogState::resolve`] method.
134///
135/// The catalog also maintains special "ambient schemas": virtual schemas,
136/// implicitly present in all databases, that house various system views.
137/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
138#[derive(Debug)]
139pub struct Catalog {
140    state: CatalogState,
141    plans: CatalogPlans,
142    expr_cache_handle: Option<ExpressionCacheHandle>,
143    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
144    transient_revision: u64,
145}
146
147// Implement our own Clone because derive can't unless S is Clone, which it's
148// not (hence the Arc).
149impl Clone for Catalog {
150    fn clone(&self) -> Self {
151        Self {
152            state: self.state.clone(),
153            plans: self.plans.clone(),
154            expr_cache_handle: self.expr_cache_handle.clone(),
155            storage: Arc::clone(&self.storage),
156            transient_revision: self.transient_revision,
157        }
158    }
159}
160
161#[derive(Default, Debug, Clone)]
162pub struct CatalogPlans {
163    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
164    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
165    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
166    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
167}
168
169impl Catalog {
170    /// Set the optimized plan for the item identified by `id`.
171    #[mz_ore::instrument(level = "trace")]
172    pub fn set_optimized_plan(
173        &mut self,
174        id: GlobalId,
175        plan: DataflowDescription<OptimizedMirRelationExpr>,
176    ) {
177        self.plans.optimized_plan_by_id.insert(id, plan.into());
178    }
179
180    /// Set the physical plan for the item identified by `id`.
181    #[mz_ore::instrument(level = "trace")]
182    pub fn set_physical_plan(
183        &mut self,
184        id: GlobalId,
185        plan: DataflowDescription<mz_compute_types::plan::Plan>,
186    ) {
187        self.plans.physical_plan_by_id.insert(id, plan.into());
188    }
189
190    /// Try to get the optimized plan for the item identified by `id`.
191    #[mz_ore::instrument(level = "trace")]
192    pub fn try_get_optimized_plan(
193        &self,
194        id: &GlobalId,
195    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
196        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
197    }
198
199    /// Try to get the physical plan for the item identified by `id`.
200    #[mz_ore::instrument(level = "trace")]
201    pub fn try_get_physical_plan(
202        &self,
203        id: &GlobalId,
204    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
205        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
206    }
207
208    /// Set the `DataflowMetainfo` for the item identified by `id`.
209    #[mz_ore::instrument(level = "trace")]
210    pub fn set_dataflow_metainfo(
211        &mut self,
212        id: GlobalId,
213        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
214    ) {
215        // Add entries to the `notices_by_dep_id` lookup map.
216        for notice in metainfo.optimizer_notices.iter() {
217            for dep_id in notice.dependencies.iter() {
218                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
219                entry.push(Arc::clone(notice))
220            }
221            if let Some(item_id) = notice.item_id {
222                soft_assert_eq_or_log!(
223                    item_id,
224                    id,
225                    "notice.item_id should match the id for whom we are saving the notice"
226                );
227            }
228        }
229        // Add the dataflow with the scoped entries.
230        self.plans.dataflow_metainfos.insert(id, metainfo);
231    }
232
233    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
234    #[mz_ore::instrument(level = "trace")]
235    pub fn try_get_dataflow_metainfo(
236        &self,
237        id: &GlobalId,
238    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
239        self.plans.dataflow_metainfos.get(id)
240    }
241
242    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
243    /// item identified by `id`.
244    ///
245    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
246    ///
247    /// Return a set containing all dropped notices. Note that if for some
248    /// reason we end up with two identical notices being dropped by the same
249    /// call, the result will contain only one instance of that notice.
250    #[mz_ore::instrument(level = "trace")]
251    pub fn drop_plans_and_metainfos(
252        &mut self,
253        drop_ids: &BTreeSet<GlobalId>,
254    ) -> BTreeSet<Arc<OptimizerNotice>> {
255        // Collect dropped notices in this set.
256        let mut dropped_notices = BTreeSet::new();
257
258        // Remove plans and metainfo.optimizer_notices entries.
259        for id in drop_ids {
260            self.plans.optimized_plan_by_id.remove(id);
261            self.plans.physical_plan_by_id.remove(id);
262            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
263                soft_assert_or_log!(
264                    metainfo.optimizer_notices.iter().all_unique(),
265                    "should have been pushed there by `push_optimizer_notice_dedup`"
266                );
267                for n in metainfo.optimizer_notices.drain(..) {
268                    // Remove the corresponding notices_by_dep_id entries.
269                    for dep_id in n.dependencies.iter() {
270                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
271                            soft_assert_or_log!(
272                                notices.iter().any(|x| &n == x),
273                                "corrupt notices_by_dep_id"
274                            );
275                            notices.retain(|x| &n != x)
276                        }
277                    }
278                    dropped_notices.insert(n);
279                }
280            }
281        }
282
283        // Remove notices_by_dep_id entries.
284        for id in drop_ids {
285            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
286                for n in notices.drain(..) {
287                    // Remove the corresponding metainfo.optimizer_notices entries.
288                    if let Some(item_id) = n.item_id.as_ref() {
289                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
290                            metainfo.optimizer_notices.iter().for_each(|n2| {
291                                if let Some(item_id_2) = n2.item_id {
292                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
293                                }
294                            });
295                            metainfo.optimizer_notices.retain(|x| &n != x);
296                        }
297                    }
298                    dropped_notices.insert(n);
299                }
300            }
301        }
302
303        // Collect dependency ids not in drop_ids with at least one dropped
304        // notice.
305        let mut todo_dep_ids = BTreeSet::new();
306        for notice in dropped_notices.iter() {
307            for dep_id in notice.dependencies.iter() {
308                if !drop_ids.contains(dep_id) {
309                    todo_dep_ids.insert(*dep_id);
310                }
311            }
312        }
313        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
314        // entries in `todo_dep_ids`.
315        for id in todo_dep_ids {
316            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
317                notices.retain(|n| !dropped_notices.contains(n))
318            }
319        }
320
321        if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
322            use mz_ore::str::{bracketed, separated};
323            let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
324            let bad_notices = bad_notices.map(|n| {
325                // Try to find where the bad reference is.
326                // Maybe in `dataflow_metainfos`?
327                let mut dataflow_metainfo_occurrences = Vec::new();
328                for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
329                    if meta_info.optimizer_notices.contains(n) {
330                        dataflow_metainfo_occurrences.push(id);
331                    }
332                }
333                // Or `notices_by_dep_id`?
334                let mut notices_by_dep_id_occurrences = Vec::new();
335                for (id, notices) in self.plans.notices_by_dep_id.iter() {
336                    if notices.iter().contains(n) {
337                        notices_by_dep_id_occurrences.push(id);
338                    }
339                }
340                format!(
341                    "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
342                    dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
343                    n.id,
344                    n.kind,
345                    n.dependencies,
346                    Arc::strong_count(n),
347                    dataflow_metainfo_occurrences,
348                    notices_by_dep_id_occurrences
349                )
350            });
351            let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
352            error!(
353                "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
354                 bad_notices = {bad_notices}; \
355                 drop_ids = {drop_ids:?}"
356            );
357        }
358
359        dropped_notices
360    }
361
362    /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
363    /// as a result of creating new indexes on the items in `ons`.
364    ///
365    /// When creating and inserting a new index, we need to invalidate some entries that may
366    /// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
367    /// the following objects:
368    ///
369    ///   - `o`.
370    ///   - All compute objects that depend directly on `o`.
371    ///   - All compute objects that would directly depend on `o`, if all views were inlined.
372    pub(crate) fn invalidate_for_index(
373        &self,
374        ons: impl Iterator<Item = GlobalId>,
375    ) -> BTreeSet<GlobalId> {
376        let mut dependencies = BTreeSet::new();
377        let mut queue = VecDeque::new();
378        let mut seen = HashSet::new();
379        for on in ons {
380            let entry = self.get_entry_by_global_id(&on);
381            dependencies.insert(on);
382            seen.insert(entry.id);
383            let uses = entry.uses();
384            queue.extend(uses.clone());
385        }
386
387        while let Some(cur) = queue.pop_front() {
388            let entry = self.get_entry(&cur);
389            if seen.insert(cur) {
390                let global_ids = entry.global_ids();
391                match entry.item_type() {
392                    CatalogItemType::Table
393                    | CatalogItemType::Source
394                    | CatalogItemType::MaterializedView
395                    | CatalogItemType::Sink
396                    | CatalogItemType::Index
397                    | CatalogItemType::Type
398                    | CatalogItemType::Func
399                    | CatalogItemType::Secret
400                    | CatalogItemType::Connection
401                    | CatalogItemType::ContinualTask => {
402                        dependencies.extend(global_ids);
403                    }
404                    CatalogItemType::View => {
405                        dependencies.extend(global_ids);
406                        queue.extend(entry.uses());
407                    }
408                }
409            }
410        }
411        dependencies
412    }
413}
414
415#[derive(Debug)]
416pub struct ConnCatalog<'a> {
417    state: Cow<'a, CatalogState>,
418    /// Because we don't have any way of removing items from the catalog
419    /// temporarily, we allow the ConnCatalog to pretend that a set of items
420    /// don't exist during resolution.
421    ///
422    /// This feature is necessary to allow re-planning of statements, which is
423    /// either incredibly useful or required when altering item definitions.
424    ///
425    /// Note that uses of this field should be used by short-lived
426    /// catalogs.
427    unresolvable_ids: BTreeSet<CatalogItemId>,
428    conn_id: ConnectionId,
429    cluster: String,
430    database: Option<DatabaseId>,
431    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
432    role_id: RoleId,
433    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
434    portals: Option<&'a BTreeMap<String, Portal>>,
435    notices_tx: UnboundedSender<AdapterNotice>,
436}
437
438impl ConnCatalog<'_> {
439    pub fn conn_id(&self) -> &ConnectionId {
440        &self.conn_id
441    }
442
443    pub fn state(&self) -> &CatalogState {
444        &*self.state
445    }
446
447    /// Prevent planning from resolving item with the provided ID. Instead,
448    /// return an error as if the item did not exist.
449    ///
450    /// This feature is meant exclusively to permit re-planning statements
451    /// during update operations and should not be used otherwise given its
452    /// extremely "powerful" semantics.
453    ///
454    /// # Panics
455    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
456    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
457        assert_eq!(
458            self.role_id, MZ_SYSTEM_ROLE_ID,
459            "only the system role can mark IDs unresolvable",
460        );
461        self.unresolvable_ids.insert(id);
462    }
463
464    /// Returns the schemas:
465    /// - mz_catalog
466    /// - pg_catalog
467    /// - temp (if requested)
468    /// - all schemas from the session's search_path var that exist
469    pub fn effective_search_path(
470        &self,
471        include_temp_schema: bool,
472    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
473        self.state
474            .effective_search_path(&self.search_path, include_temp_schema)
475    }
476}
477
478impl ConnectionResolver for ConnCatalog<'_> {
479    fn resolve_connection(
480        &self,
481        id: CatalogItemId,
482    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
483        self.state().resolve_connection(id)
484    }
485}
486
487impl Catalog {
488    /// Returns the catalog's transient revision, which starts at 1 and is
489    /// incremented on every change. This is not persisted to disk, and will
490    /// restart on every load.
491    pub fn transient_revision(&self) -> u64 {
492        self.transient_revision
493    }
494
495    /// Creates a debug catalog from the current
496    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
497    /// like in tests.
498    ///
499    /// WARNING! This function can arbitrarily fail because it does not make any
500    /// effort to adjust the catalog's contents' structure or semantics to the
501    /// currently running version, i.e. it does not apply any migrations.
502    ///
503    /// This function must not be called in production contexts. Use
504    /// [`Catalog::open`] with appropriately set configuration parameters
505    /// instead.
506    pub async fn with_debug<F, Fut, T>(f: F) -> T
507    where
508        F: FnOnce(Catalog) -> Fut,
509        Fut: Future<Output = T>,
510    {
511        let persist_client = PersistClient::new_for_tests().await;
512        let organization_id = Uuid::new_v4();
513        let bootstrap_args = test_bootstrap_args();
514        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
515            .await
516            .expect("can open debug catalog");
517        f(catalog).await
518    }
519
520    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
521    /// in progress.
522    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
523    where
524        F: FnOnce(Catalog) -> Fut,
525        Fut: Future<Output = T>,
526    {
527        let persist_client = PersistClient::new_for_tests().await;
528        let organization_id = Uuid::new_v4();
529        let bootstrap_args = test_bootstrap_args();
530        let mut catalog =
531            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
532                .await
533                .expect("can open debug catalog");
534
535        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
536        let now = SYSTEM_TIME.clone();
537        let openable_storage = TestCatalogStateBuilder::new(persist_client)
538            .with_organization_id(organization_id)
539            .with_default_deploy_generation()
540            .build()
541            .await
542            .expect("can create durable catalog");
543        let mut storage = openable_storage
544            .open(now().into(), &bootstrap_args)
545            .await
546            .expect("can open durable catalog")
547            .0;
548        // Drain updates.
549        let _ = storage
550            .sync_to_current_updates()
551            .await
552            .expect("can sync to current updates");
553        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
554
555        f(catalog).await
556    }
557
558    /// Opens a debug catalog.
559    ///
560    /// See [`Catalog::with_debug`].
561    pub async fn open_debug_catalog(
562        persist_client: PersistClient,
563        organization_id: Uuid,
564        bootstrap_args: &BootstrapArgs,
565    ) -> Result<Catalog, anyhow::Error> {
566        let now = SYSTEM_TIME.clone();
567        let environment_id = None;
568        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
569            .with_organization_id(organization_id)
570            .with_default_deploy_generation()
571            .build()
572            .await?;
573        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
574        let system_parameter_defaults = BTreeMap::default();
575        Self::open_debug_catalog_inner(
576            persist_client,
577            storage,
578            now,
579            environment_id,
580            &DUMMY_BUILD_INFO,
581            system_parameter_defaults,
582            bootstrap_args,
583            None,
584        )
585        .await
586    }
587
588    /// Opens a read only debug persist backed catalog defined by `persist_client` and
589    /// `organization_id`.
590    ///
591    /// See [`Catalog::with_debug`].
592    pub async fn open_debug_read_only_catalog(
593        persist_client: PersistClient,
594        organization_id: Uuid,
595        bootstrap_args: &BootstrapArgs,
596    ) -> Result<Catalog, anyhow::Error> {
597        let now = SYSTEM_TIME.clone();
598        let environment_id = None;
599        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
600            .with_organization_id(organization_id)
601            .build()
602            .await?;
603        let storage = openable_storage
604            .open_read_only(&test_bootstrap_args())
605            .await?;
606        let system_parameter_defaults = BTreeMap::default();
607        Self::open_debug_catalog_inner(
608            persist_client,
609            storage,
610            now,
611            environment_id,
612            &DUMMY_BUILD_INFO,
613            system_parameter_defaults,
614            bootstrap_args,
615            None,
616        )
617        .await
618    }
619
620    /// Opens a read only debug persist backed catalog defined by `persist_client` and
621    /// `organization_id`.
622    ///
623    /// See [`Catalog::with_debug`].
624    pub async fn open_debug_read_only_persist_catalog_config(
625        persist_client: PersistClient,
626        now: NowFn,
627        environment_id: EnvironmentId,
628        system_parameter_defaults: BTreeMap<String, String>,
629        build_info: &'static BuildInfo,
630        bootstrap_args: &BootstrapArgs,
631        enable_expression_cache_override: Option<bool>,
632    ) -> Result<Catalog, anyhow::Error> {
633        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
634            .with_organization_id(environment_id.organization_id())
635            .with_version(
636                build_info
637                    .version
638                    .parse()
639                    .expect("build version is parseable"),
640            )
641            .build()
642            .await?;
643        let storage = openable_storage.open_read_only(bootstrap_args).await?;
644        Self::open_debug_catalog_inner(
645            persist_client,
646            storage,
647            now,
648            Some(environment_id),
649            build_info,
650            system_parameter_defaults,
651            bootstrap_args,
652            enable_expression_cache_override,
653        )
654        .await
655    }
656
657    async fn open_debug_catalog_inner(
658        persist_client: PersistClient,
659        storage: Box<dyn DurableCatalogState>,
660        now: NowFn,
661        environment_id: Option<EnvironmentId>,
662        build_info: &'static BuildInfo,
663        system_parameter_defaults: BTreeMap<String, String>,
664        bootstrap_args: &BootstrapArgs,
665        enable_expression_cache_override: Option<bool>,
666    ) -> Result<Catalog, anyhow::Error> {
667        let metrics_registry = &MetricsRegistry::new();
668        let secrets_reader = Arc::new(InMemorySecretsController::new());
669        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
670        // debugging/testing.
671        let previous_ts = now().into();
672        let replica_size = &bootstrap_args.default_cluster_replica_size;
673        let read_only = false;
674
675        let OpenCatalogResult {
676            catalog,
677            migrated_storage_collections_0dt: _,
678            new_builtin_collections: _,
679            builtin_table_updates: _,
680            cached_global_exprs: _,
681            uncached_local_exprs: _,
682        } = Catalog::open(Config {
683            storage,
684            metrics_registry,
685            state: StateConfig {
686                unsafe_mode: true,
687                all_features: false,
688                build_info,
689                deploy_generation: 0,
690                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
691                read_only,
692                now,
693                boot_ts: previous_ts,
694                skip_migrations: true,
695                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
696                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
697                    size: replica_size.clone(),
698                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
699                },
700                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
701                    size: replica_size.clone(),
702                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
703                },
704                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
705                    size: replica_size.clone(),
706                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
707                },
708                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
709                    size: replica_size.clone(),
710                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711                },
712                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
713                    size: replica_size.clone(),
714                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
715                },
716                system_parameter_defaults,
717                remote_system_parameters: None,
718                availability_zones: vec![],
719                egress_addresses: vec![],
720                aws_principal_context: None,
721                aws_privatelink_availability_zones: None,
722                http_host_name: None,
723                connection_context: ConnectionContext::for_tests(secrets_reader),
724                builtin_item_migration_config: BuiltinItemMigrationConfig {
725                    persist_client: persist_client.clone(),
726                    read_only,
727                    force_migration: None,
728                },
729                persist_client,
730                enable_expression_cache_override,
731                helm_chart_version: None,
732                external_login_password_mz_system: None,
733                license_key: ValidatedLicenseKey::for_tests(),
734            },
735        })
736        .await?;
737        Ok(catalog)
738    }
739
740    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
741        self.state.for_session(session)
742    }
743
744    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
745        self.state.for_sessionless_user(role_id)
746    }
747
748    pub fn for_system_session(&self) -> ConnCatalog<'_> {
749        self.state.for_system_session()
750    }
751
752    async fn storage<'a>(
753        &'a self,
754    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
755        self.storage.lock().await
756    }
757
758    pub async fn current_upper(&self) -> mz_repr::Timestamp {
759        self.storage().await.current_upper().await
760    }
761
762    pub async fn allocate_user_id(
763        &self,
764        commit_ts: mz_repr::Timestamp,
765    ) -> Result<(CatalogItemId, GlobalId), Error> {
766        self.storage()
767            .await
768            .allocate_user_id(commit_ts)
769            .await
770            .maybe_terminate("allocating user ids")
771            .err_into()
772    }
773
774    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
775    pub async fn allocate_user_ids(
776        &self,
777        amount: u64,
778        commit_ts: mz_repr::Timestamp,
779    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
780        self.storage()
781            .await
782            .allocate_user_ids(amount, commit_ts)
783            .await
784            .maybe_terminate("allocating user ids")
785            .err_into()
786    }
787
788    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
789        let commit_ts = self.storage().await.current_upper().await;
790        self.allocate_user_id(commit_ts).await
791    }
792
793    /// Get the next user item ID without allocating it.
794    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
795        self.storage()
796            .await
797            .get_next_user_item_id()
798            .await
799            .err_into()
800    }
801
802    #[cfg(test)]
803    pub async fn allocate_system_id(
804        &self,
805        commit_ts: mz_repr::Timestamp,
806    ) -> Result<(CatalogItemId, GlobalId), Error> {
807        use mz_ore::collections::CollectionExt;
808
809        let mut storage = self.storage().await;
810        let mut txn = storage.transaction().await?;
811        let id = txn
812            .allocate_system_item_ids(1)
813            .maybe_terminate("allocating system ids")?
814            .into_element();
815        // Drain transaction.
816        let _ = txn.get_and_commit_op_updates();
817        txn.commit(commit_ts).await?;
818        Ok(id)
819    }
820
821    /// Get the next system item ID without allocating it.
822    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
823        self.storage()
824            .await
825            .get_next_system_item_id()
826            .await
827            .err_into()
828    }
829
830    pub async fn allocate_user_cluster_id(
831        &self,
832        commit_ts: mz_repr::Timestamp,
833    ) -> Result<ClusterId, Error> {
834        self.storage()
835            .await
836            .allocate_user_cluster_id(commit_ts)
837            .await
838            .maybe_terminate("allocating user cluster ids")
839            .err_into()
840    }
841
842    /// Get the next system replica id without allocating it.
843    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
844        self.storage()
845            .await
846            .get_next_system_replica_id()
847            .await
848            .err_into()
849    }
850
851    /// Get the next user replica id without allocating it.
852    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
853        self.storage()
854            .await
855            .get_next_user_replica_id()
856            .await
857            .err_into()
858    }
859
860    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
861        self.state.resolve_database(database_name)
862    }
863
864    pub fn resolve_schema(
865        &self,
866        current_database: Option<&DatabaseId>,
867        database_name: Option<&str>,
868        schema_name: &str,
869        conn_id: &ConnectionId,
870    ) -> Result<&Schema, SqlCatalogError> {
871        self.state
872            .resolve_schema(current_database, database_name, schema_name, conn_id)
873    }
874
875    pub fn resolve_schema_in_database(
876        &self,
877        database_spec: &ResolvedDatabaseSpecifier,
878        schema_name: &str,
879        conn_id: &ConnectionId,
880    ) -> Result<&Schema, SqlCatalogError> {
881        self.state
882            .resolve_schema_in_database(database_spec, schema_name, conn_id)
883    }
884
885    pub fn resolve_replica_in_cluster(
886        &self,
887        cluster_id: &ClusterId,
888        replica_name: &str,
889    ) -> Result<&ClusterReplica, SqlCatalogError> {
890        self.state
891            .resolve_replica_in_cluster(cluster_id, replica_name)
892    }
893
894    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
895        self.state.resolve_system_schema(name)
896    }
897
898    pub fn resolve_search_path(
899        &self,
900        session: &Session,
901    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
902        self.state.resolve_search_path(session)
903    }
904
905    /// Resolves `name` to a non-function [`CatalogEntry`].
906    pub fn resolve_entry(
907        &self,
908        current_database: Option<&DatabaseId>,
909        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
910        name: &PartialItemName,
911        conn_id: &ConnectionId,
912    ) -> Result<&CatalogEntry, SqlCatalogError> {
913        self.state
914            .resolve_entry(current_database, search_path, name, conn_id)
915    }
916
917    /// Resolves a `BuiltinTable`.
918    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
919        self.state.resolve_builtin_table(builtin)
920    }
921
922    /// Resolves a `BuiltinLog`.
923    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
924        self.state.resolve_builtin_log(builtin).0
925    }
926
927    /// Resolves a `BuiltinSource`.
928    pub fn resolve_builtin_storage_collection(
929        &self,
930        builtin: &'static BuiltinSource,
931    ) -> CatalogItemId {
932        self.state.resolve_builtin_source(builtin)
933    }
934
935    /// Resolves `name` to a function [`CatalogEntry`].
936    pub fn resolve_function(
937        &self,
938        current_database: Option<&DatabaseId>,
939        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
940        name: &PartialItemName,
941        conn_id: &ConnectionId,
942    ) -> Result<&CatalogEntry, SqlCatalogError> {
943        self.state
944            .resolve_function(current_database, search_path, name, conn_id)
945    }
946
947    /// Resolves `name` to a type [`CatalogEntry`].
948    pub fn resolve_type(
949        &self,
950        current_database: Option<&DatabaseId>,
951        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
952        name: &PartialItemName,
953        conn_id: &ConnectionId,
954    ) -> Result<&CatalogEntry, SqlCatalogError> {
955        self.state
956            .resolve_type(current_database, search_path, name, conn_id)
957    }
958
959    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
960        self.state.resolve_cluster(name)
961    }
962
963    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
964    ///
965    /// # Panics
966    /// * If the [`BuiltinCluster`] doesn't exist.
967    ///
968    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
969        self.state.resolve_builtin_cluster(cluster)
970    }
971
972    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
973        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
974    }
975
976    /// Resolves a [`Cluster`] for a TargetCluster.
977    pub fn resolve_target_cluster(
978        &self,
979        target_cluster: TargetCluster,
980        session: &Session,
981    ) -> Result<&Cluster, AdapterError> {
982        match target_cluster {
983            TargetCluster::CatalogServer => {
984                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
985            }
986            TargetCluster::Active => self.active_cluster(session),
987            TargetCluster::Transaction(cluster_id) => self
988                .try_get_cluster(cluster_id)
989                .ok_or(AdapterError::ConcurrentClusterDrop),
990        }
991    }
992
993    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
994        // TODO(benesch): this check here is not sufficiently protective. It'd
995        // be very easy for a code path to accidentally avoid this check by
996        // calling `resolve_cluster(session.vars().cluster())`.
997        if session.user().name != SYSTEM_USER.name
998            && session.user().name != SUPPORT_USER.name
999            && session.vars().cluster() == SYSTEM_USER.name
1000        {
1001            coord_bail!(
1002                "system cluster '{}' cannot execute user queries",
1003                SYSTEM_USER.name
1004            );
1005        }
1006        let cluster = self.resolve_cluster(session.vars().cluster())?;
1007        Ok(cluster)
1008    }
1009
1010    pub fn state(&self) -> &CatalogState {
1011        &self.state
1012    }
1013
1014    pub fn resolve_full_name(
1015        &self,
1016        name: &QualifiedItemName,
1017        conn_id: Option<&ConnectionId>,
1018    ) -> FullItemName {
1019        self.state.resolve_full_name(name, conn_id)
1020    }
1021
1022    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1023        self.state.try_get_entry(id)
1024    }
1025
1026    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1027        self.state.try_get_entry_by_global_id(id)
1028    }
1029
1030    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1031        self.state.get_entry(id)
1032    }
1033
1034    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1035        self.state.get_entry_by_global_id(id)
1036    }
1037
1038    pub fn get_global_ids<'a>(
1039        &'a self,
1040        id: &CatalogItemId,
1041    ) -> impl Iterator<Item = GlobalId> + use<'a> {
1042        self.get_entry(id).global_ids()
1043    }
1044
1045    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1046        self.get_entry_by_global_id(id).id()
1047    }
1048
1049    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1050        let item = self.try_get_entry_by_global_id(id)?;
1051        Some(item.id())
1052    }
1053
1054    pub fn get_schema(
1055        &self,
1056        database_spec: &ResolvedDatabaseSpecifier,
1057        schema_spec: &SchemaSpecifier,
1058        conn_id: &ConnectionId,
1059    ) -> &Schema {
1060        self.state.get_schema(database_spec, schema_spec, conn_id)
1061    }
1062
1063    pub fn try_get_schema(
1064        &self,
1065        database_spec: &ResolvedDatabaseSpecifier,
1066        schema_spec: &SchemaSpecifier,
1067        conn_id: &ConnectionId,
1068    ) -> Option<&Schema> {
1069        self.state
1070            .try_get_schema(database_spec, schema_spec, conn_id)
1071    }
1072
1073    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1074        self.state.get_mz_catalog_schema_id()
1075    }
1076
1077    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1078        self.state.get_pg_catalog_schema_id()
1079    }
1080
1081    pub fn get_information_schema_id(&self) -> SchemaId {
1082        self.state.get_information_schema_id()
1083    }
1084
1085    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1086        self.state.get_mz_internal_schema_id()
1087    }
1088
1089    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1090        self.state.get_mz_introspection_schema_id()
1091    }
1092
1093    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1094        self.state.get_mz_unsafe_schema_id()
1095    }
1096
1097    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1098        self.state.system_schema_ids()
1099    }
1100
1101    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1102        self.state.get_database(id)
1103    }
1104
1105    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1106        self.state.try_get_role(id)
1107    }
1108
1109    pub fn get_role(&self, id: &RoleId) -> &Role {
1110        self.state.get_role(id)
1111    }
1112
1113    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1114        self.state.try_get_role_by_name(role_name)
1115    }
1116
1117    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1118        self.state.try_get_role_auth_by_id(id)
1119    }
1120
1121    /// Creates a new schema in the `Catalog` for temporary items
1122    /// indicated by the TEMPORARY or TEMP keywords.
1123    pub fn create_temporary_schema(
1124        &mut self,
1125        conn_id: &ConnectionId,
1126        owner_id: RoleId,
1127    ) -> Result<(), Error> {
1128        self.state.create_temporary_schema(conn_id, owner_id)
1129    }
1130
1131    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1132        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
1133        self.state
1134            .temporary_schemas
1135            .get(conn_id)
1136            .map(|schema| schema.items.contains_key(item_name))
1137            .unwrap_or(false)
1138    }
1139
1140    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1141    /// Returns Ok if conn_id's temp schema does not exist.
1142    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1143        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1144            return Ok(());
1145        };
1146        if !schema.items.is_empty() {
1147            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1148        }
1149        Ok(())
1150    }
1151
1152    pub(crate) fn object_dependents(
1153        &self,
1154        object_ids: &Vec<ObjectId>,
1155        conn_id: &ConnectionId,
1156    ) -> Vec<ObjectId> {
1157        let mut seen = BTreeSet::new();
1158        self.state.object_dependents(object_ids, conn_id, &mut seen)
1159    }
1160
1161    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1162        FullNameV1 {
1163            database: name.database.to_string(),
1164            schema: name.schema.clone(),
1165            item: name.item.clone(),
1166        }
1167    }
1168
1169    pub fn find_available_cluster_name(&self, name: &str) -> String {
1170        let mut i = 0;
1171        let mut candidate = name.to_string();
1172        while self.state.clusters_by_name.contains_key(&candidate) {
1173            i += 1;
1174            candidate = format!("{}{}", name, i);
1175        }
1176        candidate
1177    }
1178
1179    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1180        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1181            self.cluster_replica_sizes()
1182                .enabled_allocations()
1183                .map(|a| a.0.to_owned())
1184                .collect::<Vec<_>>()
1185        } else {
1186            self.system_config().allowed_cluster_replica_sizes()
1187        }
1188    }
1189
1190    pub fn concretize_replica_location(
1191        &self,
1192        location: mz_catalog::durable::ReplicaLocation,
1193        allowed_sizes: &Vec<String>,
1194        allowed_availability_zones: Option<&[String]>,
1195    ) -> Result<ReplicaLocation, Error> {
1196        self.state
1197            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1198    }
1199
1200    pub(crate) fn ensure_valid_replica_size(
1201        &self,
1202        allowed_sizes: &[String],
1203        size: &String,
1204    ) -> Result<(), Error> {
1205        self.state.ensure_valid_replica_size(allowed_sizes, size)
1206    }
1207
1208    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1209        &self.state.cluster_replica_sizes
1210    }
1211
1212    /// Returns the privileges of an object by its ID.
1213    pub fn get_privileges(
1214        &self,
1215        id: &SystemObjectId,
1216        conn_id: &ConnectionId,
1217    ) -> Option<&PrivilegeMap> {
1218        match id {
1219            SystemObjectId::Object(id) => match id {
1220                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1221                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1222                ObjectId::Schema((database_spec, schema_spec)) => Some(
1223                    self.get_schema(database_spec, schema_spec, conn_id)
1224                        .privileges(),
1225                ),
1226                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1227                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1228                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1229            },
1230            SystemObjectId::System => Some(&self.state.system_privileges),
1231        }
1232    }
1233
1234    #[mz_ore::instrument(level = "debug")]
1235    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1236        Ok(self.storage().await.confirm_leadership().await?)
1237    }
1238
1239    /// Return the ids of all log sources the given object depends on.
1240    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1241        self.state.introspection_dependencies(id)
1242    }
1243
1244    /// Serializes the catalog's in-memory state.
1245    ///
1246    /// There are no guarantees about the format of the serialized state, except
1247    /// that the serialized state for two identical catalogs will compare
1248    /// identically.
1249    pub fn dump(&self) -> Result<CatalogDump, Error> {
1250        Ok(CatalogDump::new(self.state.dump(None)?))
1251    }
1252
1253    /// Checks the [`Catalog`]s internal consistency.
1254    ///
1255    /// Returns a JSON object describing the inconsistencies, if there are any.
1256    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1257        self.state.check_consistency().map_err(|inconsistencies| {
1258            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1259                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1260            })
1261        })
1262    }
1263
1264    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1265        self.state.config()
1266    }
1267
1268    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1269        self.state.entry_by_id.values()
1270    }
1271
1272    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1273        self.entries()
1274            .filter(|entry| entry.is_connection() && entry.id().is_user())
1275    }
1276
1277    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1278        self.entries()
1279            .filter(|entry| entry.is_table() && entry.id().is_user())
1280    }
1281
1282    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1283        self.entries()
1284            .filter(|entry| entry.is_source() && entry.id().is_user())
1285    }
1286
1287    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1288        self.entries()
1289            .filter(|entry| entry.is_sink() && entry.id().is_user())
1290    }
1291
1292    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1293        self.entries()
1294            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1295    }
1296
1297    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1298        self.entries()
1299            .filter(|entry| entry.is_secret() && entry.id().is_user())
1300    }
1301
1302    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1303        self.state.get_network_policy(&network_policy_id)
1304    }
1305
1306    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1307        self.state.try_get_network_policy_by_name(name)
1308    }
1309
1310    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1311        self.state.clusters_by_id.values()
1312    }
1313
1314    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1315        self.state.get_cluster(cluster_id)
1316    }
1317
1318    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1319        self.state.try_get_cluster(cluster_id)
1320    }
1321
1322    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1323        self.clusters().filter(|cluster| cluster.id.is_user())
1324    }
1325
1326    pub fn get_cluster_replica(
1327        &self,
1328        cluster_id: ClusterId,
1329        replica_id: ReplicaId,
1330    ) -> &ClusterReplica {
1331        self.state.get_cluster_replica(cluster_id, replica_id)
1332    }
1333
1334    pub fn try_get_cluster_replica(
1335        &self,
1336        cluster_id: ClusterId,
1337        replica_id: ReplicaId,
1338    ) -> Option<&ClusterReplica> {
1339        self.state.try_get_cluster_replica(cluster_id, replica_id)
1340    }
1341
1342    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1343        self.user_clusters()
1344            .flat_map(|cluster| cluster.user_replicas())
1345    }
1346
1347    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1348        self.state.database_by_id.values()
1349    }
1350
1351    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1352        self.state
1353            .roles_by_id
1354            .values()
1355            .filter(|role| role.is_user())
1356    }
1357
1358    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1359        self.entries()
1360            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1361    }
1362
1363    pub fn system_privileges(&self) -> &PrivilegeMap {
1364        &self.state.system_privileges
1365    }
1366
1367    pub fn default_privileges(
1368        &self,
1369    ) -> impl Iterator<
1370        Item = (
1371            &DefaultPrivilegeObject,
1372            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1373        ),
1374    > {
1375        self.state.default_privileges.iter()
1376    }
1377
1378    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1379        self.state
1380            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1381    }
1382
1383    pub fn pack_storage_usage_update(
1384        &self,
1385        event: VersionedStorageUsage,
1386        diff: Diff,
1387    ) -> BuiltinTableUpdate {
1388        self.state
1389            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1390    }
1391
1392    pub fn system_config(&self) -> &SystemVars {
1393        self.state.system_config()
1394    }
1395
1396    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1397        self.state.system_config_mut()
1398    }
1399
1400    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1401        self.state.ensure_not_reserved_role(role_id)
1402    }
1403
1404    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1405        self.state.ensure_grantable_role(role_id)
1406    }
1407
1408    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1409        self.state.ensure_not_system_role(role_id)
1410    }
1411
1412    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1413        self.state.ensure_not_predefined_role(role_id)
1414    }
1415
1416    pub fn ensure_not_reserved_network_policy(
1417        &self,
1418        network_policy_id: &NetworkPolicyId,
1419    ) -> Result<(), Error> {
1420        self.state
1421            .ensure_not_reserved_network_policy(network_policy_id)
1422    }
1423
1424    pub fn ensure_not_reserved_object(
1425        &self,
1426        object_id: &ObjectId,
1427        conn_id: &ConnectionId,
1428    ) -> Result<(), Error> {
1429        match object_id {
1430            ObjectId::Cluster(cluster_id) => {
1431                if cluster_id.is_system() {
1432                    let cluster = self.get_cluster(*cluster_id);
1433                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1434                        cluster.name().to_string(),
1435                    )))
1436                } else {
1437                    Ok(())
1438                }
1439            }
1440            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1441                if replica_id.is_system() {
1442                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1443                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1444                        replica.name().to_string(),
1445                    )))
1446                } else {
1447                    Ok(())
1448                }
1449            }
1450            ObjectId::Database(database_id) => {
1451                if database_id.is_system() {
1452                    let database = self.get_database(database_id);
1453                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1454                        database.name().to_string(),
1455                    )))
1456                } else {
1457                    Ok(())
1458                }
1459            }
1460            ObjectId::Schema((database_spec, schema_spec)) => {
1461                if schema_spec.is_system() {
1462                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1463                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1464                        schema.name().schema.clone(),
1465                    )))
1466                } else {
1467                    Ok(())
1468                }
1469            }
1470            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1471            ObjectId::Item(item_id) => {
1472                if item_id.is_system() {
1473                    let item = self.get_entry(item_id);
1474                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1475                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1476                } else {
1477                    Ok(())
1478                }
1479            }
1480            ObjectId::NetworkPolicy(network_policy_id) => {
1481                self.ensure_not_reserved_network_policy(network_policy_id)
1482            }
1483        }
1484    }
1485
1486    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1487    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1488        &mut self,
1489        create_sql: &str,
1490        force_if_exists_skip: bool,
1491    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1492        self.state
1493            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1494    }
1495
1496    pub(crate) fn update_expression_cache<'a, 'b>(
1497        &'a self,
1498        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1499        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1500    ) -> BoxFuture<'b, ()> {
1501        if let Some(expr_cache) = &self.expr_cache_handle {
1502            let ons = new_local_expressions
1503                .iter()
1504                .map(|(id, _)| id)
1505                .chain(new_global_expressions.iter().map(|(id, _)| id))
1506                .map(|id| self.get_entry_by_global_id(id))
1507                .filter_map(|entry| entry.index().map(|index| index.on));
1508            let invalidate_ids = self.invalidate_for_index(ons);
1509            expr_cache
1510                .update(
1511                    new_local_expressions,
1512                    new_global_expressions,
1513                    invalidate_ids,
1514                )
1515                .boxed()
1516        } else {
1517            async {}.boxed()
1518        }
1519    }
1520
1521    /// Listen for and apply all unconsumed updates to the durable catalog state.
1522    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1523    // `#[cfg(test)]` annotation.
1524    #[cfg(test)]
1525    async fn sync_to_current_updates(
1526        &mut self,
1527    ) -> Result<
1528        (
1529            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1530            Vec<ParsedStateUpdate>,
1531        ),
1532        CatalogError,
1533    > {
1534        let updates = self.storage().await.sync_to_current_updates().await?;
1535        let (builtin_table_updates, catalog_updates) = self
1536            .state
1537            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1538            .await;
1539        Ok((builtin_table_updates, catalog_updates))
1540    }
1541}
1542
1543pub fn is_reserved_name(name: &str) -> bool {
1544    BUILTIN_PREFIXES
1545        .iter()
1546        .any(|prefix| name.starts_with(prefix))
1547}
1548
1549pub fn is_reserved_role_name(name: &str) -> bool {
1550    is_reserved_name(name) || is_public_role(name)
1551}
1552
1553pub fn is_public_role(name: &str) -> bool {
1554    name == &*PUBLIC_ROLE_NAME
1555}
1556
1557pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1558    object_type_to_audit_object_type(sql_type.into())
1559}
1560
1561pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1562    match id {
1563        CommentObjectId::Table(_) => ObjectType::Table,
1564        CommentObjectId::View(_) => ObjectType::View,
1565        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1566        CommentObjectId::Source(_) => ObjectType::Source,
1567        CommentObjectId::Sink(_) => ObjectType::Sink,
1568        CommentObjectId::Index(_) => ObjectType::Index,
1569        CommentObjectId::Func(_) => ObjectType::Func,
1570        CommentObjectId::Connection(_) => ObjectType::Connection,
1571        CommentObjectId::Type(_) => ObjectType::Type,
1572        CommentObjectId::Secret(_) => ObjectType::Secret,
1573        CommentObjectId::Role(_) => ObjectType::Role,
1574        CommentObjectId::Database(_) => ObjectType::Database,
1575        CommentObjectId::Schema(_) => ObjectType::Schema,
1576        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1577        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1578        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1579        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1580    }
1581}
1582
1583pub(crate) fn object_type_to_audit_object_type(
1584    object_type: mz_sql::catalog::ObjectType,
1585) -> ObjectType {
1586    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1587}
1588
1589pub(crate) fn system_object_type_to_audit_object_type(
1590    system_type: &SystemObjectType,
1591) -> ObjectType {
1592    match system_type {
1593        SystemObjectType::Object(object_type) => match object_type {
1594            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1595            mz_sql::catalog::ObjectType::View => ObjectType::View,
1596            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1597            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1598            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1599            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1600            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1601            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1602            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1603            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1604            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1605            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1606            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1607            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1608            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1609            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1610            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1611        },
1612        SystemObjectType::System => ObjectType::System,
1613    }
1614}
1615
1616#[derive(Debug, Copy, Clone)]
1617pub enum UpdatePrivilegeVariant {
1618    Grant,
1619    Revoke,
1620}
1621
1622impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1623    fn from(variant: UpdatePrivilegeVariant) -> Self {
1624        match variant {
1625            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1626            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1627        }
1628    }
1629}
1630
1631impl From<UpdatePrivilegeVariant> for EventType {
1632    fn from(variant: UpdatePrivilegeVariant) -> Self {
1633        match variant {
1634            UpdatePrivilegeVariant::Grant => EventType::Grant,
1635            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1636        }
1637    }
1638}
1639
1640impl ConnCatalog<'_> {
1641    fn resolve_item_name(
1642        &self,
1643        name: &PartialItemName,
1644    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1645        self.resolve_item(name).map(|entry| entry.name())
1646    }
1647
1648    fn resolve_function_name(
1649        &self,
1650        name: &PartialItemName,
1651    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1652        self.resolve_function(name).map(|entry| entry.name())
1653    }
1654
1655    fn resolve_type_name(
1656        &self,
1657        name: &PartialItemName,
1658    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1659        self.resolve_type(name).map(|entry| entry.name())
1660    }
1661}
1662
1663impl ExprHumanizer for ConnCatalog<'_> {
1664    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1665        let entry = self.state.try_get_entry_by_global_id(&id)?;
1666        Some(self.resolve_full_name(entry.name()).to_string())
1667    }
1668
1669    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1670        let entry = self.state.try_get_entry_by_global_id(&id)?;
1671        Some(entry.name().item.clone())
1672    }
1673
1674    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1675        let entry = self.state.try_get_entry_by_global_id(&id)?;
1676        Some(self.resolve_full_name(entry.name()).into_parts())
1677    }
1678
1679    fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1680        use SqlScalarType::*;
1681
1682        match typ {
1683            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1684            List {
1685                custom_id: Some(item_id),
1686                ..
1687            }
1688            | Map {
1689                custom_id: Some(item_id),
1690                ..
1691            } => {
1692                let item = self.get_item(item_id);
1693                self.minimal_qualification(item.name()).to_string()
1694            }
1695            List { element_type, .. } => {
1696                format!(
1697                    "{} list",
1698                    self.humanize_scalar_type(element_type, postgres_compat)
1699                )
1700            }
1701            Map { value_type, .. } => format!(
1702                "map[{}=>{}]",
1703                self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1704                self.humanize_scalar_type(value_type, postgres_compat)
1705            ),
1706            Record {
1707                custom_id: Some(item_id),
1708                ..
1709            } => {
1710                let item = self.get_item(item_id);
1711                self.minimal_qualification(item.name()).to_string()
1712            }
1713            Record { fields, .. } => format!(
1714                "record({})",
1715                fields
1716                    .iter()
1717                    .map(|f| format!(
1718                        "{}: {}",
1719                        f.0,
1720                        self.humanize_column_type(&f.1, postgres_compat)
1721                    ))
1722                    .join(",")
1723            ),
1724            PgLegacyChar => "\"char\"".into(),
1725            Char { length } if !postgres_compat => match length {
1726                None => "char".into(),
1727                Some(length) => format!("char({})", length.into_u32()),
1728            },
1729            VarChar { max_length } if !postgres_compat => match max_length {
1730                None => "varchar".into(),
1731                Some(length) => format!("varchar({})", length.into_u32()),
1732            },
1733            UInt16 => "uint2".into(),
1734            UInt32 => "uint4".into(),
1735            UInt64 => "uint8".into(),
1736            ty => {
1737                let pgrepr_type = mz_pgrepr::Type::from(ty);
1738                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1739
1740                let res = if self
1741                    .effective_search_path(true)
1742                    .iter()
1743                    .any(|(_, schema)| schema == &pg_catalog_schema)
1744                {
1745                    pgrepr_type.name().to_string()
1746                } else {
1747                    // If PG_CATALOG_SCHEMA is not in search path, you need
1748                    // qualified object name to refer to type.
1749                    let name = QualifiedItemName {
1750                        qualifiers: ItemQualifiers {
1751                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1752                            schema_spec: pg_catalog_schema,
1753                        },
1754                        item: pgrepr_type.name().to_string(),
1755                    };
1756                    self.resolve_full_name(&name).to_string()
1757                };
1758                res
1759            }
1760        }
1761    }
1762
1763    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1764        let entry = self.state.try_get_entry_by_global_id(&id)?;
1765
1766        match entry.index() {
1767            Some(index) => {
1768                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1769                let mut on_names = on_desc
1770                    .iter_names()
1771                    .map(|col_name| col_name.to_string())
1772                    .collect::<Vec<_>>();
1773
1774                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1775
1776                // Init ix_names with unknown column names. Unknown columns are
1777                // represented as an empty String and rendered as `#c` by the
1778                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1779                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1780                let mut ix_names = vec![String::new(); ix_arity];
1781
1782                // Apply the permutation by swapping on_names with ix_names.
1783                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1784                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1785                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1786                    std::mem::swap(on_name, ix_name);
1787                }
1788
1789                Some(ix_names) // Return the updated ix_names vector.
1790            }
1791            None => {
1792                let desc = self.state.try_get_desc_by_global_id(&id)?;
1793                let column_names = desc
1794                    .iter_names()
1795                    .map(|col_name| col_name.to_string())
1796                    .collect();
1797
1798                Some(column_names)
1799            }
1800        }
1801    }
1802
1803    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1804        let desc = self.state.try_get_desc_by_global_id(&id)?;
1805        Some(desc.get_name(column).to_string())
1806    }
1807
1808    fn id_exists(&self, id: GlobalId) -> bool {
1809        self.state.entry_by_global_id.contains_key(&id)
1810    }
1811}
1812
1813impl SessionCatalog for ConnCatalog<'_> {
1814    fn active_role_id(&self) -> &RoleId {
1815        &self.role_id
1816    }
1817
1818    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1819        self.prepared_statements
1820            .as_ref()
1821            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1822            .flatten()
1823    }
1824
1825    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1826        self.portals
1827            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1828    }
1829
1830    fn active_database(&self) -> Option<&DatabaseId> {
1831        self.database.as_ref()
1832    }
1833
1834    fn active_cluster(&self) -> &str {
1835        &self.cluster
1836    }
1837
1838    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1839        &self.search_path
1840    }
1841
1842    fn resolve_database(
1843        &self,
1844        database_name: &str,
1845    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1846        Ok(self.state.resolve_database(database_name)?)
1847    }
1848
1849    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1850        self.state
1851            .database_by_id
1852            .get(id)
1853            .expect("database doesn't exist")
1854    }
1855
1856    // `as` is ok to use to cast to a trait object.
1857    #[allow(clippy::as_conversions)]
1858    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1859        self.state
1860            .database_by_id
1861            .values()
1862            .map(|database| database as &dyn CatalogDatabase)
1863            .collect()
1864    }
1865
1866    fn resolve_schema(
1867        &self,
1868        database_name: Option<&str>,
1869        schema_name: &str,
1870    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1871        Ok(self.state.resolve_schema(
1872            self.database.as_ref(),
1873            database_name,
1874            schema_name,
1875            &self.conn_id,
1876        )?)
1877    }
1878
1879    fn resolve_schema_in_database(
1880        &self,
1881        database_spec: &ResolvedDatabaseSpecifier,
1882        schema_name: &str,
1883    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1884        Ok(self
1885            .state
1886            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1887    }
1888
1889    fn get_schema(
1890        &self,
1891        database_spec: &ResolvedDatabaseSpecifier,
1892        schema_spec: &SchemaSpecifier,
1893    ) -> &dyn CatalogSchema {
1894        self.state
1895            .get_schema(database_spec, schema_spec, &self.conn_id)
1896    }
1897
1898    // `as` is ok to use to cast to a trait object.
1899    #[allow(clippy::as_conversions)]
1900    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1901        self.get_databases()
1902            .into_iter()
1903            .flat_map(|database| database.schemas().into_iter())
1904            .chain(
1905                self.state
1906                    .ambient_schemas_by_id
1907                    .values()
1908                    .chain(self.state.temporary_schemas.values())
1909                    .map(|schema| schema as &dyn CatalogSchema),
1910            )
1911            .collect()
1912    }
1913
1914    fn get_mz_internal_schema_id(&self) -> SchemaId {
1915        self.state().get_mz_internal_schema_id()
1916    }
1917
1918    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1919        self.state().get_mz_unsafe_schema_id()
1920    }
1921
1922    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1923        self.state.is_system_schema_specifier(schema)
1924    }
1925
1926    fn resolve_role(
1927        &self,
1928        role_name: &str,
1929    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1930        match self.state.try_get_role_by_name(role_name) {
1931            Some(role) => Ok(role),
1932            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1933        }
1934    }
1935
1936    fn resolve_network_policy(
1937        &self,
1938        policy_name: &str,
1939    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1940        match self.state.try_get_network_policy_by_name(policy_name) {
1941            Some(policy) => Ok(policy),
1942            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1943        }
1944    }
1945
1946    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1947        Some(self.state.roles_by_id.get(id)?)
1948    }
1949
1950    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1951        self.state.get_role(id)
1952    }
1953
1954    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1955        // `as` is ok to use to cast to a trait object.
1956        #[allow(clippy::as_conversions)]
1957        self.state
1958            .roles_by_id
1959            .values()
1960            .map(|role| role as &dyn CatalogRole)
1961            .collect()
1962    }
1963
1964    fn mz_system_role_id(&self) -> RoleId {
1965        MZ_SYSTEM_ROLE_ID
1966    }
1967
1968    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1969        self.state.collect_role_membership(id)
1970    }
1971
1972    fn get_network_policy(
1973        &self,
1974        id: &NetworkPolicyId,
1975    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1976        self.state.get_network_policy(id)
1977    }
1978
1979    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1980        // `as` is ok to use to cast to a trait object.
1981        #[allow(clippy::as_conversions)]
1982        self.state
1983            .network_policies_by_id
1984            .values()
1985            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1986            .collect()
1987    }
1988
1989    fn resolve_cluster(
1990        &self,
1991        cluster_name: Option<&str>,
1992    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1993        Ok(self
1994            .state
1995            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1996    }
1997
1998    fn resolve_cluster_replica(
1999        &self,
2000        cluster_replica_name: &QualifiedReplica,
2001    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
2002        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
2003    }
2004
2005    fn resolve_item(
2006        &self,
2007        name: &PartialItemName,
2008    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2009        let r = self.state.resolve_entry(
2010            self.database.as_ref(),
2011            &self.effective_search_path(true),
2012            name,
2013            &self.conn_id,
2014        )?;
2015        if self.unresolvable_ids.contains(&r.id()) {
2016            Err(SqlCatalogError::UnknownItem(name.to_string()))
2017        } else {
2018            Ok(r)
2019        }
2020    }
2021
2022    fn resolve_function(
2023        &self,
2024        name: &PartialItemName,
2025    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2026        let r = self.state.resolve_function(
2027            self.database.as_ref(),
2028            &self.effective_search_path(false),
2029            name,
2030            &self.conn_id,
2031        )?;
2032
2033        if self.unresolvable_ids.contains(&r.id()) {
2034            Err(SqlCatalogError::UnknownFunction {
2035                name: name.to_string(),
2036                alternative: None,
2037            })
2038        } else {
2039            Ok(r)
2040        }
2041    }
2042
2043    fn resolve_type(
2044        &self,
2045        name: &PartialItemName,
2046    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2047        let r = self.state.resolve_type(
2048            self.database.as_ref(),
2049            &self.effective_search_path(false),
2050            name,
2051            &self.conn_id,
2052        )?;
2053
2054        if self.unresolvable_ids.contains(&r.id()) {
2055            Err(SqlCatalogError::UnknownType {
2056                name: name.to_string(),
2057            })
2058        } else {
2059            Ok(r)
2060        }
2061    }
2062
2063    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2064        self.state.get_system_type(name)
2065    }
2066
2067    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2068        Some(self.state.try_get_entry(id)?)
2069    }
2070
2071    fn try_get_item_by_global_id(
2072        &self,
2073        id: &GlobalId,
2074    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2075        let entry = self.state.try_get_entry_by_global_id(id)?;
2076        let entry = match &entry.item {
2077            CatalogItem::Table(table) => {
2078                let (version, _gid) = table
2079                    .collections
2080                    .iter()
2081                    .find(|(_version, gid)| *gid == id)
2082                    .expect("catalog out of sync, mismatched GlobalId");
2083                entry.at_version(RelationVersionSelector::Specific(*version))
2084            }
2085            _ => entry.at_version(RelationVersionSelector::Latest),
2086        };
2087        Some(entry)
2088    }
2089
2090    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2091        self.state.get_entry(id)
2092    }
2093
2094    fn get_item_by_global_id(
2095        &self,
2096        id: &GlobalId,
2097    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2098        let entry = self.state.get_entry_by_global_id(id);
2099        let entry = match &entry.item {
2100            CatalogItem::Table(table) => {
2101                let (version, _gid) = table
2102                    .collections
2103                    .iter()
2104                    .find(|(_version, gid)| *gid == id)
2105                    .expect("catalog out of sync, mismatched GlobalId");
2106                entry.at_version(RelationVersionSelector::Specific(*version))
2107            }
2108            _ => entry.at_version(RelationVersionSelector::Latest),
2109        };
2110        entry
2111    }
2112
2113    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2114        self.get_schemas()
2115            .into_iter()
2116            .flat_map(|schema| schema.item_ids())
2117            .map(|id| self.get_item(&id))
2118            .collect()
2119    }
2120
2121    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2122        self.state
2123            .get_item_by_name(name, &self.conn_id)
2124            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2125    }
2126
2127    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2128        self.state
2129            .get_type_by_name(name, &self.conn_id)
2130            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2131    }
2132
2133    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2134        &self.state.clusters_by_id[&id]
2135    }
2136
2137    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2138        self.state
2139            .clusters_by_id
2140            .values()
2141            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2142            .collect()
2143    }
2144
2145    fn get_cluster_replica(
2146        &self,
2147        cluster_id: ClusterId,
2148        replica_id: ReplicaId,
2149    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2150        let cluster = self.get_cluster(cluster_id);
2151        cluster.replica(replica_id)
2152    }
2153
2154    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2155        self.get_clusters()
2156            .into_iter()
2157            .flat_map(|cluster| cluster.replicas().into_iter())
2158            .collect()
2159    }
2160
2161    fn get_system_privileges(&self) -> &PrivilegeMap {
2162        &self.state.system_privileges
2163    }
2164
2165    fn get_default_privileges(
2166        &self,
2167    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2168        self.state
2169            .default_privileges
2170            .iter()
2171            .map(|(object, acl_items)| (object, acl_items.collect()))
2172            .collect()
2173    }
2174
2175    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2176        self.state.find_available_name(name, &self.conn_id)
2177    }
2178
2179    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2180        self.state.resolve_full_name(name, Some(&self.conn_id))
2181    }
2182
2183    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2184        self.state.resolve_full_schema_name(name)
2185    }
2186
2187    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2188        self.state.get_entry_by_global_id(global_id).id()
2189    }
2190
2191    fn resolve_global_id(
2192        &self,
2193        item_id: &CatalogItemId,
2194        version: RelationVersionSelector,
2195    ) -> GlobalId {
2196        self.state
2197            .get_entry(item_id)
2198            .at_version(version)
2199            .global_id()
2200    }
2201
2202    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2203        self.state.config()
2204    }
2205
2206    fn now(&self) -> EpochMillis {
2207        (self.state.config().now)()
2208    }
2209
2210    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2211        self.state.aws_privatelink_availability_zones.clone()
2212    }
2213
2214    fn system_vars(&self) -> &SystemVars {
2215        &self.state.system_configuration
2216    }
2217
2218    fn system_vars_mut(&mut self) -> &mut SystemVars {
2219        &mut self.state.to_mut().system_configuration
2220    }
2221
2222    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2223        self.state().get_owner_id(id, self.conn_id())
2224    }
2225
2226    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2227        match id {
2228            SystemObjectId::System => Some(&self.state.system_privileges),
2229            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2230                Some(self.get_cluster(*id).privileges())
2231            }
2232            SystemObjectId::Object(ObjectId::Database(id)) => {
2233                Some(self.get_database(id).privileges())
2234            }
2235            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2236                // For temporary schemas that haven't been created yet (lazy creation),
2237                // we return None - the RBAC check will need to handle this case.
2238                self.state
2239                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2240                    .map(|schema| schema.privileges())
2241            }
2242            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2243            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2244                Some(self.get_network_policy(id).privileges())
2245            }
2246            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2247            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2248        }
2249    }
2250
2251    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2252        let mut seen = BTreeSet::new();
2253        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2254    }
2255
2256    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2257        let mut seen = BTreeSet::new();
2258        self.state.item_dependents(id, &mut seen)
2259    }
2260
2261    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2262        rbac::all_object_privileges(object_type)
2263    }
2264
2265    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2266        self.state.get_object_type(object_id)
2267    }
2268
2269    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2270        self.state.get_system_object_type(id)
2271    }
2272
2273    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2274    /// the object.
2275    ///
2276    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2277    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2278    /// only for error msgs and other humanizations.
2279    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2280        if qualified_name.qualifiers.schema_spec.is_temporary() {
2281            // All bets are off. Just give up and return the qualified name as is.
2282            // TODO: Figure out what's going on with temporary objects.
2283
2284            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2285            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2286            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2287            // which we don't have a simple repro.
2288            return qualified_name.item.clone().into();
2289        }
2290
2291        let database_id = match &qualified_name.qualifiers.database_spec {
2292            ResolvedDatabaseSpecifier::Ambient => None,
2293            ResolvedDatabaseSpecifier::Id(id)
2294                if self.database.is_some() && self.database == Some(*id) =>
2295            {
2296                None
2297            }
2298            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2299        };
2300
2301        let schema_spec = if database_id.is_none()
2302            && self.resolve_item_name(&PartialItemName {
2303                database: None,
2304                schema: None,
2305                item: qualified_name.item.clone(),
2306            }) == Ok(qualified_name)
2307            || self.resolve_function_name(&PartialItemName {
2308                database: None,
2309                schema: None,
2310                item: qualified_name.item.clone(),
2311            }) == Ok(qualified_name)
2312            || self.resolve_type_name(&PartialItemName {
2313                database: None,
2314                schema: None,
2315                item: qualified_name.item.clone(),
2316            }) == Ok(qualified_name)
2317        {
2318            None
2319        } else {
2320            // If `search_path` does not contain `full_name.schema`, the
2321            // `PartialName` must contain it.
2322            Some(qualified_name.qualifiers.schema_spec.clone())
2323        };
2324
2325        let res = PartialItemName {
2326            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2327            schema: schema_spec.map(|spec| {
2328                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2329                    .name()
2330                    .schema
2331                    .clone()
2332            }),
2333            item: qualified_name.item.clone(),
2334        };
2335        assert!(
2336            self.resolve_item_name(&res) == Ok(qualified_name)
2337                || self.resolve_function_name(&res) == Ok(qualified_name)
2338                || self.resolve_type_name(&res) == Ok(qualified_name)
2339        );
2340        res
2341    }
2342
2343    fn add_notice(&self, notice: PlanNotice) {
2344        let _ = self.notices_tx.send(notice.into());
2345    }
2346
2347    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2348        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2349        self.state.comments.get_object_comments(comment_id)
2350    }
2351
2352    fn is_cluster_size_cc(&self, size: &str) -> bool {
2353        self.state
2354            .cluster_replica_sizes
2355            .0
2356            .get(size)
2357            .map_or(false, |a| a.is_cc)
2358    }
2359}
2360
2361#[cfg(test)]
2362mod tests {
2363    use std::collections::{BTreeMap, BTreeSet};
2364    use std::sync::Arc;
2365    use std::{env, iter};
2366
2367    use itertools::Itertools;
2368    use mz_catalog::memory::objects::CatalogItem;
2369    use tokio_postgres::NoTls;
2370    use tokio_postgres::types::Type;
2371    use uuid::Uuid;
2372
2373    use mz_catalog::SYSTEM_CONN_ID;
2374    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2375    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2376    use mz_controller_types::{ClusterId, ReplicaId};
2377    use mz_expr::MirScalarExpr;
2378    use mz_ore::now::to_datetime;
2379    use mz_ore::{assert_err, assert_ok, task};
2380    use mz_persist_client::PersistClient;
2381    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2382    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2383    use mz_repr::role_id::RoleId;
2384    use mz_repr::{
2385        CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2386        SqlScalarType, Timestamp,
2387    };
2388    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2389    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2390    use mz_sql::names::{
2391        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2392        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2393    };
2394    use mz_sql::plan::{
2395        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2396        Scope, StatementContext,
2397    };
2398    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2399    use mz_sql::session::vars::{SystemVars, VarInput};
2400
2401    use crate::catalog::state::LocalExpressionCache;
2402    use crate::catalog::{Catalog, Op};
2403    use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2404    use crate::session::Session;
2405
2406    /// System sessions have an empty `search_path` so it's necessary to
2407    /// schema-qualify all referenced items.
2408    ///
2409    /// Dummy (and ostensibly client) sessions contain system schemas in their
2410    /// search paths, so do not require schema qualification on system objects such
2411    /// as types.
2412    #[mz_ore::test(tokio::test)]
2413    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2414    async fn test_minimal_qualification() {
2415        Catalog::with_debug(|catalog| async move {
2416            struct TestCase {
2417                input: QualifiedItemName,
2418                system_output: PartialItemName,
2419                normal_output: PartialItemName,
2420            }
2421
2422            let test_cases = vec![
2423                TestCase {
2424                    input: QualifiedItemName {
2425                        qualifiers: ItemQualifiers {
2426                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2427                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2428                        },
2429                        item: "numeric".to_string(),
2430                    },
2431                    system_output: PartialItemName {
2432                        database: None,
2433                        schema: None,
2434                        item: "numeric".to_string(),
2435                    },
2436                    normal_output: PartialItemName {
2437                        database: None,
2438                        schema: None,
2439                        item: "numeric".to_string(),
2440                    },
2441                },
2442                TestCase {
2443                    input: QualifiedItemName {
2444                        qualifiers: ItemQualifiers {
2445                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2446                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2447                        },
2448                        item: "mz_array_types".to_string(),
2449                    },
2450                    system_output: PartialItemName {
2451                        database: None,
2452                        schema: None,
2453                        item: "mz_array_types".to_string(),
2454                    },
2455                    normal_output: PartialItemName {
2456                        database: None,
2457                        schema: None,
2458                        item: "mz_array_types".to_string(),
2459                    },
2460                },
2461            ];
2462
2463            for tc in test_cases {
2464                assert_eq!(
2465                    catalog
2466                        .for_system_session()
2467                        .minimal_qualification(&tc.input),
2468                    tc.system_output
2469                );
2470                assert_eq!(
2471                    catalog
2472                        .for_session(&Session::dummy())
2473                        .minimal_qualification(&tc.input),
2474                    tc.normal_output
2475                );
2476            }
2477            catalog.expire().await;
2478        })
2479        .await
2480    }
2481
2482    #[mz_ore::test(tokio::test)]
2483    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2484    async fn test_catalog_revision() {
2485        let persist_client = PersistClient::new_for_tests().await;
2486        let organization_id = Uuid::new_v4();
2487        let bootstrap_args = test_bootstrap_args();
2488        {
2489            let mut catalog = Catalog::open_debug_catalog(
2490                persist_client.clone(),
2491                organization_id.clone(),
2492                &bootstrap_args,
2493            )
2494            .await
2495            .expect("unable to open debug catalog");
2496            assert_eq!(catalog.transient_revision(), 1);
2497            let commit_ts = catalog.current_upper().await;
2498            catalog
2499                .transact(
2500                    None,
2501                    commit_ts,
2502                    None,
2503                    vec![Op::CreateDatabase {
2504                        name: "test".to_string(),
2505                        owner_id: MZ_SYSTEM_ROLE_ID,
2506                    }],
2507                )
2508                .await
2509                .expect("failed to transact");
2510            assert_eq!(catalog.transient_revision(), 2);
2511            catalog.expire().await;
2512        }
2513        {
2514            let catalog =
2515                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2516                    .await
2517                    .expect("unable to open debug catalog");
2518            // Re-opening the same catalog resets the transient_revision to 1.
2519            assert_eq!(catalog.transient_revision(), 1);
2520            catalog.expire().await;
2521        }
2522    }
2523
2524    #[mz_ore::test(tokio::test)]
2525    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2526    async fn test_effective_search_path() {
2527        Catalog::with_debug(|catalog| async move {
2528            let mz_catalog_schema = (
2529                ResolvedDatabaseSpecifier::Ambient,
2530                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2531            );
2532            let pg_catalog_schema = (
2533                ResolvedDatabaseSpecifier::Ambient,
2534                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2535            );
2536            let mz_temp_schema = (
2537                ResolvedDatabaseSpecifier::Ambient,
2538                SchemaSpecifier::Temporary,
2539            );
2540
2541            // Behavior with the default search_schema (public)
2542            let session = Session::dummy();
2543            let conn_catalog = catalog.for_session(&session);
2544            assert_ne!(
2545                conn_catalog.effective_search_path(false),
2546                conn_catalog.search_path
2547            );
2548            assert_ne!(
2549                conn_catalog.effective_search_path(true),
2550                conn_catalog.search_path
2551            );
2552            assert_eq!(
2553                conn_catalog.effective_search_path(false),
2554                vec![
2555                    mz_catalog_schema.clone(),
2556                    pg_catalog_schema.clone(),
2557                    conn_catalog.search_path[0].clone()
2558                ]
2559            );
2560            assert_eq!(
2561                conn_catalog.effective_search_path(true),
2562                vec![
2563                    mz_temp_schema.clone(),
2564                    mz_catalog_schema.clone(),
2565                    pg_catalog_schema.clone(),
2566                    conn_catalog.search_path[0].clone()
2567                ]
2568            );
2569
2570            // missing schemas are added when missing
2571            let mut session = Session::dummy();
2572            session
2573                .vars_mut()
2574                .set(
2575                    &SystemVars::new(),
2576                    "search_path",
2577                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2578                    false,
2579                )
2580                .expect("failed to set search_path");
2581            let conn_catalog = catalog.for_session(&session);
2582            assert_ne!(
2583                conn_catalog.effective_search_path(false),
2584                conn_catalog.search_path
2585            );
2586            assert_ne!(
2587                conn_catalog.effective_search_path(true),
2588                conn_catalog.search_path
2589            );
2590            assert_eq!(
2591                conn_catalog.effective_search_path(false),
2592                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2593            );
2594            assert_eq!(
2595                conn_catalog.effective_search_path(true),
2596                vec![
2597                    mz_temp_schema.clone(),
2598                    mz_catalog_schema.clone(),
2599                    pg_catalog_schema.clone()
2600                ]
2601            );
2602
2603            let mut session = Session::dummy();
2604            session
2605                .vars_mut()
2606                .set(
2607                    &SystemVars::new(),
2608                    "search_path",
2609                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2610                    false,
2611                )
2612                .expect("failed to set search_path");
2613            let conn_catalog = catalog.for_session(&session);
2614            assert_ne!(
2615                conn_catalog.effective_search_path(false),
2616                conn_catalog.search_path
2617            );
2618            assert_ne!(
2619                conn_catalog.effective_search_path(true),
2620                conn_catalog.search_path
2621            );
2622            assert_eq!(
2623                conn_catalog.effective_search_path(false),
2624                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2625            );
2626            assert_eq!(
2627                conn_catalog.effective_search_path(true),
2628                vec![
2629                    mz_temp_schema.clone(),
2630                    pg_catalog_schema.clone(),
2631                    mz_catalog_schema.clone()
2632                ]
2633            );
2634
2635            let mut session = Session::dummy();
2636            session
2637                .vars_mut()
2638                .set(
2639                    &SystemVars::new(),
2640                    "search_path",
2641                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2642                    false,
2643                )
2644                .expect("failed to set search_path");
2645            let conn_catalog = catalog.for_session(&session);
2646            assert_ne!(
2647                conn_catalog.effective_search_path(false),
2648                conn_catalog.search_path
2649            );
2650            assert_ne!(
2651                conn_catalog.effective_search_path(true),
2652                conn_catalog.search_path
2653            );
2654            assert_eq!(
2655                conn_catalog.effective_search_path(false),
2656                vec![
2657                    mz_catalog_schema.clone(),
2658                    pg_catalog_schema.clone(),
2659                    mz_temp_schema.clone()
2660                ]
2661            );
2662            assert_eq!(
2663                conn_catalog.effective_search_path(true),
2664                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2665            );
2666            catalog.expire().await;
2667        })
2668        .await
2669    }
2670
2671    #[mz_ore::test(tokio::test)]
2672    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2673    async fn test_normalized_create() {
2674        use mz_ore::collections::CollectionExt;
2675        Catalog::with_debug(|catalog| async move {
2676            let conn_catalog = catalog.for_system_session();
2677            let scx = &mut StatementContext::new(None, &conn_catalog);
2678
2679            let parsed = mz_sql_parser::parser::parse_statements(
2680                "create view public.foo as select 1 as bar",
2681            )
2682            .expect("")
2683            .into_element()
2684            .ast;
2685
2686            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2687
2688            // Ensure that all identifiers are quoted.
2689            assert_eq!(
2690                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2691                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2692            );
2693            catalog.expire().await;
2694        })
2695        .await;
2696    }
2697
2698    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2699    #[mz_ore::test(tokio::test)]
2700    #[cfg_attr(miri, ignore)] // slow
2701    async fn test_large_catalog_item() {
2702        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2703        let column = ", 1";
2704        let view_def_size = view_def.bytes().count();
2705        let column_size = column.bytes().count();
2706        let column_count =
2707            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2708        let columns = iter::repeat(column).take(column_count).join("");
2709        let create_sql = format!("{view_def}{columns})");
2710        let create_sql_check = create_sql.clone();
2711        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2712        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2713            &create_sql
2714        ));
2715
2716        let persist_client = PersistClient::new_for_tests().await;
2717        let organization_id = Uuid::new_v4();
2718        let id = CatalogItemId::User(1);
2719        let gid = GlobalId::User(1);
2720        let bootstrap_args = test_bootstrap_args();
2721        {
2722            let mut catalog = Catalog::open_debug_catalog(
2723                persist_client.clone(),
2724                organization_id.clone(),
2725                &bootstrap_args,
2726            )
2727            .await
2728            .expect("unable to open debug catalog");
2729            let item = catalog
2730                .state()
2731                .deserialize_item(
2732                    gid,
2733                    &create_sql,
2734                    &BTreeMap::new(),
2735                    &mut LocalExpressionCache::Closed,
2736                    None,
2737                )
2738                .expect("unable to parse view");
2739            let commit_ts = catalog.current_upper().await;
2740            catalog
2741                .transact(
2742                    None,
2743                    commit_ts,
2744                    None,
2745                    vec![Op::CreateItem {
2746                        item,
2747                        name: QualifiedItemName {
2748                            qualifiers: ItemQualifiers {
2749                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2750                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2751                            },
2752                            item: "v".to_string(),
2753                        },
2754                        id,
2755                        owner_id: MZ_SYSTEM_ROLE_ID,
2756                    }],
2757                )
2758                .await
2759                .expect("failed to transact");
2760            catalog.expire().await;
2761        }
2762        {
2763            let catalog =
2764                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2765                    .await
2766                    .expect("unable to open debug catalog");
2767            let view = catalog.get_entry(&id);
2768            assert_eq!("v", view.name.item);
2769            match &view.item {
2770                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2771                item => panic!("expected view, got {}", item.typ()),
2772            }
2773            catalog.expire().await;
2774        }
2775    }
2776
2777    #[mz_ore::test(tokio::test)]
2778    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2779    async fn test_object_type() {
2780        Catalog::with_debug(|catalog| async move {
2781            let conn_catalog = catalog.for_system_session();
2782
2783            assert_eq!(
2784                mz_sql::catalog::ObjectType::ClusterReplica,
2785                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2786                    ClusterId::user(1).expect("1 is a valid ID"),
2787                    ReplicaId::User(1)
2788                )))
2789            );
2790            assert_eq!(
2791                mz_sql::catalog::ObjectType::Role,
2792                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2793            );
2794            catalog.expire().await;
2795        })
2796        .await;
2797    }
2798
2799    #[mz_ore::test(tokio::test)]
2800    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2801    async fn test_get_privileges() {
2802        Catalog::with_debug(|catalog| async move {
2803            let conn_catalog = catalog.for_system_session();
2804
2805            assert_eq!(
2806                None,
2807                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2808                    ClusterId::user(1).expect("1 is a valid ID"),
2809                    ReplicaId::User(1),
2810                ))))
2811            );
2812            assert_eq!(
2813                None,
2814                conn_catalog
2815                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2816            );
2817            catalog.expire().await;
2818        })
2819        .await;
2820    }
2821
2822    #[mz_ore::test(tokio::test)]
2823    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2824    async fn verify_builtin_descs() {
2825        Catalog::with_debug(|catalog| async move {
2826            let conn_catalog = catalog.for_system_session();
2827
2828            let builtins_cfg = BuiltinsConfig {
2829                include_continual_tasks: true,
2830            };
2831            for builtin in BUILTINS::iter(&builtins_cfg) {
2832                let (schema, name, expected_desc) = match builtin {
2833                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2834                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2835                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2836                    Builtin::Log(_)
2837                    | Builtin::Type(_)
2838                    | Builtin::Func(_)
2839                    | Builtin::ContinualTask(_)
2840                    | Builtin::Index(_)
2841                    | Builtin::Connection(_) => continue,
2842                };
2843                let item = conn_catalog
2844                    .resolve_item(&PartialItemName {
2845                        database: None,
2846                        schema: Some(schema.to_string()),
2847                        item: name.to_string(),
2848                    })
2849                    .expect("unable to resolve item")
2850                    .at_version(RelationVersionSelector::Latest);
2851
2852                let actual_desc = item.relation_desc().expect("invalid item type");
2853                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2854                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2855                {
2856                    assert_eq!(
2857                        actual_name, expected_name,
2858                        "item {schema}.{name} column {index} name did not match its expected name"
2859                    );
2860                    assert_eq!(
2861                        actual_typ, expected_typ,
2862                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2863                    );
2864                }
2865                assert_eq!(
2866                    &*actual_desc, expected_desc,
2867                    "item {schema}.{name} did not match its expected RelationDesc"
2868                );
2869            }
2870            catalog.expire().await;
2871        })
2872        .await
2873    }
2874
2875    // Connect to a running Postgres server and verify that our builtin
2876    // types and functions match it, in addition to some other things.
2877    #[mz_ore::test(tokio::test)]
2878    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2879    async fn test_compare_builtins_postgres() {
2880        async fn inner(catalog: Catalog) {
2881            // Verify that all builtin functions:
2882            // - have a unique OID
2883            // - if they have a postgres counterpart (same oid) then they have matching name
2884            let (client, connection) = tokio_postgres::connect(
2885                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2886                NoTls,
2887            )
2888            .await
2889            .expect("failed to connect to Postgres");
2890
2891            task::spawn(|| "compare_builtin_postgres", async move {
2892                if let Err(e) = connection.await {
2893                    panic!("connection error: {}", e);
2894                }
2895            });
2896
2897            struct PgProc {
2898                name: String,
2899                arg_oids: Vec<u32>,
2900                ret_oid: Option<u32>,
2901                ret_set: bool,
2902            }
2903
2904            struct PgType {
2905                name: String,
2906                ty: String,
2907                elem: u32,
2908                array: u32,
2909                input: u32,
2910                receive: u32,
2911            }
2912
2913            struct PgOper {
2914                oprresult: u32,
2915                name: String,
2916            }
2917
2918            let pg_proc: BTreeMap<_, _> = client
2919                .query(
2920                    "SELECT
2921                    p.oid,
2922                    proname,
2923                    proargtypes,
2924                    prorettype,
2925                    proretset
2926                FROM pg_proc p
2927                JOIN pg_namespace n ON p.pronamespace = n.oid",
2928                    &[],
2929                )
2930                .await
2931                .expect("pg query failed")
2932                .into_iter()
2933                .map(|row| {
2934                    let oid: u32 = row.get("oid");
2935                    let pg_proc = PgProc {
2936                        name: row.get("proname"),
2937                        arg_oids: row.get("proargtypes"),
2938                        ret_oid: row.get("prorettype"),
2939                        ret_set: row.get("proretset"),
2940                    };
2941                    (oid, pg_proc)
2942                })
2943                .collect();
2944
2945            let pg_type: BTreeMap<_, _> = client
2946                .query(
2947                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2948                    &[],
2949                )
2950                .await
2951                .expect("pg query failed")
2952                .into_iter()
2953                .map(|row| {
2954                    let oid: u32 = row.get("oid");
2955                    let pg_type = PgType {
2956                        name: row.get("typname"),
2957                        ty: row.get("typtype"),
2958                        elem: row.get("typelem"),
2959                        array: row.get("typarray"),
2960                        input: row.get("typinput"),
2961                        receive: row.get("typreceive"),
2962                    };
2963                    (oid, pg_type)
2964                })
2965                .collect();
2966
2967            let pg_oper: BTreeMap<_, _> = client
2968                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2969                .await
2970                .expect("pg query failed")
2971                .into_iter()
2972                .map(|row| {
2973                    let oid: u32 = row.get("oid");
2974                    let pg_oper = PgOper {
2975                        name: row.get("oprname"),
2976                        oprresult: row.get("oprresult"),
2977                    };
2978                    (oid, pg_oper)
2979                })
2980                .collect();
2981
2982            let conn_catalog = catalog.for_system_session();
2983            let resolve_type_oid = |item: &str| {
2984                conn_catalog
2985                    .resolve_type(&PartialItemName {
2986                        database: None,
2987                        // All functions we check exist in PG, so the types must, as
2988                        // well
2989                        schema: Some(PG_CATALOG_SCHEMA.into()),
2990                        item: item.to_string(),
2991                    })
2992                    .expect("unable to resolve type")
2993                    .oid()
2994            };
2995
2996            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2997                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2998                .collect();
2999
3000            let mut all_oids = BTreeSet::new();
3001
3002            // A function to determine if two oids are equivalent enough for these tests. We don't
3003            // support some types, so map exceptions here.
3004            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
3005                [
3006                    // We don't support NAME.
3007                    (Type::NAME, Type::TEXT),
3008                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
3009                    // We don't support time with time zone.
3010                    (Type::TIME, Type::TIMETZ),
3011                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
3012                ]
3013                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
3014            );
3015            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
3016                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
3017            ]);
3018            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
3019                if ignore_return_types.contains(&fn_oid) {
3020                    return true;
3021                }
3022                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
3023                    return true;
3024                }
3025                a == b
3026            };
3027
3028            let builtins_cfg = BuiltinsConfig {
3029                include_continual_tasks: true,
3030            };
3031            for builtin in BUILTINS::iter(&builtins_cfg) {
3032                match builtin {
3033                    Builtin::Type(ty) => {
3034                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3035
3036                        if ty.oid >= FIRST_MATERIALIZE_OID {
3037                            // High OIDs are reserved in Materialize and don't have
3038                            // PostgreSQL counterparts.
3039                            continue;
3040                        }
3041
3042                        // For types that have a PostgreSQL counterpart, verify that
3043                        // the name and oid match.
3044                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3045                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3046                        });
3047                        assert_eq!(
3048                            ty.name, pg_ty.name,
3049                            "oid {} has name {} in postgres; expected {}",
3050                            ty.oid, pg_ty.name, ty.name,
3051                        );
3052
3053                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3054                            None => (0, 0),
3055                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3056                        };
3057                        assert_eq!(
3058                            typinput_oid, pg_ty.input,
3059                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3060                            ty.name, typinput_oid, pg_ty.input,
3061                        );
3062                        assert_eq!(
3063                            typreceive_oid, pg_ty.receive,
3064                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3065                            ty.name, typreceive_oid, pg_ty.receive,
3066                        );
3067                        if typinput_oid != 0 {
3068                            assert!(
3069                                func_oids.contains(&typinput_oid),
3070                                "type {} has typinput OID {} that does not exist in pg_proc",
3071                                ty.name,
3072                                typinput_oid,
3073                            );
3074                        }
3075                        if typreceive_oid != 0 {
3076                            assert!(
3077                                func_oids.contains(&typreceive_oid),
3078                                "type {} has typreceive OID {} that does not exist in pg_proc",
3079                                ty.name,
3080                                typreceive_oid,
3081                            );
3082                        }
3083
3084                        // Ensure the type matches.
3085                        match &ty.details.typ {
3086                            CatalogType::Array { element_reference } => {
3087                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3088                                    .filter_map(|builtin| match builtin {
3089                                        Builtin::Type(ty @ BuiltinType { name, .. })
3090                                            if element_reference == name =>
3091                                        {
3092                                            Some(ty)
3093                                        }
3094                                        _ => None,
3095                                    })
3096                                    .next();
3097                                let elem_ty = match elem_ty {
3098                                    Some(ty) => ty,
3099                                    None => {
3100                                        panic!("{} is unexpectedly not a type", element_reference)
3101                                    }
3102                                };
3103                                assert_eq!(
3104                                    pg_ty.elem, elem_ty.oid,
3105                                    "type {} has mismatched element OIDs",
3106                                    ty.name
3107                                )
3108                            }
3109                            CatalogType::Pseudo => {
3110                                assert_eq!(
3111                                    pg_ty.ty, "p",
3112                                    "type {} is not a pseudo type as expected",
3113                                    ty.name
3114                                )
3115                            }
3116                            CatalogType::Range { .. } => {
3117                                assert_eq!(
3118                                    pg_ty.ty, "r",
3119                                    "type {} is not a range type as expected",
3120                                    ty.name
3121                                );
3122                            }
3123                            _ => {
3124                                assert_eq!(
3125                                    pg_ty.ty, "b",
3126                                    "type {} is not a base type as expected",
3127                                    ty.name
3128                                )
3129                            }
3130                        }
3131
3132                        // Ensure the array type reference is correct.
3133                        let schema = catalog
3134                            .resolve_schema_in_database(
3135                                &ResolvedDatabaseSpecifier::Ambient,
3136                                ty.schema,
3137                                &SYSTEM_CONN_ID,
3138                            )
3139                            .expect("unable to resolve schema");
3140                        let allocated_type = catalog
3141                            .resolve_type(
3142                                None,
3143                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3144                                &PartialItemName {
3145                                    database: None,
3146                                    schema: Some(schema.name().schema.clone()),
3147                                    item: ty.name.to_string(),
3148                                },
3149                                &SYSTEM_CONN_ID,
3150                            )
3151                            .expect("unable to resolve type");
3152                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3153                            ty
3154                        } else {
3155                            panic!("unexpectedly not a type")
3156                        };
3157                        match ty.details.array_id {
3158                            Some(array_id) => {
3159                                let array_ty = catalog.get_entry(&array_id);
3160                                assert_eq!(
3161                                    pg_ty.array, array_ty.oid,
3162                                    "type {} has mismatched array OIDs",
3163                                    allocated_type.name.item,
3164                                );
3165                            }
3166                            None => assert_eq!(
3167                                pg_ty.array, 0,
3168                                "type {} does not have an array type in mz but does in pg",
3169                                allocated_type.name.item,
3170                            ),
3171                        }
3172                    }
3173                    Builtin::Func(func) => {
3174                        for imp in func.inner.func_impls() {
3175                            assert!(
3176                                all_oids.insert(imp.oid),
3177                                "{} reused oid {}",
3178                                func.name,
3179                                imp.oid
3180                            );
3181
3182                            assert!(
3183                                imp.oid < FIRST_USER_OID,
3184                                "built-in function {} erroneously has OID in user space ({})",
3185                                func.name,
3186                                imp.oid,
3187                            );
3188
3189                            // For functions that have a postgres counterpart, verify that the name and
3190                            // oid match.
3191                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3192                                continue;
3193                            } else {
3194                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3195                                    panic!(
3196                                        "pg_proc missing function {}: oid {}",
3197                                        func.name, imp.oid
3198                                    )
3199                                })
3200                            };
3201                            assert_eq!(
3202                                func.name, pg_fn.name,
3203                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3204                                imp.oid, func.name, pg_fn.name
3205                            );
3206
3207                            // Complain, but don't fail, if argument oids don't match.
3208                            // TODO: make these match.
3209                            let imp_arg_oids = imp
3210                                .arg_typs
3211                                .iter()
3212                                .map(|item| resolve_type_oid(item))
3213                                .collect::<Vec<_>>();
3214
3215                            if imp_arg_oids != pg_fn.arg_oids {
3216                                println!(
3217                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3218                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3219                                );
3220                            }
3221
3222                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3223
3224                            assert!(
3225                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3226                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3227                                imp.oid,
3228                                func.name,
3229                                imp_return_oid,
3230                                pg_fn.ret_oid
3231                            );
3232
3233                            assert_eq!(
3234                                imp.return_is_set, pg_fn.ret_set,
3235                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3236                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3237                            );
3238                        }
3239                    }
3240                    _ => (),
3241                }
3242            }
3243
3244            for (op, func) in OP_IMPLS.iter() {
3245                for imp in func.func_impls() {
3246                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3247
3248                    // For operators that have a postgres counterpart, verify that the name and oid match.
3249                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3250                        continue;
3251                    } else {
3252                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3253                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3254                        })
3255                    };
3256
3257                    assert_eq!(*op, pg_op.name);
3258
3259                    let imp_return_oid =
3260                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3261                    if imp_return_oid != pg_op.oprresult {
3262                        panic!(
3263                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3264                            imp.oid, op, imp_return_oid, pg_op.oprresult
3265                        );
3266                    }
3267                }
3268            }
3269            catalog.expire().await;
3270        }
3271
3272        Catalog::with_debug(inner).await
3273    }
3274
3275    // Execute all builtin functions with all combinations of arguments from interesting datums.
3276    #[mz_ore::test(tokio::test)]
3277    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3278    async fn test_smoketest_all_builtins() {
3279        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3280            let catalog = Arc::new(catalog);
3281            let conn_catalog = catalog.for_system_session();
3282
3283            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3284            let mut handles = Vec::new();
3285
3286            // Extracted during planning; always panics when executed.
3287            let ignore_names = BTreeSet::from([
3288                "avg",
3289                "avg_internal_v1",
3290                "bool_and",
3291                "bool_or",
3292                "has_table_privilege", // > 3 s each
3293                "has_type_privilege",  // > 3 s each
3294                "mod",
3295                "mz_panic",
3296                "mz_sleep",
3297                "pow",
3298                "stddev_pop",
3299                "stddev_samp",
3300                "stddev",
3301                "var_pop",
3302                "var_samp",
3303                "variance",
3304            ]);
3305
3306            let fns = BUILTINS::funcs()
3307                .map(|func| (&func.name, func.inner))
3308                .chain(OP_IMPLS.iter());
3309
3310            for (name, func) in fns {
3311                if ignore_names.contains(name) {
3312                    continue;
3313                }
3314                let Func::Scalar(impls) = func else {
3315                    continue;
3316                };
3317
3318                'outer: for imp in impls {
3319                    let details = imp.details();
3320                    let mut styps = Vec::new();
3321                    for item in details.arg_typs.iter() {
3322                        let oid = resolve_type_oid(item);
3323                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3324                            continue 'outer;
3325                        };
3326                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3327                    }
3328                    let datums = styps
3329                        .iter()
3330                        .map(|styp| {
3331                            let mut datums = vec![Datum::Null];
3332                            datums.extend(styp.interesting_datums());
3333                            datums
3334                        })
3335                        .collect::<Vec<_>>();
3336                    // Skip nullary fns.
3337                    if datums.is_empty() {
3338                        continue;
3339                    }
3340
3341                    let return_oid = details
3342                        .return_typ
3343                        .map(resolve_type_oid)
3344                        .expect("must exist");
3345                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3346                        .ok()
3347                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3348
3349                    let mut idxs = vec![0; datums.len()];
3350                    while idxs[0] < datums[0].len() {
3351                        let mut args = Vec::with_capacity(idxs.len());
3352                        for i in 0..(datums.len()) {
3353                            args.push(datums[i][idxs[i]]);
3354                        }
3355
3356                        let op = &imp.op;
3357                        let scalars = args
3358                            .iter()
3359                            .enumerate()
3360                            .map(|(i, datum)| {
3361                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3362                                    datum.clone(),
3363                                    styps[i].clone(),
3364                                ))
3365                            })
3366                            .collect();
3367
3368                        let call_name = format!(
3369                            "{name}({}) (oid: {})",
3370                            args.iter()
3371                                .map(|d| d.to_string())
3372                                .collect::<Vec<_>>()
3373                                .join(", "),
3374                            imp.oid
3375                        );
3376                        let catalog = Arc::clone(&catalog);
3377                        let call_name_fn = call_name.clone();
3378                        let return_styp = return_styp.clone();
3379                        let handle = task::spawn_blocking(
3380                            || call_name,
3381                            move || {
3382                                smoketest_fn(
3383                                    name,
3384                                    call_name_fn,
3385                                    op,
3386                                    imp,
3387                                    args,
3388                                    catalog,
3389                                    scalars,
3390                                    return_styp,
3391                                )
3392                            },
3393                        );
3394                        handles.push(handle);
3395
3396                        // Advance to the next datum combination.
3397                        for i in (0..datums.len()).rev() {
3398                            idxs[i] += 1;
3399                            if idxs[i] >= datums[i].len() {
3400                                if i == 0 {
3401                                    break;
3402                                }
3403                                idxs[i] = 0;
3404                                continue;
3405                            } else {
3406                                break;
3407                            }
3408                        }
3409                    }
3410                }
3411            }
3412            handles
3413        }
3414
3415        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3416        for handle in handles {
3417            handle.await;
3418        }
3419    }
3420
3421    fn smoketest_fn(
3422        name: &&str,
3423        call_name: String,
3424        op: &Operation<HirScalarExpr>,
3425        imp: &FuncImpl<HirScalarExpr>,
3426        args: Vec<Datum<'_>>,
3427        catalog: Arc<Catalog>,
3428        scalars: Vec<CoercibleScalarExpr>,
3429        return_styp: Option<SqlScalarType>,
3430    ) {
3431        let conn_catalog = catalog.for_system_session();
3432        let pcx = PlanContext::zero();
3433        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3434        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3435        let ecx = ExprContext {
3436            qcx: &qcx,
3437            name: "smoketest",
3438            scope: &Scope::empty(),
3439            relation_type: &SqlRelationType::empty(),
3440            allow_aggregates: false,
3441            allow_subqueries: false,
3442            allow_parameters: false,
3443            allow_windows: false,
3444        };
3445        let arena = RowArena::new();
3446        let mut session = Session::<Timestamp>::dummy();
3447        session
3448            .start_transaction(to_datetime(0), None, None)
3449            .expect("must succeed");
3450        let prep_style = ExprPrepStyle::OneShot {
3451            logical_time: EvalTime::Time(Timestamp::MIN),
3452            session: &session,
3453            catalog_state: &catalog.state,
3454        };
3455
3456        // Execute the function as much as possible, ensuring no panics occur, but
3457        // otherwise ignoring eval errors. We also do various other checks.
3458        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3459        if let Ok(hir) = res {
3460            if let Ok(mut mir) = hir.lower_uncorrelated() {
3461                // Populate unmaterialized functions.
3462                prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3463
3464                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3465                    if let Some(return_styp) = return_styp {
3466                        let mir_typ = mir.typ(&[]);
3467                        // MIR type inference should be consistent with the type
3468                        // we get from the catalog.
3469                        assert_eq!(mir_typ.scalar_type, return_styp);
3470                        // The following will check not just that the scalar type
3471                        // is ok, but also catches if the function returned a null
3472                        // but the MIR type inference said "non-nullable".
3473                        if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3474                            panic!(
3475                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3476                            );
3477                        }
3478                        // Check the consistency of `introduces_nulls` and
3479                        // `propagates_nulls` with `MirScalarExpr::typ`.
3480                        if let Some((introduces_nulls, propagates_nulls)) =
3481                            call_introduces_propagates_nulls(&mir)
3482                        {
3483                            if introduces_nulls {
3484                                // If the function introduces_nulls, then the return
3485                                // type should always be nullable, regardless of
3486                                // the nullability of the input types.
3487                                assert!(
3488                                    mir_typ.nullable,
3489                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3490                                    name, args, mir, mir_typ.nullable
3491                                );
3492                            } else {
3493                                let any_input_null = args.iter().any(|arg| arg.is_null());
3494                                if !any_input_null {
3495                                    assert!(
3496                                        !mir_typ.nullable,
3497                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3498                                        name, args, mir, mir_typ.nullable
3499                                    );
3500                                } else {
3501                                    assert_eq!(
3502                                        mir_typ.nullable, propagates_nulls,
3503                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3504                                        name, args, mir, mir_typ.nullable
3505                                    );
3506                                }
3507                            }
3508                        }
3509                        // Check that `MirScalarExpr::reduce` yields the same result
3510                        // as the real evaluation.
3511                        let mut reduced = mir.clone();
3512                        reduced.reduce(&[]);
3513                        match reduced {
3514                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3515                                match reduce_result {
3516                                    Ok(reduce_result_row) => {
3517                                        let reduce_result_datum = reduce_result_row.unpack_first();
3518                                        assert_eq!(
3519                                            reduce_result_datum,
3520                                            eval_result_datum,
3521                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3522                                            name,
3523                                            args,
3524                                            mir,
3525                                            eval_result_datum,
3526                                            mir_typ.scalar_type,
3527                                            reduce_result_datum,
3528                                            ctyp.scalar_type
3529                                        );
3530                                        // Let's check that the types also match.
3531                                        // (We are not checking nullability here,
3532                                        // because it's ok when we know a more
3533                                        // precise nullability after actually
3534                                        // evaluating a function than before.)
3535                                        assert_eq!(
3536                                            ctyp.scalar_type,
3537                                            mir_typ.scalar_type,
3538                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3539                                            name,
3540                                            args,
3541                                            mir,
3542                                            eval_result_datum,
3543                                            mir_typ.scalar_type,
3544                                            reduce_result_datum,
3545                                            ctyp.scalar_type
3546                                        );
3547                                    }
3548                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3549                                }
3550                            }
3551                            _ => unreachable!(
3552                                "all args are literals, so should have reduced to a literal"
3553                            ),
3554                        }
3555                    }
3556                }
3557            }
3558        }
3559    }
3560
3561    /// If the given MirScalarExpr
3562    ///  - is a function call, and
3563    ///  - all arguments are literals
3564    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3565    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3566        match mir_func_call {
3567            MirScalarExpr::CallUnary { func, expr } => {
3568                if expr.is_literal() {
3569                    Some((func.introduces_nulls(), func.propagates_nulls()))
3570                } else {
3571                    None
3572                }
3573            }
3574            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3575                if expr1.is_literal() && expr2.is_literal() {
3576                    Some((func.introduces_nulls(), func.propagates_nulls()))
3577                } else {
3578                    None
3579                }
3580            }
3581            MirScalarExpr::CallVariadic { func, exprs } => {
3582                if exprs.iter().all(|arg| arg.is_literal()) {
3583                    Some((func.introduces_nulls(), func.propagates_nulls()))
3584                } else {
3585                    None
3586                }
3587            }
3588            _ => None,
3589        }
3590    }
3591
3592    // Make sure pg views don't use types that only exist in Materialize.
3593    #[mz_ore::test(tokio::test)]
3594    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3595    async fn test_pg_views_forbidden_types() {
3596        Catalog::with_debug(|catalog| async move {
3597            let conn_catalog = catalog.for_system_session();
3598
3599            for view in BUILTINS::views().filter(|view| {
3600                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3601            }) {
3602                let item = conn_catalog
3603                    .resolve_item(&PartialItemName {
3604                        database: None,
3605                        schema: Some(view.schema.to_string()),
3606                        item: view.name.to_string(),
3607                    })
3608                    .expect("unable to resolve view")
3609                    // TODO(alter_table)
3610                    .at_version(RelationVersionSelector::Latest);
3611                let full_name = conn_catalog.resolve_full_name(item.name());
3612                let desc = item.relation_desc().expect("invalid item type");
3613                for col_type in desc.iter_types() {
3614                    match &col_type.scalar_type {
3615                        typ @ SqlScalarType::UInt16
3616                        | typ @ SqlScalarType::UInt32
3617                        | typ @ SqlScalarType::UInt64
3618                        | typ @ SqlScalarType::MzTimestamp
3619                        | typ @ SqlScalarType::List { .. }
3620                        | typ @ SqlScalarType::Map { .. }
3621                        | typ @ SqlScalarType::MzAclItem => {
3622                            panic!("{typ:?} type found in {full_name}");
3623                        }
3624                        SqlScalarType::AclItem
3625                        | SqlScalarType::Bool
3626                        | SqlScalarType::Int16
3627                        | SqlScalarType::Int32
3628                        | SqlScalarType::Int64
3629                        | SqlScalarType::Float32
3630                        | SqlScalarType::Float64
3631                        | SqlScalarType::Numeric { .. }
3632                        | SqlScalarType::Date
3633                        | SqlScalarType::Time
3634                        | SqlScalarType::Timestamp { .. }
3635                        | SqlScalarType::TimestampTz { .. }
3636                        | SqlScalarType::Interval
3637                        | SqlScalarType::PgLegacyChar
3638                        | SqlScalarType::Bytes
3639                        | SqlScalarType::String
3640                        | SqlScalarType::Char { .. }
3641                        | SqlScalarType::VarChar { .. }
3642                        | SqlScalarType::Jsonb
3643                        | SqlScalarType::Uuid
3644                        | SqlScalarType::Array(_)
3645                        | SqlScalarType::Record { .. }
3646                        | SqlScalarType::Oid
3647                        | SqlScalarType::RegProc
3648                        | SqlScalarType::RegType
3649                        | SqlScalarType::RegClass
3650                        | SqlScalarType::Int2Vector
3651                        | SqlScalarType::Range { .. }
3652                        | SqlScalarType::PgLegacyName => {}
3653                    }
3654                }
3655            }
3656            catalog.expire().await;
3657        })
3658        .await
3659    }
3660
3661    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3662    // introspection relations.
3663    #[mz_ore::test(tokio::test)]
3664    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3665    async fn test_mz_introspection_builtins() {
3666        Catalog::with_debug(|catalog| async move {
3667            let conn_catalog = catalog.for_system_session();
3668
3669            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3670            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3671
3672            for entry in catalog.entries() {
3673                let schema_spec = entry.name().qualifiers.schema_spec;
3674                let introspection_deps = catalog.introspection_dependencies(entry.id);
3675                if introspection_deps.is_empty() {
3676                    assert!(
3677                        schema_spec != introspection_schema_spec,
3678                        "entry does not depend on introspection sources but is in \
3679                         `mz_introspection`: {}",
3680                        conn_catalog.resolve_full_name(entry.name()),
3681                    );
3682                } else {
3683                    assert!(
3684                        schema_spec == introspection_schema_spec,
3685                        "entry depends on introspection sources but is not in \
3686                         `mz_introspection`: {}",
3687                        conn_catalog.resolve_full_name(entry.name()),
3688                    );
3689                }
3690            }
3691        })
3692        .await
3693    }
3694
3695    #[mz_ore::test(tokio::test)]
3696    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3697    async fn test_multi_subscriber_catalog() {
3698        let persist_client = PersistClient::new_for_tests().await;
3699        let bootstrap_args = test_bootstrap_args();
3700        let organization_id = Uuid::new_v4();
3701        let db_name = "DB";
3702
3703        let mut writer_catalog = Catalog::open_debug_catalog(
3704            persist_client.clone(),
3705            organization_id.clone(),
3706            &bootstrap_args,
3707        )
3708        .await
3709        .expect("open_debug_catalog");
3710        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3711            persist_client.clone(),
3712            organization_id.clone(),
3713            &bootstrap_args,
3714        )
3715        .await
3716        .expect("open_debug_read_only_catalog");
3717        assert_err!(writer_catalog.resolve_database(db_name));
3718        assert_err!(read_only_catalog.resolve_database(db_name));
3719
3720        let commit_ts = writer_catalog.current_upper().await;
3721        writer_catalog
3722            .transact(
3723                None,
3724                commit_ts,
3725                None,
3726                vec![Op::CreateDatabase {
3727                    name: db_name.to_string(),
3728                    owner_id: MZ_SYSTEM_ROLE_ID,
3729                }],
3730            )
3731            .await
3732            .expect("failed to transact");
3733
3734        let write_db = writer_catalog
3735            .resolve_database(db_name)
3736            .expect("resolve_database");
3737        read_only_catalog
3738            .sync_to_current_updates()
3739            .await
3740            .expect("sync_to_current_updates");
3741        let read_db = read_only_catalog
3742            .resolve_database(db_name)
3743            .expect("resolve_database");
3744
3745        assert_eq!(write_db, read_db);
3746
3747        let writer_catalog_fencer =
3748            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3749                .await
3750                .expect("open_debug_catalog for fencer");
3751        let fencer_db = writer_catalog_fencer
3752            .resolve_database(db_name)
3753            .expect("resolve_database for fencer");
3754        assert_eq!(fencer_db, read_db);
3755
3756        let write_fence_err = writer_catalog
3757            .sync_to_current_updates()
3758            .await
3759            .expect_err("sync_to_current_updates for fencer");
3760        assert!(matches!(
3761            write_fence_err,
3762            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3763        ));
3764        let read_fence_err = read_only_catalog
3765            .sync_to_current_updates()
3766            .await
3767            .expect_err("sync_to_current_updates after fencer");
3768        assert!(matches!(
3769            read_fence_err,
3770            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3771        ));
3772
3773        writer_catalog.expire().await;
3774        read_only_catalog.expire().await;
3775        writer_catalog_fencer.expire().await;
3776    }
3777}