mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67    CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68    DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91// DO NOT add any more imports from `crate` outside of `crate::catalog`.
92pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100#[cfg(test)]
101use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
102use crate::session::{Portal, PreparedStatement, Session};
103use crate::util::ResultExt;
104use crate::{AdapterError, AdapterNotice, ExecuteResponse};
105
106mod builtin_table_updates;
107pub(crate) mod consistency;
108mod migrate;
109
110mod apply;
111mod open;
112mod state;
113mod timeline;
114mod transact;
115
116/// A `Catalog` keeps track of the SQL objects known to the planner.
117///
118/// For each object, it keeps track of both forward and reverse dependencies:
119/// i.e., which objects are depended upon by the object, and which objects
120/// depend upon the object. It enforces the SQL rules around dropping: an object
121/// cannot be dropped until all of the objects that depend upon it are dropped.
122/// It also enforces uniqueness of names.
123///
124/// SQL mandates a hierarchy of exactly three layers. A catalog contains
125/// databases, databases contain schemas, and schemas contain catalog items,
126/// like sources, sinks, view, and indexes.
127///
128/// To the outside world, databases, schemas, and items are all identified by
129/// name. Items can be referred to by their [`FullItemName`], which fully and
130/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
131/// database name and/or the schema name. Partial names can be converted into
132/// full names via a complicated resolution process documented by the
133/// [`CatalogState::resolve`] method.
134///
135/// The catalog also maintains special "ambient schemas": virtual schemas,
136/// implicitly present in all databases, that house various system views.
137/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
138#[derive(Debug)]
139pub struct Catalog {
140    state: CatalogState,
141    plans: CatalogPlans,
142    expr_cache_handle: Option<ExpressionCacheHandle>,
143    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
144    transient_revision: u64,
145}
146
147// Implement our own Clone because derive can't unless S is Clone, which it's
148// not (hence the Arc).
149impl Clone for Catalog {
150    fn clone(&self) -> Self {
151        Self {
152            state: self.state.clone(),
153            plans: self.plans.clone(),
154            expr_cache_handle: self.expr_cache_handle.clone(),
155            storage: Arc::clone(&self.storage),
156            transient_revision: self.transient_revision,
157        }
158    }
159}
160
161#[derive(Default, Debug, Clone)]
162pub struct CatalogPlans {
163    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
164    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
165    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
166    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
167}
168
169impl Catalog {
170    /// Set the optimized plan for the item identified by `id`.
171    #[mz_ore::instrument(level = "trace")]
172    pub fn set_optimized_plan(
173        &mut self,
174        id: GlobalId,
175        plan: DataflowDescription<OptimizedMirRelationExpr>,
176    ) {
177        self.plans.optimized_plan_by_id.insert(id, plan.into());
178    }
179
180    /// Set the physical plan for the item identified by `id`.
181    #[mz_ore::instrument(level = "trace")]
182    pub fn set_physical_plan(
183        &mut self,
184        id: GlobalId,
185        plan: DataflowDescription<mz_compute_types::plan::Plan>,
186    ) {
187        self.plans.physical_plan_by_id.insert(id, plan.into());
188    }
189
190    /// Try to get the optimized plan for the item identified by `id`.
191    #[mz_ore::instrument(level = "trace")]
192    pub fn try_get_optimized_plan(
193        &self,
194        id: &GlobalId,
195    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
196        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
197    }
198
199    /// Try to get the physical plan for the item identified by `id`.
200    #[mz_ore::instrument(level = "trace")]
201    pub fn try_get_physical_plan(
202        &self,
203        id: &GlobalId,
204    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
205        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
206    }
207
208    /// Set the `DataflowMetainfo` for the item identified by `id`.
209    #[mz_ore::instrument(level = "trace")]
210    pub fn set_dataflow_metainfo(
211        &mut self,
212        id: GlobalId,
213        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
214    ) {
215        // Add entries to the `notices_by_dep_id` lookup map.
216        for notice in metainfo.optimizer_notices.iter() {
217            for dep_id in notice.dependencies.iter() {
218                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
219                entry.push(Arc::clone(notice))
220            }
221            if let Some(item_id) = notice.item_id {
222                soft_assert_eq_or_log!(
223                    item_id,
224                    id,
225                    "notice.item_id should match the id for whom we are saving the notice"
226                );
227            }
228        }
229        // Add the dataflow with the scoped entries.
230        self.plans.dataflow_metainfos.insert(id, metainfo);
231    }
232
233    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
234    #[mz_ore::instrument(level = "trace")]
235    pub fn try_get_dataflow_metainfo(
236        &self,
237        id: &GlobalId,
238    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
239        self.plans.dataflow_metainfos.get(id)
240    }
241
242    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
243    /// item identified by `id`.
244    ///
245    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
246    ///
247    /// Return a set containing all dropped notices. Note that if for some
248    /// reason we end up with two identical notices being dropped by the same
249    /// call, the result will contain only one instance of that notice.
250    #[mz_ore::instrument(level = "trace")]
251    pub fn drop_plans_and_metainfos(
252        &mut self,
253        drop_ids: &BTreeSet<GlobalId>,
254    ) -> BTreeSet<Arc<OptimizerNotice>> {
255        // Collect dropped notices in this set.
256        let mut dropped_notices = BTreeSet::new();
257
258        // Remove plans and metainfo.optimizer_notices entries.
259        for id in drop_ids {
260            self.plans.optimized_plan_by_id.remove(id);
261            self.plans.physical_plan_by_id.remove(id);
262            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
263                soft_assert_or_log!(
264                    metainfo.optimizer_notices.iter().all_unique(),
265                    "should have been pushed there by `push_optimizer_notice_dedup`"
266                );
267                for n in metainfo.optimizer_notices.drain(..) {
268                    // Remove the corresponding notices_by_dep_id entries.
269                    for dep_id in n.dependencies.iter() {
270                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
271                            soft_assert_or_log!(
272                                notices.iter().any(|x| &n == x),
273                                "corrupt notices_by_dep_id"
274                            );
275                            notices.retain(|x| &n != x)
276                        }
277                    }
278                    dropped_notices.insert(n);
279                }
280            }
281        }
282
283        // Remove notices_by_dep_id entries.
284        for id in drop_ids {
285            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
286                for n in notices.drain(..) {
287                    // Remove the corresponding metainfo.optimizer_notices entries.
288                    if let Some(item_id) = n.item_id.as_ref() {
289                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
290                            metainfo.optimizer_notices.iter().for_each(|n2| {
291                                if let Some(item_id_2) = n2.item_id {
292                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
293                                }
294                            });
295                            metainfo.optimizer_notices.retain(|x| &n != x);
296                        }
297                    }
298                    dropped_notices.insert(n);
299                }
300            }
301        }
302
303        // Collect dependency ids not in drop_ids with at least one dropped
304        // notice.
305        let mut todo_dep_ids = BTreeSet::new();
306        for notice in dropped_notices.iter() {
307            for dep_id in notice.dependencies.iter() {
308                if !drop_ids.contains(dep_id) {
309                    todo_dep_ids.insert(*dep_id);
310                }
311            }
312        }
313        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
314        // entries in `todo_dep_ids`.
315        for id in todo_dep_ids {
316            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
317                notices.retain(|n| !dropped_notices.contains(n))
318            }
319        }
320
321        if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
322            use mz_ore::str::{bracketed, separated};
323            let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
324            let bad_notices = bad_notices.map(|n| {
325                // Try to find where the bad reference is.
326                // Maybe in `dataflow_metainfos`?
327                let mut dataflow_metainfo_occurrences = Vec::new();
328                for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
329                    if meta_info.optimizer_notices.contains(n) {
330                        dataflow_metainfo_occurrences.push(id);
331                    }
332                }
333                // Or `notices_by_dep_id`?
334                let mut notices_by_dep_id_occurrences = Vec::new();
335                for (id, notices) in self.plans.notices_by_dep_id.iter() {
336                    if notices.iter().contains(n) {
337                        notices_by_dep_id_occurrences.push(id);
338                    }
339                }
340                format!(
341                    "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
342                    dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
343                    n.id,
344                    n.kind,
345                    n.dependencies,
346                    Arc::strong_count(n),
347                    dataflow_metainfo_occurrences,
348                    notices_by_dep_id_occurrences
349                )
350            });
351            let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
352            error!(
353                "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
354                 bad_notices = {bad_notices}; \
355                 drop_ids = {drop_ids:?}"
356            );
357        }
358
359        dropped_notices
360    }
361
362    /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
363    /// as a result of creating new indexes on the items in `ons`.
364    ///
365    /// When creating and inserting a new index, we need to invalidate some entries that may
366    /// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
367    /// the following objects:
368    ///
369    ///   - `o`.
370    ///   - All compute objects that depend directly on `o`.
371    ///   - All compute objects that would directly depend on `o`, if all views were inlined.
372    pub(crate) fn invalidate_for_index(
373        &self,
374        ons: impl Iterator<Item = GlobalId>,
375    ) -> BTreeSet<GlobalId> {
376        let mut dependencies = BTreeSet::new();
377        let mut queue = VecDeque::new();
378        let mut seen = HashSet::new();
379        for on in ons {
380            let entry = self.get_entry_by_global_id(&on);
381            dependencies.insert(on);
382            seen.insert(entry.id);
383            let uses = entry.uses();
384            queue.extend(uses.clone());
385        }
386
387        while let Some(cur) = queue.pop_front() {
388            let entry = self.get_entry(&cur);
389            if seen.insert(cur) {
390                let global_ids = entry.global_ids();
391                match entry.item_type() {
392                    CatalogItemType::Table
393                    | CatalogItemType::Source
394                    | CatalogItemType::MaterializedView
395                    | CatalogItemType::Sink
396                    | CatalogItemType::Index
397                    | CatalogItemType::Type
398                    | CatalogItemType::Func
399                    | CatalogItemType::Secret
400                    | CatalogItemType::Connection
401                    | CatalogItemType::ContinualTask => {
402                        dependencies.extend(global_ids);
403                    }
404                    CatalogItemType::View => {
405                        dependencies.extend(global_ids);
406                        queue.extend(entry.uses());
407                    }
408                }
409            }
410        }
411        dependencies
412    }
413}
414
415#[derive(Debug)]
416pub struct ConnCatalog<'a> {
417    state: Cow<'a, CatalogState>,
418    /// Because we don't have any way of removing items from the catalog
419    /// temporarily, we allow the ConnCatalog to pretend that a set of items
420    /// don't exist during resolution.
421    ///
422    /// This feature is necessary to allow re-planning of statements, which is
423    /// either incredibly useful or required when altering item definitions.
424    ///
425    /// Note that uses of this field should be used by short-lived
426    /// catalogs.
427    unresolvable_ids: BTreeSet<CatalogItemId>,
428    conn_id: ConnectionId,
429    cluster: String,
430    database: Option<DatabaseId>,
431    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
432    role_id: RoleId,
433    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
434    portals: Option<&'a BTreeMap<String, Portal>>,
435    notices_tx: UnboundedSender<AdapterNotice>,
436}
437
438impl ConnCatalog<'_> {
439    pub fn conn_id(&self) -> &ConnectionId {
440        &self.conn_id
441    }
442
443    pub fn state(&self) -> &CatalogState {
444        &*self.state
445    }
446
447    /// Prevent planning from resolving item with the provided ID. Instead,
448    /// return an error as if the item did not exist.
449    ///
450    /// This feature is meant exclusively to permit re-planning statements
451    /// during update operations and should not be used otherwise given its
452    /// extremely "powerful" semantics.
453    ///
454    /// # Panics
455    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
456    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
457        assert_eq!(
458            self.role_id, MZ_SYSTEM_ROLE_ID,
459            "only the system role can mark IDs unresolvable",
460        );
461        self.unresolvable_ids.insert(id);
462    }
463
464    /// Returns the schemas:
465    /// - mz_catalog
466    /// - pg_catalog
467    /// - temp (if requested)
468    /// - all schemas from the session's search_path var that exist
469    pub fn effective_search_path(
470        &self,
471        include_temp_schema: bool,
472    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
473        self.state
474            .effective_search_path(&self.search_path, include_temp_schema)
475    }
476}
477
478impl ConnectionResolver for ConnCatalog<'_> {
479    fn resolve_connection(
480        &self,
481        id: CatalogItemId,
482    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
483        self.state().resolve_connection(id)
484    }
485}
486
487impl Catalog {
488    /// Returns the catalog's transient revision, which starts at 1 and is
489    /// incremented on every change. This is not persisted to disk, and will
490    /// restart on every load.
491    pub fn transient_revision(&self) -> u64 {
492        self.transient_revision
493    }
494
495    /// Creates a debug catalog from the current
496    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
497    /// like in tests.
498    ///
499    /// WARNING! This function can arbitrarily fail because it does not make any
500    /// effort to adjust the catalog's contents' structure or semantics to the
501    /// currently running version, i.e. it does not apply any migrations.
502    ///
503    /// This function must not be called in production contexts. Use
504    /// [`Catalog::open`] with appropriately set configuration parameters
505    /// instead.
506    pub async fn with_debug<F, Fut, T>(f: F) -> T
507    where
508        F: FnOnce(Catalog) -> Fut,
509        Fut: Future<Output = T>,
510    {
511        let persist_client = PersistClient::new_for_tests().await;
512        let organization_id = Uuid::new_v4();
513        let bootstrap_args = test_bootstrap_args();
514        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
515            .await
516            .expect("can open debug catalog");
517        f(catalog).await
518    }
519
520    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
521    /// in progress.
522    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
523    where
524        F: FnOnce(Catalog) -> Fut,
525        Fut: Future<Output = T>,
526    {
527        let persist_client = PersistClient::new_for_tests().await;
528        let organization_id = Uuid::new_v4();
529        let bootstrap_args = test_bootstrap_args();
530        let mut catalog =
531            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
532                .await
533                .expect("can open debug catalog");
534
535        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
536        let now = SYSTEM_TIME.clone();
537        let openable_storage = TestCatalogStateBuilder::new(persist_client)
538            .with_organization_id(organization_id)
539            .with_default_deploy_generation()
540            .build()
541            .await
542            .expect("can create durable catalog");
543        let mut storage = openable_storage
544            .open(now().into(), &bootstrap_args)
545            .await
546            .expect("can open durable catalog")
547            .0;
548        // Drain updates.
549        let _ = storage
550            .sync_to_current_updates()
551            .await
552            .expect("can sync to current updates");
553        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
554
555        f(catalog).await
556    }
557
558    /// Opens a debug catalog.
559    ///
560    /// See [`Catalog::with_debug`].
561    pub async fn open_debug_catalog(
562        persist_client: PersistClient,
563        organization_id: Uuid,
564        bootstrap_args: &BootstrapArgs,
565    ) -> Result<Catalog, anyhow::Error> {
566        let now = SYSTEM_TIME.clone();
567        let environment_id = None;
568        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
569            .with_organization_id(organization_id)
570            .with_default_deploy_generation()
571            .build()
572            .await?;
573        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
574        let system_parameter_defaults = BTreeMap::default();
575        Self::open_debug_catalog_inner(
576            persist_client,
577            storage,
578            now,
579            environment_id,
580            &DUMMY_BUILD_INFO,
581            system_parameter_defaults,
582            bootstrap_args,
583            None,
584        )
585        .await
586    }
587
588    /// Opens a read only debug persist backed catalog defined by `persist_client` and
589    /// `organization_id`.
590    ///
591    /// See [`Catalog::with_debug`].
592    pub async fn open_debug_read_only_catalog(
593        persist_client: PersistClient,
594        organization_id: Uuid,
595        bootstrap_args: &BootstrapArgs,
596    ) -> Result<Catalog, anyhow::Error> {
597        let now = SYSTEM_TIME.clone();
598        let environment_id = None;
599        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
600            .with_organization_id(organization_id)
601            .build()
602            .await?;
603        let storage = openable_storage
604            .open_read_only(&test_bootstrap_args())
605            .await?;
606        let system_parameter_defaults = BTreeMap::default();
607        Self::open_debug_catalog_inner(
608            persist_client,
609            storage,
610            now,
611            environment_id,
612            &DUMMY_BUILD_INFO,
613            system_parameter_defaults,
614            bootstrap_args,
615            None,
616        )
617        .await
618    }
619
620    /// Opens a read only debug persist backed catalog defined by `persist_client` and
621    /// `organization_id`.
622    ///
623    /// See [`Catalog::with_debug`].
624    pub async fn open_debug_read_only_persist_catalog_config(
625        persist_client: PersistClient,
626        now: NowFn,
627        environment_id: EnvironmentId,
628        system_parameter_defaults: BTreeMap<String, String>,
629        build_info: &'static BuildInfo,
630        bootstrap_args: &BootstrapArgs,
631        enable_expression_cache_override: Option<bool>,
632    ) -> Result<Catalog, anyhow::Error> {
633        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
634            .with_organization_id(environment_id.organization_id())
635            .with_version(
636                build_info
637                    .version
638                    .parse()
639                    .expect("build version is parseable"),
640            )
641            .build()
642            .await?;
643        let storage = openable_storage.open_read_only(bootstrap_args).await?;
644        Self::open_debug_catalog_inner(
645            persist_client,
646            storage,
647            now,
648            Some(environment_id),
649            build_info,
650            system_parameter_defaults,
651            bootstrap_args,
652            enable_expression_cache_override,
653        )
654        .await
655    }
656
657    async fn open_debug_catalog_inner(
658        persist_client: PersistClient,
659        storage: Box<dyn DurableCatalogState>,
660        now: NowFn,
661        environment_id: Option<EnvironmentId>,
662        build_info: &'static BuildInfo,
663        system_parameter_defaults: BTreeMap<String, String>,
664        bootstrap_args: &BootstrapArgs,
665        enable_expression_cache_override: Option<bool>,
666    ) -> Result<Catalog, anyhow::Error> {
667        let metrics_registry = &MetricsRegistry::new();
668        let secrets_reader = Arc::new(InMemorySecretsController::new());
669        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
670        // debugging/testing.
671        let previous_ts = now().into();
672        let replica_size = &bootstrap_args.default_cluster_replica_size;
673        let read_only = false;
674
675        let OpenCatalogResult {
676            catalog,
677            migrated_storage_collections_0dt: _,
678            new_builtin_collections: _,
679            builtin_table_updates: _,
680            cached_global_exprs: _,
681            uncached_local_exprs: _,
682        } = Catalog::open(Config {
683            storage,
684            metrics_registry,
685            state: StateConfig {
686                unsafe_mode: true,
687                all_features: false,
688                build_info,
689                deploy_generation: 0,
690                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
691                read_only,
692                now,
693                boot_ts: previous_ts,
694                skip_migrations: true,
695                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
696                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
697                    size: replica_size.clone(),
698                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
699                },
700                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
701                    size: replica_size.clone(),
702                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
703                },
704                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
705                    size: replica_size.clone(),
706                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
707                },
708                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
709                    size: replica_size.clone(),
710                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711                },
712                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
713                    size: replica_size.clone(),
714                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
715                },
716                system_parameter_defaults,
717                remote_system_parameters: None,
718                availability_zones: vec![],
719                egress_addresses: vec![],
720                aws_principal_context: None,
721                aws_privatelink_availability_zones: None,
722                http_host_name: None,
723                connection_context: ConnectionContext::for_tests(secrets_reader),
724                builtin_item_migration_config: BuiltinItemMigrationConfig {
725                    persist_client: persist_client.clone(),
726                    read_only,
727                    force_migration: None,
728                },
729                persist_client,
730                enable_expression_cache_override,
731                helm_chart_version: None,
732                external_login_password_mz_system: None,
733                license_key: ValidatedLicenseKey::for_tests(),
734            },
735        })
736        .await?;
737        Ok(catalog)
738    }
739
740    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
741        self.state.for_session(session)
742    }
743
744    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
745        self.state.for_sessionless_user(role_id)
746    }
747
748    pub fn for_system_session(&self) -> ConnCatalog<'_> {
749        self.state.for_system_session()
750    }
751
752    async fn storage<'a>(
753        &'a self,
754    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
755        self.storage.lock().await
756    }
757
758    pub async fn current_upper(&self) -> mz_repr::Timestamp {
759        self.storage().await.current_upper().await
760    }
761
762    pub async fn allocate_user_id(
763        &self,
764        commit_ts: mz_repr::Timestamp,
765    ) -> Result<(CatalogItemId, GlobalId), Error> {
766        self.storage()
767            .await
768            .allocate_user_id(commit_ts)
769            .await
770            .maybe_terminate("allocating user ids")
771            .err_into()
772    }
773
774    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
775    pub async fn allocate_user_ids(
776        &self,
777        amount: u64,
778        commit_ts: mz_repr::Timestamp,
779    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
780        self.storage()
781            .await
782            .allocate_user_ids(amount, commit_ts)
783            .await
784            .maybe_terminate("allocating user ids")
785            .err_into()
786    }
787
788    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
789        let commit_ts = self.storage().await.current_upper().await;
790        self.allocate_user_id(commit_ts).await
791    }
792
793    /// Get the next user item ID without allocating it.
794    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
795        self.storage()
796            .await
797            .get_next_user_item_id()
798            .await
799            .err_into()
800    }
801
802    #[cfg(test)]
803    pub async fn allocate_system_id(
804        &self,
805        commit_ts: mz_repr::Timestamp,
806    ) -> Result<(CatalogItemId, GlobalId), Error> {
807        use mz_ore::collections::CollectionExt;
808
809        let mut storage = self.storage().await;
810        let mut txn = storage.transaction().await?;
811        let id = txn
812            .allocate_system_item_ids(1)
813            .maybe_terminate("allocating system ids")?
814            .into_element();
815        // Drain transaction.
816        let _ = txn.get_and_commit_op_updates();
817        txn.commit(commit_ts).await?;
818        Ok(id)
819    }
820
821    /// Get the next system item ID without allocating it.
822    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
823        self.storage()
824            .await
825            .get_next_system_item_id()
826            .await
827            .err_into()
828    }
829
830    pub async fn allocate_user_cluster_id(
831        &self,
832        commit_ts: mz_repr::Timestamp,
833    ) -> Result<ClusterId, Error> {
834        self.storage()
835            .await
836            .allocate_user_cluster_id(commit_ts)
837            .await
838            .maybe_terminate("allocating user cluster ids")
839            .err_into()
840    }
841
842    /// Get the next system replica id without allocating it.
843    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
844        self.storage()
845            .await
846            .get_next_system_replica_id()
847            .await
848            .err_into()
849    }
850
851    /// Get the next user replica id without allocating it.
852    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
853        self.storage()
854            .await
855            .get_next_user_replica_id()
856            .await
857            .err_into()
858    }
859
860    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
861        self.state.resolve_database(database_name)
862    }
863
864    pub fn resolve_schema(
865        &self,
866        current_database: Option<&DatabaseId>,
867        database_name: Option<&str>,
868        schema_name: &str,
869        conn_id: &ConnectionId,
870    ) -> Result<&Schema, SqlCatalogError> {
871        self.state
872            .resolve_schema(current_database, database_name, schema_name, conn_id)
873    }
874
875    pub fn resolve_schema_in_database(
876        &self,
877        database_spec: &ResolvedDatabaseSpecifier,
878        schema_name: &str,
879        conn_id: &ConnectionId,
880    ) -> Result<&Schema, SqlCatalogError> {
881        self.state
882            .resolve_schema_in_database(database_spec, schema_name, conn_id)
883    }
884
885    pub fn resolve_replica_in_cluster(
886        &self,
887        cluster_id: &ClusterId,
888        replica_name: &str,
889    ) -> Result<&ClusterReplica, SqlCatalogError> {
890        self.state
891            .resolve_replica_in_cluster(cluster_id, replica_name)
892    }
893
894    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
895        self.state.resolve_system_schema(name)
896    }
897
898    pub fn resolve_search_path(
899        &self,
900        session: &Session,
901    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
902        self.state.resolve_search_path(session)
903    }
904
905    /// Resolves `name` to a non-function [`CatalogEntry`].
906    pub fn resolve_entry(
907        &self,
908        current_database: Option<&DatabaseId>,
909        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
910        name: &PartialItemName,
911        conn_id: &ConnectionId,
912    ) -> Result<&CatalogEntry, SqlCatalogError> {
913        self.state
914            .resolve_entry(current_database, search_path, name, conn_id)
915    }
916
917    /// Resolves a `BuiltinTable`.
918    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
919        self.state.resolve_builtin_table(builtin)
920    }
921
922    /// Resolves a `BuiltinLog`.
923    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
924        self.state.resolve_builtin_log(builtin).0
925    }
926
927    /// Resolves a `BuiltinSource`.
928    pub fn resolve_builtin_storage_collection(
929        &self,
930        builtin: &'static BuiltinSource,
931    ) -> CatalogItemId {
932        self.state.resolve_builtin_source(builtin)
933    }
934
935    /// Resolves `name` to a function [`CatalogEntry`].
936    pub fn resolve_function(
937        &self,
938        current_database: Option<&DatabaseId>,
939        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
940        name: &PartialItemName,
941        conn_id: &ConnectionId,
942    ) -> Result<&CatalogEntry, SqlCatalogError> {
943        self.state
944            .resolve_function(current_database, search_path, name, conn_id)
945    }
946
947    /// Resolves `name` to a type [`CatalogEntry`].
948    pub fn resolve_type(
949        &self,
950        current_database: Option<&DatabaseId>,
951        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
952        name: &PartialItemName,
953        conn_id: &ConnectionId,
954    ) -> Result<&CatalogEntry, SqlCatalogError> {
955        self.state
956            .resolve_type(current_database, search_path, name, conn_id)
957    }
958
959    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
960        self.state.resolve_cluster(name)
961    }
962
963    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
964    ///
965    /// # Panics
966    /// * If the [`BuiltinCluster`] doesn't exist.
967    ///
968    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
969        self.state.resolve_builtin_cluster(cluster)
970    }
971
972    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
973        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
974    }
975
976    /// Resolves a [`Cluster`] for a TargetCluster.
977    pub fn resolve_target_cluster(
978        &self,
979        target_cluster: TargetCluster,
980        session: &Session,
981    ) -> Result<&Cluster, AdapterError> {
982        match target_cluster {
983            TargetCluster::CatalogServer => {
984                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
985            }
986            TargetCluster::Active => self.active_cluster(session),
987            TargetCluster::Transaction(cluster_id) => self
988                .try_get_cluster(cluster_id)
989                .ok_or(AdapterError::ConcurrentClusterDrop),
990        }
991    }
992
993    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
994        // TODO(benesch): this check here is not sufficiently protective. It'd
995        // be very easy for a code path to accidentally avoid this check by
996        // calling `resolve_cluster(session.vars().cluster())`.
997        if session.user().name != SYSTEM_USER.name
998            && session.user().name != SUPPORT_USER.name
999            && session.vars().cluster() == SYSTEM_USER.name
1000        {
1001            coord_bail!(
1002                "system cluster '{}' cannot execute user queries",
1003                SYSTEM_USER.name
1004            );
1005        }
1006        let cluster = self.resolve_cluster(session.vars().cluster())?;
1007        Ok(cluster)
1008    }
1009
1010    pub fn state(&self) -> &CatalogState {
1011        &self.state
1012    }
1013
1014    pub fn resolve_full_name(
1015        &self,
1016        name: &QualifiedItemName,
1017        conn_id: Option<&ConnectionId>,
1018    ) -> FullItemName {
1019        self.state.resolve_full_name(name, conn_id)
1020    }
1021
1022    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1023        self.state.try_get_entry(id)
1024    }
1025
1026    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1027        self.state.try_get_entry_by_global_id(id)
1028    }
1029
1030    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1031        self.state.get_entry(id)
1032    }
1033
1034    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1035        self.state.get_entry_by_global_id(id)
1036    }
1037
1038    pub fn get_global_ids<'a>(
1039        &'a self,
1040        id: &CatalogItemId,
1041    ) -> impl Iterator<Item = GlobalId> + use<'a> {
1042        self.get_entry(id).global_ids()
1043    }
1044
1045    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1046        self.get_entry_by_global_id(id).id()
1047    }
1048
1049    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1050        let item = self.try_get_entry_by_global_id(id)?;
1051        Some(item.id())
1052    }
1053
1054    pub fn get_schema(
1055        &self,
1056        database_spec: &ResolvedDatabaseSpecifier,
1057        schema_spec: &SchemaSpecifier,
1058        conn_id: &ConnectionId,
1059    ) -> &Schema {
1060        self.state.get_schema(database_spec, schema_spec, conn_id)
1061    }
1062
1063    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1064        self.state.get_mz_catalog_schema_id()
1065    }
1066
1067    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1068        self.state.get_pg_catalog_schema_id()
1069    }
1070
1071    pub fn get_information_schema_id(&self) -> SchemaId {
1072        self.state.get_information_schema_id()
1073    }
1074
1075    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1076        self.state.get_mz_internal_schema_id()
1077    }
1078
1079    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1080        self.state.get_mz_introspection_schema_id()
1081    }
1082
1083    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1084        self.state.get_mz_unsafe_schema_id()
1085    }
1086
1087    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1088        self.state.system_schema_ids()
1089    }
1090
1091    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1092        self.state.get_database(id)
1093    }
1094
1095    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1096        self.state.try_get_role(id)
1097    }
1098
1099    pub fn get_role(&self, id: &RoleId) -> &Role {
1100        self.state.get_role(id)
1101    }
1102
1103    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1104        self.state.try_get_role_by_name(role_name)
1105    }
1106
1107    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1108        self.state.try_get_role_auth_by_id(id)
1109    }
1110
1111    /// Creates a new schema in the `Catalog` for temporary items
1112    /// indicated by the TEMPORARY or TEMP keywords.
1113    pub fn create_temporary_schema(
1114        &mut self,
1115        conn_id: &ConnectionId,
1116        owner_id: RoleId,
1117    ) -> Result<(), Error> {
1118        self.state.create_temporary_schema(conn_id, owner_id)
1119    }
1120
1121    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1122        self.state.temporary_schemas[conn_id]
1123            .items
1124            .contains_key(item_name)
1125    }
1126
1127    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1128    /// Returns Ok if conn_id's temp schema does not exist.
1129    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1130        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1131            return Ok(());
1132        };
1133        if !schema.items.is_empty() {
1134            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1135        }
1136        Ok(())
1137    }
1138
1139    pub(crate) fn object_dependents(
1140        &self,
1141        object_ids: &Vec<ObjectId>,
1142        conn_id: &ConnectionId,
1143    ) -> Vec<ObjectId> {
1144        let mut seen = BTreeSet::new();
1145        self.state.object_dependents(object_ids, conn_id, &mut seen)
1146    }
1147
1148    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1149        FullNameV1 {
1150            database: name.database.to_string(),
1151            schema: name.schema.clone(),
1152            item: name.item.clone(),
1153        }
1154    }
1155
1156    pub fn find_available_cluster_name(&self, name: &str) -> String {
1157        let mut i = 0;
1158        let mut candidate = name.to_string();
1159        while self.state.clusters_by_name.contains_key(&candidate) {
1160            i += 1;
1161            candidate = format!("{}{}", name, i);
1162        }
1163        candidate
1164    }
1165
1166    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1167        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1168            self.cluster_replica_sizes()
1169                .enabled_allocations()
1170                .map(|a| a.0.to_owned())
1171                .collect::<Vec<_>>()
1172        } else {
1173            self.system_config().allowed_cluster_replica_sizes()
1174        }
1175    }
1176
1177    pub fn concretize_replica_location(
1178        &self,
1179        location: mz_catalog::durable::ReplicaLocation,
1180        allowed_sizes: &Vec<String>,
1181        allowed_availability_zones: Option<&[String]>,
1182    ) -> Result<ReplicaLocation, Error> {
1183        self.state
1184            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1185    }
1186
1187    pub(crate) fn ensure_valid_replica_size(
1188        &self,
1189        allowed_sizes: &[String],
1190        size: &String,
1191    ) -> Result<(), Error> {
1192        self.state.ensure_valid_replica_size(allowed_sizes, size)
1193    }
1194
1195    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1196        &self.state.cluster_replica_sizes
1197    }
1198
1199    /// Returns the privileges of an object by its ID.
1200    pub fn get_privileges(
1201        &self,
1202        id: &SystemObjectId,
1203        conn_id: &ConnectionId,
1204    ) -> Option<&PrivilegeMap> {
1205        match id {
1206            SystemObjectId::Object(id) => match id {
1207                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1208                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1209                ObjectId::Schema((database_spec, schema_spec)) => Some(
1210                    self.get_schema(database_spec, schema_spec, conn_id)
1211                        .privileges(),
1212                ),
1213                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1214                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1215                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1216            },
1217            SystemObjectId::System => Some(&self.state.system_privileges),
1218        }
1219    }
1220
1221    #[mz_ore::instrument(level = "debug")]
1222    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1223        Ok(self.storage().await.confirm_leadership().await?)
1224    }
1225
1226    /// Return the ids of all log sources the given object depends on.
1227    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1228        self.state.introspection_dependencies(id)
1229    }
1230
1231    /// Serializes the catalog's in-memory state.
1232    ///
1233    /// There are no guarantees about the format of the serialized state, except
1234    /// that the serialized state for two identical catalogs will compare
1235    /// identically.
1236    pub fn dump(&self) -> Result<CatalogDump, Error> {
1237        Ok(CatalogDump::new(self.state.dump(None)?))
1238    }
1239
1240    /// Checks the [`Catalog`]s internal consistency.
1241    ///
1242    /// Returns a JSON object describing the inconsistencies, if there are any.
1243    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1244        self.state.check_consistency().map_err(|inconsistencies| {
1245            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1246                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1247            })
1248        })
1249    }
1250
1251    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1252        self.state.config()
1253    }
1254
1255    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1256        self.state.entry_by_id.values()
1257    }
1258
1259    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1260        self.entries()
1261            .filter(|entry| entry.is_connection() && entry.id().is_user())
1262    }
1263
1264    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1265        self.entries()
1266            .filter(|entry| entry.is_table() && entry.id().is_user())
1267    }
1268
1269    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1270        self.entries()
1271            .filter(|entry| entry.is_source() && entry.id().is_user())
1272    }
1273
1274    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1275        self.entries()
1276            .filter(|entry| entry.is_sink() && entry.id().is_user())
1277    }
1278
1279    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1280        self.entries()
1281            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1282    }
1283
1284    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1285        self.entries()
1286            .filter(|entry| entry.is_secret() && entry.id().is_user())
1287    }
1288
1289    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1290        self.state.get_network_policy(&network_policy_id)
1291    }
1292
1293    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1294        self.state.try_get_network_policy_by_name(name)
1295    }
1296
1297    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1298        self.state.clusters_by_id.values()
1299    }
1300
1301    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1302        self.state.get_cluster(cluster_id)
1303    }
1304
1305    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1306        self.state.try_get_cluster(cluster_id)
1307    }
1308
1309    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1310        self.clusters().filter(|cluster| cluster.id.is_user())
1311    }
1312
1313    pub fn get_cluster_replica(
1314        &self,
1315        cluster_id: ClusterId,
1316        replica_id: ReplicaId,
1317    ) -> &ClusterReplica {
1318        self.state.get_cluster_replica(cluster_id, replica_id)
1319    }
1320
1321    pub fn try_get_cluster_replica(
1322        &self,
1323        cluster_id: ClusterId,
1324        replica_id: ReplicaId,
1325    ) -> Option<&ClusterReplica> {
1326        self.state.try_get_cluster_replica(cluster_id, replica_id)
1327    }
1328
1329    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1330        self.user_clusters()
1331            .flat_map(|cluster| cluster.user_replicas())
1332    }
1333
1334    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1335        self.state.database_by_id.values()
1336    }
1337
1338    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1339        self.state
1340            .roles_by_id
1341            .values()
1342            .filter(|role| role.is_user())
1343    }
1344
1345    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1346        self.entries()
1347            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1348    }
1349
1350    pub fn system_privileges(&self) -> &PrivilegeMap {
1351        &self.state.system_privileges
1352    }
1353
1354    pub fn default_privileges(
1355        &self,
1356    ) -> impl Iterator<
1357        Item = (
1358            &DefaultPrivilegeObject,
1359            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1360        ),
1361    > {
1362        self.state.default_privileges.iter()
1363    }
1364
1365    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1366        self.state
1367            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1368    }
1369
1370    pub fn pack_storage_usage_update(
1371        &self,
1372        event: VersionedStorageUsage,
1373        diff: Diff,
1374    ) -> BuiltinTableUpdate {
1375        self.state
1376            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1377    }
1378
1379    pub fn system_config(&self) -> &SystemVars {
1380        self.state.system_config()
1381    }
1382
1383    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1384        self.state.system_config_mut()
1385    }
1386
1387    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1388        self.state.ensure_not_reserved_role(role_id)
1389    }
1390
1391    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1392        self.state.ensure_grantable_role(role_id)
1393    }
1394
1395    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1396        self.state.ensure_not_system_role(role_id)
1397    }
1398
1399    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1400        self.state.ensure_not_predefined_role(role_id)
1401    }
1402
1403    pub fn ensure_not_reserved_network_policy(
1404        &self,
1405        network_policy_id: &NetworkPolicyId,
1406    ) -> Result<(), Error> {
1407        self.state
1408            .ensure_not_reserved_network_policy(network_policy_id)
1409    }
1410
1411    pub fn ensure_not_reserved_object(
1412        &self,
1413        object_id: &ObjectId,
1414        conn_id: &ConnectionId,
1415    ) -> Result<(), Error> {
1416        match object_id {
1417            ObjectId::Cluster(cluster_id) => {
1418                if cluster_id.is_system() {
1419                    let cluster = self.get_cluster(*cluster_id);
1420                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1421                        cluster.name().to_string(),
1422                    )))
1423                } else {
1424                    Ok(())
1425                }
1426            }
1427            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1428                if replica_id.is_system() {
1429                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1430                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1431                        replica.name().to_string(),
1432                    )))
1433                } else {
1434                    Ok(())
1435                }
1436            }
1437            ObjectId::Database(database_id) => {
1438                if database_id.is_system() {
1439                    let database = self.get_database(database_id);
1440                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1441                        database.name().to_string(),
1442                    )))
1443                } else {
1444                    Ok(())
1445                }
1446            }
1447            ObjectId::Schema((database_spec, schema_spec)) => {
1448                if schema_spec.is_system() {
1449                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1450                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1451                        schema.name().schema.clone(),
1452                    )))
1453                } else {
1454                    Ok(())
1455                }
1456            }
1457            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1458            ObjectId::Item(item_id) => {
1459                if item_id.is_system() {
1460                    let item = self.get_entry(item_id);
1461                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1462                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1463                } else {
1464                    Ok(())
1465                }
1466            }
1467            ObjectId::NetworkPolicy(network_policy_id) => {
1468                self.ensure_not_reserved_network_policy(network_policy_id)
1469            }
1470        }
1471    }
1472
1473    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1474    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1475        &mut self,
1476        create_sql: &str,
1477        force_if_exists_skip: bool,
1478    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1479        self.state
1480            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1481    }
1482
1483    pub(crate) fn update_expression_cache<'a, 'b>(
1484        &'a self,
1485        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1486        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1487    ) -> BoxFuture<'b, ()> {
1488        if let Some(expr_cache) = &self.expr_cache_handle {
1489            let ons = new_local_expressions
1490                .iter()
1491                .map(|(id, _)| id)
1492                .chain(new_global_expressions.iter().map(|(id, _)| id))
1493                .map(|id| self.get_entry_by_global_id(id))
1494                .filter_map(|entry| entry.index().map(|index| index.on));
1495            let invalidate_ids = self.invalidate_for_index(ons);
1496            expr_cache
1497                .update(
1498                    new_local_expressions,
1499                    new_global_expressions,
1500                    invalidate_ids,
1501                )
1502                .boxed()
1503        } else {
1504            async {}.boxed()
1505        }
1506    }
1507
1508    /// Listen for and apply all unconsumed updates to the durable catalog state.
1509    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1510    // `#[cfg(test)]` annotation.
1511    #[cfg(test)]
1512    async fn sync_to_current_updates(
1513        &mut self,
1514    ) -> Result<
1515        (
1516            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1517            Vec<ParsedStateUpdate>,
1518        ),
1519        CatalogError,
1520    > {
1521        let updates = self.storage().await.sync_to_current_updates().await?;
1522        let (builtin_table_updates, catalog_updates) = self.state.apply_updates(updates)?;
1523        Ok((builtin_table_updates, catalog_updates))
1524    }
1525}
1526
1527pub fn is_reserved_name(name: &str) -> bool {
1528    BUILTIN_PREFIXES
1529        .iter()
1530        .any(|prefix| name.starts_with(prefix))
1531}
1532
1533pub fn is_reserved_role_name(name: &str) -> bool {
1534    is_reserved_name(name) || is_public_role(name)
1535}
1536
1537pub fn is_public_role(name: &str) -> bool {
1538    name == &*PUBLIC_ROLE_NAME
1539}
1540
1541pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1542    object_type_to_audit_object_type(sql_type.into())
1543}
1544
1545pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1546    match id {
1547        CommentObjectId::Table(_) => ObjectType::Table,
1548        CommentObjectId::View(_) => ObjectType::View,
1549        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1550        CommentObjectId::Source(_) => ObjectType::Source,
1551        CommentObjectId::Sink(_) => ObjectType::Sink,
1552        CommentObjectId::Index(_) => ObjectType::Index,
1553        CommentObjectId::Func(_) => ObjectType::Func,
1554        CommentObjectId::Connection(_) => ObjectType::Connection,
1555        CommentObjectId::Type(_) => ObjectType::Type,
1556        CommentObjectId::Secret(_) => ObjectType::Secret,
1557        CommentObjectId::Role(_) => ObjectType::Role,
1558        CommentObjectId::Database(_) => ObjectType::Database,
1559        CommentObjectId::Schema(_) => ObjectType::Schema,
1560        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1561        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1562        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1563        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1564    }
1565}
1566
1567pub(crate) fn object_type_to_audit_object_type(
1568    object_type: mz_sql::catalog::ObjectType,
1569) -> ObjectType {
1570    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1571}
1572
1573pub(crate) fn system_object_type_to_audit_object_type(
1574    system_type: &SystemObjectType,
1575) -> ObjectType {
1576    match system_type {
1577        SystemObjectType::Object(object_type) => match object_type {
1578            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1579            mz_sql::catalog::ObjectType::View => ObjectType::View,
1580            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1581            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1582            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1583            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1584            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1585            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1586            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1587            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1588            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1589            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1590            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1591            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1592            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1593            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1594            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1595        },
1596        SystemObjectType::System => ObjectType::System,
1597    }
1598}
1599
1600#[derive(Debug, Copy, Clone)]
1601pub enum UpdatePrivilegeVariant {
1602    Grant,
1603    Revoke,
1604}
1605
1606impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1607    fn from(variant: UpdatePrivilegeVariant) -> Self {
1608        match variant {
1609            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1610            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1611        }
1612    }
1613}
1614
1615impl From<UpdatePrivilegeVariant> for EventType {
1616    fn from(variant: UpdatePrivilegeVariant) -> Self {
1617        match variant {
1618            UpdatePrivilegeVariant::Grant => EventType::Grant,
1619            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1620        }
1621    }
1622}
1623
1624impl ConnCatalog<'_> {
1625    fn resolve_item_name(
1626        &self,
1627        name: &PartialItemName,
1628    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1629        self.resolve_item(name).map(|entry| entry.name())
1630    }
1631
1632    fn resolve_function_name(
1633        &self,
1634        name: &PartialItemName,
1635    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1636        self.resolve_function(name).map(|entry| entry.name())
1637    }
1638
1639    fn resolve_type_name(
1640        &self,
1641        name: &PartialItemName,
1642    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1643        self.resolve_type(name).map(|entry| entry.name())
1644    }
1645}
1646
1647impl ExprHumanizer for ConnCatalog<'_> {
1648    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1649        let entry = self.state.try_get_entry_by_global_id(&id)?;
1650        Some(self.resolve_full_name(entry.name()).to_string())
1651    }
1652
1653    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1654        let entry = self.state.try_get_entry_by_global_id(&id)?;
1655        Some(entry.name().item.clone())
1656    }
1657
1658    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1659        let entry = self.state.try_get_entry_by_global_id(&id)?;
1660        Some(self.resolve_full_name(entry.name()).into_parts())
1661    }
1662
1663    fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1664        use SqlScalarType::*;
1665
1666        match typ {
1667            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1668            List {
1669                custom_id: Some(item_id),
1670                ..
1671            }
1672            | Map {
1673                custom_id: Some(item_id),
1674                ..
1675            } => {
1676                let item = self.get_item(item_id);
1677                self.minimal_qualification(item.name()).to_string()
1678            }
1679            List { element_type, .. } => {
1680                format!(
1681                    "{} list",
1682                    self.humanize_scalar_type(element_type, postgres_compat)
1683                )
1684            }
1685            Map { value_type, .. } => format!(
1686                "map[{}=>{}]",
1687                self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1688                self.humanize_scalar_type(value_type, postgres_compat)
1689            ),
1690            Record {
1691                custom_id: Some(item_id),
1692                ..
1693            } => {
1694                let item = self.get_item(item_id);
1695                self.minimal_qualification(item.name()).to_string()
1696            }
1697            Record { fields, .. } => format!(
1698                "record({})",
1699                fields
1700                    .iter()
1701                    .map(|f| format!(
1702                        "{}: {}",
1703                        f.0,
1704                        self.humanize_column_type(&f.1, postgres_compat)
1705                    ))
1706                    .join(",")
1707            ),
1708            PgLegacyChar => "\"char\"".into(),
1709            Char { length } if !postgres_compat => match length {
1710                None => "char".into(),
1711                Some(length) => format!("char({})", length.into_u32()),
1712            },
1713            VarChar { max_length } if !postgres_compat => match max_length {
1714                None => "varchar".into(),
1715                Some(length) => format!("varchar({})", length.into_u32()),
1716            },
1717            UInt16 => "uint2".into(),
1718            UInt32 => "uint4".into(),
1719            UInt64 => "uint8".into(),
1720            ty => {
1721                let pgrepr_type = mz_pgrepr::Type::from(ty);
1722                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1723
1724                let res = if self
1725                    .effective_search_path(true)
1726                    .iter()
1727                    .any(|(_, schema)| schema == &pg_catalog_schema)
1728                {
1729                    pgrepr_type.name().to_string()
1730                } else {
1731                    // If PG_CATALOG_SCHEMA is not in search path, you need
1732                    // qualified object name to refer to type.
1733                    let name = QualifiedItemName {
1734                        qualifiers: ItemQualifiers {
1735                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1736                            schema_spec: pg_catalog_schema,
1737                        },
1738                        item: pgrepr_type.name().to_string(),
1739                    };
1740                    self.resolve_full_name(&name).to_string()
1741                };
1742                res
1743            }
1744        }
1745    }
1746
1747    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1748        let entry = self.state.try_get_entry_by_global_id(&id)?;
1749
1750        match entry.index() {
1751            Some(index) => {
1752                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1753                let mut on_names = on_desc
1754                    .iter_names()
1755                    .map(|col_name| col_name.to_string())
1756                    .collect::<Vec<_>>();
1757
1758                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1759
1760                // Init ix_names with unknown column names. Unknown columns are
1761                // represented as an empty String and rendered as `#c` by the
1762                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1763                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1764                let mut ix_names = vec![String::new(); ix_arity];
1765
1766                // Apply the permutation by swapping on_names with ix_names.
1767                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1768                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1769                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1770                    std::mem::swap(on_name, ix_name);
1771                }
1772
1773                Some(ix_names) // Return the updated ix_names vector.
1774            }
1775            None => {
1776                let desc = self.state.try_get_desc_by_global_id(&id)?;
1777                let column_names = desc
1778                    .iter_names()
1779                    .map(|col_name| col_name.to_string())
1780                    .collect();
1781
1782                Some(column_names)
1783            }
1784        }
1785    }
1786
1787    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1788        let desc = self.state.try_get_desc_by_global_id(&id)?;
1789        Some(desc.get_name(column).to_string())
1790    }
1791
1792    fn id_exists(&self, id: GlobalId) -> bool {
1793        self.state.entry_by_global_id.contains_key(&id)
1794    }
1795}
1796
1797impl SessionCatalog for ConnCatalog<'_> {
1798    fn active_role_id(&self) -> &RoleId {
1799        &self.role_id
1800    }
1801
1802    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1803        self.prepared_statements
1804            .as_ref()
1805            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1806            .flatten()
1807    }
1808
1809    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1810        self.portals
1811            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1812    }
1813
1814    fn active_database(&self) -> Option<&DatabaseId> {
1815        self.database.as_ref()
1816    }
1817
1818    fn active_cluster(&self) -> &str {
1819        &self.cluster
1820    }
1821
1822    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1823        &self.search_path
1824    }
1825
1826    fn resolve_database(
1827        &self,
1828        database_name: &str,
1829    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1830        Ok(self.state.resolve_database(database_name)?)
1831    }
1832
1833    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1834        self.state
1835            .database_by_id
1836            .get(id)
1837            .expect("database doesn't exist")
1838    }
1839
1840    // `as` is ok to use to cast to a trait object.
1841    #[allow(clippy::as_conversions)]
1842    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1843        self.state
1844            .database_by_id
1845            .values()
1846            .map(|database| database as &dyn CatalogDatabase)
1847            .collect()
1848    }
1849
1850    fn resolve_schema(
1851        &self,
1852        database_name: Option<&str>,
1853        schema_name: &str,
1854    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1855        Ok(self.state.resolve_schema(
1856            self.database.as_ref(),
1857            database_name,
1858            schema_name,
1859            &self.conn_id,
1860        )?)
1861    }
1862
1863    fn resolve_schema_in_database(
1864        &self,
1865        database_spec: &ResolvedDatabaseSpecifier,
1866        schema_name: &str,
1867    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1868        Ok(self
1869            .state
1870            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1871    }
1872
1873    fn get_schema(
1874        &self,
1875        database_spec: &ResolvedDatabaseSpecifier,
1876        schema_spec: &SchemaSpecifier,
1877    ) -> &dyn CatalogSchema {
1878        self.state
1879            .get_schema(database_spec, schema_spec, &self.conn_id)
1880    }
1881
1882    // `as` is ok to use to cast to a trait object.
1883    #[allow(clippy::as_conversions)]
1884    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1885        self.get_databases()
1886            .into_iter()
1887            .flat_map(|database| database.schemas().into_iter())
1888            .chain(
1889                self.state
1890                    .ambient_schemas_by_id
1891                    .values()
1892                    .chain(self.state.temporary_schemas.values())
1893                    .map(|schema| schema as &dyn CatalogSchema),
1894            )
1895            .collect()
1896    }
1897
1898    fn get_mz_internal_schema_id(&self) -> SchemaId {
1899        self.state().get_mz_internal_schema_id()
1900    }
1901
1902    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1903        self.state().get_mz_unsafe_schema_id()
1904    }
1905
1906    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1907        self.state.is_system_schema_specifier(schema)
1908    }
1909
1910    fn resolve_role(
1911        &self,
1912        role_name: &str,
1913    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1914        match self.state.try_get_role_by_name(role_name) {
1915            Some(role) => Ok(role),
1916            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1917        }
1918    }
1919
1920    fn resolve_network_policy(
1921        &self,
1922        policy_name: &str,
1923    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1924        match self.state.try_get_network_policy_by_name(policy_name) {
1925            Some(policy) => Ok(policy),
1926            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1927        }
1928    }
1929
1930    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1931        Some(self.state.roles_by_id.get(id)?)
1932    }
1933
1934    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1935        self.state.get_role(id)
1936    }
1937
1938    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1939        // `as` is ok to use to cast to a trait object.
1940        #[allow(clippy::as_conversions)]
1941        self.state
1942            .roles_by_id
1943            .values()
1944            .map(|role| role as &dyn CatalogRole)
1945            .collect()
1946    }
1947
1948    fn mz_system_role_id(&self) -> RoleId {
1949        MZ_SYSTEM_ROLE_ID
1950    }
1951
1952    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1953        self.state.collect_role_membership(id)
1954    }
1955
1956    fn get_network_policy(
1957        &self,
1958        id: &NetworkPolicyId,
1959    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1960        self.state.get_network_policy(id)
1961    }
1962
1963    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1964        // `as` is ok to use to cast to a trait object.
1965        #[allow(clippy::as_conversions)]
1966        self.state
1967            .network_policies_by_id
1968            .values()
1969            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1970            .collect()
1971    }
1972
1973    fn resolve_cluster(
1974        &self,
1975        cluster_name: Option<&str>,
1976    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1977        Ok(self
1978            .state
1979            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1980    }
1981
1982    fn resolve_cluster_replica(
1983        &self,
1984        cluster_replica_name: &QualifiedReplica,
1985    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1986        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1987    }
1988
1989    fn resolve_item(
1990        &self,
1991        name: &PartialItemName,
1992    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1993        let r = self.state.resolve_entry(
1994            self.database.as_ref(),
1995            &self.effective_search_path(true),
1996            name,
1997            &self.conn_id,
1998        )?;
1999        if self.unresolvable_ids.contains(&r.id()) {
2000            Err(SqlCatalogError::UnknownItem(name.to_string()))
2001        } else {
2002            Ok(r)
2003        }
2004    }
2005
2006    fn resolve_function(
2007        &self,
2008        name: &PartialItemName,
2009    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2010        let r = self.state.resolve_function(
2011            self.database.as_ref(),
2012            &self.effective_search_path(false),
2013            name,
2014            &self.conn_id,
2015        )?;
2016
2017        if self.unresolvable_ids.contains(&r.id()) {
2018            Err(SqlCatalogError::UnknownFunction {
2019                name: name.to_string(),
2020                alternative: None,
2021            })
2022        } else {
2023            Ok(r)
2024        }
2025    }
2026
2027    fn resolve_type(
2028        &self,
2029        name: &PartialItemName,
2030    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2031        let r = self.state.resolve_type(
2032            self.database.as_ref(),
2033            &self.effective_search_path(false),
2034            name,
2035            &self.conn_id,
2036        )?;
2037
2038        if self.unresolvable_ids.contains(&r.id()) {
2039            Err(SqlCatalogError::UnknownType {
2040                name: name.to_string(),
2041            })
2042        } else {
2043            Ok(r)
2044        }
2045    }
2046
2047    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2048        self.state.get_system_type(name)
2049    }
2050
2051    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2052        Some(self.state.try_get_entry(id)?)
2053    }
2054
2055    fn try_get_item_by_global_id(
2056        &self,
2057        id: &GlobalId,
2058    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2059        let entry = self.state.try_get_entry_by_global_id(id)?;
2060        let entry = match &entry.item {
2061            CatalogItem::Table(table) => {
2062                let (version, _gid) = table
2063                    .collections
2064                    .iter()
2065                    .find(|(_version, gid)| *gid == id)
2066                    .expect("catalog out of sync, mismatched GlobalId");
2067                entry.at_version(RelationVersionSelector::Specific(*version))
2068            }
2069            _ => entry.at_version(RelationVersionSelector::Latest),
2070        };
2071        Some(entry)
2072    }
2073
2074    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2075        self.state.get_entry(id)
2076    }
2077
2078    fn get_item_by_global_id(
2079        &self,
2080        id: &GlobalId,
2081    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2082        let entry = self.state.get_entry_by_global_id(id);
2083        let entry = match &entry.item {
2084            CatalogItem::Table(table) => {
2085                let (version, _gid) = table
2086                    .collections
2087                    .iter()
2088                    .find(|(_version, gid)| *gid == id)
2089                    .expect("catalog out of sync, mismatched GlobalId");
2090                entry.at_version(RelationVersionSelector::Specific(*version))
2091            }
2092            _ => entry.at_version(RelationVersionSelector::Latest),
2093        };
2094        entry
2095    }
2096
2097    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2098        self.get_schemas()
2099            .into_iter()
2100            .flat_map(|schema| schema.item_ids())
2101            .map(|id| self.get_item(&id))
2102            .collect()
2103    }
2104
2105    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2106        self.state
2107            .get_item_by_name(name, &self.conn_id)
2108            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2109    }
2110
2111    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2112        self.state
2113            .get_type_by_name(name, &self.conn_id)
2114            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2115    }
2116
2117    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2118        &self.state.clusters_by_id[&id]
2119    }
2120
2121    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2122        self.state
2123            .clusters_by_id
2124            .values()
2125            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2126            .collect()
2127    }
2128
2129    fn get_cluster_replica(
2130        &self,
2131        cluster_id: ClusterId,
2132        replica_id: ReplicaId,
2133    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2134        let cluster = self.get_cluster(cluster_id);
2135        cluster.replica(replica_id)
2136    }
2137
2138    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2139        self.get_clusters()
2140            .into_iter()
2141            .flat_map(|cluster| cluster.replicas().into_iter())
2142            .collect()
2143    }
2144
2145    fn get_system_privileges(&self) -> &PrivilegeMap {
2146        &self.state.system_privileges
2147    }
2148
2149    fn get_default_privileges(
2150        &self,
2151    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2152        self.state
2153            .default_privileges
2154            .iter()
2155            .map(|(object, acl_items)| (object, acl_items.collect()))
2156            .collect()
2157    }
2158
2159    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2160        self.state.find_available_name(name, &self.conn_id)
2161    }
2162
2163    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2164        self.state.resolve_full_name(name, Some(&self.conn_id))
2165    }
2166
2167    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2168        self.state.resolve_full_schema_name(name)
2169    }
2170
2171    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2172        self.state.get_entry_by_global_id(global_id).id()
2173    }
2174
2175    fn resolve_global_id(
2176        &self,
2177        item_id: &CatalogItemId,
2178        version: RelationVersionSelector,
2179    ) -> GlobalId {
2180        self.state
2181            .get_entry(item_id)
2182            .at_version(version)
2183            .global_id()
2184    }
2185
2186    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2187        self.state.config()
2188    }
2189
2190    fn now(&self) -> EpochMillis {
2191        (self.state.config().now)()
2192    }
2193
2194    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2195        self.state.aws_privatelink_availability_zones.clone()
2196    }
2197
2198    fn system_vars(&self) -> &SystemVars {
2199        &self.state.system_configuration
2200    }
2201
2202    fn system_vars_mut(&mut self) -> &mut SystemVars {
2203        &mut self.state.to_mut().system_configuration
2204    }
2205
2206    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2207        self.state().get_owner_id(id, self.conn_id())
2208    }
2209
2210    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2211        match id {
2212            SystemObjectId::System => Some(&self.state.system_privileges),
2213            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2214                Some(self.get_cluster(*id).privileges())
2215            }
2216            SystemObjectId::Object(ObjectId::Database(id)) => {
2217                Some(self.get_database(id).privileges())
2218            }
2219            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2220                Some(self.get_schema(database_spec, schema_spec).privileges())
2221            }
2222            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2223            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2224                Some(self.get_network_policy(id).privileges())
2225            }
2226            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2227            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2228        }
2229    }
2230
2231    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2232        let mut seen = BTreeSet::new();
2233        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2234    }
2235
2236    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2237        let mut seen = BTreeSet::new();
2238        self.state.item_dependents(id, &mut seen)
2239    }
2240
2241    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2242        rbac::all_object_privileges(object_type)
2243    }
2244
2245    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2246        self.state.get_object_type(object_id)
2247    }
2248
2249    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2250        self.state.get_system_object_type(id)
2251    }
2252
2253    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2254    /// the object.
2255    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2256        let database_id = match &qualified_name.qualifiers.database_spec {
2257            ResolvedDatabaseSpecifier::Ambient => None,
2258            ResolvedDatabaseSpecifier::Id(id)
2259                if self.database.is_some() && self.database == Some(*id) =>
2260            {
2261                None
2262            }
2263            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2264        };
2265
2266        let schema_spec = if database_id.is_none()
2267            && self.resolve_item_name(&PartialItemName {
2268                database: None,
2269                schema: None,
2270                item: qualified_name.item.clone(),
2271            }) == Ok(qualified_name)
2272            || self.resolve_function_name(&PartialItemName {
2273                database: None,
2274                schema: None,
2275                item: qualified_name.item.clone(),
2276            }) == Ok(qualified_name)
2277            || self.resolve_type_name(&PartialItemName {
2278                database: None,
2279                schema: None,
2280                item: qualified_name.item.clone(),
2281            }) == Ok(qualified_name)
2282        {
2283            None
2284        } else {
2285            // If `search_path` does not contain `full_name.schema`, the
2286            // `PartialName` must contain it.
2287            Some(qualified_name.qualifiers.schema_spec.clone())
2288        };
2289
2290        let res = PartialItemName {
2291            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2292            schema: schema_spec.map(|spec| {
2293                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2294                    .name()
2295                    .schema
2296                    .clone()
2297            }),
2298            item: qualified_name.item.clone(),
2299        };
2300        assert!(
2301            self.resolve_item_name(&res) == Ok(qualified_name)
2302                || self.resolve_function_name(&res) == Ok(qualified_name)
2303                || self.resolve_type_name(&res) == Ok(qualified_name)
2304        );
2305        res
2306    }
2307
2308    fn add_notice(&self, notice: PlanNotice) {
2309        let _ = self.notices_tx.send(notice.into());
2310    }
2311
2312    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2313        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2314        self.state.comments.get_object_comments(comment_id)
2315    }
2316
2317    fn is_cluster_size_cc(&self, size: &str) -> bool {
2318        self.state
2319            .cluster_replica_sizes
2320            .0
2321            .get(size)
2322            .map_or(false, |a| a.is_cc)
2323    }
2324}
2325
2326#[cfg(test)]
2327mod tests {
2328    use std::collections::{BTreeMap, BTreeSet};
2329    use std::sync::Arc;
2330    use std::{env, iter};
2331
2332    use itertools::Itertools;
2333    use mz_catalog::memory::objects::CatalogItem;
2334    use tokio_postgres::NoTls;
2335    use tokio_postgres::types::Type;
2336    use uuid::Uuid;
2337
2338    use mz_catalog::SYSTEM_CONN_ID;
2339    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2340    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2341    use mz_controller_types::{ClusterId, ReplicaId};
2342    use mz_expr::MirScalarExpr;
2343    use mz_ore::now::to_datetime;
2344    use mz_ore::{assert_err, assert_ok, task};
2345    use mz_persist_client::PersistClient;
2346    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2347    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2348    use mz_repr::role_id::RoleId;
2349    use mz_repr::{
2350        CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2351        SqlScalarType, Timestamp,
2352    };
2353    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2354    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2355    use mz_sql::names::{
2356        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2357        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2358    };
2359    use mz_sql::plan::{
2360        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2361        Scope, StatementContext,
2362    };
2363    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2364    use mz_sql::session::vars::{SystemVars, VarInput};
2365
2366    use crate::catalog::state::LocalExpressionCache;
2367    use crate::catalog::{Catalog, Op};
2368    use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2369    use crate::session::Session;
2370
2371    /// System sessions have an empty `search_path` so it's necessary to
2372    /// schema-qualify all referenced items.
2373    ///
2374    /// Dummy (and ostensibly client) sessions contain system schemas in their
2375    /// search paths, so do not require schema qualification on system objects such
2376    /// as types.
2377    #[mz_ore::test(tokio::test)]
2378    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2379    async fn test_minimal_qualification() {
2380        Catalog::with_debug(|catalog| async move {
2381            struct TestCase {
2382                input: QualifiedItemName,
2383                system_output: PartialItemName,
2384                normal_output: PartialItemName,
2385            }
2386
2387            let test_cases = vec![
2388                TestCase {
2389                    input: QualifiedItemName {
2390                        qualifiers: ItemQualifiers {
2391                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2392                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2393                        },
2394                        item: "numeric".to_string(),
2395                    },
2396                    system_output: PartialItemName {
2397                        database: None,
2398                        schema: None,
2399                        item: "numeric".to_string(),
2400                    },
2401                    normal_output: PartialItemName {
2402                        database: None,
2403                        schema: None,
2404                        item: "numeric".to_string(),
2405                    },
2406                },
2407                TestCase {
2408                    input: QualifiedItemName {
2409                        qualifiers: ItemQualifiers {
2410                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2411                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2412                        },
2413                        item: "mz_array_types".to_string(),
2414                    },
2415                    system_output: PartialItemName {
2416                        database: None,
2417                        schema: None,
2418                        item: "mz_array_types".to_string(),
2419                    },
2420                    normal_output: PartialItemName {
2421                        database: None,
2422                        schema: None,
2423                        item: "mz_array_types".to_string(),
2424                    },
2425                },
2426            ];
2427
2428            for tc in test_cases {
2429                assert_eq!(
2430                    catalog
2431                        .for_system_session()
2432                        .minimal_qualification(&tc.input),
2433                    tc.system_output
2434                );
2435                assert_eq!(
2436                    catalog
2437                        .for_session(&Session::dummy())
2438                        .minimal_qualification(&tc.input),
2439                    tc.normal_output
2440                );
2441            }
2442            catalog.expire().await;
2443        })
2444        .await
2445    }
2446
2447    #[mz_ore::test(tokio::test)]
2448    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2449    async fn test_catalog_revision() {
2450        let persist_client = PersistClient::new_for_tests().await;
2451        let organization_id = Uuid::new_v4();
2452        let bootstrap_args = test_bootstrap_args();
2453        {
2454            let mut catalog = Catalog::open_debug_catalog(
2455                persist_client.clone(),
2456                organization_id.clone(),
2457                &bootstrap_args,
2458            )
2459            .await
2460            .expect("unable to open debug catalog");
2461            assert_eq!(catalog.transient_revision(), 1);
2462            let commit_ts = catalog.current_upper().await;
2463            catalog
2464                .transact(
2465                    None,
2466                    commit_ts,
2467                    None,
2468                    vec![Op::CreateDatabase {
2469                        name: "test".to_string(),
2470                        owner_id: MZ_SYSTEM_ROLE_ID,
2471                    }],
2472                )
2473                .await
2474                .expect("failed to transact");
2475            assert_eq!(catalog.transient_revision(), 2);
2476            catalog.expire().await;
2477        }
2478        {
2479            let catalog =
2480                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2481                    .await
2482                    .expect("unable to open debug catalog");
2483            // Re-opening the same catalog resets the transient_revision to 1.
2484            assert_eq!(catalog.transient_revision(), 1);
2485            catalog.expire().await;
2486        }
2487    }
2488
2489    #[mz_ore::test(tokio::test)]
2490    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2491    async fn test_effective_search_path() {
2492        Catalog::with_debug(|catalog| async move {
2493            let mz_catalog_schema = (
2494                ResolvedDatabaseSpecifier::Ambient,
2495                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2496            );
2497            let pg_catalog_schema = (
2498                ResolvedDatabaseSpecifier::Ambient,
2499                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2500            );
2501            let mz_temp_schema = (
2502                ResolvedDatabaseSpecifier::Ambient,
2503                SchemaSpecifier::Temporary,
2504            );
2505
2506            // Behavior with the default search_schema (public)
2507            let session = Session::dummy();
2508            let conn_catalog = catalog.for_session(&session);
2509            assert_ne!(
2510                conn_catalog.effective_search_path(false),
2511                conn_catalog.search_path
2512            );
2513            assert_ne!(
2514                conn_catalog.effective_search_path(true),
2515                conn_catalog.search_path
2516            );
2517            assert_eq!(
2518                conn_catalog.effective_search_path(false),
2519                vec![
2520                    mz_catalog_schema.clone(),
2521                    pg_catalog_schema.clone(),
2522                    conn_catalog.search_path[0].clone()
2523                ]
2524            );
2525            assert_eq!(
2526                conn_catalog.effective_search_path(true),
2527                vec![
2528                    mz_temp_schema.clone(),
2529                    mz_catalog_schema.clone(),
2530                    pg_catalog_schema.clone(),
2531                    conn_catalog.search_path[0].clone()
2532                ]
2533            );
2534
2535            // missing schemas are added when missing
2536            let mut session = Session::dummy();
2537            session
2538                .vars_mut()
2539                .set(
2540                    &SystemVars::new(),
2541                    "search_path",
2542                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2543                    false,
2544                )
2545                .expect("failed to set search_path");
2546            let conn_catalog = catalog.for_session(&session);
2547            assert_ne!(
2548                conn_catalog.effective_search_path(false),
2549                conn_catalog.search_path
2550            );
2551            assert_ne!(
2552                conn_catalog.effective_search_path(true),
2553                conn_catalog.search_path
2554            );
2555            assert_eq!(
2556                conn_catalog.effective_search_path(false),
2557                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2558            );
2559            assert_eq!(
2560                conn_catalog.effective_search_path(true),
2561                vec![
2562                    mz_temp_schema.clone(),
2563                    mz_catalog_schema.clone(),
2564                    pg_catalog_schema.clone()
2565                ]
2566            );
2567
2568            let mut session = Session::dummy();
2569            session
2570                .vars_mut()
2571                .set(
2572                    &SystemVars::new(),
2573                    "search_path",
2574                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2575                    false,
2576                )
2577                .expect("failed to set search_path");
2578            let conn_catalog = catalog.for_session(&session);
2579            assert_ne!(
2580                conn_catalog.effective_search_path(false),
2581                conn_catalog.search_path
2582            );
2583            assert_ne!(
2584                conn_catalog.effective_search_path(true),
2585                conn_catalog.search_path
2586            );
2587            assert_eq!(
2588                conn_catalog.effective_search_path(false),
2589                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2590            );
2591            assert_eq!(
2592                conn_catalog.effective_search_path(true),
2593                vec![
2594                    mz_temp_schema.clone(),
2595                    pg_catalog_schema.clone(),
2596                    mz_catalog_schema.clone()
2597                ]
2598            );
2599
2600            let mut session = Session::dummy();
2601            session
2602                .vars_mut()
2603                .set(
2604                    &SystemVars::new(),
2605                    "search_path",
2606                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2607                    false,
2608                )
2609                .expect("failed to set search_path");
2610            let conn_catalog = catalog.for_session(&session);
2611            assert_ne!(
2612                conn_catalog.effective_search_path(false),
2613                conn_catalog.search_path
2614            );
2615            assert_ne!(
2616                conn_catalog.effective_search_path(true),
2617                conn_catalog.search_path
2618            );
2619            assert_eq!(
2620                conn_catalog.effective_search_path(false),
2621                vec![
2622                    mz_catalog_schema.clone(),
2623                    pg_catalog_schema.clone(),
2624                    mz_temp_schema.clone()
2625                ]
2626            );
2627            assert_eq!(
2628                conn_catalog.effective_search_path(true),
2629                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2630            );
2631            catalog.expire().await;
2632        })
2633        .await
2634    }
2635
2636    #[mz_ore::test(tokio::test)]
2637    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2638    async fn test_normalized_create() {
2639        use mz_ore::collections::CollectionExt;
2640        Catalog::with_debug(|catalog| async move {
2641            let conn_catalog = catalog.for_system_session();
2642            let scx = &mut StatementContext::new(None, &conn_catalog);
2643
2644            let parsed = mz_sql_parser::parser::parse_statements(
2645                "create view public.foo as select 1 as bar",
2646            )
2647            .expect("")
2648            .into_element()
2649            .ast;
2650
2651            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2652
2653            // Ensure that all identifiers are quoted.
2654            assert_eq!(
2655                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2656                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2657            );
2658            catalog.expire().await;
2659        })
2660        .await;
2661    }
2662
2663    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2664    #[mz_ore::test(tokio::test)]
2665    #[cfg_attr(miri, ignore)] // slow
2666    async fn test_large_catalog_item() {
2667        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2668        let column = ", 1";
2669        let view_def_size = view_def.bytes().count();
2670        let column_size = column.bytes().count();
2671        let column_count =
2672            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2673        let columns = iter::repeat(column).take(column_count).join("");
2674        let create_sql = format!("{view_def}{columns})");
2675        let create_sql_check = create_sql.clone();
2676        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2677        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2678            &create_sql
2679        ));
2680
2681        let persist_client = PersistClient::new_for_tests().await;
2682        let organization_id = Uuid::new_v4();
2683        let id = CatalogItemId::User(1);
2684        let gid = GlobalId::User(1);
2685        let bootstrap_args = test_bootstrap_args();
2686        {
2687            let mut catalog = Catalog::open_debug_catalog(
2688                persist_client.clone(),
2689                organization_id.clone(),
2690                &bootstrap_args,
2691            )
2692            .await
2693            .expect("unable to open debug catalog");
2694            let item = catalog
2695                .state()
2696                .deserialize_item(
2697                    gid,
2698                    &create_sql,
2699                    &BTreeMap::new(),
2700                    &mut LocalExpressionCache::Closed,
2701                    None,
2702                )
2703                .expect("unable to parse view");
2704            let commit_ts = catalog.current_upper().await;
2705            catalog
2706                .transact(
2707                    None,
2708                    commit_ts,
2709                    None,
2710                    vec![Op::CreateItem {
2711                        item,
2712                        name: QualifiedItemName {
2713                            qualifiers: ItemQualifiers {
2714                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2715                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2716                            },
2717                            item: "v".to_string(),
2718                        },
2719                        id,
2720                        owner_id: MZ_SYSTEM_ROLE_ID,
2721                    }],
2722                )
2723                .await
2724                .expect("failed to transact");
2725            catalog.expire().await;
2726        }
2727        {
2728            let catalog =
2729                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2730                    .await
2731                    .expect("unable to open debug catalog");
2732            let view = catalog.get_entry(&id);
2733            assert_eq!("v", view.name.item);
2734            match &view.item {
2735                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2736                item => panic!("expected view, got {}", item.typ()),
2737            }
2738            catalog.expire().await;
2739        }
2740    }
2741
2742    #[mz_ore::test(tokio::test)]
2743    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2744    async fn test_object_type() {
2745        Catalog::with_debug(|catalog| async move {
2746            let conn_catalog = catalog.for_system_session();
2747
2748            assert_eq!(
2749                mz_sql::catalog::ObjectType::ClusterReplica,
2750                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2751                    ClusterId::user(1).expect("1 is a valid ID"),
2752                    ReplicaId::User(1)
2753                )))
2754            );
2755            assert_eq!(
2756                mz_sql::catalog::ObjectType::Role,
2757                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2758            );
2759            catalog.expire().await;
2760        })
2761        .await;
2762    }
2763
2764    #[mz_ore::test(tokio::test)]
2765    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2766    async fn test_get_privileges() {
2767        Catalog::with_debug(|catalog| async move {
2768            let conn_catalog = catalog.for_system_session();
2769
2770            assert_eq!(
2771                None,
2772                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2773                    ClusterId::user(1).expect("1 is a valid ID"),
2774                    ReplicaId::User(1),
2775                ))))
2776            );
2777            assert_eq!(
2778                None,
2779                conn_catalog
2780                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2781            );
2782            catalog.expire().await;
2783        })
2784        .await;
2785    }
2786
2787    #[mz_ore::test(tokio::test)]
2788    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2789    async fn verify_builtin_descs() {
2790        Catalog::with_debug(|catalog| async move {
2791            let conn_catalog = catalog.for_system_session();
2792
2793            let builtins_cfg = BuiltinsConfig {
2794                include_continual_tasks: true,
2795            };
2796            for builtin in BUILTINS::iter(&builtins_cfg) {
2797                let (schema, name, expected_desc) = match builtin {
2798                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2799                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2800                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2801                    Builtin::Log(_)
2802                    | Builtin::Type(_)
2803                    | Builtin::Func(_)
2804                    | Builtin::ContinualTask(_)
2805                    | Builtin::Index(_)
2806                    | Builtin::Connection(_) => continue,
2807                };
2808                let item = conn_catalog
2809                    .resolve_item(&PartialItemName {
2810                        database: None,
2811                        schema: Some(schema.to_string()),
2812                        item: name.to_string(),
2813                    })
2814                    .expect("unable to resolve item")
2815                    .at_version(RelationVersionSelector::Latest);
2816
2817                let full_name = conn_catalog.resolve_full_name(item.name());
2818                let actual_desc = item.desc(&full_name).expect("invalid item type");
2819                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2820                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2821                {
2822                    assert_eq!(
2823                        actual_name, expected_name,
2824                        "item {schema}.{name} column {index} name did not match its expected name"
2825                    );
2826                    assert_eq!(
2827                        actual_typ, expected_typ,
2828                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2829                    );
2830                }
2831                assert_eq!(
2832                    &*actual_desc, expected_desc,
2833                    "item {schema}.{name} did not match its expected RelationDesc"
2834                );
2835            }
2836            catalog.expire().await;
2837        })
2838        .await
2839    }
2840
2841    // Connect to a running Postgres server and verify that our builtin
2842    // types and functions match it, in addition to some other things.
2843    #[mz_ore::test(tokio::test)]
2844    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2845    async fn test_compare_builtins_postgres() {
2846        async fn inner(catalog: Catalog) {
2847            // Verify that all builtin functions:
2848            // - have a unique OID
2849            // - if they have a postgres counterpart (same oid) then they have matching name
2850            let (client, connection) = tokio_postgres::connect(
2851                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2852                NoTls,
2853            )
2854            .await
2855            .expect("failed to connect to Postgres");
2856
2857            task::spawn(|| "compare_builtin_postgres", async move {
2858                if let Err(e) = connection.await {
2859                    panic!("connection error: {}", e);
2860                }
2861            });
2862
2863            struct PgProc {
2864                name: String,
2865                arg_oids: Vec<u32>,
2866                ret_oid: Option<u32>,
2867                ret_set: bool,
2868            }
2869
2870            struct PgType {
2871                name: String,
2872                ty: String,
2873                elem: u32,
2874                array: u32,
2875                input: u32,
2876                receive: u32,
2877            }
2878
2879            struct PgOper {
2880                oprresult: u32,
2881                name: String,
2882            }
2883
2884            let pg_proc: BTreeMap<_, _> = client
2885                .query(
2886                    "SELECT
2887                    p.oid,
2888                    proname,
2889                    proargtypes,
2890                    prorettype,
2891                    proretset
2892                FROM pg_proc p
2893                JOIN pg_namespace n ON p.pronamespace = n.oid",
2894                    &[],
2895                )
2896                .await
2897                .expect("pg query failed")
2898                .into_iter()
2899                .map(|row| {
2900                    let oid: u32 = row.get("oid");
2901                    let pg_proc = PgProc {
2902                        name: row.get("proname"),
2903                        arg_oids: row.get("proargtypes"),
2904                        ret_oid: row.get("prorettype"),
2905                        ret_set: row.get("proretset"),
2906                    };
2907                    (oid, pg_proc)
2908                })
2909                .collect();
2910
2911            let pg_type: BTreeMap<_, _> = client
2912                .query(
2913                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2914                    &[],
2915                )
2916                .await
2917                .expect("pg query failed")
2918                .into_iter()
2919                .map(|row| {
2920                    let oid: u32 = row.get("oid");
2921                    let pg_type = PgType {
2922                        name: row.get("typname"),
2923                        ty: row.get("typtype"),
2924                        elem: row.get("typelem"),
2925                        array: row.get("typarray"),
2926                        input: row.get("typinput"),
2927                        receive: row.get("typreceive"),
2928                    };
2929                    (oid, pg_type)
2930                })
2931                .collect();
2932
2933            let pg_oper: BTreeMap<_, _> = client
2934                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2935                .await
2936                .expect("pg query failed")
2937                .into_iter()
2938                .map(|row| {
2939                    let oid: u32 = row.get("oid");
2940                    let pg_oper = PgOper {
2941                        name: row.get("oprname"),
2942                        oprresult: row.get("oprresult"),
2943                    };
2944                    (oid, pg_oper)
2945                })
2946                .collect();
2947
2948            let conn_catalog = catalog.for_system_session();
2949            let resolve_type_oid = |item: &str| {
2950                conn_catalog
2951                    .resolve_type(&PartialItemName {
2952                        database: None,
2953                        // All functions we check exist in PG, so the types must, as
2954                        // well
2955                        schema: Some(PG_CATALOG_SCHEMA.into()),
2956                        item: item.to_string(),
2957                    })
2958                    .expect("unable to resolve type")
2959                    .oid()
2960            };
2961
2962            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2963                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2964                .collect();
2965
2966            let mut all_oids = BTreeSet::new();
2967
2968            // A function to determine if two oids are equivalent enough for these tests. We don't
2969            // support some types, so map exceptions here.
2970            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2971                [
2972                    // We don't support NAME.
2973                    (Type::NAME, Type::TEXT),
2974                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2975                    // We don't support time with time zone.
2976                    (Type::TIME, Type::TIMETZ),
2977                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2978                ]
2979                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2980            );
2981            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2982                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2983            ]);
2984            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2985                if ignore_return_types.contains(&fn_oid) {
2986                    return true;
2987                }
2988                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2989                    return true;
2990                }
2991                a == b
2992            };
2993
2994            let builtins_cfg = BuiltinsConfig {
2995                include_continual_tasks: true,
2996            };
2997            for builtin in BUILTINS::iter(&builtins_cfg) {
2998                match builtin {
2999                    Builtin::Type(ty) => {
3000                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3001
3002                        if ty.oid >= FIRST_MATERIALIZE_OID {
3003                            // High OIDs are reserved in Materialize and don't have
3004                            // PostgreSQL counterparts.
3005                            continue;
3006                        }
3007
3008                        // For types that have a PostgreSQL counterpart, verify that
3009                        // the name and oid match.
3010                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3011                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3012                        });
3013                        assert_eq!(
3014                            ty.name, pg_ty.name,
3015                            "oid {} has name {} in postgres; expected {}",
3016                            ty.oid, pg_ty.name, ty.name,
3017                        );
3018
3019                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3020                            None => (0, 0),
3021                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3022                        };
3023                        assert_eq!(
3024                            typinput_oid, pg_ty.input,
3025                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3026                            ty.name, typinput_oid, pg_ty.input,
3027                        );
3028                        assert_eq!(
3029                            typreceive_oid, pg_ty.receive,
3030                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3031                            ty.name, typreceive_oid, pg_ty.receive,
3032                        );
3033                        if typinput_oid != 0 {
3034                            assert!(
3035                                func_oids.contains(&typinput_oid),
3036                                "type {} has typinput OID {} that does not exist in pg_proc",
3037                                ty.name,
3038                                typinput_oid,
3039                            );
3040                        }
3041                        if typreceive_oid != 0 {
3042                            assert!(
3043                                func_oids.contains(&typreceive_oid),
3044                                "type {} has typreceive OID {} that does not exist in pg_proc",
3045                                ty.name,
3046                                typreceive_oid,
3047                            );
3048                        }
3049
3050                        // Ensure the type matches.
3051                        match &ty.details.typ {
3052                            CatalogType::Array { element_reference } => {
3053                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3054                                    .filter_map(|builtin| match builtin {
3055                                        Builtin::Type(ty @ BuiltinType { name, .. })
3056                                            if element_reference == name =>
3057                                        {
3058                                            Some(ty)
3059                                        }
3060                                        _ => None,
3061                                    })
3062                                    .next();
3063                                let elem_ty = match elem_ty {
3064                                    Some(ty) => ty,
3065                                    None => {
3066                                        panic!("{} is unexpectedly not a type", element_reference)
3067                                    }
3068                                };
3069                                assert_eq!(
3070                                    pg_ty.elem, elem_ty.oid,
3071                                    "type {} has mismatched element OIDs",
3072                                    ty.name
3073                                )
3074                            }
3075                            CatalogType::Pseudo => {
3076                                assert_eq!(
3077                                    pg_ty.ty, "p",
3078                                    "type {} is not a pseudo type as expected",
3079                                    ty.name
3080                                )
3081                            }
3082                            CatalogType::Range { .. } => {
3083                                assert_eq!(
3084                                    pg_ty.ty, "r",
3085                                    "type {} is not a range type as expected",
3086                                    ty.name
3087                                );
3088                            }
3089                            _ => {
3090                                assert_eq!(
3091                                    pg_ty.ty, "b",
3092                                    "type {} is not a base type as expected",
3093                                    ty.name
3094                                )
3095                            }
3096                        }
3097
3098                        // Ensure the array type reference is correct.
3099                        let schema = catalog
3100                            .resolve_schema_in_database(
3101                                &ResolvedDatabaseSpecifier::Ambient,
3102                                ty.schema,
3103                                &SYSTEM_CONN_ID,
3104                            )
3105                            .expect("unable to resolve schema");
3106                        let allocated_type = catalog
3107                            .resolve_type(
3108                                None,
3109                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3110                                &PartialItemName {
3111                                    database: None,
3112                                    schema: Some(schema.name().schema.clone()),
3113                                    item: ty.name.to_string(),
3114                                },
3115                                &SYSTEM_CONN_ID,
3116                            )
3117                            .expect("unable to resolve type");
3118                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3119                            ty
3120                        } else {
3121                            panic!("unexpectedly not a type")
3122                        };
3123                        match ty.details.array_id {
3124                            Some(array_id) => {
3125                                let array_ty = catalog.get_entry(&array_id);
3126                                assert_eq!(
3127                                    pg_ty.array, array_ty.oid,
3128                                    "type {} has mismatched array OIDs",
3129                                    allocated_type.name.item,
3130                                );
3131                            }
3132                            None => assert_eq!(
3133                                pg_ty.array, 0,
3134                                "type {} does not have an array type in mz but does in pg",
3135                                allocated_type.name.item,
3136                            ),
3137                        }
3138                    }
3139                    Builtin::Func(func) => {
3140                        for imp in func.inner.func_impls() {
3141                            assert!(
3142                                all_oids.insert(imp.oid),
3143                                "{} reused oid {}",
3144                                func.name,
3145                                imp.oid
3146                            );
3147
3148                            assert!(
3149                                imp.oid < FIRST_USER_OID,
3150                                "built-in function {} erroneously has OID in user space ({})",
3151                                func.name,
3152                                imp.oid,
3153                            );
3154
3155                            // For functions that have a postgres counterpart, verify that the name and
3156                            // oid match.
3157                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3158                                continue;
3159                            } else {
3160                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3161                                    panic!(
3162                                        "pg_proc missing function {}: oid {}",
3163                                        func.name, imp.oid
3164                                    )
3165                                })
3166                            };
3167                            assert_eq!(
3168                                func.name, pg_fn.name,
3169                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3170                                imp.oid, func.name, pg_fn.name
3171                            );
3172
3173                            // Complain, but don't fail, if argument oids don't match.
3174                            // TODO: make these match.
3175                            let imp_arg_oids = imp
3176                                .arg_typs
3177                                .iter()
3178                                .map(|item| resolve_type_oid(item))
3179                                .collect::<Vec<_>>();
3180
3181                            if imp_arg_oids != pg_fn.arg_oids {
3182                                println!(
3183                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3184                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3185                                );
3186                            }
3187
3188                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3189
3190                            assert!(
3191                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3192                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3193                                imp.oid,
3194                                func.name,
3195                                imp_return_oid,
3196                                pg_fn.ret_oid
3197                            );
3198
3199                            assert_eq!(
3200                                imp.return_is_set, pg_fn.ret_set,
3201                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3202                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3203                            );
3204                        }
3205                    }
3206                    _ => (),
3207                }
3208            }
3209
3210            for (op, func) in OP_IMPLS.iter() {
3211                for imp in func.func_impls() {
3212                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3213
3214                    // For operators that have a postgres counterpart, verify that the name and oid match.
3215                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3216                        continue;
3217                    } else {
3218                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3219                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3220                        })
3221                    };
3222
3223                    assert_eq!(*op, pg_op.name);
3224
3225                    let imp_return_oid =
3226                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3227                    if imp_return_oid != pg_op.oprresult {
3228                        panic!(
3229                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3230                            imp.oid, op, imp_return_oid, pg_op.oprresult
3231                        );
3232                    }
3233                }
3234            }
3235            catalog.expire().await;
3236        }
3237
3238        Catalog::with_debug(inner).await
3239    }
3240
3241    // Execute all builtin functions with all combinations of arguments from interesting datums.
3242    #[mz_ore::test(tokio::test)]
3243    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3244    async fn test_smoketest_all_builtins() {
3245        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3246            let catalog = Arc::new(catalog);
3247            let conn_catalog = catalog.for_system_session();
3248
3249            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3250            let mut handles = Vec::new();
3251
3252            // Extracted during planning; always panics when executed.
3253            let ignore_names = BTreeSet::from([
3254                "avg",
3255                "avg_internal_v1",
3256                "bool_and",
3257                "bool_or",
3258                "has_table_privilege", // > 3 s each
3259                "has_type_privilege",  // > 3 s each
3260                "mod",
3261                "mz_panic",
3262                "mz_sleep",
3263                "pow",
3264                "stddev_pop",
3265                "stddev_samp",
3266                "stddev",
3267                "var_pop",
3268                "var_samp",
3269                "variance",
3270            ]);
3271
3272            let fns = BUILTINS::funcs()
3273                .map(|func| (&func.name, func.inner))
3274                .chain(OP_IMPLS.iter());
3275
3276            for (name, func) in fns {
3277                if ignore_names.contains(name) {
3278                    continue;
3279                }
3280                let Func::Scalar(impls) = func else {
3281                    continue;
3282                };
3283
3284                'outer: for imp in impls {
3285                    let details = imp.details();
3286                    let mut styps = Vec::new();
3287                    for item in details.arg_typs.iter() {
3288                        let oid = resolve_type_oid(item);
3289                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3290                            continue 'outer;
3291                        };
3292                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3293                    }
3294                    let datums = styps
3295                        .iter()
3296                        .map(|styp| {
3297                            let mut datums = vec![Datum::Null];
3298                            datums.extend(styp.interesting_datums());
3299                            datums
3300                        })
3301                        .collect::<Vec<_>>();
3302                    // Skip nullary fns.
3303                    if datums.is_empty() {
3304                        continue;
3305                    }
3306
3307                    let return_oid = details
3308                        .return_typ
3309                        .map(resolve_type_oid)
3310                        .expect("must exist");
3311                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3312                        .ok()
3313                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3314
3315                    let mut idxs = vec![0; datums.len()];
3316                    while idxs[0] < datums[0].len() {
3317                        let mut args = Vec::with_capacity(idxs.len());
3318                        for i in 0..(datums.len()) {
3319                            args.push(datums[i][idxs[i]]);
3320                        }
3321
3322                        let op = &imp.op;
3323                        let scalars = args
3324                            .iter()
3325                            .enumerate()
3326                            .map(|(i, datum)| {
3327                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3328                                    datum.clone(),
3329                                    styps[i].clone(),
3330                                ))
3331                            })
3332                            .collect();
3333
3334                        let call_name = format!(
3335                            "{name}({}) (oid: {})",
3336                            args.iter()
3337                                .map(|d| d.to_string())
3338                                .collect::<Vec<_>>()
3339                                .join(", "),
3340                            imp.oid
3341                        );
3342                        let catalog = Arc::clone(&catalog);
3343                        let call_name_fn = call_name.clone();
3344                        let return_styp = return_styp.clone();
3345                        let handle = task::spawn_blocking(
3346                            || call_name,
3347                            move || {
3348                                smoketest_fn(
3349                                    name,
3350                                    call_name_fn,
3351                                    op,
3352                                    imp,
3353                                    args,
3354                                    catalog,
3355                                    scalars,
3356                                    return_styp,
3357                                )
3358                            },
3359                        );
3360                        handles.push(handle);
3361
3362                        // Advance to the next datum combination.
3363                        for i in (0..datums.len()).rev() {
3364                            idxs[i] += 1;
3365                            if idxs[i] >= datums[i].len() {
3366                                if i == 0 {
3367                                    break;
3368                                }
3369                                idxs[i] = 0;
3370                                continue;
3371                            } else {
3372                                break;
3373                            }
3374                        }
3375                    }
3376                }
3377            }
3378            handles
3379        }
3380
3381        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3382        for handle in handles {
3383            handle.await;
3384        }
3385    }
3386
3387    fn smoketest_fn(
3388        name: &&str,
3389        call_name: String,
3390        op: &Operation<HirScalarExpr>,
3391        imp: &FuncImpl<HirScalarExpr>,
3392        args: Vec<Datum<'_>>,
3393        catalog: Arc<Catalog>,
3394        scalars: Vec<CoercibleScalarExpr>,
3395        return_styp: Option<SqlScalarType>,
3396    ) {
3397        let conn_catalog = catalog.for_system_session();
3398        let pcx = PlanContext::zero();
3399        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3400        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3401        let ecx = ExprContext {
3402            qcx: &qcx,
3403            name: "smoketest",
3404            scope: &Scope::empty(),
3405            relation_type: &SqlRelationType::empty(),
3406            allow_aggregates: false,
3407            allow_subqueries: false,
3408            allow_parameters: false,
3409            allow_windows: false,
3410        };
3411        let arena = RowArena::new();
3412        let mut session = Session::<Timestamp>::dummy();
3413        session
3414            .start_transaction(to_datetime(0), None, None)
3415            .expect("must succeed");
3416        let prep_style = ExprPrepStyle::OneShot {
3417            logical_time: EvalTime::Time(Timestamp::MIN),
3418            session: &session,
3419            catalog_state: &catalog.state,
3420        };
3421
3422        // Execute the function as much as possible, ensuring no panics occur, but
3423        // otherwise ignoring eval errors. We also do various other checks.
3424        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3425        if let Ok(hir) = res {
3426            if let Ok(mut mir) = hir.lower_uncorrelated() {
3427                // Populate unmaterialized functions.
3428                prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3429
3430                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3431                    if let Some(return_styp) = return_styp {
3432                        let mir_typ = mir.typ(&[]);
3433                        // MIR type inference should be consistent with the type
3434                        // we get from the catalog.
3435                        assert_eq!(mir_typ.scalar_type, return_styp);
3436                        // The following will check not just that the scalar type
3437                        // is ok, but also catches if the function returned a null
3438                        // but the MIR type inference said "non-nullable".
3439                        if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3440                            panic!(
3441                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3442                            );
3443                        }
3444                        // Check the consistency of `introduces_nulls` and
3445                        // `propagates_nulls` with `MirScalarExpr::typ`.
3446                        if let Some((introduces_nulls, propagates_nulls)) =
3447                            call_introduces_propagates_nulls(&mir)
3448                        {
3449                            if introduces_nulls {
3450                                // If the function introduces_nulls, then the return
3451                                // type should always be nullable, regardless of
3452                                // the nullability of the input types.
3453                                assert!(
3454                                    mir_typ.nullable,
3455                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3456                                    name, args, mir, mir_typ.nullable
3457                                );
3458                            } else {
3459                                let any_input_null = args.iter().any(|arg| arg.is_null());
3460                                if !any_input_null {
3461                                    assert!(
3462                                        !mir_typ.nullable,
3463                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3464                                        name, args, mir, mir_typ.nullable
3465                                    );
3466                                } else {
3467                                    assert_eq!(
3468                                        mir_typ.nullable, propagates_nulls,
3469                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3470                                        name, args, mir, mir_typ.nullable
3471                                    );
3472                                }
3473                            }
3474                        }
3475                        // Check that `MirScalarExpr::reduce` yields the same result
3476                        // as the real evaluation.
3477                        let mut reduced = mir.clone();
3478                        reduced.reduce(&[]);
3479                        match reduced {
3480                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3481                                match reduce_result {
3482                                    Ok(reduce_result_row) => {
3483                                        let reduce_result_datum = reduce_result_row.unpack_first();
3484                                        assert_eq!(
3485                                            reduce_result_datum,
3486                                            eval_result_datum,
3487                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3488                                            name,
3489                                            args,
3490                                            mir,
3491                                            eval_result_datum,
3492                                            mir_typ.scalar_type,
3493                                            reduce_result_datum,
3494                                            ctyp.scalar_type
3495                                        );
3496                                        // Let's check that the types also match.
3497                                        // (We are not checking nullability here,
3498                                        // because it's ok when we know a more
3499                                        // precise nullability after actually
3500                                        // evaluating a function than before.)
3501                                        assert_eq!(
3502                                            ctyp.scalar_type,
3503                                            mir_typ.scalar_type,
3504                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3505                                            name,
3506                                            args,
3507                                            mir,
3508                                            eval_result_datum,
3509                                            mir_typ.scalar_type,
3510                                            reduce_result_datum,
3511                                            ctyp.scalar_type
3512                                        );
3513                                    }
3514                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3515                                }
3516                            }
3517                            _ => unreachable!(
3518                                "all args are literals, so should have reduced to a literal"
3519                            ),
3520                        }
3521                    }
3522                }
3523            }
3524        }
3525    }
3526
3527    /// If the given MirScalarExpr
3528    ///  - is a function call, and
3529    ///  - all arguments are literals
3530    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3531    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3532        match mir_func_call {
3533            MirScalarExpr::CallUnary { func, expr } => {
3534                if expr.is_literal() {
3535                    Some((func.introduces_nulls(), func.propagates_nulls()))
3536                } else {
3537                    None
3538                }
3539            }
3540            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3541                if expr1.is_literal() && expr2.is_literal() {
3542                    Some((func.introduces_nulls(), func.propagates_nulls()))
3543                } else {
3544                    None
3545                }
3546            }
3547            MirScalarExpr::CallVariadic { func, exprs } => {
3548                if exprs.iter().all(|arg| arg.is_literal()) {
3549                    Some((func.introduces_nulls(), func.propagates_nulls()))
3550                } else {
3551                    None
3552                }
3553            }
3554            _ => None,
3555        }
3556    }
3557
3558    // Make sure pg views don't use types that only exist in Materialize.
3559    #[mz_ore::test(tokio::test)]
3560    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3561    async fn test_pg_views_forbidden_types() {
3562        Catalog::with_debug(|catalog| async move {
3563            let conn_catalog = catalog.for_system_session();
3564
3565            for view in BUILTINS::views().filter(|view| {
3566                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3567            }) {
3568                let item = conn_catalog
3569                    .resolve_item(&PartialItemName {
3570                        database: None,
3571                        schema: Some(view.schema.to_string()),
3572                        item: view.name.to_string(),
3573                    })
3574                    .expect("unable to resolve view")
3575                    // TODO(alter_table)
3576                    .at_version(RelationVersionSelector::Latest);
3577                let full_name = conn_catalog.resolve_full_name(item.name());
3578                for col_type in item
3579                    .desc(&full_name)
3580                    .expect("invalid item type")
3581                    .iter_types()
3582                {
3583                    match &col_type.scalar_type {
3584                        typ @ SqlScalarType::UInt16
3585                        | typ @ SqlScalarType::UInt32
3586                        | typ @ SqlScalarType::UInt64
3587                        | typ @ SqlScalarType::MzTimestamp
3588                        | typ @ SqlScalarType::List { .. }
3589                        | typ @ SqlScalarType::Map { .. }
3590                        | typ @ SqlScalarType::MzAclItem => {
3591                            panic!("{typ:?} type found in {full_name}");
3592                        }
3593                        SqlScalarType::AclItem
3594                        | SqlScalarType::Bool
3595                        | SqlScalarType::Int16
3596                        | SqlScalarType::Int32
3597                        | SqlScalarType::Int64
3598                        | SqlScalarType::Float32
3599                        | SqlScalarType::Float64
3600                        | SqlScalarType::Numeric { .. }
3601                        | SqlScalarType::Date
3602                        | SqlScalarType::Time
3603                        | SqlScalarType::Timestamp { .. }
3604                        | SqlScalarType::TimestampTz { .. }
3605                        | SqlScalarType::Interval
3606                        | SqlScalarType::PgLegacyChar
3607                        | SqlScalarType::Bytes
3608                        | SqlScalarType::String
3609                        | SqlScalarType::Char { .. }
3610                        | SqlScalarType::VarChar { .. }
3611                        | SqlScalarType::Jsonb
3612                        | SqlScalarType::Uuid
3613                        | SqlScalarType::Array(_)
3614                        | SqlScalarType::Record { .. }
3615                        | SqlScalarType::Oid
3616                        | SqlScalarType::RegProc
3617                        | SqlScalarType::RegType
3618                        | SqlScalarType::RegClass
3619                        | SqlScalarType::Int2Vector
3620                        | SqlScalarType::Range { .. }
3621                        | SqlScalarType::PgLegacyName => {}
3622                    }
3623                }
3624            }
3625            catalog.expire().await;
3626        })
3627        .await
3628    }
3629
3630    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3631    // introspection relations.
3632    #[mz_ore::test(tokio::test)]
3633    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3634    async fn test_mz_introspection_builtins() {
3635        Catalog::with_debug(|catalog| async move {
3636            let conn_catalog = catalog.for_system_session();
3637
3638            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3639            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3640
3641            for entry in catalog.entries() {
3642                let schema_spec = entry.name().qualifiers.schema_spec;
3643                let introspection_deps = catalog.introspection_dependencies(entry.id);
3644                if introspection_deps.is_empty() {
3645                    assert!(
3646                        schema_spec != introspection_schema_spec,
3647                        "entry does not depend on introspection sources but is in \
3648                         `mz_introspection`: {}",
3649                        conn_catalog.resolve_full_name(entry.name()),
3650                    );
3651                } else {
3652                    assert!(
3653                        schema_spec == introspection_schema_spec,
3654                        "entry depends on introspection sources but is not in \
3655                         `mz_introspection`: {}",
3656                        conn_catalog.resolve_full_name(entry.name()),
3657                    );
3658                }
3659            }
3660        })
3661        .await
3662    }
3663
3664    #[mz_ore::test(tokio::test)]
3665    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3666    async fn test_multi_subscriber_catalog() {
3667        let persist_client = PersistClient::new_for_tests().await;
3668        let bootstrap_args = test_bootstrap_args();
3669        let organization_id = Uuid::new_v4();
3670        let db_name = "DB";
3671
3672        let mut writer_catalog = Catalog::open_debug_catalog(
3673            persist_client.clone(),
3674            organization_id.clone(),
3675            &bootstrap_args,
3676        )
3677        .await
3678        .expect("open_debug_catalog");
3679        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3680            persist_client.clone(),
3681            organization_id.clone(),
3682            &bootstrap_args,
3683        )
3684        .await
3685        .expect("open_debug_read_only_catalog");
3686        assert_err!(writer_catalog.resolve_database(db_name));
3687        assert_err!(read_only_catalog.resolve_database(db_name));
3688
3689        let commit_ts = writer_catalog.current_upper().await;
3690        writer_catalog
3691            .transact(
3692                None,
3693                commit_ts,
3694                None,
3695                vec![Op::CreateDatabase {
3696                    name: db_name.to_string(),
3697                    owner_id: MZ_SYSTEM_ROLE_ID,
3698                }],
3699            )
3700            .await
3701            .expect("failed to transact");
3702
3703        let write_db = writer_catalog
3704            .resolve_database(db_name)
3705            .expect("resolve_database");
3706        read_only_catalog
3707            .sync_to_current_updates()
3708            .await
3709            .expect("sync_to_current_updates");
3710        let read_db = read_only_catalog
3711            .resolve_database(db_name)
3712            .expect("resolve_database");
3713
3714        assert_eq!(write_db, read_db);
3715
3716        let writer_catalog_fencer =
3717            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3718                .await
3719                .expect("open_debug_catalog for fencer");
3720        let fencer_db = writer_catalog_fencer
3721            .resolve_database(db_name)
3722            .expect("resolve_database for fencer");
3723        assert_eq!(fencer_db, read_db);
3724
3725        let write_fence_err = writer_catalog
3726            .sync_to_current_updates()
3727            .await
3728            .expect_err("sync_to_current_updates for fencer");
3729        assert!(matches!(
3730            write_fence_err,
3731            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3732        ));
3733        let read_fence_err = read_only_catalog
3734            .sync_to_current_updates()
3735            .await
3736            .expect_err("sync_to_current_updates after fencer");
3737        assert!(matches!(
3738            read_fence_err,
3739            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3740        ));
3741
3742        writer_catalog.expire().await;
3743        read_only_catalog.expire().await;
3744        writer_catalog_fencer.expire().await;
3745    }
3746}