mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_ore::collections::HashSet;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, ScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
66    CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
67    DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_storage_types::read_policy::ReadPolicy;
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91// DO NOT add any more imports from `crate` outside of `crate::catalog`.
92pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod transact;
112
113/// A `Catalog` keeps track of the SQL objects known to the planner.
114///
115/// For each object, it keeps track of both forward and reverse dependencies:
116/// i.e., which objects are depended upon by the object, and which objects
117/// depend upon the object. It enforces the SQL rules around dropping: an object
118/// cannot be dropped until all of the objects that depend upon it are dropped.
119/// It also enforces uniqueness of names.
120///
121/// SQL mandates a hierarchy of exactly three layers. A catalog contains
122/// databases, databases contain schemas, and schemas contain catalog items,
123/// like sources, sinks, view, and indexes.
124///
125/// To the outside world, databases, schemas, and items are all identified by
126/// name. Items can be referred to by their [`FullItemName`], which fully and
127/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
128/// database name and/or the schema name. Partial names can be converted into
129/// full names via a complicated resolution process documented by the
130/// [`CatalogState::resolve`] method.
131///
132/// The catalog also maintains special "ambient schemas": virtual schemas,
133/// implicitly present in all databases, that house various system views.
134/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
135#[derive(Debug)]
136pub struct Catalog {
137    state: CatalogState,
138    plans: CatalogPlans,
139    expr_cache_handle: Option<ExpressionCacheHandle>,
140    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141    transient_revision: u64,
142}
143
144// Implement our own Clone because derive can't unless S is Clone, which it's
145// not (hence the Arc).
146impl Clone for Catalog {
147    fn clone(&self) -> Self {
148        Self {
149            state: self.state.clone(),
150            plans: self.plans.clone(),
151            expr_cache_handle: self.expr_cache_handle.clone(),
152            storage: Arc::clone(&self.storage),
153            transient_revision: self.transient_revision,
154        }
155    }
156}
157
158#[derive(Default, Debug, Clone)]
159pub struct CatalogPlans {
160    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
161    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
162    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
163    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
164}
165
166impl Catalog {
167    /// Set the optimized plan for the item identified by `id`.
168    #[mz_ore::instrument(level = "trace")]
169    pub fn set_optimized_plan(
170        &mut self,
171        id: GlobalId,
172        plan: DataflowDescription<OptimizedMirRelationExpr>,
173    ) {
174        self.plans.optimized_plan_by_id.insert(id, plan.into());
175    }
176
177    /// Set the physical plan for the item identified by `id`.
178    #[mz_ore::instrument(level = "trace")]
179    pub fn set_physical_plan(
180        &mut self,
181        id: GlobalId,
182        plan: DataflowDescription<mz_compute_types::plan::Plan>,
183    ) {
184        self.plans.physical_plan_by_id.insert(id, plan.into());
185    }
186
187    /// Try to get the optimized plan for the item identified by `id`.
188    #[mz_ore::instrument(level = "trace")]
189    pub fn try_get_optimized_plan(
190        &self,
191        id: &GlobalId,
192    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
193        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
194    }
195
196    /// Try to get the physical plan for the item identified by `id`.
197    #[mz_ore::instrument(level = "trace")]
198    pub fn try_get_physical_plan(
199        &self,
200        id: &GlobalId,
201    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
203    }
204
205    /// Set the `DataflowMetainfo` for the item identified by `id`.
206    #[mz_ore::instrument(level = "trace")]
207    pub fn set_dataflow_metainfo(
208        &mut self,
209        id: GlobalId,
210        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
211    ) {
212        // Add entries to the `notices_by_dep_id` lookup map.
213        for notice in metainfo.optimizer_notices.iter() {
214            for dep_id in notice.dependencies.iter() {
215                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
216                entry.push(Arc::clone(notice))
217            }
218            if let Some(item_id) = notice.item_id {
219                soft_assert_eq_or_log!(
220                    item_id,
221                    id,
222                    "notice.item_id should match the id for whom we are saving the notice"
223                );
224            }
225        }
226        // Add the dataflow with the scoped entries.
227        self.plans.dataflow_metainfos.insert(id, metainfo);
228    }
229
230    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
231    #[mz_ore::instrument(level = "trace")]
232    pub fn try_get_dataflow_metainfo(
233        &self,
234        id: &GlobalId,
235    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
236        self.plans.dataflow_metainfos.get(id)
237    }
238
239    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
240    /// item identified by `id`.
241    ///
242    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
243    ///
244    /// Return a set containing all dropped notices. Note that if for some
245    /// reason we end up with two identical notices being dropped by the same
246    /// call, the result will contain only one instance of that notice.
247    #[mz_ore::instrument(level = "trace")]
248    pub fn drop_plans_and_metainfos(
249        &mut self,
250        drop_ids: &BTreeSet<GlobalId>,
251    ) -> BTreeSet<Arc<OptimizerNotice>> {
252        // Collect dropped notices in this set.
253        let mut dropped_notices = BTreeSet::new();
254
255        // Remove plans and metainfo.optimizer_notices entries.
256        for id in drop_ids {
257            self.plans.optimized_plan_by_id.remove(id);
258            self.plans.physical_plan_by_id.remove(id);
259            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
260                soft_assert_or_log!(
261                    metainfo.optimizer_notices.iter().all_unique(),
262                    "should have been pushed there by `push_optimizer_notice_dedup`"
263                );
264                for n in metainfo.optimizer_notices.drain(..) {
265                    // Remove the corresponding notices_by_dep_id entries.
266                    for dep_id in n.dependencies.iter() {
267                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
268                            soft_assert_or_log!(
269                                notices.iter().any(|x| &n == x),
270                                "corrupt notices_by_dep_id"
271                            );
272                            notices.retain(|x| &n != x)
273                        }
274                    }
275                    dropped_notices.insert(n);
276                }
277            }
278        }
279
280        // Remove notices_by_dep_id entries.
281        for id in drop_ids {
282            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
283                for n in notices.drain(..) {
284                    // Remove the corresponding metainfo.optimizer_notices entries.
285                    if let Some(item_id) = n.item_id.as_ref() {
286                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
287                            metainfo.optimizer_notices.iter().for_each(|n2| {
288                                if let Some(item_id_2) = n2.item_id {
289                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
290                                }
291                            });
292                            metainfo.optimizer_notices.retain(|x| &n != x);
293                        }
294                    }
295                    dropped_notices.insert(n);
296                }
297            }
298        }
299
300        // Collect dependency ids not in drop_ids with at least one dropped
301        // notice.
302        let mut todo_dep_ids = BTreeSet::new();
303        for notice in dropped_notices.iter() {
304            for dep_id in notice.dependencies.iter() {
305                if !drop_ids.contains(dep_id) {
306                    todo_dep_ids.insert(*dep_id);
307                }
308            }
309        }
310        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
311        // entries in `todo_dep_ids`.
312        for id in todo_dep_ids {
313            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
314                notices.retain(|n| !dropped_notices.contains(n))
315            }
316        }
317
318        if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
319            use mz_ore::str::{bracketed, separated};
320            let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
321            let bad_notices = bad_notices.map(|n| {
322                // Try to find where the bad reference is.
323                // Maybe in `dataflow_metainfos`?
324                let mut dataflow_metainfo_occurrences = Vec::new();
325                for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
326                    if meta_info.optimizer_notices.contains(n) {
327                        dataflow_metainfo_occurrences.push(id);
328                    }
329                }
330                // Or `notices_by_dep_id`?
331                let mut notices_by_dep_id_occurrences = Vec::new();
332                for (id, notices) in self.plans.notices_by_dep_id.iter() {
333                    if notices.iter().contains(n) {
334                        notices_by_dep_id_occurrences.push(id);
335                    }
336                }
337                format!(
338                    "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
339                    dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
340                    n.id,
341                    n.kind,
342                    n.dependencies,
343                    Arc::strong_count(n),
344                    dataflow_metainfo_occurrences,
345                    notices_by_dep_id_occurrences
346                )
347            });
348            let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
349            error!(
350                "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
351                 bad_notices = {bad_notices}; \
352                 drop_ids = {drop_ids:?}"
353            );
354        }
355
356        dropped_notices
357    }
358
359    /// For the Sources ids in `ids`, return the read policies for all `ids` and additional ids that
360    /// propagate from them. Specifically, if `ids` contains a source, it and all of its source exports
361    /// will be added to the result.
362    pub fn source_read_policies(
363        &self,
364        id: CatalogItemId,
365    ) -> Vec<(CatalogItemId, ReadPolicy<mz_repr::Timestamp>)> {
366        let mut policies = Vec::new();
367        let cws = self.state.source_compaction_windows([id]);
368        for (cw, items) in cws {
369            for id in items {
370                policies.push((id, cw.into()));
371            }
372        }
373        policies
374    }
375
376    /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
377    /// as a result of creating new indexes on the items in `ons`.
378    ///
379    /// When creating and inserting a new index, we need to invalidate some entries that may
380    /// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
381    /// the following objects:
382    ///
383    ///   - `o`.
384    ///   - All compute objects that depend directly on `o`.
385    ///   - All compute objects that would directly depend on `o`, if all views were inlined.
386    pub(crate) fn invalidate_for_index(
387        &self,
388        ons: impl Iterator<Item = GlobalId>,
389    ) -> BTreeSet<GlobalId> {
390        let mut dependencies = BTreeSet::new();
391        let mut queue = VecDeque::new();
392        let mut seen = HashSet::new();
393        for on in ons {
394            let entry = self.get_entry_by_global_id(&on);
395            dependencies.insert(on);
396            seen.insert(entry.id);
397            let uses = entry.uses();
398            queue.extend(uses.clone());
399        }
400
401        while let Some(cur) = queue.pop_front() {
402            let entry = self.get_entry(&cur);
403            if seen.insert(cur) {
404                let global_ids = entry.global_ids();
405                match entry.item_type() {
406                    CatalogItemType::Table
407                    | CatalogItemType::Source
408                    | CatalogItemType::MaterializedView
409                    | CatalogItemType::Sink
410                    | CatalogItemType::Index
411                    | CatalogItemType::Type
412                    | CatalogItemType::Func
413                    | CatalogItemType::Secret
414                    | CatalogItemType::Connection
415                    | CatalogItemType::ContinualTask => {
416                        dependencies.extend(global_ids);
417                    }
418                    CatalogItemType::View => {
419                        dependencies.extend(global_ids);
420                        queue.extend(entry.uses());
421                    }
422                }
423            }
424        }
425        dependencies
426    }
427}
428
429#[derive(Debug)]
430pub struct ConnCatalog<'a> {
431    state: Cow<'a, CatalogState>,
432    /// Because we don't have any way of removing items from the catalog
433    /// temporarily, we allow the ConnCatalog to pretend that a set of items
434    /// don't exist during resolution.
435    ///
436    /// This feature is necessary to allow re-planning of statements, which is
437    /// either incredibly useful or required when altering item definitions.
438    ///
439    /// Note that uses of this field should be used by short-lived
440    /// catalogs.
441    unresolvable_ids: BTreeSet<CatalogItemId>,
442    conn_id: ConnectionId,
443    cluster: String,
444    database: Option<DatabaseId>,
445    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
446    role_id: RoleId,
447    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
448    notices_tx: UnboundedSender<AdapterNotice>,
449}
450
451impl ConnCatalog<'_> {
452    pub fn conn_id(&self) -> &ConnectionId {
453        &self.conn_id
454    }
455
456    pub fn state(&self) -> &CatalogState {
457        &*self.state
458    }
459
460    /// Prevent planning from resolving item with the provided ID. Instead,
461    /// return an error as if the item did not exist.
462    ///
463    /// This feature is meant exclusively to permit re-planning statements
464    /// during update operations and should not be used otherwise given its
465    /// extremely "powerful" semantics.
466    ///
467    /// # Panics
468    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
469    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
470        assert_eq!(
471            self.role_id, MZ_SYSTEM_ROLE_ID,
472            "only the system role can mark IDs unresolvable",
473        );
474        self.unresolvable_ids.insert(id);
475    }
476
477    /// Returns the schemas:
478    /// - mz_catalog
479    /// - pg_catalog
480    /// - temp (if requested)
481    /// - all schemas from the session's search_path var that exist
482    pub fn effective_search_path(
483        &self,
484        include_temp_schema: bool,
485    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
486        self.state
487            .effective_search_path(&self.search_path, include_temp_schema)
488    }
489}
490
491impl ConnectionResolver for ConnCatalog<'_> {
492    fn resolve_connection(
493        &self,
494        id: CatalogItemId,
495    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
496        self.state().resolve_connection(id)
497    }
498}
499
500impl Catalog {
501    /// Returns the catalog's transient revision, which starts at 1 and is
502    /// incremented on every change. This is not persisted to disk, and will
503    /// restart on every load.
504    pub fn transient_revision(&self) -> u64 {
505        self.transient_revision
506    }
507
508    /// Creates a debug catalog from the current
509    /// `COCKROACH_URL` with parameters set appropriately for debug contexts,
510    /// like in tests.
511    ///
512    /// WARNING! This function can arbitrarily fail because it does not make any
513    /// effort to adjust the catalog's contents' structure or semantics to the
514    /// currently running version, i.e. it does not apply any migrations.
515    ///
516    /// This function must not be called in production contexts. Use
517    /// [`Catalog::open`] with appropriately set configuration parameters
518    /// instead.
519    pub async fn with_debug<F, Fut, T>(f: F) -> T
520    where
521        F: FnOnce(Catalog) -> Fut,
522        Fut: Future<Output = T>,
523    {
524        let persist_client = PersistClient::new_for_tests().await;
525        let organization_id = Uuid::new_v4();
526        let bootstrap_args = test_bootstrap_args();
527        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
528            .await
529            .expect("can open debug catalog");
530        f(catalog).await
531    }
532
533    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
534    /// in progress.
535    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
536    where
537        F: FnOnce(Catalog) -> Fut,
538        Fut: Future<Output = T>,
539    {
540        let persist_client = PersistClient::new_for_tests().await;
541        let organization_id = Uuid::new_v4();
542        let bootstrap_args = test_bootstrap_args();
543        let mut catalog =
544            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
545                .await
546                .expect("can open debug catalog");
547
548        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
549        let now = SYSTEM_TIME.clone();
550        let openable_storage = TestCatalogStateBuilder::new(persist_client)
551            .with_organization_id(organization_id)
552            .with_default_deploy_generation()
553            .build()
554            .await
555            .expect("can create durable catalog");
556        let mut storage = openable_storage
557            .open(now().into(), &bootstrap_args)
558            .await
559            .expect("can open durable catalog")
560            .0;
561        // Drain updates.
562        let _ = storage
563            .sync_to_current_updates()
564            .await
565            .expect("can sync to current updates");
566        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
567
568        f(catalog).await
569    }
570
571    /// Opens a debug catalog.
572    ///
573    /// See [`Catalog::with_debug`].
574    pub async fn open_debug_catalog(
575        persist_client: PersistClient,
576        organization_id: Uuid,
577        bootstrap_args: &BootstrapArgs,
578    ) -> Result<Catalog, anyhow::Error> {
579        let now = SYSTEM_TIME.clone();
580        let environment_id = None;
581        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
582            .with_organization_id(organization_id)
583            .with_default_deploy_generation()
584            .build()
585            .await?;
586        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
587        let system_parameter_defaults = BTreeMap::default();
588        Self::open_debug_catalog_inner(
589            persist_client,
590            storage,
591            now,
592            environment_id,
593            &DUMMY_BUILD_INFO,
594            system_parameter_defaults,
595            bootstrap_args,
596            None,
597        )
598        .await
599    }
600
601    /// Opens a read only debug persist backed catalog defined by `persist_client` and
602    /// `organization_id`.
603    ///
604    /// See [`Catalog::with_debug`].
605    pub async fn open_debug_read_only_catalog(
606        persist_client: PersistClient,
607        organization_id: Uuid,
608        bootstrap_args: &BootstrapArgs,
609    ) -> Result<Catalog, anyhow::Error> {
610        let now = SYSTEM_TIME.clone();
611        let environment_id = None;
612        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
613            .with_organization_id(organization_id)
614            .build()
615            .await?;
616        let storage = openable_storage
617            .open_read_only(&test_bootstrap_args())
618            .await?;
619        let system_parameter_defaults = BTreeMap::default();
620        Self::open_debug_catalog_inner(
621            persist_client,
622            storage,
623            now,
624            environment_id,
625            &DUMMY_BUILD_INFO,
626            system_parameter_defaults,
627            bootstrap_args,
628            None,
629        )
630        .await
631    }
632
633    /// Opens a read only debug persist backed catalog defined by `persist_client` and
634    /// `organization_id`.
635    ///
636    /// See [`Catalog::with_debug`].
637    pub async fn open_debug_read_only_persist_catalog_config(
638        persist_client: PersistClient,
639        now: NowFn,
640        environment_id: EnvironmentId,
641        system_parameter_defaults: BTreeMap<String, String>,
642        build_info: &'static BuildInfo,
643        bootstrap_args: &BootstrapArgs,
644        enable_expression_cache_override: Option<bool>,
645    ) -> Result<Catalog, anyhow::Error> {
646        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
647            .with_organization_id(environment_id.organization_id())
648            .with_version(
649                build_info
650                    .version
651                    .parse()
652                    .expect("build version is parseable"),
653            )
654            .build()
655            .await?;
656        let storage = openable_storage.open_read_only(bootstrap_args).await?;
657        Self::open_debug_catalog_inner(
658            persist_client,
659            storage,
660            now,
661            Some(environment_id),
662            build_info,
663            system_parameter_defaults,
664            bootstrap_args,
665            enable_expression_cache_override,
666        )
667        .await
668    }
669
670    async fn open_debug_catalog_inner(
671        persist_client: PersistClient,
672        storage: Box<dyn DurableCatalogState>,
673        now: NowFn,
674        environment_id: Option<EnvironmentId>,
675        build_info: &'static BuildInfo,
676        system_parameter_defaults: BTreeMap<String, String>,
677        bootstrap_args: &BootstrapArgs,
678        enable_expression_cache_override: Option<bool>,
679    ) -> Result<Catalog, anyhow::Error> {
680        let metrics_registry = &MetricsRegistry::new();
681        let secrets_reader = Arc::new(InMemorySecretsController::new());
682        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
683        // debugging/testing.
684        let previous_ts = now().into();
685        let replica_size = &bootstrap_args.default_cluster_replica_size;
686        let read_only = false;
687
688        let OpenCatalogResult {
689            catalog,
690            migrated_storage_collections_0dt: _,
691            new_builtin_collections: _,
692            builtin_table_updates: _,
693            cached_global_exprs: _,
694            uncached_local_exprs: _,
695        } = Catalog::open(Config {
696            storage,
697            metrics_registry,
698            state: StateConfig {
699                unsafe_mode: true,
700                all_features: false,
701                build_info,
702                environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
703                read_only,
704                now,
705                boot_ts: previous_ts,
706                skip_migrations: true,
707                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
708                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
709                    size: replica_size.clone(),
710                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711                },
712                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
713                    size: replica_size.clone(),
714                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
715                },
716                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
717                    size: replica_size.clone(),
718                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
719                },
720                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
721                    size: replica_size.clone(),
722                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
723                },
724                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
725                    size: replica_size.clone(),
726                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
727                },
728                system_parameter_defaults,
729                remote_system_parameters: None,
730                availability_zones: vec![],
731                egress_addresses: vec![],
732                aws_principal_context: None,
733                aws_privatelink_availability_zones: None,
734                http_host_name: None,
735                connection_context: ConnectionContext::for_tests(secrets_reader),
736                builtin_item_migration_config: BuiltinItemMigrationConfig {
737                    persist_client: persist_client.clone(),
738                    read_only,
739                },
740                persist_client,
741                enable_expression_cache_override,
742                enable_0dt_deployment: true,
743                helm_chart_version: None,
744            },
745        })
746        .await?;
747        Ok(catalog)
748    }
749
750    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
751        self.state.for_session(session)
752    }
753
754    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog {
755        self.state.for_sessionless_user(role_id)
756    }
757
758    pub fn for_system_session(&self) -> ConnCatalog {
759        self.state.for_system_session()
760    }
761
762    async fn storage<'a>(
763        &'a self,
764    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
765        self.storage.lock().await
766    }
767
768    pub async fn current_upper(&self) -> mz_repr::Timestamp {
769        self.storage().await.current_upper().await
770    }
771
772    pub async fn allocate_user_id(
773        &self,
774        commit_ts: mz_repr::Timestamp,
775    ) -> Result<(CatalogItemId, GlobalId), Error> {
776        self.storage()
777            .await
778            .allocate_user_id(commit_ts)
779            .await
780            .maybe_terminate("allocating user ids")
781            .err_into()
782    }
783
784    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
785    pub async fn allocate_user_ids(
786        &self,
787        amount: u64,
788        commit_ts: mz_repr::Timestamp,
789    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
790        self.storage()
791            .await
792            .allocate_user_ids(amount, commit_ts)
793            .await
794            .maybe_terminate("allocating user ids")
795            .err_into()
796    }
797
798    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
799        let commit_ts = self.storage().await.current_upper().await;
800        self.allocate_user_id(commit_ts).await
801    }
802
803    /// Get the next user item ID without allocating it.
804    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
805        self.storage()
806            .await
807            .get_next_user_item_id()
808            .await
809            .err_into()
810    }
811
812    #[cfg(test)]
813    pub async fn allocate_system_id(
814        &self,
815        commit_ts: mz_repr::Timestamp,
816    ) -> Result<(CatalogItemId, GlobalId), Error> {
817        use mz_ore::collections::CollectionExt;
818
819        let mut storage = self.storage().await;
820        let mut txn = storage.transaction().await?;
821        let id = txn
822            .allocate_system_item_ids(1)
823            .maybe_terminate("allocating system ids")?
824            .into_element();
825        // Drain transaction.
826        let _ = txn.get_and_commit_op_updates();
827        txn.commit(commit_ts).await?;
828        Ok(id)
829    }
830
831    /// Get the next system item ID without allocating it.
832    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
833        self.storage()
834            .await
835            .get_next_system_item_id()
836            .await
837            .err_into()
838    }
839
840    pub async fn allocate_user_cluster_id(
841        &self,
842        commit_ts: mz_repr::Timestamp,
843    ) -> Result<ClusterId, Error> {
844        self.storage()
845            .await
846            .allocate_user_cluster_id(commit_ts)
847            .await
848            .maybe_terminate("allocating user cluster ids")
849            .err_into()
850    }
851
852    /// Get the next system replica id without allocating it.
853    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
854        self.storage()
855            .await
856            .get_next_system_replica_id()
857            .await
858            .err_into()
859    }
860
861    /// Get the next user replica id without allocating it.
862    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
863        self.storage()
864            .await
865            .get_next_user_replica_id()
866            .await
867            .err_into()
868    }
869
870    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
871        self.state.resolve_database(database_name)
872    }
873
874    pub fn resolve_schema(
875        &self,
876        current_database: Option<&DatabaseId>,
877        database_name: Option<&str>,
878        schema_name: &str,
879        conn_id: &ConnectionId,
880    ) -> Result<&Schema, SqlCatalogError> {
881        self.state
882            .resolve_schema(current_database, database_name, schema_name, conn_id)
883    }
884
885    pub fn resolve_schema_in_database(
886        &self,
887        database_spec: &ResolvedDatabaseSpecifier,
888        schema_name: &str,
889        conn_id: &ConnectionId,
890    ) -> Result<&Schema, SqlCatalogError> {
891        self.state
892            .resolve_schema_in_database(database_spec, schema_name, conn_id)
893    }
894
895    pub fn resolve_replica_in_cluster(
896        &self,
897        cluster_id: &ClusterId,
898        replica_name: &str,
899    ) -> Result<&ClusterReplica, SqlCatalogError> {
900        self.state
901            .resolve_replica_in_cluster(cluster_id, replica_name)
902    }
903
904    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
905        self.state.resolve_system_schema(name)
906    }
907
908    pub fn resolve_search_path(
909        &self,
910        session: &Session,
911    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
912        self.state.resolve_search_path(session)
913    }
914
915    /// Resolves `name` to a non-function [`CatalogEntry`].
916    pub fn resolve_entry(
917        &self,
918        current_database: Option<&DatabaseId>,
919        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
920        name: &PartialItemName,
921        conn_id: &ConnectionId,
922    ) -> Result<&CatalogEntry, SqlCatalogError> {
923        self.state
924            .resolve_entry(current_database, search_path, name, conn_id)
925    }
926
927    /// Resolves a `BuiltinTable`.
928    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
929        self.state.resolve_builtin_table(builtin)
930    }
931
932    /// Resolves a `BuiltinLog`.
933    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
934        self.state.resolve_builtin_log(builtin).0
935    }
936
937    /// Resolves a `BuiltinSource`.
938    pub fn resolve_builtin_storage_collection(
939        &self,
940        builtin: &'static BuiltinSource,
941    ) -> CatalogItemId {
942        self.state.resolve_builtin_source(builtin)
943    }
944
945    /// Resolves `name` to a function [`CatalogEntry`].
946    pub fn resolve_function(
947        &self,
948        current_database: Option<&DatabaseId>,
949        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
950        name: &PartialItemName,
951        conn_id: &ConnectionId,
952    ) -> Result<&CatalogEntry, SqlCatalogError> {
953        self.state
954            .resolve_function(current_database, search_path, name, conn_id)
955    }
956
957    /// Resolves `name` to a type [`CatalogEntry`].
958    pub fn resolve_type(
959        &self,
960        current_database: Option<&DatabaseId>,
961        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
962        name: &PartialItemName,
963        conn_id: &ConnectionId,
964    ) -> Result<&CatalogEntry, SqlCatalogError> {
965        self.state
966            .resolve_type(current_database, search_path, name, conn_id)
967    }
968
969    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
970        self.state.resolve_cluster(name)
971    }
972
973    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
974    ///
975    /// # Panics
976    /// * If the [`BuiltinCluster`] doesn't exist.
977    ///
978    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
979        self.state.resolve_builtin_cluster(cluster)
980    }
981
982    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
983        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
984    }
985
986    /// Resolves a [`Cluster`] for a TargetCluster.
987    pub fn resolve_target_cluster(
988        &self,
989        target_cluster: TargetCluster,
990        session: &Session,
991    ) -> Result<&Cluster, AdapterError> {
992        match target_cluster {
993            TargetCluster::CatalogServer => {
994                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
995            }
996            TargetCluster::Active => self.active_cluster(session),
997            TargetCluster::Transaction(cluster_id) => self
998                .try_get_cluster(cluster_id)
999                .ok_or(AdapterError::ConcurrentClusterDrop),
1000        }
1001    }
1002
1003    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
1004        // TODO(benesch): this check here is not sufficiently protective. It'd
1005        // be very easy for a code path to accidentally avoid this check by
1006        // calling `resolve_cluster(session.vars().cluster())`.
1007        if session.user().name != SYSTEM_USER.name
1008            && session.user().name != SUPPORT_USER.name
1009            && session.vars().cluster() == SYSTEM_USER.name
1010        {
1011            coord_bail!(
1012                "system cluster '{}' cannot execute user queries",
1013                SYSTEM_USER.name
1014            );
1015        }
1016        let cluster = self.resolve_cluster(session.vars().cluster())?;
1017        Ok(cluster)
1018    }
1019
1020    pub fn state(&self) -> &CatalogState {
1021        &self.state
1022    }
1023
1024    pub fn resolve_full_name(
1025        &self,
1026        name: &QualifiedItemName,
1027        conn_id: Option<&ConnectionId>,
1028    ) -> FullItemName {
1029        self.state.resolve_full_name(name, conn_id)
1030    }
1031
1032    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1033        self.state.try_get_entry(id)
1034    }
1035
1036    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1037        self.state.try_get_entry_by_global_id(id)
1038    }
1039
1040    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1041        self.state.get_entry(id)
1042    }
1043
1044    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1045        self.state.get_entry_by_global_id(id)
1046    }
1047
1048    pub fn get_global_ids<'a>(
1049        &'a self,
1050        id: &CatalogItemId,
1051    ) -> impl Iterator<Item = GlobalId> + use<'a> {
1052        self.get_entry(id).global_ids()
1053    }
1054
1055    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1056        self.get_entry_by_global_id(id).id()
1057    }
1058
1059    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1060        let item = self.try_get_entry_by_global_id(id)?;
1061        Some(item.id())
1062    }
1063
1064    pub fn get_schema(
1065        &self,
1066        database_spec: &ResolvedDatabaseSpecifier,
1067        schema_spec: &SchemaSpecifier,
1068        conn_id: &ConnectionId,
1069    ) -> &Schema {
1070        self.state.get_schema(database_spec, schema_spec, conn_id)
1071    }
1072
1073    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1074        self.state.get_mz_catalog_schema_id()
1075    }
1076
1077    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1078        self.state.get_pg_catalog_schema_id()
1079    }
1080
1081    pub fn get_information_schema_id(&self) -> SchemaId {
1082        self.state.get_information_schema_id()
1083    }
1084
1085    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1086        self.state.get_mz_internal_schema_id()
1087    }
1088
1089    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1090        self.state.get_mz_introspection_schema_id()
1091    }
1092
1093    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1094        self.state.get_mz_unsafe_schema_id()
1095    }
1096
1097    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1098        self.state.system_schema_ids()
1099    }
1100
1101    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1102        self.state.get_database(id)
1103    }
1104
1105    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1106        self.state.try_get_role(id)
1107    }
1108
1109    pub fn get_role(&self, id: &RoleId) -> &Role {
1110        self.state.get_role(id)
1111    }
1112
1113    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1114        self.state.try_get_role_by_name(role_name)
1115    }
1116
1117    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1118        self.state.try_get_role_auth_by_id(id)
1119    }
1120
1121    /// Creates a new schema in the `Catalog` for temporary items
1122    /// indicated by the TEMPORARY or TEMP keywords.
1123    pub fn create_temporary_schema(
1124        &mut self,
1125        conn_id: &ConnectionId,
1126        owner_id: RoleId,
1127    ) -> Result<(), Error> {
1128        self.state.create_temporary_schema(conn_id, owner_id)
1129    }
1130
1131    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1132        self.state.temporary_schemas[conn_id]
1133            .items
1134            .contains_key(item_name)
1135    }
1136
1137    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1138    /// Returns Ok if conn_id's temp schema does not exist.
1139    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1140        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1141            return Ok(());
1142        };
1143        if !schema.items.is_empty() {
1144            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1145        }
1146        Ok(())
1147    }
1148
1149    pub(crate) fn object_dependents(
1150        &self,
1151        object_ids: &Vec<ObjectId>,
1152        conn_id: &ConnectionId,
1153    ) -> Vec<ObjectId> {
1154        let mut seen = BTreeSet::new();
1155        self.state.object_dependents(object_ids, conn_id, &mut seen)
1156    }
1157
1158    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1159        FullNameV1 {
1160            database: name.database.to_string(),
1161            schema: name.schema.clone(),
1162            item: name.item.clone(),
1163        }
1164    }
1165
1166    pub fn find_available_cluster_name(&self, name: &str) -> String {
1167        let mut i = 0;
1168        let mut candidate = name.to_string();
1169        while self.state.clusters_by_name.contains_key(&candidate) {
1170            i += 1;
1171            candidate = format!("{}{}", name, i);
1172        }
1173        candidate
1174    }
1175
1176    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1177        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1178            self.cluster_replica_sizes()
1179                .enabled_allocations()
1180                .map(|a| a.0.to_owned())
1181                .collect::<Vec<_>>()
1182        } else {
1183            self.system_config().allowed_cluster_replica_sizes()
1184        }
1185    }
1186
1187    pub fn concretize_replica_location(
1188        &self,
1189        location: mz_catalog::durable::ReplicaLocation,
1190        allowed_sizes: &Vec<String>,
1191        allowed_availability_zones: Option<&[String]>,
1192    ) -> Result<ReplicaLocation, Error> {
1193        self.state
1194            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1195    }
1196
1197    pub(crate) fn ensure_valid_replica_size(
1198        &self,
1199        allowed_sizes: &[String],
1200        size: &String,
1201    ) -> Result<(), Error> {
1202        self.state.ensure_valid_replica_size(allowed_sizes, size)
1203    }
1204
1205    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1206        &self.state.cluster_replica_sizes
1207    }
1208
1209    /// Returns the privileges of an object by its ID.
1210    pub fn get_privileges(
1211        &self,
1212        id: &SystemObjectId,
1213        conn_id: &ConnectionId,
1214    ) -> Option<&PrivilegeMap> {
1215        match id {
1216            SystemObjectId::Object(id) => match id {
1217                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1218                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1219                ObjectId::Schema((database_spec, schema_spec)) => Some(
1220                    self.get_schema(database_spec, schema_spec, conn_id)
1221                        .privileges(),
1222                ),
1223                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1224                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1225                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1226            },
1227            SystemObjectId::System => Some(&self.state.system_privileges),
1228        }
1229    }
1230
1231    #[mz_ore::instrument(level = "debug")]
1232    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1233        Ok(self.storage().await.confirm_leadership().await?)
1234    }
1235
1236    /// Return the ids of all log sources the given object depends on.
1237    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1238        self.state.introspection_dependencies(id)
1239    }
1240
1241    /// Serializes the catalog's in-memory state.
1242    ///
1243    /// There are no guarantees about the format of the serialized state, except
1244    /// that the serialized state for two identical catalogs will compare
1245    /// identically.
1246    pub fn dump(&self) -> Result<CatalogDump, Error> {
1247        Ok(CatalogDump::new(self.state.dump(None)?))
1248    }
1249
1250    /// Checks the [`Catalog`]s internal consistency.
1251    ///
1252    /// Returns a JSON object describing the inconsistencies, if there are any.
1253    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1254        self.state.check_consistency().map_err(|inconsistencies| {
1255            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1256                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1257            })
1258        })
1259    }
1260
1261    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1262        self.state.config()
1263    }
1264
1265    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1266        self.state.entry_by_id.values()
1267    }
1268
1269    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1270        self.entries()
1271            .filter(|entry| entry.is_connection() && entry.id().is_user())
1272    }
1273
1274    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1275        self.entries()
1276            .filter(|entry| entry.is_table() && entry.id().is_user())
1277    }
1278
1279    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1280        self.entries()
1281            .filter(|entry| entry.is_source() && entry.id().is_user())
1282    }
1283
1284    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1285        self.entries()
1286            .filter(|entry| entry.is_sink() && entry.id().is_user())
1287    }
1288
1289    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1290        self.entries()
1291            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1292    }
1293
1294    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1295        self.entries()
1296            .filter(|entry| entry.is_secret() && entry.id().is_user())
1297    }
1298
1299    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1300        self.state.get_network_policy(&network_policy_id)
1301    }
1302
1303    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1304        self.state.try_get_network_policy_by_name(name)
1305    }
1306
1307    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1308        self.state.clusters_by_id.values()
1309    }
1310
1311    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1312        self.state.get_cluster(cluster_id)
1313    }
1314
1315    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1316        self.state.try_get_cluster(cluster_id)
1317    }
1318
1319    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1320        self.clusters().filter(|cluster| cluster.id.is_user())
1321    }
1322
1323    pub fn get_cluster_replica(
1324        &self,
1325        cluster_id: ClusterId,
1326        replica_id: ReplicaId,
1327    ) -> &ClusterReplica {
1328        self.state.get_cluster_replica(cluster_id, replica_id)
1329    }
1330
1331    pub fn try_get_cluster_replica(
1332        &self,
1333        cluster_id: ClusterId,
1334        replica_id: ReplicaId,
1335    ) -> Option<&ClusterReplica> {
1336        self.state.try_get_cluster_replica(cluster_id, replica_id)
1337    }
1338
1339    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1340        self.user_clusters()
1341            .flat_map(|cluster| cluster.user_replicas())
1342    }
1343
1344    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1345        self.state.database_by_id.values()
1346    }
1347
1348    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1349        self.state
1350            .roles_by_id
1351            .values()
1352            .filter(|role| role.is_user())
1353    }
1354
1355    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1356        self.entries()
1357            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1358    }
1359
1360    pub fn system_privileges(&self) -> &PrivilegeMap {
1361        &self.state.system_privileges
1362    }
1363
1364    pub fn default_privileges(
1365        &self,
1366    ) -> impl Iterator<
1367        Item = (
1368            &DefaultPrivilegeObject,
1369            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1370        ),
1371    > {
1372        self.state.default_privileges.iter()
1373    }
1374
1375    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1376        self.state
1377            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1378    }
1379
1380    pub fn pack_storage_usage_update(
1381        &self,
1382        event: VersionedStorageUsage,
1383        diff: Diff,
1384    ) -> BuiltinTableUpdate {
1385        self.state
1386            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1387    }
1388
1389    pub fn system_config(&self) -> &SystemVars {
1390        self.state.system_config()
1391    }
1392
1393    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1394        self.state.system_config_mut()
1395    }
1396
1397    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1398        self.state.ensure_not_reserved_role(role_id)
1399    }
1400
1401    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1402        self.state.ensure_grantable_role(role_id)
1403    }
1404
1405    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1406        self.state.ensure_not_system_role(role_id)
1407    }
1408
1409    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1410        self.state.ensure_not_predefined_role(role_id)
1411    }
1412
1413    pub fn ensure_not_reserved_network_policy(
1414        &self,
1415        network_policy_id: &NetworkPolicyId,
1416    ) -> Result<(), Error> {
1417        self.state
1418            .ensure_not_reserved_network_policy(network_policy_id)
1419    }
1420
1421    pub fn ensure_not_reserved_object(
1422        &self,
1423        object_id: &ObjectId,
1424        conn_id: &ConnectionId,
1425    ) -> Result<(), Error> {
1426        match object_id {
1427            ObjectId::Cluster(cluster_id) => {
1428                if cluster_id.is_system() {
1429                    let cluster = self.get_cluster(*cluster_id);
1430                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1431                        cluster.name().to_string(),
1432                    )))
1433                } else {
1434                    Ok(())
1435                }
1436            }
1437            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1438                if replica_id.is_system() {
1439                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1440                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1441                        replica.name().to_string(),
1442                    )))
1443                } else {
1444                    Ok(())
1445                }
1446            }
1447            ObjectId::Database(database_id) => {
1448                if database_id.is_system() {
1449                    let database = self.get_database(database_id);
1450                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1451                        database.name().to_string(),
1452                    )))
1453                } else {
1454                    Ok(())
1455                }
1456            }
1457            ObjectId::Schema((database_spec, schema_spec)) => {
1458                if schema_spec.is_system() {
1459                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1460                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1461                        schema.name().schema.clone(),
1462                    )))
1463                } else {
1464                    Ok(())
1465                }
1466            }
1467            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1468            ObjectId::Item(item_id) => {
1469                if item_id.is_system() {
1470                    let item = self.get_entry(item_id);
1471                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1472                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1473                } else {
1474                    Ok(())
1475                }
1476            }
1477            ObjectId::NetworkPolicy(network_policy_id) => {
1478                self.ensure_not_reserved_network_policy(network_policy_id)
1479            }
1480        }
1481    }
1482
1483    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1484    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1485        &mut self,
1486        create_sql: &str,
1487        force_if_exists_skip: bool,
1488    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1489        self.state
1490            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1491    }
1492
1493    pub(crate) fn update_expression_cache<'a, 'b>(
1494        &'a self,
1495        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1496        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1497    ) -> BoxFuture<'b, ()> {
1498        if let Some(expr_cache) = &self.expr_cache_handle {
1499            let ons = new_local_expressions
1500                .iter()
1501                .map(|(id, _)| id)
1502                .chain(new_global_expressions.iter().map(|(id, _)| id))
1503                .map(|id| self.get_entry_by_global_id(id))
1504                .filter_map(|entry| entry.index().map(|index| index.on));
1505            let invalidate_ids = self.invalidate_for_index(ons);
1506            expr_cache
1507                .update(
1508                    new_local_expressions,
1509                    new_global_expressions,
1510                    invalidate_ids,
1511                )
1512                .boxed()
1513        } else {
1514            async {}.boxed()
1515        }
1516    }
1517
1518    /// Listen for and apply all unconsumed updates to the durable catalog state.
1519    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1520    // `#[cfg(test)]` annotation.
1521    #[cfg(test)]
1522    async fn sync_to_current_updates(
1523        &mut self,
1524    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1525        let updates = self.storage().await.sync_to_current_updates().await?;
1526        let builtin_table_updates = self.state.apply_updates(updates)?;
1527        Ok(builtin_table_updates)
1528    }
1529}
1530
1531pub fn is_reserved_name(name: &str) -> bool {
1532    BUILTIN_PREFIXES
1533        .iter()
1534        .any(|prefix| name.starts_with(prefix))
1535}
1536
1537pub fn is_reserved_role_name(name: &str) -> bool {
1538    is_reserved_name(name) || is_public_role(name)
1539}
1540
1541pub fn is_public_role(name: &str) -> bool {
1542    name == &*PUBLIC_ROLE_NAME
1543}
1544
1545pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1546    object_type_to_audit_object_type(sql_type.into())
1547}
1548
1549pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1550    match id {
1551        CommentObjectId::Table(_) => ObjectType::Table,
1552        CommentObjectId::View(_) => ObjectType::View,
1553        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1554        CommentObjectId::Source(_) => ObjectType::Source,
1555        CommentObjectId::Sink(_) => ObjectType::Sink,
1556        CommentObjectId::Index(_) => ObjectType::Index,
1557        CommentObjectId::Func(_) => ObjectType::Func,
1558        CommentObjectId::Connection(_) => ObjectType::Connection,
1559        CommentObjectId::Type(_) => ObjectType::Type,
1560        CommentObjectId::Secret(_) => ObjectType::Secret,
1561        CommentObjectId::Role(_) => ObjectType::Role,
1562        CommentObjectId::Database(_) => ObjectType::Database,
1563        CommentObjectId::Schema(_) => ObjectType::Schema,
1564        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1565        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1566        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1567        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1568    }
1569}
1570
1571pub(crate) fn object_type_to_audit_object_type(
1572    object_type: mz_sql::catalog::ObjectType,
1573) -> ObjectType {
1574    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1575}
1576
1577pub(crate) fn system_object_type_to_audit_object_type(
1578    system_type: &SystemObjectType,
1579) -> ObjectType {
1580    match system_type {
1581        SystemObjectType::Object(object_type) => match object_type {
1582            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1583            mz_sql::catalog::ObjectType::View => ObjectType::View,
1584            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1585            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1586            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1587            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1588            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1589            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1590            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1591            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1592            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1593            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1594            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1595            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1596            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1597            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1598            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1599        },
1600        SystemObjectType::System => ObjectType::System,
1601    }
1602}
1603
1604#[derive(Debug, Copy, Clone)]
1605pub enum UpdatePrivilegeVariant {
1606    Grant,
1607    Revoke,
1608}
1609
1610impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1611    fn from(variant: UpdatePrivilegeVariant) -> Self {
1612        match variant {
1613            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1614            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1615        }
1616    }
1617}
1618
1619impl From<UpdatePrivilegeVariant> for EventType {
1620    fn from(variant: UpdatePrivilegeVariant) -> Self {
1621        match variant {
1622            UpdatePrivilegeVariant::Grant => EventType::Grant,
1623            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1624        }
1625    }
1626}
1627
1628impl ConnCatalog<'_> {
1629    fn resolve_item_name(
1630        &self,
1631        name: &PartialItemName,
1632    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1633        self.resolve_item(name).map(|entry| entry.name())
1634    }
1635
1636    fn resolve_function_name(
1637        &self,
1638        name: &PartialItemName,
1639    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1640        self.resolve_function(name).map(|entry| entry.name())
1641    }
1642
1643    fn resolve_type_name(
1644        &self,
1645        name: &PartialItemName,
1646    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1647        self.resolve_type(name).map(|entry| entry.name())
1648    }
1649}
1650
1651impl ExprHumanizer for ConnCatalog<'_> {
1652    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1653        let entry = self.state.try_get_entry_by_global_id(&id)?;
1654        Some(self.resolve_full_name(entry.name()).to_string())
1655    }
1656
1657    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1658        let entry = self.state.try_get_entry_by_global_id(&id)?;
1659        Some(entry.name().item.clone())
1660    }
1661
1662    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1663        let entry = self.state.try_get_entry_by_global_id(&id)?;
1664        Some(self.resolve_full_name(entry.name()).into_parts())
1665    }
1666
1667    fn humanize_scalar_type(&self, typ: &ScalarType, postgres_compat: bool) -> String {
1668        use ScalarType::*;
1669
1670        match typ {
1671            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1672            List {
1673                custom_id: Some(item_id),
1674                ..
1675            }
1676            | Map {
1677                custom_id: Some(item_id),
1678                ..
1679            } => {
1680                let item = self.get_item(item_id);
1681                self.minimal_qualification(item.name()).to_string()
1682            }
1683            List { element_type, .. } => {
1684                format!(
1685                    "{} list",
1686                    self.humanize_scalar_type(element_type, postgres_compat)
1687                )
1688            }
1689            Map { value_type, .. } => format!(
1690                "map[{}=>{}]",
1691                self.humanize_scalar_type(&ScalarType::String, postgres_compat),
1692                self.humanize_scalar_type(value_type, postgres_compat)
1693            ),
1694            Record {
1695                custom_id: Some(item_id),
1696                ..
1697            } => {
1698                let item = self.get_item(item_id);
1699                self.minimal_qualification(item.name()).to_string()
1700            }
1701            Record { fields, .. } => format!(
1702                "record({})",
1703                fields
1704                    .iter()
1705                    .map(|f| format!(
1706                        "{}: {}",
1707                        f.0,
1708                        self.humanize_column_type(&f.1, postgres_compat)
1709                    ))
1710                    .join(",")
1711            ),
1712            PgLegacyChar => "\"char\"".into(),
1713            Char { length } if !postgres_compat => match length {
1714                None => "char".into(),
1715                Some(length) => format!("char({})", length.into_u32()),
1716            },
1717            VarChar { max_length } if !postgres_compat => match max_length {
1718                None => "varchar".into(),
1719                Some(length) => format!("varchar({})", length.into_u32()),
1720            },
1721            UInt16 => "uint2".into(),
1722            UInt32 => "uint4".into(),
1723            UInt64 => "uint8".into(),
1724            ty => {
1725                let pgrepr_type = mz_pgrepr::Type::from(ty);
1726                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1727
1728                let res = if self
1729                    .effective_search_path(true)
1730                    .iter()
1731                    .any(|(_, schema)| schema == &pg_catalog_schema)
1732                {
1733                    pgrepr_type.name().to_string()
1734                } else {
1735                    // If PG_CATALOG_SCHEMA is not in search path, you need
1736                    // qualified object name to refer to type.
1737                    let name = QualifiedItemName {
1738                        qualifiers: ItemQualifiers {
1739                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1740                            schema_spec: pg_catalog_schema,
1741                        },
1742                        item: pgrepr_type.name().to_string(),
1743                    };
1744                    self.resolve_full_name(&name).to_string()
1745                };
1746                res
1747            }
1748        }
1749    }
1750
1751    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1752        let entry = self.state.try_get_entry_by_global_id(&id)?;
1753
1754        match entry.index() {
1755            Some(index) => {
1756                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1757                let mut on_names = on_desc
1758                    .iter_names()
1759                    .map(|col_name| col_name.to_string())
1760                    .collect::<Vec<_>>();
1761
1762                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1763
1764                // Init ix_names with unknown column names. Unknown columns are
1765                // represented as an empty String and rendered as `#c` by the
1766                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1767                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1768                let mut ix_names = vec![String::new(); ix_arity];
1769
1770                // Apply the permutation by swapping on_names with ix_names.
1771                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1772                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1773                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1774                    std::mem::swap(on_name, ix_name);
1775                }
1776
1777                Some(ix_names) // Return the updated ix_names vector.
1778            }
1779            None => {
1780                let desc = self.state.try_get_desc_by_global_id(&id)?;
1781                let column_names = desc
1782                    .iter_names()
1783                    .map(|col_name| col_name.to_string())
1784                    .collect();
1785
1786                Some(column_names)
1787            }
1788        }
1789    }
1790
1791    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1792        let desc = self.state.try_get_desc_by_global_id(&id)?;
1793        Some(desc.get_name(column).to_string())
1794    }
1795
1796    fn id_exists(&self, id: GlobalId) -> bool {
1797        self.state.entry_by_global_id.contains_key(&id)
1798    }
1799}
1800
1801impl SessionCatalog for ConnCatalog<'_> {
1802    fn active_role_id(&self) -> &RoleId {
1803        &self.role_id
1804    }
1805
1806    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1807        self.prepared_statements
1808            .as_ref()
1809            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1810            .flatten()
1811    }
1812
1813    fn active_database(&self) -> Option<&DatabaseId> {
1814        self.database.as_ref()
1815    }
1816
1817    fn active_cluster(&self) -> &str {
1818        &self.cluster
1819    }
1820
1821    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1822        &self.search_path
1823    }
1824
1825    fn resolve_database(
1826        &self,
1827        database_name: &str,
1828    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1829        Ok(self.state.resolve_database(database_name)?)
1830    }
1831
1832    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1833        self.state
1834            .database_by_id
1835            .get(id)
1836            .expect("database doesn't exist")
1837    }
1838
1839    // `as` is ok to use to cast to a trait object.
1840    #[allow(clippy::as_conversions)]
1841    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1842        self.state
1843            .database_by_id
1844            .values()
1845            .map(|database| database as &dyn CatalogDatabase)
1846            .collect()
1847    }
1848
1849    fn resolve_schema(
1850        &self,
1851        database_name: Option<&str>,
1852        schema_name: &str,
1853    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1854        Ok(self.state.resolve_schema(
1855            self.database.as_ref(),
1856            database_name,
1857            schema_name,
1858            &self.conn_id,
1859        )?)
1860    }
1861
1862    fn resolve_schema_in_database(
1863        &self,
1864        database_spec: &ResolvedDatabaseSpecifier,
1865        schema_name: &str,
1866    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1867        Ok(self
1868            .state
1869            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1870    }
1871
1872    fn get_schema(
1873        &self,
1874        database_spec: &ResolvedDatabaseSpecifier,
1875        schema_spec: &SchemaSpecifier,
1876    ) -> &dyn CatalogSchema {
1877        self.state
1878            .get_schema(database_spec, schema_spec, &self.conn_id)
1879    }
1880
1881    // `as` is ok to use to cast to a trait object.
1882    #[allow(clippy::as_conversions)]
1883    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1884        self.get_databases()
1885            .into_iter()
1886            .flat_map(|database| database.schemas().into_iter())
1887            .chain(
1888                self.state
1889                    .ambient_schemas_by_id
1890                    .values()
1891                    .chain(self.state.temporary_schemas.values())
1892                    .map(|schema| schema as &dyn CatalogSchema),
1893            )
1894            .collect()
1895    }
1896
1897    fn get_mz_internal_schema_id(&self) -> SchemaId {
1898        self.state().get_mz_internal_schema_id()
1899    }
1900
1901    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1902        self.state().get_mz_unsafe_schema_id()
1903    }
1904
1905    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1906        self.state.is_system_schema_specifier(schema)
1907    }
1908
1909    fn resolve_role(
1910        &self,
1911        role_name: &str,
1912    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1913        match self.state.try_get_role_by_name(role_name) {
1914            Some(role) => Ok(role),
1915            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1916        }
1917    }
1918
1919    fn resolve_network_policy(
1920        &self,
1921        policy_name: &str,
1922    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1923        match self.state.try_get_network_policy_by_name(policy_name) {
1924            Some(policy) => Ok(policy),
1925            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1926        }
1927    }
1928
1929    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1930        Some(self.state.roles_by_id.get(id)?)
1931    }
1932
1933    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1934        self.state.get_role(id)
1935    }
1936
1937    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1938        // `as` is ok to use to cast to a trait object.
1939        #[allow(clippy::as_conversions)]
1940        self.state
1941            .roles_by_id
1942            .values()
1943            .map(|role| role as &dyn CatalogRole)
1944            .collect()
1945    }
1946
1947    fn mz_system_role_id(&self) -> RoleId {
1948        MZ_SYSTEM_ROLE_ID
1949    }
1950
1951    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1952        self.state.collect_role_membership(id)
1953    }
1954
1955    fn get_network_policy(
1956        &self,
1957        id: &NetworkPolicyId,
1958    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1959        self.state.get_network_policy(id)
1960    }
1961
1962    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1963        // `as` is ok to use to cast to a trait object.
1964        #[allow(clippy::as_conversions)]
1965        self.state
1966            .network_policies_by_id
1967            .values()
1968            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1969            .collect()
1970    }
1971
1972    fn resolve_cluster(
1973        &self,
1974        cluster_name: Option<&str>,
1975    ) -> Result<&dyn mz_sql::catalog::CatalogCluster, SqlCatalogError> {
1976        Ok(self
1977            .state
1978            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1979    }
1980
1981    fn resolve_cluster_replica(
1982        &self,
1983        cluster_replica_name: &QualifiedReplica,
1984    ) -> Result<&dyn CatalogClusterReplica, SqlCatalogError> {
1985        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1986    }
1987
1988    fn resolve_item(
1989        &self,
1990        name: &PartialItemName,
1991    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1992        let r = self.state.resolve_entry(
1993            self.database.as_ref(),
1994            &self.effective_search_path(true),
1995            name,
1996            &self.conn_id,
1997        )?;
1998        if self.unresolvable_ids.contains(&r.id()) {
1999            Err(SqlCatalogError::UnknownItem(name.to_string()))
2000        } else {
2001            Ok(r)
2002        }
2003    }
2004
2005    fn resolve_function(
2006        &self,
2007        name: &PartialItemName,
2008    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2009        let r = self.state.resolve_function(
2010            self.database.as_ref(),
2011            &self.effective_search_path(false),
2012            name,
2013            &self.conn_id,
2014        )?;
2015
2016        if self.unresolvable_ids.contains(&r.id()) {
2017            Err(SqlCatalogError::UnknownFunction {
2018                name: name.to_string(),
2019                alternative: None,
2020            })
2021        } else {
2022            Ok(r)
2023        }
2024    }
2025
2026    fn resolve_type(
2027        &self,
2028        name: &PartialItemName,
2029    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2030        let r = self.state.resolve_type(
2031            self.database.as_ref(),
2032            &self.effective_search_path(false),
2033            name,
2034            &self.conn_id,
2035        )?;
2036
2037        if self.unresolvable_ids.contains(&r.id()) {
2038            Err(SqlCatalogError::UnknownType {
2039                name: name.to_string(),
2040            })
2041        } else {
2042            Ok(r)
2043        }
2044    }
2045
2046    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2047        self.state.get_system_type(name)
2048    }
2049
2050    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2051        Some(self.state.try_get_entry(id)?)
2052    }
2053
2054    fn try_get_item_by_global_id(
2055        &self,
2056        id: &GlobalId,
2057    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2058        let entry = self.state.try_get_entry_by_global_id(id)?;
2059        let entry = match &entry.item {
2060            CatalogItem::Table(table) => {
2061                let (version, _gid) = table
2062                    .collections
2063                    .iter()
2064                    .find(|(_version, gid)| *gid == id)
2065                    .expect("catalog out of sync, mismatched GlobalId");
2066                entry.at_version(RelationVersionSelector::Specific(*version))
2067            }
2068            _ => entry.at_version(RelationVersionSelector::Latest),
2069        };
2070        Some(entry)
2071    }
2072
2073    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2074        self.state.get_entry(id)
2075    }
2076
2077    fn get_item_by_global_id(
2078        &self,
2079        id: &GlobalId,
2080    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2081        let entry = self.state.get_entry_by_global_id(id);
2082        let entry = match &entry.item {
2083            CatalogItem::Table(table) => {
2084                let (version, _gid) = table
2085                    .collections
2086                    .iter()
2087                    .find(|(_version, gid)| *gid == id)
2088                    .expect("catalog out of sync, mismatched GlobalId");
2089                entry.at_version(RelationVersionSelector::Specific(*version))
2090            }
2091            _ => entry.at_version(RelationVersionSelector::Latest),
2092        };
2093        entry
2094    }
2095
2096    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2097        self.get_schemas()
2098            .into_iter()
2099            .flat_map(|schema| schema.item_ids())
2100            .map(|id| self.get_item(&id))
2101            .collect()
2102    }
2103
2104    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2105        self.state
2106            .get_item_by_name(name, &self.conn_id)
2107            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2108    }
2109
2110    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2111        self.state
2112            .get_type_by_name(name, &self.conn_id)
2113            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2114    }
2115
2116    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster {
2117        &self.state.clusters_by_id[&id]
2118    }
2119
2120    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster> {
2121        self.state
2122            .clusters_by_id
2123            .values()
2124            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2125            .collect()
2126    }
2127
2128    fn get_cluster_replica(
2129        &self,
2130        cluster_id: ClusterId,
2131        replica_id: ReplicaId,
2132    ) -> &dyn mz_sql::catalog::CatalogClusterReplica {
2133        let cluster = self.get_cluster(cluster_id);
2134        cluster.replica(replica_id)
2135    }
2136
2137    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica> {
2138        self.get_clusters()
2139            .into_iter()
2140            .flat_map(|cluster| cluster.replicas().into_iter())
2141            .collect()
2142    }
2143
2144    fn get_system_privileges(&self) -> &PrivilegeMap {
2145        &self.state.system_privileges
2146    }
2147
2148    fn get_default_privileges(
2149        &self,
2150    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2151        self.state
2152            .default_privileges
2153            .iter()
2154            .map(|(object, acl_items)| (object, acl_items.collect()))
2155            .collect()
2156    }
2157
2158    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2159        self.state.find_available_name(name, &self.conn_id)
2160    }
2161
2162    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2163        self.state.resolve_full_name(name, Some(&self.conn_id))
2164    }
2165
2166    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2167        self.state.resolve_full_schema_name(name)
2168    }
2169
2170    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2171        self.state.get_entry_by_global_id(global_id).id()
2172    }
2173
2174    fn resolve_global_id(
2175        &self,
2176        item_id: &CatalogItemId,
2177        version: RelationVersionSelector,
2178    ) -> GlobalId {
2179        self.state
2180            .get_entry(item_id)
2181            .at_version(version)
2182            .global_id()
2183    }
2184
2185    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2186        self.state.config()
2187    }
2188
2189    fn now(&self) -> EpochMillis {
2190        (self.state.config().now)()
2191    }
2192
2193    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2194        self.state.aws_privatelink_availability_zones.clone()
2195    }
2196
2197    fn system_vars(&self) -> &SystemVars {
2198        &self.state.system_configuration
2199    }
2200
2201    fn system_vars_mut(&mut self) -> &mut SystemVars {
2202        &mut self.state.to_mut().system_configuration
2203    }
2204
2205    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2206        self.state().get_owner_id(id, self.conn_id())
2207    }
2208
2209    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2210        match id {
2211            SystemObjectId::System => Some(&self.state.system_privileges),
2212            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2213                Some(self.get_cluster(*id).privileges())
2214            }
2215            SystemObjectId::Object(ObjectId::Database(id)) => {
2216                Some(self.get_database(id).privileges())
2217            }
2218            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2219                Some(self.get_schema(database_spec, schema_spec).privileges())
2220            }
2221            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2222            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2223                Some(self.get_network_policy(id).privileges())
2224            }
2225            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2226            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2227        }
2228    }
2229
2230    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2231        let mut seen = BTreeSet::new();
2232        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2233    }
2234
2235    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2236        let mut seen = BTreeSet::new();
2237        self.state.item_dependents(id, &mut seen)
2238    }
2239
2240    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2241        rbac::all_object_privileges(object_type)
2242    }
2243
2244    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2245        self.state.get_object_type(object_id)
2246    }
2247
2248    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2249        self.state.get_system_object_type(id)
2250    }
2251
2252    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2253    /// the object.
2254    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2255        let database_id = match &qualified_name.qualifiers.database_spec {
2256            ResolvedDatabaseSpecifier::Ambient => None,
2257            ResolvedDatabaseSpecifier::Id(id)
2258                if self.database.is_some() && self.database == Some(*id) =>
2259            {
2260                None
2261            }
2262            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2263        };
2264
2265        let schema_spec = if database_id.is_none()
2266            && self.resolve_item_name(&PartialItemName {
2267                database: None,
2268                schema: None,
2269                item: qualified_name.item.clone(),
2270            }) == Ok(qualified_name)
2271            || self.resolve_function_name(&PartialItemName {
2272                database: None,
2273                schema: None,
2274                item: qualified_name.item.clone(),
2275            }) == Ok(qualified_name)
2276            || self.resolve_type_name(&PartialItemName {
2277                database: None,
2278                schema: None,
2279                item: qualified_name.item.clone(),
2280            }) == Ok(qualified_name)
2281        {
2282            None
2283        } else {
2284            // If `search_path` does not contain `full_name.schema`, the
2285            // `PartialName` must contain it.
2286            Some(qualified_name.qualifiers.schema_spec.clone())
2287        };
2288
2289        let res = PartialItemName {
2290            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2291            schema: schema_spec.map(|spec| {
2292                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2293                    .name()
2294                    .schema
2295                    .clone()
2296            }),
2297            item: qualified_name.item.clone(),
2298        };
2299        assert!(
2300            self.resolve_item_name(&res) == Ok(qualified_name)
2301                || self.resolve_function_name(&res) == Ok(qualified_name)
2302                || self.resolve_type_name(&res) == Ok(qualified_name)
2303        );
2304        res
2305    }
2306
2307    fn add_notice(&self, notice: PlanNotice) {
2308        let _ = self.notices_tx.send(notice.into());
2309    }
2310
2311    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2312        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2313        self.state.comments.get_object_comments(comment_id)
2314    }
2315
2316    fn is_cluster_size_cc(&self, size: &str) -> bool {
2317        self.state
2318            .cluster_replica_sizes
2319            .0
2320            .get(size)
2321            .map_or(false, |a| a.is_cc)
2322    }
2323}
2324
2325#[cfg(test)]
2326mod tests {
2327    use std::collections::{BTreeMap, BTreeSet};
2328    use std::sync::Arc;
2329    use std::{env, iter};
2330
2331    use itertools::Itertools;
2332    use mz_catalog::memory::objects::CatalogItem;
2333    use tokio_postgres::NoTls;
2334    use tokio_postgres::types::Type;
2335    use uuid::Uuid;
2336
2337    use mz_catalog::SYSTEM_CONN_ID;
2338    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2339    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2340    use mz_controller_types::{ClusterId, ReplicaId};
2341    use mz_expr::MirScalarExpr;
2342    use mz_ore::now::to_datetime;
2343    use mz_ore::{assert_err, assert_ok, task};
2344    use mz_persist_client::PersistClient;
2345    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2346    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2347    use mz_repr::role_id::RoleId;
2348    use mz_repr::{
2349        CatalogItemId, Datum, GlobalId, RelationType, RelationVersionSelector, RowArena,
2350        ScalarType, Timestamp,
2351    };
2352    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2353    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2354    use mz_sql::names::{
2355        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2356        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2357    };
2358    use mz_sql::plan::{
2359        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2360        Scope, StatementContext,
2361    };
2362    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2363    use mz_sql::session::vars::{SystemVars, VarInput};
2364
2365    use crate::catalog::state::LocalExpressionCache;
2366    use crate::catalog::{Catalog, Op};
2367    use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2368    use crate::session::Session;
2369
2370    /// System sessions have an empty `search_path` so it's necessary to
2371    /// schema-qualify all referenced items.
2372    ///
2373    /// Dummy (and ostensibly client) sessions contain system schemas in their
2374    /// search paths, so do not require schema qualification on system objects such
2375    /// as types.
2376    #[mz_ore::test(tokio::test)]
2377    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2378    async fn test_minimal_qualification() {
2379        Catalog::with_debug(|catalog| async move {
2380            struct TestCase {
2381                input: QualifiedItemName,
2382                system_output: PartialItemName,
2383                normal_output: PartialItemName,
2384            }
2385
2386            let test_cases = vec![
2387                TestCase {
2388                    input: QualifiedItemName {
2389                        qualifiers: ItemQualifiers {
2390                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2391                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2392                        },
2393                        item: "numeric".to_string(),
2394                    },
2395                    system_output: PartialItemName {
2396                        database: None,
2397                        schema: None,
2398                        item: "numeric".to_string(),
2399                    },
2400                    normal_output: PartialItemName {
2401                        database: None,
2402                        schema: None,
2403                        item: "numeric".to_string(),
2404                    },
2405                },
2406                TestCase {
2407                    input: QualifiedItemName {
2408                        qualifiers: ItemQualifiers {
2409                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2410                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2411                        },
2412                        item: "mz_array_types".to_string(),
2413                    },
2414                    system_output: PartialItemName {
2415                        database: None,
2416                        schema: None,
2417                        item: "mz_array_types".to_string(),
2418                    },
2419                    normal_output: PartialItemName {
2420                        database: None,
2421                        schema: None,
2422                        item: "mz_array_types".to_string(),
2423                    },
2424                },
2425            ];
2426
2427            for tc in test_cases {
2428                assert_eq!(
2429                    catalog
2430                        .for_system_session()
2431                        .minimal_qualification(&tc.input),
2432                    tc.system_output
2433                );
2434                assert_eq!(
2435                    catalog
2436                        .for_session(&Session::dummy())
2437                        .minimal_qualification(&tc.input),
2438                    tc.normal_output
2439                );
2440            }
2441            catalog.expire().await;
2442        })
2443        .await
2444    }
2445
2446    #[mz_ore::test(tokio::test)]
2447    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2448    async fn test_catalog_revision() {
2449        let persist_client = PersistClient::new_for_tests().await;
2450        let organization_id = Uuid::new_v4();
2451        let bootstrap_args = test_bootstrap_args();
2452        {
2453            let mut catalog = Catalog::open_debug_catalog(
2454                persist_client.clone(),
2455                organization_id.clone(),
2456                &bootstrap_args,
2457            )
2458            .await
2459            .expect("unable to open debug catalog");
2460            assert_eq!(catalog.transient_revision(), 1);
2461            let commit_ts = catalog.current_upper().await;
2462            catalog
2463                .transact(
2464                    None,
2465                    commit_ts,
2466                    None,
2467                    vec![Op::CreateDatabase {
2468                        name: "test".to_string(),
2469                        owner_id: MZ_SYSTEM_ROLE_ID,
2470                    }],
2471                )
2472                .await
2473                .expect("failed to transact");
2474            assert_eq!(catalog.transient_revision(), 2);
2475            catalog.expire().await;
2476        }
2477        {
2478            let catalog =
2479                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2480                    .await
2481                    .expect("unable to open debug catalog");
2482            // Re-opening the same catalog resets the transient_revision to 1.
2483            assert_eq!(catalog.transient_revision(), 1);
2484            catalog.expire().await;
2485        }
2486    }
2487
2488    #[mz_ore::test(tokio::test)]
2489    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2490    async fn test_effective_search_path() {
2491        Catalog::with_debug(|catalog| async move {
2492            let mz_catalog_schema = (
2493                ResolvedDatabaseSpecifier::Ambient,
2494                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2495            );
2496            let pg_catalog_schema = (
2497                ResolvedDatabaseSpecifier::Ambient,
2498                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2499            );
2500            let mz_temp_schema = (
2501                ResolvedDatabaseSpecifier::Ambient,
2502                SchemaSpecifier::Temporary,
2503            );
2504
2505            // Behavior with the default search_schema (public)
2506            let session = Session::dummy();
2507            let conn_catalog = catalog.for_session(&session);
2508            assert_ne!(
2509                conn_catalog.effective_search_path(false),
2510                conn_catalog.search_path
2511            );
2512            assert_ne!(
2513                conn_catalog.effective_search_path(true),
2514                conn_catalog.search_path
2515            );
2516            assert_eq!(
2517                conn_catalog.effective_search_path(false),
2518                vec![
2519                    mz_catalog_schema.clone(),
2520                    pg_catalog_schema.clone(),
2521                    conn_catalog.search_path[0].clone()
2522                ]
2523            );
2524            assert_eq!(
2525                conn_catalog.effective_search_path(true),
2526                vec![
2527                    mz_temp_schema.clone(),
2528                    mz_catalog_schema.clone(),
2529                    pg_catalog_schema.clone(),
2530                    conn_catalog.search_path[0].clone()
2531                ]
2532            );
2533
2534            // missing schemas are added when missing
2535            let mut session = Session::dummy();
2536            session
2537                .vars_mut()
2538                .set(
2539                    &SystemVars::new(),
2540                    "search_path",
2541                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2542                    false,
2543                )
2544                .expect("failed to set search_path");
2545            let conn_catalog = catalog.for_session(&session);
2546            assert_ne!(
2547                conn_catalog.effective_search_path(false),
2548                conn_catalog.search_path
2549            );
2550            assert_ne!(
2551                conn_catalog.effective_search_path(true),
2552                conn_catalog.search_path
2553            );
2554            assert_eq!(
2555                conn_catalog.effective_search_path(false),
2556                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2557            );
2558            assert_eq!(
2559                conn_catalog.effective_search_path(true),
2560                vec![
2561                    mz_temp_schema.clone(),
2562                    mz_catalog_schema.clone(),
2563                    pg_catalog_schema.clone()
2564                ]
2565            );
2566
2567            let mut session = Session::dummy();
2568            session
2569                .vars_mut()
2570                .set(
2571                    &SystemVars::new(),
2572                    "search_path",
2573                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2574                    false,
2575                )
2576                .expect("failed to set search_path");
2577            let conn_catalog = catalog.for_session(&session);
2578            assert_ne!(
2579                conn_catalog.effective_search_path(false),
2580                conn_catalog.search_path
2581            );
2582            assert_ne!(
2583                conn_catalog.effective_search_path(true),
2584                conn_catalog.search_path
2585            );
2586            assert_eq!(
2587                conn_catalog.effective_search_path(false),
2588                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2589            );
2590            assert_eq!(
2591                conn_catalog.effective_search_path(true),
2592                vec![
2593                    mz_temp_schema.clone(),
2594                    pg_catalog_schema.clone(),
2595                    mz_catalog_schema.clone()
2596                ]
2597            );
2598
2599            let mut session = Session::dummy();
2600            session
2601                .vars_mut()
2602                .set(
2603                    &SystemVars::new(),
2604                    "search_path",
2605                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2606                    false,
2607                )
2608                .expect("failed to set search_path");
2609            let conn_catalog = catalog.for_session(&session);
2610            assert_ne!(
2611                conn_catalog.effective_search_path(false),
2612                conn_catalog.search_path
2613            );
2614            assert_ne!(
2615                conn_catalog.effective_search_path(true),
2616                conn_catalog.search_path
2617            );
2618            assert_eq!(
2619                conn_catalog.effective_search_path(false),
2620                vec![
2621                    mz_catalog_schema.clone(),
2622                    pg_catalog_schema.clone(),
2623                    mz_temp_schema.clone()
2624                ]
2625            );
2626            assert_eq!(
2627                conn_catalog.effective_search_path(true),
2628                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2629            );
2630            catalog.expire().await;
2631        })
2632        .await
2633    }
2634
2635    #[mz_ore::test(tokio::test)]
2636    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2637    async fn test_normalized_create() {
2638        use mz_ore::collections::CollectionExt;
2639        Catalog::with_debug(|catalog| async move {
2640            let conn_catalog = catalog.for_system_session();
2641            let scx = &mut StatementContext::new(None, &conn_catalog);
2642
2643            let parsed = mz_sql_parser::parser::parse_statements(
2644                "create view public.foo as select 1 as bar",
2645            )
2646            .expect("")
2647            .into_element()
2648            .ast;
2649
2650            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2651
2652            // Ensure that all identifiers are quoted.
2653            assert_eq!(
2654                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2655                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2656            );
2657            catalog.expire().await;
2658        })
2659        .await;
2660    }
2661
2662    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2663    #[mz_ore::test(tokio::test)]
2664    #[cfg_attr(miri, ignore)] // slow
2665    async fn test_large_catalog_item() {
2666        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2667        let column = ", 1";
2668        let view_def_size = view_def.bytes().count();
2669        let column_size = column.bytes().count();
2670        let column_count =
2671            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2672        let columns = iter::repeat(column).take(column_count).join("");
2673        let create_sql = format!("{view_def}{columns})");
2674        let create_sql_check = create_sql.clone();
2675        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2676        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2677            &create_sql
2678        ));
2679
2680        let persist_client = PersistClient::new_for_tests().await;
2681        let organization_id = Uuid::new_v4();
2682        let id = CatalogItemId::User(1);
2683        let gid = GlobalId::User(1);
2684        let bootstrap_args = test_bootstrap_args();
2685        {
2686            let mut catalog = Catalog::open_debug_catalog(
2687                persist_client.clone(),
2688                organization_id.clone(),
2689                &bootstrap_args,
2690            )
2691            .await
2692            .expect("unable to open debug catalog");
2693            let item = catalog
2694                .state()
2695                .deserialize_item(
2696                    gid,
2697                    &create_sql,
2698                    &BTreeMap::new(),
2699                    &mut LocalExpressionCache::Closed,
2700                    None,
2701                )
2702                .expect("unable to parse view");
2703            let commit_ts = catalog.current_upper().await;
2704            catalog
2705                .transact(
2706                    None,
2707                    commit_ts,
2708                    None,
2709                    vec![Op::CreateItem {
2710                        item,
2711                        name: QualifiedItemName {
2712                            qualifiers: ItemQualifiers {
2713                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2714                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2715                            },
2716                            item: "v".to_string(),
2717                        },
2718                        id,
2719                        owner_id: MZ_SYSTEM_ROLE_ID,
2720                    }],
2721                )
2722                .await
2723                .expect("failed to transact");
2724            catalog.expire().await;
2725        }
2726        {
2727            let catalog =
2728                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2729                    .await
2730                    .expect("unable to open debug catalog");
2731            let view = catalog.get_entry(&id);
2732            assert_eq!("v", view.name.item);
2733            match &view.item {
2734                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2735                item => panic!("expected view, got {}", item.typ()),
2736            }
2737            catalog.expire().await;
2738        }
2739    }
2740
2741    #[mz_ore::test(tokio::test)]
2742    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2743    async fn test_object_type() {
2744        Catalog::with_debug(|catalog| async move {
2745            let conn_catalog = catalog.for_system_session();
2746
2747            assert_eq!(
2748                mz_sql::catalog::ObjectType::ClusterReplica,
2749                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2750                    ClusterId::user(1).expect("1 is a valid ID"),
2751                    ReplicaId::User(1)
2752                )))
2753            );
2754            assert_eq!(
2755                mz_sql::catalog::ObjectType::Role,
2756                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2757            );
2758            catalog.expire().await;
2759        })
2760        .await;
2761    }
2762
2763    #[mz_ore::test(tokio::test)]
2764    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2765    async fn test_get_privileges() {
2766        Catalog::with_debug(|catalog| async move {
2767            let conn_catalog = catalog.for_system_session();
2768
2769            assert_eq!(
2770                None,
2771                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2772                    ClusterId::user(1).expect("1 is a valid ID"),
2773                    ReplicaId::User(1),
2774                ))))
2775            );
2776            assert_eq!(
2777                None,
2778                conn_catalog
2779                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2780            );
2781            catalog.expire().await;
2782        })
2783        .await;
2784    }
2785
2786    #[mz_ore::test(tokio::test)]
2787    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2788    async fn verify_builtin_descs() {
2789        Catalog::with_debug(|catalog| async move {
2790            let conn_catalog = catalog.for_system_session();
2791
2792            let builtins_cfg = BuiltinsConfig {
2793                include_continual_tasks: true,
2794            };
2795            for builtin in BUILTINS::iter(&builtins_cfg) {
2796                let (schema, name, expected_desc) = match builtin {
2797                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2798                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2799                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2800                    Builtin::Log(_)
2801                    | Builtin::Type(_)
2802                    | Builtin::Func(_)
2803                    | Builtin::ContinualTask(_)
2804                    | Builtin::Index(_)
2805                    | Builtin::Connection(_) => continue,
2806                };
2807                let item = conn_catalog
2808                    .resolve_item(&PartialItemName {
2809                        database: None,
2810                        schema: Some(schema.to_string()),
2811                        item: name.to_string(),
2812                    })
2813                    .expect("unable to resolve item")
2814                    .at_version(RelationVersionSelector::Latest);
2815
2816                let full_name = conn_catalog.resolve_full_name(item.name());
2817                let actual_desc = item.desc(&full_name).expect("invalid item type");
2818                assert_eq!(
2819                    &*actual_desc, expected_desc,
2820                    "item {schema}.{name} did not match its expected RelationDesc"
2821                );
2822            }
2823            catalog.expire().await;
2824        })
2825        .await
2826    }
2827
2828    // Connect to a running Postgres server and verify that our builtin
2829    // types and functions match it, in addition to some other things.
2830    #[mz_ore::test(tokio::test)]
2831    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2832    async fn test_compare_builtins_postgres() {
2833        async fn inner(catalog: Catalog) {
2834            // Verify that all builtin functions:
2835            // - have a unique OID
2836            // - if they have a postgres counterpart (same oid) then they have matching name
2837            let (client, connection) = tokio_postgres::connect(
2838                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2839                NoTls,
2840            )
2841            .await
2842            .expect("failed to connect to Postgres");
2843
2844            task::spawn(|| "compare_builtin_postgres", async move {
2845                if let Err(e) = connection.await {
2846                    panic!("connection error: {}", e);
2847                }
2848            });
2849
2850            struct PgProc {
2851                name: String,
2852                arg_oids: Vec<u32>,
2853                ret_oid: Option<u32>,
2854                ret_set: bool,
2855            }
2856
2857            struct PgType {
2858                name: String,
2859                ty: String,
2860                elem: u32,
2861                array: u32,
2862                input: u32,
2863                receive: u32,
2864            }
2865
2866            struct PgOper {
2867                oprresult: u32,
2868                name: String,
2869            }
2870
2871            let pg_proc: BTreeMap<_, _> = client
2872                .query(
2873                    "SELECT
2874                    p.oid,
2875                    proname,
2876                    proargtypes,
2877                    prorettype,
2878                    proretset
2879                FROM pg_proc p
2880                JOIN pg_namespace n ON p.pronamespace = n.oid",
2881                    &[],
2882                )
2883                .await
2884                .expect("pg query failed")
2885                .into_iter()
2886                .map(|row| {
2887                    let oid: u32 = row.get("oid");
2888                    let pg_proc = PgProc {
2889                        name: row.get("proname"),
2890                        arg_oids: row.get("proargtypes"),
2891                        ret_oid: row.get("prorettype"),
2892                        ret_set: row.get("proretset"),
2893                    };
2894                    (oid, pg_proc)
2895                })
2896                .collect();
2897
2898            let pg_type: BTreeMap<_, _> = client
2899                .query(
2900                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2901                    &[],
2902                )
2903                .await
2904                .expect("pg query failed")
2905                .into_iter()
2906                .map(|row| {
2907                    let oid: u32 = row.get("oid");
2908                    let pg_type = PgType {
2909                        name: row.get("typname"),
2910                        ty: row.get("typtype"),
2911                        elem: row.get("typelem"),
2912                        array: row.get("typarray"),
2913                        input: row.get("typinput"),
2914                        receive: row.get("typreceive"),
2915                    };
2916                    (oid, pg_type)
2917                })
2918                .collect();
2919
2920            let pg_oper: BTreeMap<_, _> = client
2921                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2922                .await
2923                .expect("pg query failed")
2924                .into_iter()
2925                .map(|row| {
2926                    let oid: u32 = row.get("oid");
2927                    let pg_oper = PgOper {
2928                        name: row.get("oprname"),
2929                        oprresult: row.get("oprresult"),
2930                    };
2931                    (oid, pg_oper)
2932                })
2933                .collect();
2934
2935            let conn_catalog = catalog.for_system_session();
2936            let resolve_type_oid = |item: &str| {
2937                conn_catalog
2938                    .resolve_type(&PartialItemName {
2939                        database: None,
2940                        // All functions we check exist in PG, so the types must, as
2941                        // well
2942                        schema: Some(PG_CATALOG_SCHEMA.into()),
2943                        item: item.to_string(),
2944                    })
2945                    .expect("unable to resolve type")
2946                    .oid()
2947            };
2948
2949            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2950                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2951                .collect();
2952
2953            let mut all_oids = BTreeSet::new();
2954
2955            // A function to determine if two oids are equivalent enough for these tests. We don't
2956            // support some types, so map exceptions here.
2957            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2958                [
2959                    // We don't support NAME.
2960                    (Type::NAME, Type::TEXT),
2961                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2962                    // We don't support time with time zone.
2963                    (Type::TIME, Type::TIMETZ),
2964                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2965                ]
2966                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2967            );
2968            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2969                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2970            ]);
2971            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2972                if ignore_return_types.contains(&fn_oid) {
2973                    return true;
2974                }
2975                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2976                    return true;
2977                }
2978                a == b
2979            };
2980
2981            let builtins_cfg = BuiltinsConfig {
2982                include_continual_tasks: true,
2983            };
2984            for builtin in BUILTINS::iter(&builtins_cfg) {
2985                match builtin {
2986                    Builtin::Type(ty) => {
2987                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2988
2989                        if ty.oid >= FIRST_MATERIALIZE_OID {
2990                            // High OIDs are reserved in Materialize and don't have
2991                            // PostgreSQL counterparts.
2992                            continue;
2993                        }
2994
2995                        // For types that have a PostgreSQL counterpart, verify that
2996                        // the name and oid match.
2997                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2998                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2999                        });
3000                        assert_eq!(
3001                            ty.name, pg_ty.name,
3002                            "oid {} has name {} in postgres; expected {}",
3003                            ty.oid, pg_ty.name, ty.name,
3004                        );
3005
3006                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3007                            None => (0, 0),
3008                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3009                        };
3010                        assert_eq!(
3011                            typinput_oid, pg_ty.input,
3012                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3013                            ty.name, typinput_oid, pg_ty.input,
3014                        );
3015                        assert_eq!(
3016                            typreceive_oid, pg_ty.receive,
3017                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3018                            ty.name, typreceive_oid, pg_ty.receive,
3019                        );
3020                        if typinput_oid != 0 {
3021                            assert!(
3022                                func_oids.contains(&typinput_oid),
3023                                "type {} has typinput OID {} that does not exist in pg_proc",
3024                                ty.name,
3025                                typinput_oid,
3026                            );
3027                        }
3028                        if typreceive_oid != 0 {
3029                            assert!(
3030                                func_oids.contains(&typreceive_oid),
3031                                "type {} has typreceive OID {} that does not exist in pg_proc",
3032                                ty.name,
3033                                typreceive_oid,
3034                            );
3035                        }
3036
3037                        // Ensure the type matches.
3038                        match &ty.details.typ {
3039                            CatalogType::Array { element_reference } => {
3040                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3041                                    .filter_map(|builtin| match builtin {
3042                                        Builtin::Type(ty @ BuiltinType { name, .. })
3043                                            if element_reference == name =>
3044                                        {
3045                                            Some(ty)
3046                                        }
3047                                        _ => None,
3048                                    })
3049                                    .next();
3050                                let elem_ty = match elem_ty {
3051                                    Some(ty) => ty,
3052                                    None => {
3053                                        panic!("{} is unexpectedly not a type", element_reference)
3054                                    }
3055                                };
3056                                assert_eq!(
3057                                    pg_ty.elem, elem_ty.oid,
3058                                    "type {} has mismatched element OIDs",
3059                                    ty.name
3060                                )
3061                            }
3062                            CatalogType::Pseudo => {
3063                                assert_eq!(
3064                                    pg_ty.ty, "p",
3065                                    "type {} is not a pseudo type as expected",
3066                                    ty.name
3067                                )
3068                            }
3069                            CatalogType::Range { .. } => {
3070                                assert_eq!(
3071                                    pg_ty.ty, "r",
3072                                    "type {} is not a range type as expected",
3073                                    ty.name
3074                                );
3075                            }
3076                            _ => {
3077                                assert_eq!(
3078                                    pg_ty.ty, "b",
3079                                    "type {} is not a base type as expected",
3080                                    ty.name
3081                                )
3082                            }
3083                        }
3084
3085                        // Ensure the array type reference is correct.
3086                        let schema = catalog
3087                            .resolve_schema_in_database(
3088                                &ResolvedDatabaseSpecifier::Ambient,
3089                                ty.schema,
3090                                &SYSTEM_CONN_ID,
3091                            )
3092                            .expect("unable to resolve schema");
3093                        let allocated_type = catalog
3094                            .resolve_type(
3095                                None,
3096                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3097                                &PartialItemName {
3098                                    database: None,
3099                                    schema: Some(schema.name().schema.clone()),
3100                                    item: ty.name.to_string(),
3101                                },
3102                                &SYSTEM_CONN_ID,
3103                            )
3104                            .expect("unable to resolve type");
3105                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3106                            ty
3107                        } else {
3108                            panic!("unexpectedly not a type")
3109                        };
3110                        match ty.details.array_id {
3111                            Some(array_id) => {
3112                                let array_ty = catalog.get_entry(&array_id);
3113                                assert_eq!(
3114                                    pg_ty.array, array_ty.oid,
3115                                    "type {} has mismatched array OIDs",
3116                                    allocated_type.name.item,
3117                                );
3118                            }
3119                            None => assert_eq!(
3120                                pg_ty.array, 0,
3121                                "type {} does not have an array type in mz but does in pg",
3122                                allocated_type.name.item,
3123                            ),
3124                        }
3125                    }
3126                    Builtin::Func(func) => {
3127                        for imp in func.inner.func_impls() {
3128                            assert!(
3129                                all_oids.insert(imp.oid),
3130                                "{} reused oid {}",
3131                                func.name,
3132                                imp.oid
3133                            );
3134
3135                            assert!(
3136                                imp.oid < FIRST_USER_OID,
3137                                "built-in function {} erroneously has OID in user space ({})",
3138                                func.name,
3139                                imp.oid,
3140                            );
3141
3142                            // For functions that have a postgres counterpart, verify that the name and
3143                            // oid match.
3144                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3145                                continue;
3146                            } else {
3147                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3148                                    panic!(
3149                                        "pg_proc missing function {}: oid {}",
3150                                        func.name, imp.oid
3151                                    )
3152                                })
3153                            };
3154                            assert_eq!(
3155                                func.name, pg_fn.name,
3156                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3157                                imp.oid, func.name, pg_fn.name
3158                            );
3159
3160                            // Complain, but don't fail, if argument oids don't match.
3161                            // TODO: make these match.
3162                            let imp_arg_oids = imp
3163                                .arg_typs
3164                                .iter()
3165                                .map(|item| resolve_type_oid(item))
3166                                .collect::<Vec<_>>();
3167
3168                            if imp_arg_oids != pg_fn.arg_oids {
3169                                println!(
3170                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3171                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3172                                );
3173                            }
3174
3175                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3176
3177                            assert!(
3178                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3179                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3180                                imp.oid,
3181                                func.name,
3182                                imp_return_oid,
3183                                pg_fn.ret_oid
3184                            );
3185
3186                            assert_eq!(
3187                                imp.return_is_set, pg_fn.ret_set,
3188                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3189                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3190                            );
3191                        }
3192                    }
3193                    _ => (),
3194                }
3195            }
3196
3197            for (op, func) in OP_IMPLS.iter() {
3198                for imp in func.func_impls() {
3199                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3200
3201                    // For operators that have a postgres counterpart, verify that the name and oid match.
3202                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3203                        continue;
3204                    } else {
3205                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3206                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3207                        })
3208                    };
3209
3210                    assert_eq!(*op, pg_op.name);
3211
3212                    let imp_return_oid =
3213                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3214                    if imp_return_oid != pg_op.oprresult {
3215                        panic!(
3216                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3217                            imp.oid, op, imp_return_oid, pg_op.oprresult
3218                        );
3219                    }
3220                }
3221            }
3222            catalog.expire().await;
3223        }
3224
3225        Catalog::with_debug(inner).await
3226    }
3227
3228    // Execute all builtin functions with all combinations of arguments from interesting datums.
3229    #[mz_ore::test(tokio::test)]
3230    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3231    async fn test_smoketest_all_builtins() {
3232        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3233            let catalog = Arc::new(catalog);
3234            let conn_catalog = catalog.for_system_session();
3235
3236            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3237            let mut handles = Vec::new();
3238
3239            // Extracted during planning; always panics when executed.
3240            let ignore_names = BTreeSet::from([
3241                "avg",
3242                "avg_internal_v1",
3243                "bool_and",
3244                "bool_or",
3245                "mod",
3246                "mz_panic",
3247                "mz_sleep",
3248                "pow",
3249                "stddev_pop",
3250                "stddev_samp",
3251                "stddev",
3252                "var_pop",
3253                "var_samp",
3254                "variance",
3255            ]);
3256
3257            let fns = BUILTINS::funcs()
3258                .map(|func| (&func.name, func.inner))
3259                .chain(OP_IMPLS.iter());
3260
3261            for (name, func) in fns {
3262                if ignore_names.contains(name) {
3263                    continue;
3264                }
3265                let Func::Scalar(impls) = func else {
3266                    continue;
3267                };
3268
3269                'outer: for imp in impls {
3270                    let details = imp.details();
3271                    let mut styps = Vec::new();
3272                    for item in details.arg_typs.iter() {
3273                        let oid = resolve_type_oid(item);
3274                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3275                            continue 'outer;
3276                        };
3277                        styps.push(ScalarType::try_from(&pgtyp).expect("must exist"));
3278                    }
3279                    let datums = styps
3280                        .iter()
3281                        .map(|styp| {
3282                            let mut datums = vec![Datum::Null];
3283                            datums.extend(styp.interesting_datums());
3284                            datums
3285                        })
3286                        .collect::<Vec<_>>();
3287                    // Skip nullary fns.
3288                    if datums.is_empty() {
3289                        continue;
3290                    }
3291
3292                    let return_oid = details
3293                        .return_typ
3294                        .map(resolve_type_oid)
3295                        .expect("must exist");
3296                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3297                        .ok()
3298                        .map(|typ| ScalarType::try_from(&typ).expect("must exist"));
3299
3300                    let mut idxs = vec![0; datums.len()];
3301                    while idxs[0] < datums[0].len() {
3302                        let mut args = Vec::with_capacity(idxs.len());
3303                        for i in 0..(datums.len()) {
3304                            args.push(datums[i][idxs[i]]);
3305                        }
3306
3307                        let op = &imp.op;
3308                        let scalars = args
3309                            .iter()
3310                            .enumerate()
3311                            .map(|(i, datum)| {
3312                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3313                                    datum.clone(),
3314                                    styps[i].clone(),
3315                                ))
3316                            })
3317                            .collect();
3318
3319                        let call_name = format!(
3320                            "{name}({}) (oid: {})",
3321                            args.iter()
3322                                .map(|d| d.to_string())
3323                                .collect::<Vec<_>>()
3324                                .join(", "),
3325                            imp.oid
3326                        );
3327                        let catalog = Arc::clone(&catalog);
3328                        let call_name_fn = call_name.clone();
3329                        let return_styp = return_styp.clone();
3330                        let handle = task::spawn_blocking(
3331                            || call_name,
3332                            move || {
3333                                smoketest_fn(
3334                                    name,
3335                                    call_name_fn,
3336                                    op,
3337                                    imp,
3338                                    args,
3339                                    catalog,
3340                                    scalars,
3341                                    return_styp,
3342                                )
3343                            },
3344                        );
3345                        handles.push(handle);
3346
3347                        // Advance to the next datum combination.
3348                        for i in (0..datums.len()).rev() {
3349                            idxs[i] += 1;
3350                            if idxs[i] >= datums[i].len() {
3351                                if i == 0 {
3352                                    break;
3353                                }
3354                                idxs[i] = 0;
3355                                continue;
3356                            } else {
3357                                break;
3358                            }
3359                        }
3360                    }
3361                }
3362            }
3363            handles
3364        }
3365
3366        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3367        for handle in handles {
3368            handle.await.expect("must succeed");
3369        }
3370    }
3371
3372    fn smoketest_fn(
3373        name: &&str,
3374        call_name: String,
3375        op: &Operation<HirScalarExpr>,
3376        imp: &FuncImpl<HirScalarExpr>,
3377        args: Vec<Datum<'_>>,
3378        catalog: Arc<Catalog>,
3379        scalars: Vec<CoercibleScalarExpr>,
3380        return_styp: Option<ScalarType>,
3381    ) {
3382        let conn_catalog = catalog.for_system_session();
3383        let pcx = PlanContext::zero();
3384        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3385        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3386        let ecx = ExprContext {
3387            qcx: &qcx,
3388            name: "smoketest",
3389            scope: &Scope::empty(),
3390            relation_type: &RelationType::empty(),
3391            allow_aggregates: false,
3392            allow_subqueries: false,
3393            allow_parameters: false,
3394            allow_windows: false,
3395        };
3396        let arena = RowArena::new();
3397        let mut session = Session::<Timestamp>::dummy();
3398        session
3399            .start_transaction(to_datetime(0), None, None)
3400            .expect("must succeed");
3401        let prep_style = ExprPrepStyle::OneShot {
3402            logical_time: EvalTime::Time(Timestamp::MIN),
3403            session: &session,
3404            catalog_state: &catalog.state,
3405        };
3406
3407        // Execute the function as much as possible, ensuring no panics occur, but
3408        // otherwise ignoring eval errors. We also do various other checks.
3409        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3410        if let Ok(hir) = res {
3411            if let Ok(mut mir) = hir.lower_uncorrelated() {
3412                // Populate unmaterialized functions.
3413                prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3414
3415                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3416                    if let Some(return_styp) = return_styp {
3417                        let mir_typ = mir.typ(&[]);
3418                        // MIR type inference should be consistent with the type
3419                        // we get from the catalog.
3420                        assert_eq!(mir_typ.scalar_type, return_styp);
3421                        // The following will check not just that the scalar type
3422                        // is ok, but also catches if the function returned a null
3423                        // but the MIR type inference said "non-nullable".
3424                        if !eval_result_datum.is_instance_of(&mir_typ) {
3425                            panic!(
3426                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3427                            );
3428                        }
3429                        // Check the consistency of `introduces_nulls` and
3430                        // `propagates_nulls` with `MirScalarExpr::typ`.
3431                        if let Some((introduces_nulls, propagates_nulls)) =
3432                            call_introduces_propagates_nulls(&mir)
3433                        {
3434                            if introduces_nulls {
3435                                // If the function introduces_nulls, then the return
3436                                // type should always be nullable, regardless of
3437                                // the nullability of the input types.
3438                                assert!(
3439                                    mir_typ.nullable,
3440                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3441                                    name, args, mir, mir_typ.nullable
3442                                );
3443                            } else {
3444                                let any_input_null = args.iter().any(|arg| arg.is_null());
3445                                if !any_input_null {
3446                                    assert!(
3447                                        !mir_typ.nullable,
3448                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3449                                        name, args, mir, mir_typ.nullable
3450                                    );
3451                                } else {
3452                                    assert_eq!(
3453                                        mir_typ.nullable, propagates_nulls,
3454                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3455                                        name, args, mir, mir_typ.nullable
3456                                    );
3457                                }
3458                            }
3459                        }
3460                        // Check that `MirScalarExpr::reduce` yields the same result
3461                        // as the real evaluation.
3462                        let mut reduced = mir.clone();
3463                        reduced.reduce(&[]);
3464                        match reduced {
3465                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3466                                match reduce_result {
3467                                    Ok(reduce_result_row) => {
3468                                        let reduce_result_datum = reduce_result_row.unpack_first();
3469                                        assert_eq!(
3470                                            reduce_result_datum,
3471                                            eval_result_datum,
3472                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3473                                            name,
3474                                            args,
3475                                            mir,
3476                                            eval_result_datum,
3477                                            mir_typ.scalar_type,
3478                                            reduce_result_datum,
3479                                            ctyp.scalar_type
3480                                        );
3481                                        // Let's check that the types also match.
3482                                        // (We are not checking nullability here,
3483                                        // because it's ok when we know a more
3484                                        // precise nullability after actually
3485                                        // evaluating a function than before.)
3486                                        assert_eq!(
3487                                            ctyp.scalar_type,
3488                                            mir_typ.scalar_type,
3489                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3490                                            name,
3491                                            args,
3492                                            mir,
3493                                            eval_result_datum,
3494                                            mir_typ.scalar_type,
3495                                            reduce_result_datum,
3496                                            ctyp.scalar_type
3497                                        );
3498                                    }
3499                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3500                                }
3501                            }
3502                            _ => unreachable!(
3503                                "all args are literals, so should have reduced to a literal"
3504                            ),
3505                        }
3506                    }
3507                }
3508            }
3509        }
3510    }
3511
3512    /// If the given MirScalarExpr
3513    ///  - is a function call, and
3514    ///  - all arguments are literals
3515    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3516    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3517        match mir_func_call {
3518            MirScalarExpr::CallUnary { func, expr } => {
3519                if expr.is_literal() {
3520                    Some((func.introduces_nulls(), func.propagates_nulls()))
3521                } else {
3522                    None
3523                }
3524            }
3525            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3526                if expr1.is_literal() && expr2.is_literal() {
3527                    Some((func.introduces_nulls(), func.propagates_nulls()))
3528                } else {
3529                    None
3530                }
3531            }
3532            MirScalarExpr::CallVariadic { func, exprs } => {
3533                if exprs.iter().all(|arg| arg.is_literal()) {
3534                    Some((func.introduces_nulls(), func.propagates_nulls()))
3535                } else {
3536                    None
3537                }
3538            }
3539            _ => None,
3540        }
3541    }
3542
3543    // Make sure pg views don't use types that only exist in Materialize.
3544    #[mz_ore::test(tokio::test)]
3545    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3546    async fn test_pg_views_forbidden_types() {
3547        Catalog::with_debug(|catalog| async move {
3548            let conn_catalog = catalog.for_system_session();
3549
3550            for view in BUILTINS::views().filter(|view| {
3551                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3552            }) {
3553                let item = conn_catalog
3554                    .resolve_item(&PartialItemName {
3555                        database: None,
3556                        schema: Some(view.schema.to_string()),
3557                        item: view.name.to_string(),
3558                    })
3559                    .expect("unable to resolve view")
3560                    // TODO(alter_table)
3561                    .at_version(RelationVersionSelector::Latest);
3562                let full_name = conn_catalog.resolve_full_name(item.name());
3563                for col_type in item
3564                    .desc(&full_name)
3565                    .expect("invalid item type")
3566                    .iter_types()
3567                {
3568                    match &col_type.scalar_type {
3569                        typ @ ScalarType::UInt16
3570                        | typ @ ScalarType::UInt32
3571                        | typ @ ScalarType::UInt64
3572                        | typ @ ScalarType::MzTimestamp
3573                        | typ @ ScalarType::List { .. }
3574                        | typ @ ScalarType::Map { .. }
3575                        | typ @ ScalarType::MzAclItem => {
3576                            panic!("{typ:?} type found in {full_name}");
3577                        }
3578                        ScalarType::AclItem
3579                        | ScalarType::Bool
3580                        | ScalarType::Int16
3581                        | ScalarType::Int32
3582                        | ScalarType::Int64
3583                        | ScalarType::Float32
3584                        | ScalarType::Float64
3585                        | ScalarType::Numeric { .. }
3586                        | ScalarType::Date
3587                        | ScalarType::Time
3588                        | ScalarType::Timestamp { .. }
3589                        | ScalarType::TimestampTz { .. }
3590                        | ScalarType::Interval
3591                        | ScalarType::PgLegacyChar
3592                        | ScalarType::Bytes
3593                        | ScalarType::String
3594                        | ScalarType::Char { .. }
3595                        | ScalarType::VarChar { .. }
3596                        | ScalarType::Jsonb
3597                        | ScalarType::Uuid
3598                        | ScalarType::Array(_)
3599                        | ScalarType::Record { .. }
3600                        | ScalarType::Oid
3601                        | ScalarType::RegProc
3602                        | ScalarType::RegType
3603                        | ScalarType::RegClass
3604                        | ScalarType::Int2Vector
3605                        | ScalarType::Range { .. }
3606                        | ScalarType::PgLegacyName => {}
3607                    }
3608                }
3609            }
3610            catalog.expire().await;
3611        })
3612        .await
3613    }
3614
3615    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3616    // introspection relations.
3617    #[mz_ore::test(tokio::test)]
3618    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3619    async fn test_mz_introspection_builtins() {
3620        Catalog::with_debug(|catalog| async move {
3621            let conn_catalog = catalog.for_system_session();
3622
3623            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3624            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3625
3626            for entry in catalog.entries() {
3627                let schema_spec = entry.name().qualifiers.schema_spec;
3628                let introspection_deps = catalog.introspection_dependencies(entry.id);
3629                if introspection_deps.is_empty() {
3630                    assert!(
3631                        schema_spec != introspection_schema_spec,
3632                        "entry does not depend on introspection sources but is in \
3633                         `mz_introspection`: {}",
3634                        conn_catalog.resolve_full_name(entry.name()),
3635                    );
3636                } else {
3637                    assert!(
3638                        schema_spec == introspection_schema_spec,
3639                        "entry depends on introspection sources but is not in \
3640                         `mz_introspection`: {}",
3641                        conn_catalog.resolve_full_name(entry.name()),
3642                    );
3643                }
3644            }
3645        })
3646        .await
3647    }
3648
3649    #[mz_ore::test(tokio::test)]
3650    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3651    async fn test_multi_subscriber_catalog() {
3652        let persist_client = PersistClient::new_for_tests().await;
3653        let bootstrap_args = test_bootstrap_args();
3654        let organization_id = Uuid::new_v4();
3655        let db_name = "DB";
3656
3657        let mut writer_catalog = Catalog::open_debug_catalog(
3658            persist_client.clone(),
3659            organization_id.clone(),
3660            &bootstrap_args,
3661        )
3662        .await
3663        .expect("open_debug_catalog");
3664        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3665            persist_client.clone(),
3666            organization_id.clone(),
3667            &bootstrap_args,
3668        )
3669        .await
3670        .expect("open_debug_read_only_catalog");
3671        assert_err!(writer_catalog.resolve_database(db_name));
3672        assert_err!(read_only_catalog.resolve_database(db_name));
3673
3674        let commit_ts = writer_catalog.current_upper().await;
3675        writer_catalog
3676            .transact(
3677                None,
3678                commit_ts,
3679                None,
3680                vec![Op::CreateDatabase {
3681                    name: db_name.to_string(),
3682                    owner_id: MZ_SYSTEM_ROLE_ID,
3683                }],
3684            )
3685            .await
3686            .expect("failed to transact");
3687
3688        let write_db = writer_catalog
3689            .resolve_database(db_name)
3690            .expect("resolve_database");
3691        read_only_catalog
3692            .sync_to_current_updates()
3693            .await
3694            .expect("sync_to_current_updates");
3695        let read_db = read_only_catalog
3696            .resolve_database(db_name)
3697            .expect("resolve_database");
3698
3699        assert_eq!(write_db, read_db);
3700
3701        let writer_catalog_fencer =
3702            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3703                .await
3704                .expect("open_debug_catalog for fencer");
3705        let fencer_db = writer_catalog_fencer
3706            .resolve_database(db_name)
3707            .expect("resolve_database for fencer");
3708        assert_eq!(fencer_db, read_db);
3709
3710        let write_fence_err = writer_catalog
3711            .sync_to_current_updates()
3712            .await
3713            .expect_err("sync_to_current_updates for fencer");
3714        assert!(matches!(
3715            write_fence_err,
3716            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3717        ));
3718        let read_fence_err = read_only_catalog
3719            .sync_to_current_updates()
3720            .await
3721            .expect_err("sync_to_current_updates after fencer");
3722        assert!(matches!(
3723            read_fence_err,
3724            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3725        ));
3726
3727        writer_catalog.expire().await;
3728        read_only_catalog.expire().await;
3729        writer_catalog_fencer.expire().await;
3730    }
3731}