mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67    CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68    DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91// DO NOT add any more imports from `crate` outside of `crate::catalog`.
92pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    plans: CatalogPlans,
140    expr_cache_handle: Option<ExpressionCacheHandle>,
141    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142    transient_revision: u64,
143}
144
145// Implement our own Clone because derive can't unless S is Clone, which it's
146// not (hence the Arc).
147impl Clone for Catalog {
148    fn clone(&self) -> Self {
149        Self {
150            state: self.state.clone(),
151            plans: self.plans.clone(),
152            expr_cache_handle: self.expr_cache_handle.clone(),
153            storage: Arc::clone(&self.storage),
154            transient_revision: self.transient_revision,
155        }
156    }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168    /// Set the optimized plan for the item identified by `id`.
169    #[mz_ore::instrument(level = "trace")]
170    pub fn set_optimized_plan(
171        &mut self,
172        id: GlobalId,
173        plan: DataflowDescription<OptimizedMirRelationExpr>,
174    ) {
175        self.plans.optimized_plan_by_id.insert(id, plan.into());
176    }
177
178    /// Set the physical plan for the item identified by `id`.
179    #[mz_ore::instrument(level = "trace")]
180    pub fn set_physical_plan(
181        &mut self,
182        id: GlobalId,
183        plan: DataflowDescription<mz_compute_types::plan::Plan>,
184    ) {
185        self.plans.physical_plan_by_id.insert(id, plan.into());
186    }
187
188    /// Try to get the optimized plan for the item identified by `id`.
189    #[mz_ore::instrument(level = "trace")]
190    pub fn try_get_optimized_plan(
191        &self,
192        id: &GlobalId,
193    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195    }
196
197    /// Try to get the physical plan for the item identified by `id`.
198    #[mz_ore::instrument(level = "trace")]
199    pub fn try_get_physical_plan(
200        &self,
201        id: &GlobalId,
202    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    #[mz_ore::instrument(level = "trace")]
208    pub fn set_dataflow_metainfo(
209        &mut self,
210        id: GlobalId,
211        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212    ) {
213        // Add entries to the `notices_by_dep_id` lookup map.
214        for notice in metainfo.optimizer_notices.iter() {
215            for dep_id in notice.dependencies.iter() {
216                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217                entry.push(Arc::clone(notice))
218            }
219            if let Some(item_id) = notice.item_id {
220                soft_assert_eq_or_log!(
221                    item_id,
222                    id,
223                    "notice.item_id should match the id for whom we are saving the notice"
224                );
225            }
226        }
227        // Add the dataflow with the scoped entries.
228        self.plans.dataflow_metainfos.insert(id, metainfo);
229    }
230
231    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
232    #[mz_ore::instrument(level = "trace")]
233    pub fn try_get_dataflow_metainfo(
234        &self,
235        id: &GlobalId,
236    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237        self.plans.dataflow_metainfos.get(id)
238    }
239
240    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
241    /// item identified by `id`.
242    ///
243    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
244    ///
245    /// Return a set containing all dropped notices. Note that if for some
246    /// reason we end up with two identical notices being dropped by the same
247    /// call, the result will contain only one instance of that notice.
248    #[mz_ore::instrument(level = "trace")]
249    pub fn drop_plans_and_metainfos(
250        &mut self,
251        drop_ids: &BTreeSet<GlobalId>,
252    ) -> BTreeSet<Arc<OptimizerNotice>> {
253        // Collect dropped notices in this set.
254        let mut dropped_notices = BTreeSet::new();
255
256        // Remove plans and metainfo.optimizer_notices entries.
257        for id in drop_ids {
258            self.plans.optimized_plan_by_id.remove(id);
259            self.plans.physical_plan_by_id.remove(id);
260            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261                soft_assert_or_log!(
262                    metainfo.optimizer_notices.iter().all_unique(),
263                    "should have been pushed there by `push_optimizer_notice_dedup`"
264                );
265                for n in metainfo.optimizer_notices.drain(..) {
266                    // Remove the corresponding notices_by_dep_id entries.
267                    for dep_id in n.dependencies.iter() {
268                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269                            soft_assert_or_log!(
270                                notices.iter().any(|x| &n == x),
271                                "corrupt notices_by_dep_id"
272                            );
273                            notices.retain(|x| &n != x)
274                        }
275                    }
276                    dropped_notices.insert(n);
277                }
278            }
279        }
280
281        // Remove notices_by_dep_id entries.
282        for id in drop_ids {
283            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284                for n in notices.drain(..) {
285                    // Remove the corresponding metainfo.optimizer_notices entries.
286                    if let Some(item_id) = n.item_id.as_ref() {
287                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288                            metainfo.optimizer_notices.iter().for_each(|n2| {
289                                if let Some(item_id_2) = n2.item_id {
290                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291                                }
292                            });
293                            metainfo.optimizer_notices.retain(|x| &n != x);
294                        }
295                    }
296                    dropped_notices.insert(n);
297                }
298            }
299        }
300
301        // Collect dependency ids not in drop_ids with at least one dropped
302        // notice.
303        let mut todo_dep_ids = BTreeSet::new();
304        for notice in dropped_notices.iter() {
305            for dep_id in notice.dependencies.iter() {
306                if !drop_ids.contains(dep_id) {
307                    todo_dep_ids.insert(*dep_id);
308                }
309            }
310        }
311        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
312        // entries in `todo_dep_ids`.
313        for id in todo_dep_ids {
314            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315                notices.retain(|n| !dropped_notices.contains(n))
316            }
317        }
318
319        if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
320            use mz_ore::str::{bracketed, separated};
321            let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
322            let bad_notices = bad_notices.map(|n| {
323                // Try to find where the bad reference is.
324                // Maybe in `dataflow_metainfos`?
325                let mut dataflow_metainfo_occurrences = Vec::new();
326                for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
327                    if meta_info.optimizer_notices.contains(n) {
328                        dataflow_metainfo_occurrences.push(id);
329                    }
330                }
331                // Or `notices_by_dep_id`?
332                let mut notices_by_dep_id_occurrences = Vec::new();
333                for (id, notices) in self.plans.notices_by_dep_id.iter() {
334                    if notices.iter().contains(n) {
335                        notices_by_dep_id_occurrences.push(id);
336                    }
337                }
338                format!(
339                    "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
340                    dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
341                    n.id,
342                    n.kind,
343                    n.dependencies,
344                    Arc::strong_count(n),
345                    dataflow_metainfo_occurrences,
346                    notices_by_dep_id_occurrences
347                )
348            });
349            let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
350            error!(
351                "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
352                 bad_notices = {bad_notices}; \
353                 drop_ids = {drop_ids:?}"
354            );
355        }
356
357        dropped_notices
358    }
359
360    /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
361    /// as a result of creating new indexes on the items in `ons`.
362    ///
363    /// When creating and inserting a new index, we need to invalidate some entries that may
364    /// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
365    /// the following objects:
366    ///
367    ///   - `o`.
368    ///   - All compute objects that depend directly on `o`.
369    ///   - All compute objects that would directly depend on `o`, if all views were inlined.
370    pub(crate) fn invalidate_for_index(
371        &self,
372        ons: impl Iterator<Item = GlobalId>,
373    ) -> BTreeSet<GlobalId> {
374        let mut dependencies = BTreeSet::new();
375        let mut queue = VecDeque::new();
376        let mut seen = HashSet::new();
377        for on in ons {
378            let entry = self.get_entry_by_global_id(&on);
379            dependencies.insert(on);
380            seen.insert(entry.id);
381            let uses = entry.uses();
382            queue.extend(uses.clone());
383        }
384
385        while let Some(cur) = queue.pop_front() {
386            let entry = self.get_entry(&cur);
387            if seen.insert(cur) {
388                let global_ids = entry.global_ids();
389                match entry.item_type() {
390                    CatalogItemType::Table
391                    | CatalogItemType::Source
392                    | CatalogItemType::MaterializedView
393                    | CatalogItemType::Sink
394                    | CatalogItemType::Index
395                    | CatalogItemType::Type
396                    | CatalogItemType::Func
397                    | CatalogItemType::Secret
398                    | CatalogItemType::Connection
399                    | CatalogItemType::ContinualTask => {
400                        dependencies.extend(global_ids);
401                    }
402                    CatalogItemType::View => {
403                        dependencies.extend(global_ids);
404                        queue.extend(entry.uses());
405                    }
406                }
407            }
408        }
409        dependencies
410    }
411}
412
413#[derive(Debug)]
414pub struct ConnCatalog<'a> {
415    state: Cow<'a, CatalogState>,
416    /// Because we don't have any way of removing items from the catalog
417    /// temporarily, we allow the ConnCatalog to pretend that a set of items
418    /// don't exist during resolution.
419    ///
420    /// This feature is necessary to allow re-planning of statements, which is
421    /// either incredibly useful or required when altering item definitions.
422    ///
423    /// Note that uses of this field should be used by short-lived
424    /// catalogs.
425    unresolvable_ids: BTreeSet<CatalogItemId>,
426    conn_id: ConnectionId,
427    cluster: String,
428    database: Option<DatabaseId>,
429    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
430    role_id: RoleId,
431    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
432    notices_tx: UnboundedSender<AdapterNotice>,
433}
434
435impl ConnCatalog<'_> {
436    pub fn conn_id(&self) -> &ConnectionId {
437        &self.conn_id
438    }
439
440    pub fn state(&self) -> &CatalogState {
441        &*self.state
442    }
443
444    /// Prevent planning from resolving item with the provided ID. Instead,
445    /// return an error as if the item did not exist.
446    ///
447    /// This feature is meant exclusively to permit re-planning statements
448    /// during update operations and should not be used otherwise given its
449    /// extremely "powerful" semantics.
450    ///
451    /// # Panics
452    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
453    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
454        assert_eq!(
455            self.role_id, MZ_SYSTEM_ROLE_ID,
456            "only the system role can mark IDs unresolvable",
457        );
458        self.unresolvable_ids.insert(id);
459    }
460
461    /// Returns the schemas:
462    /// - mz_catalog
463    /// - pg_catalog
464    /// - temp (if requested)
465    /// - all schemas from the session's search_path var that exist
466    pub fn effective_search_path(
467        &self,
468        include_temp_schema: bool,
469    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
470        self.state
471            .effective_search_path(&self.search_path, include_temp_schema)
472    }
473}
474
475impl ConnectionResolver for ConnCatalog<'_> {
476    fn resolve_connection(
477        &self,
478        id: CatalogItemId,
479    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
480        self.state().resolve_connection(id)
481    }
482}
483
484impl Catalog {
485    /// Returns the catalog's transient revision, which starts at 1 and is
486    /// incremented on every change. This is not persisted to disk, and will
487    /// restart on every load.
488    pub fn transient_revision(&self) -> u64 {
489        self.transient_revision
490    }
491
492    /// Creates a debug catalog from the current
493    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
494    /// like in tests.
495    ///
496    /// WARNING! This function can arbitrarily fail because it does not make any
497    /// effort to adjust the catalog's contents' structure or semantics to the
498    /// currently running version, i.e. it does not apply any migrations.
499    ///
500    /// This function must not be called in production contexts. Use
501    /// [`Catalog::open`] with appropriately set configuration parameters
502    /// instead.
503    pub async fn with_debug<F, Fut, T>(f: F) -> T
504    where
505        F: FnOnce(Catalog) -> Fut,
506        Fut: Future<Output = T>,
507    {
508        let persist_client = PersistClient::new_for_tests().await;
509        let organization_id = Uuid::new_v4();
510        let bootstrap_args = test_bootstrap_args();
511        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
512            .await
513            .expect("can open debug catalog");
514        f(catalog).await
515    }
516
517    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
518    /// in progress.
519    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
520    where
521        F: FnOnce(Catalog) -> Fut,
522        Fut: Future<Output = T>,
523    {
524        let persist_client = PersistClient::new_for_tests().await;
525        let organization_id = Uuid::new_v4();
526        let bootstrap_args = test_bootstrap_args();
527        let mut catalog =
528            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
529                .await
530                .expect("can open debug catalog");
531
532        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
533        let now = SYSTEM_TIME.clone();
534        let openable_storage = TestCatalogStateBuilder::new(persist_client)
535            .with_organization_id(organization_id)
536            .with_default_deploy_generation()
537            .build()
538            .await
539            .expect("can create durable catalog");
540        let mut storage = openable_storage
541            .open(now().into(), &bootstrap_args)
542            .await
543            .expect("can open durable catalog")
544            .0;
545        // Drain updates.
546        let _ = storage
547            .sync_to_current_updates()
548            .await
549            .expect("can sync to current updates");
550        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
551
552        f(catalog).await
553    }
554
555    /// Opens a debug catalog.
556    ///
557    /// See [`Catalog::with_debug`].
558    pub async fn open_debug_catalog(
559        persist_client: PersistClient,
560        organization_id: Uuid,
561        bootstrap_args: &BootstrapArgs,
562    ) -> Result<Catalog, anyhow::Error> {
563        let now = SYSTEM_TIME.clone();
564        let environment_id = None;
565        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
566            .with_organization_id(organization_id)
567            .with_default_deploy_generation()
568            .build()
569            .await?;
570        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
571        let system_parameter_defaults = BTreeMap::default();
572        Self::open_debug_catalog_inner(
573            persist_client,
574            storage,
575            now,
576            environment_id,
577            &DUMMY_BUILD_INFO,
578            system_parameter_defaults,
579            bootstrap_args,
580            None,
581        )
582        .await
583    }
584
585    /// Opens a read only debug persist backed catalog defined by `persist_client` and
586    /// `organization_id`.
587    ///
588    /// See [`Catalog::with_debug`].
589    pub async fn open_debug_read_only_catalog(
590        persist_client: PersistClient,
591        organization_id: Uuid,
592        bootstrap_args: &BootstrapArgs,
593    ) -> Result<Catalog, anyhow::Error> {
594        let now = SYSTEM_TIME.clone();
595        let environment_id = None;
596        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
597            .with_organization_id(organization_id)
598            .build()
599            .await?;
600        let storage = openable_storage
601            .open_read_only(&test_bootstrap_args())
602            .await?;
603        let system_parameter_defaults = BTreeMap::default();
604        Self::open_debug_catalog_inner(
605            persist_client,
606            storage,
607            now,
608            environment_id,
609            &DUMMY_BUILD_INFO,
610            system_parameter_defaults,
611            bootstrap_args,
612            None,
613        )
614        .await
615    }
616
617    /// Opens a read only debug persist backed catalog defined by `persist_client` and
618    /// `organization_id`.
619    ///
620    /// See [`Catalog::with_debug`].
621    pub async fn open_debug_read_only_persist_catalog_config(
622        persist_client: PersistClient,
623        now: NowFn,
624        environment_id: EnvironmentId,
625        system_parameter_defaults: BTreeMap<String, String>,
626        build_info: &'static BuildInfo,
627        bootstrap_args: &BootstrapArgs,
628        enable_expression_cache_override: Option<bool>,
629    ) -> Result<Catalog, anyhow::Error> {
630        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
631            .with_organization_id(environment_id.organization_id())
632            .with_version(
633                build_info
634                    .version
635                    .parse()
636                    .expect("build version is parseable"),
637            )
638            .build()
639            .await?;
640        let storage = openable_storage.open_read_only(bootstrap_args).await?;
641        Self::open_debug_catalog_inner(
642            persist_client,
643            storage,
644            now,
645            Some(environment_id),
646            build_info,
647            system_parameter_defaults,
648            bootstrap_args,
649            enable_expression_cache_override,
650        )
651        .await
652    }
653
654    async fn open_debug_catalog_inner(
655        persist_client: PersistClient,
656        storage: Box<dyn DurableCatalogState>,
657        now: NowFn,
658        environment_id: Option<EnvironmentId>,
659        build_info: &'static BuildInfo,
660        system_parameter_defaults: BTreeMap<String, String>,
661        bootstrap_args: &BootstrapArgs,
662        enable_expression_cache_override: Option<bool>,
663    ) -> Result<Catalog, anyhow::Error> {
664        let metrics_registry = &MetricsRegistry::new();
665        let secrets_reader = Arc::new(InMemorySecretsController::new());
666        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
667        // debugging/testing.
668        let previous_ts = now().into();
669        let replica_size = &bootstrap_args.default_cluster_replica_size;
670        let read_only = false;
671
672        let OpenCatalogResult {
673            catalog,
674            migrated_storage_collections_0dt: _,
675            new_builtin_collections: _,
676            builtin_table_updates: _,
677            cached_global_exprs: _,
678            uncached_local_exprs: _,
679        } = Catalog::open(Config {
680            storage,
681            metrics_registry,
682            state: StateConfig {
683                unsafe_mode: true,
684                all_features: false,
685                build_info,
686                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
687                read_only,
688                now,
689                boot_ts: previous_ts,
690                skip_migrations: true,
691                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
692                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
693                    size: replica_size.clone(),
694                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
695                },
696                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
697                    size: replica_size.clone(),
698                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
699                },
700                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
701                    size: replica_size.clone(),
702                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
703                },
704                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
705                    size: replica_size.clone(),
706                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
707                },
708                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
709                    size: replica_size.clone(),
710                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711                },
712                system_parameter_defaults,
713                remote_system_parameters: None,
714                availability_zones: vec![],
715                egress_addresses: vec![],
716                aws_principal_context: None,
717                aws_privatelink_availability_zones: None,
718                http_host_name: None,
719                connection_context: ConnectionContext::for_tests(secrets_reader),
720                builtin_item_migration_config: BuiltinItemMigrationConfig {
721                    persist_client: persist_client.clone(),
722                    read_only,
723                },
724                persist_client,
725                enable_expression_cache_override,
726                enable_0dt_deployment: true,
727                helm_chart_version: None,
728                external_login_password_mz_system: None,
729                license_key: ValidatedLicenseKey::for_tests(),
730            },
731        })
732        .await?;
733        Ok(catalog)
734    }
735
736    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
737        self.state.for_session(session)
738    }
739
740    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
741        self.state.for_sessionless_user(role_id)
742    }
743
744    pub fn for_system_session(&self) -> ConnCatalog<'_> {
745        self.state.for_system_session()
746    }
747
748    async fn storage<'a>(
749        &'a self,
750    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
751        self.storage.lock().await
752    }
753
754    pub async fn current_upper(&self) -> mz_repr::Timestamp {
755        self.storage().await.current_upper().await
756    }
757
758    pub async fn allocate_user_id(
759        &self,
760        commit_ts: mz_repr::Timestamp,
761    ) -> Result<(CatalogItemId, GlobalId), Error> {
762        self.storage()
763            .await
764            .allocate_user_id(commit_ts)
765            .await
766            .maybe_terminate("allocating user ids")
767            .err_into()
768    }
769
770    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
771    pub async fn allocate_user_ids(
772        &self,
773        amount: u64,
774        commit_ts: mz_repr::Timestamp,
775    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
776        self.storage()
777            .await
778            .allocate_user_ids(amount, commit_ts)
779            .await
780            .maybe_terminate("allocating user ids")
781            .err_into()
782    }
783
784    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
785        let commit_ts = self.storage().await.current_upper().await;
786        self.allocate_user_id(commit_ts).await
787    }
788
789    /// Get the next user item ID without allocating it.
790    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
791        self.storage()
792            .await
793            .get_next_user_item_id()
794            .await
795            .err_into()
796    }
797
798    #[cfg(test)]
799    pub async fn allocate_system_id(
800        &self,
801        commit_ts: mz_repr::Timestamp,
802    ) -> Result<(CatalogItemId, GlobalId), Error> {
803        use mz_ore::collections::CollectionExt;
804
805        let mut storage = self.storage().await;
806        let mut txn = storage.transaction().await?;
807        let id = txn
808            .allocate_system_item_ids(1)
809            .maybe_terminate("allocating system ids")?
810            .into_element();
811        // Drain transaction.
812        let _ = txn.get_and_commit_op_updates();
813        txn.commit(commit_ts).await?;
814        Ok(id)
815    }
816
817    /// Get the next system item ID without allocating it.
818    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
819        self.storage()
820            .await
821            .get_next_system_item_id()
822            .await
823            .err_into()
824    }
825
826    pub async fn allocate_user_cluster_id(
827        &self,
828        commit_ts: mz_repr::Timestamp,
829    ) -> Result<ClusterId, Error> {
830        self.storage()
831            .await
832            .allocate_user_cluster_id(commit_ts)
833            .await
834            .maybe_terminate("allocating user cluster ids")
835            .err_into()
836    }
837
838    /// Get the next system replica id without allocating it.
839    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
840        self.storage()
841            .await
842            .get_next_system_replica_id()
843            .await
844            .err_into()
845    }
846
847    /// Get the next user replica id without allocating it.
848    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
849        self.storage()
850            .await
851            .get_next_user_replica_id()
852            .await
853            .err_into()
854    }
855
856    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
857        self.state.resolve_database(database_name)
858    }
859
860    pub fn resolve_schema(
861        &self,
862        current_database: Option<&DatabaseId>,
863        database_name: Option<&str>,
864        schema_name: &str,
865        conn_id: &ConnectionId,
866    ) -> Result<&Schema, SqlCatalogError> {
867        self.state
868            .resolve_schema(current_database, database_name, schema_name, conn_id)
869    }
870
871    pub fn resolve_schema_in_database(
872        &self,
873        database_spec: &ResolvedDatabaseSpecifier,
874        schema_name: &str,
875        conn_id: &ConnectionId,
876    ) -> Result<&Schema, SqlCatalogError> {
877        self.state
878            .resolve_schema_in_database(database_spec, schema_name, conn_id)
879    }
880
881    pub fn resolve_replica_in_cluster(
882        &self,
883        cluster_id: &ClusterId,
884        replica_name: &str,
885    ) -> Result<&ClusterReplica, SqlCatalogError> {
886        self.state
887            .resolve_replica_in_cluster(cluster_id, replica_name)
888    }
889
890    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
891        self.state.resolve_system_schema(name)
892    }
893
894    pub fn resolve_search_path(
895        &self,
896        session: &Session,
897    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
898        self.state.resolve_search_path(session)
899    }
900
901    /// Resolves `name` to a non-function [`CatalogEntry`].
902    pub fn resolve_entry(
903        &self,
904        current_database: Option<&DatabaseId>,
905        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
906        name: &PartialItemName,
907        conn_id: &ConnectionId,
908    ) -> Result<&CatalogEntry, SqlCatalogError> {
909        self.state
910            .resolve_entry(current_database, search_path, name, conn_id)
911    }
912
913    /// Resolves a `BuiltinTable`.
914    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
915        self.state.resolve_builtin_table(builtin)
916    }
917
918    /// Resolves a `BuiltinLog`.
919    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
920        self.state.resolve_builtin_log(builtin).0
921    }
922
923    /// Resolves a `BuiltinSource`.
924    pub fn resolve_builtin_storage_collection(
925        &self,
926        builtin: &'static BuiltinSource,
927    ) -> CatalogItemId {
928        self.state.resolve_builtin_source(builtin)
929    }
930
931    /// Resolves `name` to a function [`CatalogEntry`].
932    pub fn resolve_function(
933        &self,
934        current_database: Option<&DatabaseId>,
935        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
936        name: &PartialItemName,
937        conn_id: &ConnectionId,
938    ) -> Result<&CatalogEntry, SqlCatalogError> {
939        self.state
940            .resolve_function(current_database, search_path, name, conn_id)
941    }
942
943    /// Resolves `name` to a type [`CatalogEntry`].
944    pub fn resolve_type(
945        &self,
946        current_database: Option<&DatabaseId>,
947        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
948        name: &PartialItemName,
949        conn_id: &ConnectionId,
950    ) -> Result<&CatalogEntry, SqlCatalogError> {
951        self.state
952            .resolve_type(current_database, search_path, name, conn_id)
953    }
954
955    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
956        self.state.resolve_cluster(name)
957    }
958
959    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
960    ///
961    /// # Panics
962    /// * If the [`BuiltinCluster`] doesn't exist.
963    ///
964    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
965        self.state.resolve_builtin_cluster(cluster)
966    }
967
968    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
969        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
970    }
971
972    /// Resolves a [`Cluster`] for a TargetCluster.
973    pub fn resolve_target_cluster(
974        &self,
975        target_cluster: TargetCluster,
976        session: &Session,
977    ) -> Result<&Cluster, AdapterError> {
978        match target_cluster {
979            TargetCluster::CatalogServer => {
980                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
981            }
982            TargetCluster::Active => self.active_cluster(session),
983            TargetCluster::Transaction(cluster_id) => self
984                .try_get_cluster(cluster_id)
985                .ok_or(AdapterError::ConcurrentClusterDrop),
986        }
987    }
988
989    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
990        // TODO(benesch): this check here is not sufficiently protective. It'd
991        // be very easy for a code path to accidentally avoid this check by
992        // calling `resolve_cluster(session.vars().cluster())`.
993        if session.user().name != SYSTEM_USER.name
994            && session.user().name != SUPPORT_USER.name
995            && session.vars().cluster() == SYSTEM_USER.name
996        {
997            coord_bail!(
998                "system cluster '{}' cannot execute user queries",
999                SYSTEM_USER.name
1000            );
1001        }
1002        let cluster = self.resolve_cluster(session.vars().cluster())?;
1003        Ok(cluster)
1004    }
1005
1006    pub fn state(&self) -> &CatalogState {
1007        &self.state
1008    }
1009
1010    pub fn resolve_full_name(
1011        &self,
1012        name: &QualifiedItemName,
1013        conn_id: Option<&ConnectionId>,
1014    ) -> FullItemName {
1015        self.state.resolve_full_name(name, conn_id)
1016    }
1017
1018    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1019        self.state.try_get_entry(id)
1020    }
1021
1022    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1023        self.state.try_get_entry_by_global_id(id)
1024    }
1025
1026    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1027        self.state.get_entry(id)
1028    }
1029
1030    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1031        self.state.get_entry_by_global_id(id)
1032    }
1033
1034    pub fn get_global_ids<'a>(
1035        &'a self,
1036        id: &CatalogItemId,
1037    ) -> impl Iterator<Item = GlobalId> + use<'a> {
1038        self.get_entry(id).global_ids()
1039    }
1040
1041    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1042        self.get_entry_by_global_id(id).id()
1043    }
1044
1045    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1046        let item = self.try_get_entry_by_global_id(id)?;
1047        Some(item.id())
1048    }
1049
1050    pub fn get_schema(
1051        &self,
1052        database_spec: &ResolvedDatabaseSpecifier,
1053        schema_spec: &SchemaSpecifier,
1054        conn_id: &ConnectionId,
1055    ) -> &Schema {
1056        self.state.get_schema(database_spec, schema_spec, conn_id)
1057    }
1058
1059    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1060        self.state.get_mz_catalog_schema_id()
1061    }
1062
1063    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1064        self.state.get_pg_catalog_schema_id()
1065    }
1066
1067    pub fn get_information_schema_id(&self) -> SchemaId {
1068        self.state.get_information_schema_id()
1069    }
1070
1071    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1072        self.state.get_mz_internal_schema_id()
1073    }
1074
1075    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1076        self.state.get_mz_introspection_schema_id()
1077    }
1078
1079    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1080        self.state.get_mz_unsafe_schema_id()
1081    }
1082
1083    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1084        self.state.system_schema_ids()
1085    }
1086
1087    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1088        self.state.get_database(id)
1089    }
1090
1091    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1092        self.state.try_get_role(id)
1093    }
1094
1095    pub fn get_role(&self, id: &RoleId) -> &Role {
1096        self.state.get_role(id)
1097    }
1098
1099    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1100        self.state.try_get_role_by_name(role_name)
1101    }
1102
1103    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1104        self.state.try_get_role_auth_by_id(id)
1105    }
1106
1107    /// Creates a new schema in the `Catalog` for temporary items
1108    /// indicated by the TEMPORARY or TEMP keywords.
1109    pub fn create_temporary_schema(
1110        &mut self,
1111        conn_id: &ConnectionId,
1112        owner_id: RoleId,
1113    ) -> Result<(), Error> {
1114        self.state.create_temporary_schema(conn_id, owner_id)
1115    }
1116
1117    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1118        self.state.temporary_schemas[conn_id]
1119            .items
1120            .contains_key(item_name)
1121    }
1122
1123    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1124    /// Returns Ok if conn_id's temp schema does not exist.
1125    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1126        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1127            return Ok(());
1128        };
1129        if !schema.items.is_empty() {
1130            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1131        }
1132        Ok(())
1133    }
1134
1135    pub(crate) fn object_dependents(
1136        &self,
1137        object_ids: &Vec<ObjectId>,
1138        conn_id: &ConnectionId,
1139    ) -> Vec<ObjectId> {
1140        let mut seen = BTreeSet::new();
1141        self.state.object_dependents(object_ids, conn_id, &mut seen)
1142    }
1143
1144    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1145        FullNameV1 {
1146            database: name.database.to_string(),
1147            schema: name.schema.clone(),
1148            item: name.item.clone(),
1149        }
1150    }
1151
1152    pub fn find_available_cluster_name(&self, name: &str) -> String {
1153        let mut i = 0;
1154        let mut candidate = name.to_string();
1155        while self.state.clusters_by_name.contains_key(&candidate) {
1156            i += 1;
1157            candidate = format!("{}{}", name, i);
1158        }
1159        candidate
1160    }
1161
1162    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1163        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1164            self.cluster_replica_sizes()
1165                .enabled_allocations()
1166                .map(|a| a.0.to_owned())
1167                .collect::<Vec<_>>()
1168        } else {
1169            self.system_config().allowed_cluster_replica_sizes()
1170        }
1171    }
1172
1173    pub fn concretize_replica_location(
1174        &self,
1175        location: mz_catalog::durable::ReplicaLocation,
1176        allowed_sizes: &Vec<String>,
1177        allowed_availability_zones: Option<&[String]>,
1178    ) -> Result<ReplicaLocation, Error> {
1179        self.state
1180            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1181    }
1182
1183    pub(crate) fn ensure_valid_replica_size(
1184        &self,
1185        allowed_sizes: &[String],
1186        size: &String,
1187    ) -> Result<(), Error> {
1188        self.state.ensure_valid_replica_size(allowed_sizes, size)
1189    }
1190
1191    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1192        &self.state.cluster_replica_sizes
1193    }
1194
1195    /// Returns the privileges of an object by its ID.
1196    pub fn get_privileges(
1197        &self,
1198        id: &SystemObjectId,
1199        conn_id: &ConnectionId,
1200    ) -> Option<&PrivilegeMap> {
1201        match id {
1202            SystemObjectId::Object(id) => match id {
1203                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1204                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1205                ObjectId::Schema((database_spec, schema_spec)) => Some(
1206                    self.get_schema(database_spec, schema_spec, conn_id)
1207                        .privileges(),
1208                ),
1209                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1210                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1211                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1212            },
1213            SystemObjectId::System => Some(&self.state.system_privileges),
1214        }
1215    }
1216
1217    #[mz_ore::instrument(level = "debug")]
1218    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1219        Ok(self.storage().await.confirm_leadership().await?)
1220    }
1221
1222    /// Return the ids of all log sources the given object depends on.
1223    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1224        self.state.introspection_dependencies(id)
1225    }
1226
1227    /// Serializes the catalog's in-memory state.
1228    ///
1229    /// There are no guarantees about the format of the serialized state, except
1230    /// that the serialized state for two identical catalogs will compare
1231    /// identically.
1232    pub fn dump(&self) -> Result<CatalogDump, Error> {
1233        Ok(CatalogDump::new(self.state.dump(None)?))
1234    }
1235
1236    /// Checks the [`Catalog`]s internal consistency.
1237    ///
1238    /// Returns a JSON object describing the inconsistencies, if there are any.
1239    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1240        self.state.check_consistency().map_err(|inconsistencies| {
1241            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1242                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1243            })
1244        })
1245    }
1246
1247    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1248        self.state.config()
1249    }
1250
1251    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1252        self.state.entry_by_id.values()
1253    }
1254
1255    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1256        self.entries()
1257            .filter(|entry| entry.is_connection() && entry.id().is_user())
1258    }
1259
1260    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1261        self.entries()
1262            .filter(|entry| entry.is_table() && entry.id().is_user())
1263    }
1264
1265    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1266        self.entries()
1267            .filter(|entry| entry.is_source() && entry.id().is_user())
1268    }
1269
1270    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1271        self.entries()
1272            .filter(|entry| entry.is_sink() && entry.id().is_user())
1273    }
1274
1275    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1276        self.entries()
1277            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1278    }
1279
1280    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1281        self.entries()
1282            .filter(|entry| entry.is_secret() && entry.id().is_user())
1283    }
1284
1285    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1286        self.state.get_network_policy(&network_policy_id)
1287    }
1288
1289    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1290        self.state.try_get_network_policy_by_name(name)
1291    }
1292
1293    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1294        self.state.clusters_by_id.values()
1295    }
1296
1297    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1298        self.state.get_cluster(cluster_id)
1299    }
1300
1301    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1302        self.state.try_get_cluster(cluster_id)
1303    }
1304
1305    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1306        self.clusters().filter(|cluster| cluster.id.is_user())
1307    }
1308
1309    pub fn get_cluster_replica(
1310        &self,
1311        cluster_id: ClusterId,
1312        replica_id: ReplicaId,
1313    ) -> &ClusterReplica {
1314        self.state.get_cluster_replica(cluster_id, replica_id)
1315    }
1316
1317    pub fn try_get_cluster_replica(
1318        &self,
1319        cluster_id: ClusterId,
1320        replica_id: ReplicaId,
1321    ) -> Option<&ClusterReplica> {
1322        self.state.try_get_cluster_replica(cluster_id, replica_id)
1323    }
1324
1325    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1326        self.user_clusters()
1327            .flat_map(|cluster| cluster.user_replicas())
1328    }
1329
1330    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1331        self.state.database_by_id.values()
1332    }
1333
1334    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1335        self.state
1336            .roles_by_id
1337            .values()
1338            .filter(|role| role.is_user())
1339    }
1340
1341    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1342        self.entries()
1343            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1344    }
1345
1346    pub fn system_privileges(&self) -> &PrivilegeMap {
1347        &self.state.system_privileges
1348    }
1349
1350    pub fn default_privileges(
1351        &self,
1352    ) -> impl Iterator<
1353        Item = (
1354            &DefaultPrivilegeObject,
1355            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1356        ),
1357    > {
1358        self.state.default_privileges.iter()
1359    }
1360
1361    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1362        self.state
1363            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1364    }
1365
1366    pub fn pack_storage_usage_update(
1367        &self,
1368        event: VersionedStorageUsage,
1369        diff: Diff,
1370    ) -> BuiltinTableUpdate {
1371        self.state
1372            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1373    }
1374
1375    pub fn system_config(&self) -> &SystemVars {
1376        self.state.system_config()
1377    }
1378
1379    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1380        self.state.system_config_mut()
1381    }
1382
1383    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1384        self.state.ensure_not_reserved_role(role_id)
1385    }
1386
1387    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1388        self.state.ensure_grantable_role(role_id)
1389    }
1390
1391    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1392        self.state.ensure_not_system_role(role_id)
1393    }
1394
1395    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1396        self.state.ensure_not_predefined_role(role_id)
1397    }
1398
1399    pub fn ensure_not_reserved_network_policy(
1400        &self,
1401        network_policy_id: &NetworkPolicyId,
1402    ) -> Result<(), Error> {
1403        self.state
1404            .ensure_not_reserved_network_policy(network_policy_id)
1405    }
1406
1407    pub fn ensure_not_reserved_object(
1408        &self,
1409        object_id: &ObjectId,
1410        conn_id: &ConnectionId,
1411    ) -> Result<(), Error> {
1412        match object_id {
1413            ObjectId::Cluster(cluster_id) => {
1414                if cluster_id.is_system() {
1415                    let cluster = self.get_cluster(*cluster_id);
1416                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1417                        cluster.name().to_string(),
1418                    )))
1419                } else {
1420                    Ok(())
1421                }
1422            }
1423            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1424                if replica_id.is_system() {
1425                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1426                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1427                        replica.name().to_string(),
1428                    )))
1429                } else {
1430                    Ok(())
1431                }
1432            }
1433            ObjectId::Database(database_id) => {
1434                if database_id.is_system() {
1435                    let database = self.get_database(database_id);
1436                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1437                        database.name().to_string(),
1438                    )))
1439                } else {
1440                    Ok(())
1441                }
1442            }
1443            ObjectId::Schema((database_spec, schema_spec)) => {
1444                if schema_spec.is_system() {
1445                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1446                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1447                        schema.name().schema.clone(),
1448                    )))
1449                } else {
1450                    Ok(())
1451                }
1452            }
1453            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1454            ObjectId::Item(item_id) => {
1455                if item_id.is_system() {
1456                    let item = self.get_entry(item_id);
1457                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1458                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1459                } else {
1460                    Ok(())
1461                }
1462            }
1463            ObjectId::NetworkPolicy(network_policy_id) => {
1464                self.ensure_not_reserved_network_policy(network_policy_id)
1465            }
1466        }
1467    }
1468
1469    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1470    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1471        &mut self,
1472        create_sql: &str,
1473        force_if_exists_skip: bool,
1474    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1475        self.state
1476            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1477    }
1478
1479    pub(crate) fn update_expression_cache<'a, 'b>(
1480        &'a self,
1481        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1482        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1483    ) -> BoxFuture<'b, ()> {
1484        if let Some(expr_cache) = &self.expr_cache_handle {
1485            let ons = new_local_expressions
1486                .iter()
1487                .map(|(id, _)| id)
1488                .chain(new_global_expressions.iter().map(|(id, _)| id))
1489                .map(|id| self.get_entry_by_global_id(id))
1490                .filter_map(|entry| entry.index().map(|index| index.on));
1491            let invalidate_ids = self.invalidate_for_index(ons);
1492            expr_cache
1493                .update(
1494                    new_local_expressions,
1495                    new_global_expressions,
1496                    invalidate_ids,
1497                )
1498                .boxed()
1499        } else {
1500            async {}.boxed()
1501        }
1502    }
1503
1504    /// Listen for and apply all unconsumed updates to the durable catalog state.
1505    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1506    // `#[cfg(test)]` annotation.
1507    #[cfg(test)]
1508    async fn sync_to_current_updates(
1509        &mut self,
1510    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1511        let updates = self.storage().await.sync_to_current_updates().await?;
1512        let builtin_table_updates = self.state.apply_updates(updates)?;
1513        Ok(builtin_table_updates)
1514    }
1515}
1516
1517pub fn is_reserved_name(name: &str) -> bool {
1518    BUILTIN_PREFIXES
1519        .iter()
1520        .any(|prefix| name.starts_with(prefix))
1521}
1522
1523pub fn is_reserved_role_name(name: &str) -> bool {
1524    is_reserved_name(name) || is_public_role(name)
1525}
1526
1527pub fn is_public_role(name: &str) -> bool {
1528    name == &*PUBLIC_ROLE_NAME
1529}
1530
1531pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1532    object_type_to_audit_object_type(sql_type.into())
1533}
1534
1535pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1536    match id {
1537        CommentObjectId::Table(_) => ObjectType::Table,
1538        CommentObjectId::View(_) => ObjectType::View,
1539        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1540        CommentObjectId::Source(_) => ObjectType::Source,
1541        CommentObjectId::Sink(_) => ObjectType::Sink,
1542        CommentObjectId::Index(_) => ObjectType::Index,
1543        CommentObjectId::Func(_) => ObjectType::Func,
1544        CommentObjectId::Connection(_) => ObjectType::Connection,
1545        CommentObjectId::Type(_) => ObjectType::Type,
1546        CommentObjectId::Secret(_) => ObjectType::Secret,
1547        CommentObjectId::Role(_) => ObjectType::Role,
1548        CommentObjectId::Database(_) => ObjectType::Database,
1549        CommentObjectId::Schema(_) => ObjectType::Schema,
1550        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1551        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1552        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1553        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1554    }
1555}
1556
1557pub(crate) fn object_type_to_audit_object_type(
1558    object_type: mz_sql::catalog::ObjectType,
1559) -> ObjectType {
1560    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1561}
1562
1563pub(crate) fn system_object_type_to_audit_object_type(
1564    system_type: &SystemObjectType,
1565) -> ObjectType {
1566    match system_type {
1567        SystemObjectType::Object(object_type) => match object_type {
1568            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1569            mz_sql::catalog::ObjectType::View => ObjectType::View,
1570            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1571            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1572            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1573            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1574            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1575            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1576            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1577            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1578            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1579            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1580            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1581            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1582            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1583            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1584            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1585        },
1586        SystemObjectType::System => ObjectType::System,
1587    }
1588}
1589
1590#[derive(Debug, Copy, Clone)]
1591pub enum UpdatePrivilegeVariant {
1592    Grant,
1593    Revoke,
1594}
1595
1596impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1597    fn from(variant: UpdatePrivilegeVariant) -> Self {
1598        match variant {
1599            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1600            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1601        }
1602    }
1603}
1604
1605impl From<UpdatePrivilegeVariant> for EventType {
1606    fn from(variant: UpdatePrivilegeVariant) -> Self {
1607        match variant {
1608            UpdatePrivilegeVariant::Grant => EventType::Grant,
1609            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1610        }
1611    }
1612}
1613
1614impl ConnCatalog<'_> {
1615    fn resolve_item_name(
1616        &self,
1617        name: &PartialItemName,
1618    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1619        self.resolve_item(name).map(|entry| entry.name())
1620    }
1621
1622    fn resolve_function_name(
1623        &self,
1624        name: &PartialItemName,
1625    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1626        self.resolve_function(name).map(|entry| entry.name())
1627    }
1628
1629    fn resolve_type_name(
1630        &self,
1631        name: &PartialItemName,
1632    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1633        self.resolve_type(name).map(|entry| entry.name())
1634    }
1635}
1636
1637impl ExprHumanizer for ConnCatalog<'_> {
1638    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1639        let entry = self.state.try_get_entry_by_global_id(&id)?;
1640        Some(self.resolve_full_name(entry.name()).to_string())
1641    }
1642
1643    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1644        let entry = self.state.try_get_entry_by_global_id(&id)?;
1645        Some(entry.name().item.clone())
1646    }
1647
1648    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1649        let entry = self.state.try_get_entry_by_global_id(&id)?;
1650        Some(self.resolve_full_name(entry.name()).into_parts())
1651    }
1652
1653    fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1654        use SqlScalarType::*;
1655
1656        match typ {
1657            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1658            List {
1659                custom_id: Some(item_id),
1660                ..
1661            }
1662            | Map {
1663                custom_id: Some(item_id),
1664                ..
1665            } => {
1666                let item = self.get_item(item_id);
1667                self.minimal_qualification(item.name()).to_string()
1668            }
1669            List { element_type, .. } => {
1670                format!(
1671                    "{} list",
1672                    self.humanize_scalar_type(element_type, postgres_compat)
1673                )
1674            }
1675            Map { value_type, .. } => format!(
1676                "map[{}=>{}]",
1677                self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1678                self.humanize_scalar_type(value_type, postgres_compat)
1679            ),
1680            Record {
1681                custom_id: Some(item_id),
1682                ..
1683            } => {
1684                let item = self.get_item(item_id);
1685                self.minimal_qualification(item.name()).to_string()
1686            }
1687            Record { fields, .. } => format!(
1688                "record({})",
1689                fields
1690                    .iter()
1691                    .map(|f| format!(
1692                        "{}: {}",
1693                        f.0,
1694                        self.humanize_column_type(&f.1, postgres_compat)
1695                    ))
1696                    .join(",")
1697            ),
1698            PgLegacyChar => "\"char\"".into(),
1699            Char { length } if !postgres_compat => match length {
1700                None => "char".into(),
1701                Some(length) => format!("char({})", length.into_u32()),
1702            },
1703            VarChar { max_length } if !postgres_compat => match max_length {
1704                None => "varchar".into(),
1705                Some(length) => format!("varchar({})", length.into_u32()),
1706            },
1707            UInt16 => "uint2".into(),
1708            UInt32 => "uint4".into(),
1709            UInt64 => "uint8".into(),
1710            ty => {
1711                let pgrepr_type = mz_pgrepr::Type::from(ty);
1712                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1713
1714                let res = if self
1715                    .effective_search_path(true)
1716                    .iter()
1717                    .any(|(_, schema)| schema == &pg_catalog_schema)
1718                {
1719                    pgrepr_type.name().to_string()
1720                } else {
1721                    // If PG_CATALOG_SCHEMA is not in search path, you need
1722                    // qualified object name to refer to type.
1723                    let name = QualifiedItemName {
1724                        qualifiers: ItemQualifiers {
1725                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1726                            schema_spec: pg_catalog_schema,
1727                        },
1728                        item: pgrepr_type.name().to_string(),
1729                    };
1730                    self.resolve_full_name(&name).to_string()
1731                };
1732                res
1733            }
1734        }
1735    }
1736
1737    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1738        let entry = self.state.try_get_entry_by_global_id(&id)?;
1739
1740        match entry.index() {
1741            Some(index) => {
1742                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1743                let mut on_names = on_desc
1744                    .iter_names()
1745                    .map(|col_name| col_name.to_string())
1746                    .collect::<Vec<_>>();
1747
1748                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1749
1750                // Init ix_names with unknown column names. Unknown columns are
1751                // represented as an empty String and rendered as `#c` by the
1752                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1753                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1754                let mut ix_names = vec![String::new(); ix_arity];
1755
1756                // Apply the permutation by swapping on_names with ix_names.
1757                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1758                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1759                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1760                    std::mem::swap(on_name, ix_name);
1761                }
1762
1763                Some(ix_names) // Return the updated ix_names vector.
1764            }
1765            None => {
1766                let desc = self.state.try_get_desc_by_global_id(&id)?;
1767                let column_names = desc
1768                    .iter_names()
1769                    .map(|col_name| col_name.to_string())
1770                    .collect();
1771
1772                Some(column_names)
1773            }
1774        }
1775    }
1776
1777    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1778        let desc = self.state.try_get_desc_by_global_id(&id)?;
1779        Some(desc.get_name(column).to_string())
1780    }
1781
1782    fn id_exists(&self, id: GlobalId) -> bool {
1783        self.state.entry_by_global_id.contains_key(&id)
1784    }
1785}
1786
1787impl SessionCatalog for ConnCatalog<'_> {
1788    fn active_role_id(&self) -> &RoleId {
1789        &self.role_id
1790    }
1791
1792    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1793        self.prepared_statements
1794            .as_ref()
1795            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1796            .flatten()
1797    }
1798
1799    fn active_database(&self) -> Option<&DatabaseId> {
1800        self.database.as_ref()
1801    }
1802
1803    fn active_cluster(&self) -> &str {
1804        &self.cluster
1805    }
1806
1807    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1808        &self.search_path
1809    }
1810
1811    fn resolve_database(
1812        &self,
1813        database_name: &str,
1814    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1815        Ok(self.state.resolve_database(database_name)?)
1816    }
1817
1818    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1819        self.state
1820            .database_by_id
1821            .get(id)
1822            .expect("database doesn't exist")
1823    }
1824
1825    // `as` is ok to use to cast to a trait object.
1826    #[allow(clippy::as_conversions)]
1827    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1828        self.state
1829            .database_by_id
1830            .values()
1831            .map(|database| database as &dyn CatalogDatabase)
1832            .collect()
1833    }
1834
1835    fn resolve_schema(
1836        &self,
1837        database_name: Option<&str>,
1838        schema_name: &str,
1839    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1840        Ok(self.state.resolve_schema(
1841            self.database.as_ref(),
1842            database_name,
1843            schema_name,
1844            &self.conn_id,
1845        )?)
1846    }
1847
1848    fn resolve_schema_in_database(
1849        &self,
1850        database_spec: &ResolvedDatabaseSpecifier,
1851        schema_name: &str,
1852    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1853        Ok(self
1854            .state
1855            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1856    }
1857
1858    fn get_schema(
1859        &self,
1860        database_spec: &ResolvedDatabaseSpecifier,
1861        schema_spec: &SchemaSpecifier,
1862    ) -> &dyn CatalogSchema {
1863        self.state
1864            .get_schema(database_spec, schema_spec, &self.conn_id)
1865    }
1866
1867    // `as` is ok to use to cast to a trait object.
1868    #[allow(clippy::as_conversions)]
1869    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1870        self.get_databases()
1871            .into_iter()
1872            .flat_map(|database| database.schemas().into_iter())
1873            .chain(
1874                self.state
1875                    .ambient_schemas_by_id
1876                    .values()
1877                    .chain(self.state.temporary_schemas.values())
1878                    .map(|schema| schema as &dyn CatalogSchema),
1879            )
1880            .collect()
1881    }
1882
1883    fn get_mz_internal_schema_id(&self) -> SchemaId {
1884        self.state().get_mz_internal_schema_id()
1885    }
1886
1887    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1888        self.state().get_mz_unsafe_schema_id()
1889    }
1890
1891    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1892        self.state.is_system_schema_specifier(schema)
1893    }
1894
1895    fn resolve_role(
1896        &self,
1897        role_name: &str,
1898    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1899        match self.state.try_get_role_by_name(role_name) {
1900            Some(role) => Ok(role),
1901            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1902        }
1903    }
1904
1905    fn resolve_network_policy(
1906        &self,
1907        policy_name: &str,
1908    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1909        match self.state.try_get_network_policy_by_name(policy_name) {
1910            Some(policy) => Ok(policy),
1911            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1912        }
1913    }
1914
1915    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1916        Some(self.state.roles_by_id.get(id)?)
1917    }
1918
1919    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1920        self.state.get_role(id)
1921    }
1922
1923    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1924        // `as` is ok to use to cast to a trait object.
1925        #[allow(clippy::as_conversions)]
1926        self.state
1927            .roles_by_id
1928            .values()
1929            .map(|role| role as &dyn CatalogRole)
1930            .collect()
1931    }
1932
1933    fn mz_system_role_id(&self) -> RoleId {
1934        MZ_SYSTEM_ROLE_ID
1935    }
1936
1937    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1938        self.state.collect_role_membership(id)
1939    }
1940
1941    fn get_network_policy(
1942        &self,
1943        id: &NetworkPolicyId,
1944    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1945        self.state.get_network_policy(id)
1946    }
1947
1948    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1949        // `as` is ok to use to cast to a trait object.
1950        #[allow(clippy::as_conversions)]
1951        self.state
1952            .network_policies_by_id
1953            .values()
1954            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1955            .collect()
1956    }
1957
1958    fn resolve_cluster(
1959        &self,
1960        cluster_name: Option<&str>,
1961    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1962        Ok(self
1963            .state
1964            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1965    }
1966
1967    fn resolve_cluster_replica(
1968        &self,
1969        cluster_replica_name: &QualifiedReplica,
1970    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1971        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1972    }
1973
1974    fn resolve_item(
1975        &self,
1976        name: &PartialItemName,
1977    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1978        let r = self.state.resolve_entry(
1979            self.database.as_ref(),
1980            &self.effective_search_path(true),
1981            name,
1982            &self.conn_id,
1983        )?;
1984        if self.unresolvable_ids.contains(&r.id()) {
1985            Err(SqlCatalogError::UnknownItem(name.to_string()))
1986        } else {
1987            Ok(r)
1988        }
1989    }
1990
1991    fn resolve_function(
1992        &self,
1993        name: &PartialItemName,
1994    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1995        let r = self.state.resolve_function(
1996            self.database.as_ref(),
1997            &self.effective_search_path(false),
1998            name,
1999            &self.conn_id,
2000        )?;
2001
2002        if self.unresolvable_ids.contains(&r.id()) {
2003            Err(SqlCatalogError::UnknownFunction {
2004                name: name.to_string(),
2005                alternative: None,
2006            })
2007        } else {
2008            Ok(r)
2009        }
2010    }
2011
2012    fn resolve_type(
2013        &self,
2014        name: &PartialItemName,
2015    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2016        let r = self.state.resolve_type(
2017            self.database.as_ref(),
2018            &self.effective_search_path(false),
2019            name,
2020            &self.conn_id,
2021        )?;
2022
2023        if self.unresolvable_ids.contains(&r.id()) {
2024            Err(SqlCatalogError::UnknownType {
2025                name: name.to_string(),
2026            })
2027        } else {
2028            Ok(r)
2029        }
2030    }
2031
2032    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2033        self.state.get_system_type(name)
2034    }
2035
2036    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2037        Some(self.state.try_get_entry(id)?)
2038    }
2039
2040    fn try_get_item_by_global_id(
2041        &self,
2042        id: &GlobalId,
2043    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2044        let entry = self.state.try_get_entry_by_global_id(id)?;
2045        let entry = match &entry.item {
2046            CatalogItem::Table(table) => {
2047                let (version, _gid) = table
2048                    .collections
2049                    .iter()
2050                    .find(|(_version, gid)| *gid == id)
2051                    .expect("catalog out of sync, mismatched GlobalId");
2052                entry.at_version(RelationVersionSelector::Specific(*version))
2053            }
2054            _ => entry.at_version(RelationVersionSelector::Latest),
2055        };
2056        Some(entry)
2057    }
2058
2059    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2060        self.state.get_entry(id)
2061    }
2062
2063    fn get_item_by_global_id(
2064        &self,
2065        id: &GlobalId,
2066    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2067        let entry = self.state.get_entry_by_global_id(id);
2068        let entry = match &entry.item {
2069            CatalogItem::Table(table) => {
2070                let (version, _gid) = table
2071                    .collections
2072                    .iter()
2073                    .find(|(_version, gid)| *gid == id)
2074                    .expect("catalog out of sync, mismatched GlobalId");
2075                entry.at_version(RelationVersionSelector::Specific(*version))
2076            }
2077            _ => entry.at_version(RelationVersionSelector::Latest),
2078        };
2079        entry
2080    }
2081
2082    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2083        self.get_schemas()
2084            .into_iter()
2085            .flat_map(|schema| schema.item_ids())
2086            .map(|id| self.get_item(&id))
2087            .collect()
2088    }
2089
2090    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2091        self.state
2092            .get_item_by_name(name, &self.conn_id)
2093            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2094    }
2095
2096    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2097        self.state
2098            .get_type_by_name(name, &self.conn_id)
2099            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2100    }
2101
2102    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2103        &self.state.clusters_by_id[&id]
2104    }
2105
2106    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2107        self.state
2108            .clusters_by_id
2109            .values()
2110            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2111            .collect()
2112    }
2113
2114    fn get_cluster_replica(
2115        &self,
2116        cluster_id: ClusterId,
2117        replica_id: ReplicaId,
2118    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2119        let cluster = self.get_cluster(cluster_id);
2120        cluster.replica(replica_id)
2121    }
2122
2123    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2124        self.get_clusters()
2125            .into_iter()
2126            .flat_map(|cluster| cluster.replicas().into_iter())
2127            .collect()
2128    }
2129
2130    fn get_system_privileges(&self) -> &PrivilegeMap {
2131        &self.state.system_privileges
2132    }
2133
2134    fn get_default_privileges(
2135        &self,
2136    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2137        self.state
2138            .default_privileges
2139            .iter()
2140            .map(|(object, acl_items)| (object, acl_items.collect()))
2141            .collect()
2142    }
2143
2144    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2145        self.state.find_available_name(name, &self.conn_id)
2146    }
2147
2148    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2149        self.state.resolve_full_name(name, Some(&self.conn_id))
2150    }
2151
2152    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2153        self.state.resolve_full_schema_name(name)
2154    }
2155
2156    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2157        self.state.get_entry_by_global_id(global_id).id()
2158    }
2159
2160    fn resolve_global_id(
2161        &self,
2162        item_id: &CatalogItemId,
2163        version: RelationVersionSelector,
2164    ) -> GlobalId {
2165        self.state
2166            .get_entry(item_id)
2167            .at_version(version)
2168            .global_id()
2169    }
2170
2171    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2172        self.state.config()
2173    }
2174
2175    fn now(&self) -> EpochMillis {
2176        (self.state.config().now)()
2177    }
2178
2179    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2180        self.state.aws_privatelink_availability_zones.clone()
2181    }
2182
2183    fn system_vars(&self) -> &SystemVars {
2184        &self.state.system_configuration
2185    }
2186
2187    fn system_vars_mut(&mut self) -> &mut SystemVars {
2188        &mut self.state.to_mut().system_configuration
2189    }
2190
2191    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2192        self.state().get_owner_id(id, self.conn_id())
2193    }
2194
2195    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2196        match id {
2197            SystemObjectId::System => Some(&self.state.system_privileges),
2198            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2199                Some(self.get_cluster(*id).privileges())
2200            }
2201            SystemObjectId::Object(ObjectId::Database(id)) => {
2202                Some(self.get_database(id).privileges())
2203            }
2204            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2205                Some(self.get_schema(database_spec, schema_spec).privileges())
2206            }
2207            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2208            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2209                Some(self.get_network_policy(id).privileges())
2210            }
2211            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2212            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2213        }
2214    }
2215
2216    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2217        let mut seen = BTreeSet::new();
2218        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2219    }
2220
2221    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2222        let mut seen = BTreeSet::new();
2223        self.state.item_dependents(id, &mut seen)
2224    }
2225
2226    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2227        rbac::all_object_privileges(object_type)
2228    }
2229
2230    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2231        self.state.get_object_type(object_id)
2232    }
2233
2234    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2235        self.state.get_system_object_type(id)
2236    }
2237
2238    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2239    /// the object.
2240    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2241        let database_id = match &qualified_name.qualifiers.database_spec {
2242            ResolvedDatabaseSpecifier::Ambient => None,
2243            ResolvedDatabaseSpecifier::Id(id)
2244                if self.database.is_some() && self.database == Some(*id) =>
2245            {
2246                None
2247            }
2248            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2249        };
2250
2251        let schema_spec = if database_id.is_none()
2252            && self.resolve_item_name(&PartialItemName {
2253                database: None,
2254                schema: None,
2255                item: qualified_name.item.clone(),
2256            }) == Ok(qualified_name)
2257            || self.resolve_function_name(&PartialItemName {
2258                database: None,
2259                schema: None,
2260                item: qualified_name.item.clone(),
2261            }) == Ok(qualified_name)
2262            || self.resolve_type_name(&PartialItemName {
2263                database: None,
2264                schema: None,
2265                item: qualified_name.item.clone(),
2266            }) == Ok(qualified_name)
2267        {
2268            None
2269        } else {
2270            // If `search_path` does not contain `full_name.schema`, the
2271            // `PartialName` must contain it.
2272            Some(qualified_name.qualifiers.schema_spec.clone())
2273        };
2274
2275        let res = PartialItemName {
2276            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2277            schema: schema_spec.map(|spec| {
2278                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2279                    .name()
2280                    .schema
2281                    .clone()
2282            }),
2283            item: qualified_name.item.clone(),
2284        };
2285        assert!(
2286            self.resolve_item_name(&res) == Ok(qualified_name)
2287                || self.resolve_function_name(&res) == Ok(qualified_name)
2288                || self.resolve_type_name(&res) == Ok(qualified_name)
2289        );
2290        res
2291    }
2292
2293    fn add_notice(&self, notice: PlanNotice) {
2294        let _ = self.notices_tx.send(notice.into());
2295    }
2296
2297    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2298        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2299        self.state.comments.get_object_comments(comment_id)
2300    }
2301
2302    fn is_cluster_size_cc(&self, size: &str) -> bool {
2303        self.state
2304            .cluster_replica_sizes
2305            .0
2306            .get(size)
2307            .map_or(false, |a| a.is_cc)
2308    }
2309}
2310
2311#[cfg(test)]
2312mod tests {
2313    use std::collections::{BTreeMap, BTreeSet};
2314    use std::sync::Arc;
2315    use std::{env, iter};
2316
2317    use itertools::Itertools;
2318    use mz_catalog::memory::objects::CatalogItem;
2319    use tokio_postgres::NoTls;
2320    use tokio_postgres::types::Type;
2321    use uuid::Uuid;
2322
2323    use mz_catalog::SYSTEM_CONN_ID;
2324    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2325    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2326    use mz_controller_types::{ClusterId, ReplicaId};
2327    use mz_expr::MirScalarExpr;
2328    use mz_ore::now::to_datetime;
2329    use mz_ore::{assert_err, assert_ok, task};
2330    use mz_persist_client::PersistClient;
2331    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2332    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2333    use mz_repr::role_id::RoleId;
2334    use mz_repr::{
2335        CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2336        SqlScalarType, Timestamp,
2337    };
2338    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2339    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2340    use mz_sql::names::{
2341        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2342        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2343    };
2344    use mz_sql::plan::{
2345        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2346        Scope, StatementContext,
2347    };
2348    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2349    use mz_sql::session::vars::{SystemVars, VarInput};
2350
2351    use crate::catalog::state::LocalExpressionCache;
2352    use crate::catalog::{Catalog, Op};
2353    use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2354    use crate::session::Session;
2355
2356    /// System sessions have an empty `search_path` so it's necessary to
2357    /// schema-qualify all referenced items.
2358    ///
2359    /// Dummy (and ostensibly client) sessions contain system schemas in their
2360    /// search paths, so do not require schema qualification on system objects such
2361    /// as types.
2362    #[mz_ore::test(tokio::test)]
2363    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2364    async fn test_minimal_qualification() {
2365        Catalog::with_debug(|catalog| async move {
2366            struct TestCase {
2367                input: QualifiedItemName,
2368                system_output: PartialItemName,
2369                normal_output: PartialItemName,
2370            }
2371
2372            let test_cases = vec![
2373                TestCase {
2374                    input: QualifiedItemName {
2375                        qualifiers: ItemQualifiers {
2376                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2377                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2378                        },
2379                        item: "numeric".to_string(),
2380                    },
2381                    system_output: PartialItemName {
2382                        database: None,
2383                        schema: None,
2384                        item: "numeric".to_string(),
2385                    },
2386                    normal_output: PartialItemName {
2387                        database: None,
2388                        schema: None,
2389                        item: "numeric".to_string(),
2390                    },
2391                },
2392                TestCase {
2393                    input: QualifiedItemName {
2394                        qualifiers: ItemQualifiers {
2395                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2396                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2397                        },
2398                        item: "mz_array_types".to_string(),
2399                    },
2400                    system_output: PartialItemName {
2401                        database: None,
2402                        schema: None,
2403                        item: "mz_array_types".to_string(),
2404                    },
2405                    normal_output: PartialItemName {
2406                        database: None,
2407                        schema: None,
2408                        item: "mz_array_types".to_string(),
2409                    },
2410                },
2411            ];
2412
2413            for tc in test_cases {
2414                assert_eq!(
2415                    catalog
2416                        .for_system_session()
2417                        .minimal_qualification(&tc.input),
2418                    tc.system_output
2419                );
2420                assert_eq!(
2421                    catalog
2422                        .for_session(&Session::dummy())
2423                        .minimal_qualification(&tc.input),
2424                    tc.normal_output
2425                );
2426            }
2427            catalog.expire().await;
2428        })
2429        .await
2430    }
2431
2432    #[mz_ore::test(tokio::test)]
2433    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2434    async fn test_catalog_revision() {
2435        let persist_client = PersistClient::new_for_tests().await;
2436        let organization_id = Uuid::new_v4();
2437        let bootstrap_args = test_bootstrap_args();
2438        {
2439            let mut catalog = Catalog::open_debug_catalog(
2440                persist_client.clone(),
2441                organization_id.clone(),
2442                &bootstrap_args,
2443            )
2444            .await
2445            .expect("unable to open debug catalog");
2446            assert_eq!(catalog.transient_revision(), 1);
2447            let commit_ts = catalog.current_upper().await;
2448            catalog
2449                .transact(
2450                    None,
2451                    commit_ts,
2452                    None,
2453                    vec![Op::CreateDatabase {
2454                        name: "test".to_string(),
2455                        owner_id: MZ_SYSTEM_ROLE_ID,
2456                    }],
2457                )
2458                .await
2459                .expect("failed to transact");
2460            assert_eq!(catalog.transient_revision(), 2);
2461            catalog.expire().await;
2462        }
2463        {
2464            let catalog =
2465                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2466                    .await
2467                    .expect("unable to open debug catalog");
2468            // Re-opening the same catalog resets the transient_revision to 1.
2469            assert_eq!(catalog.transient_revision(), 1);
2470            catalog.expire().await;
2471        }
2472    }
2473
2474    #[mz_ore::test(tokio::test)]
2475    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2476    async fn test_effective_search_path() {
2477        Catalog::with_debug(|catalog| async move {
2478            let mz_catalog_schema = (
2479                ResolvedDatabaseSpecifier::Ambient,
2480                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2481            );
2482            let pg_catalog_schema = (
2483                ResolvedDatabaseSpecifier::Ambient,
2484                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2485            );
2486            let mz_temp_schema = (
2487                ResolvedDatabaseSpecifier::Ambient,
2488                SchemaSpecifier::Temporary,
2489            );
2490
2491            // Behavior with the default search_schema (public)
2492            let session = Session::dummy();
2493            let conn_catalog = catalog.for_session(&session);
2494            assert_ne!(
2495                conn_catalog.effective_search_path(false),
2496                conn_catalog.search_path
2497            );
2498            assert_ne!(
2499                conn_catalog.effective_search_path(true),
2500                conn_catalog.search_path
2501            );
2502            assert_eq!(
2503                conn_catalog.effective_search_path(false),
2504                vec![
2505                    mz_catalog_schema.clone(),
2506                    pg_catalog_schema.clone(),
2507                    conn_catalog.search_path[0].clone()
2508                ]
2509            );
2510            assert_eq!(
2511                conn_catalog.effective_search_path(true),
2512                vec![
2513                    mz_temp_schema.clone(),
2514                    mz_catalog_schema.clone(),
2515                    pg_catalog_schema.clone(),
2516                    conn_catalog.search_path[0].clone()
2517                ]
2518            );
2519
2520            // missing schemas are added when missing
2521            let mut session = Session::dummy();
2522            session
2523                .vars_mut()
2524                .set(
2525                    &SystemVars::new(),
2526                    "search_path",
2527                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2528                    false,
2529                )
2530                .expect("failed to set search_path");
2531            let conn_catalog = catalog.for_session(&session);
2532            assert_ne!(
2533                conn_catalog.effective_search_path(false),
2534                conn_catalog.search_path
2535            );
2536            assert_ne!(
2537                conn_catalog.effective_search_path(true),
2538                conn_catalog.search_path
2539            );
2540            assert_eq!(
2541                conn_catalog.effective_search_path(false),
2542                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2543            );
2544            assert_eq!(
2545                conn_catalog.effective_search_path(true),
2546                vec![
2547                    mz_temp_schema.clone(),
2548                    mz_catalog_schema.clone(),
2549                    pg_catalog_schema.clone()
2550                ]
2551            );
2552
2553            let mut session = Session::dummy();
2554            session
2555                .vars_mut()
2556                .set(
2557                    &SystemVars::new(),
2558                    "search_path",
2559                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2560                    false,
2561                )
2562                .expect("failed to set search_path");
2563            let conn_catalog = catalog.for_session(&session);
2564            assert_ne!(
2565                conn_catalog.effective_search_path(false),
2566                conn_catalog.search_path
2567            );
2568            assert_ne!(
2569                conn_catalog.effective_search_path(true),
2570                conn_catalog.search_path
2571            );
2572            assert_eq!(
2573                conn_catalog.effective_search_path(false),
2574                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2575            );
2576            assert_eq!(
2577                conn_catalog.effective_search_path(true),
2578                vec![
2579                    mz_temp_schema.clone(),
2580                    pg_catalog_schema.clone(),
2581                    mz_catalog_schema.clone()
2582                ]
2583            );
2584
2585            let mut session = Session::dummy();
2586            session
2587                .vars_mut()
2588                .set(
2589                    &SystemVars::new(),
2590                    "search_path",
2591                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2592                    false,
2593                )
2594                .expect("failed to set search_path");
2595            let conn_catalog = catalog.for_session(&session);
2596            assert_ne!(
2597                conn_catalog.effective_search_path(false),
2598                conn_catalog.search_path
2599            );
2600            assert_ne!(
2601                conn_catalog.effective_search_path(true),
2602                conn_catalog.search_path
2603            );
2604            assert_eq!(
2605                conn_catalog.effective_search_path(false),
2606                vec![
2607                    mz_catalog_schema.clone(),
2608                    pg_catalog_schema.clone(),
2609                    mz_temp_schema.clone()
2610                ]
2611            );
2612            assert_eq!(
2613                conn_catalog.effective_search_path(true),
2614                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2615            );
2616            catalog.expire().await;
2617        })
2618        .await
2619    }
2620
2621    #[mz_ore::test(tokio::test)]
2622    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2623    async fn test_normalized_create() {
2624        use mz_ore::collections::CollectionExt;
2625        Catalog::with_debug(|catalog| async move {
2626            let conn_catalog = catalog.for_system_session();
2627            let scx = &mut StatementContext::new(None, &conn_catalog);
2628
2629            let parsed = mz_sql_parser::parser::parse_statements(
2630                "create view public.foo as select 1 as bar",
2631            )
2632            .expect("")
2633            .into_element()
2634            .ast;
2635
2636            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2637
2638            // Ensure that all identifiers are quoted.
2639            assert_eq!(
2640                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2641                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2642            );
2643            catalog.expire().await;
2644        })
2645        .await;
2646    }
2647
2648    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2649    #[mz_ore::test(tokio::test)]
2650    #[cfg_attr(miri, ignore)] // slow
2651    async fn test_large_catalog_item() {
2652        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2653        let column = ", 1";
2654        let view_def_size = view_def.bytes().count();
2655        let column_size = column.bytes().count();
2656        let column_count =
2657            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2658        let columns = iter::repeat(column).take(column_count).join("");
2659        let create_sql = format!("{view_def}{columns})");
2660        let create_sql_check = create_sql.clone();
2661        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2662        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2663            &create_sql
2664        ));
2665
2666        let persist_client = PersistClient::new_for_tests().await;
2667        let organization_id = Uuid::new_v4();
2668        let id = CatalogItemId::User(1);
2669        let gid = GlobalId::User(1);
2670        let bootstrap_args = test_bootstrap_args();
2671        {
2672            let mut catalog = Catalog::open_debug_catalog(
2673                persist_client.clone(),
2674                organization_id.clone(),
2675                &bootstrap_args,
2676            )
2677            .await
2678            .expect("unable to open debug catalog");
2679            let item = catalog
2680                .state()
2681                .deserialize_item(
2682                    gid,
2683                    &create_sql,
2684                    &BTreeMap::new(),
2685                    &mut LocalExpressionCache::Closed,
2686                    None,
2687                )
2688                .expect("unable to parse view");
2689            let commit_ts = catalog.current_upper().await;
2690            catalog
2691                .transact(
2692                    None,
2693                    commit_ts,
2694                    None,
2695                    vec![Op::CreateItem {
2696                        item,
2697                        name: QualifiedItemName {
2698                            qualifiers: ItemQualifiers {
2699                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2700                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2701                            },
2702                            item: "v".to_string(),
2703                        },
2704                        id,
2705                        owner_id: MZ_SYSTEM_ROLE_ID,
2706                    }],
2707                )
2708                .await
2709                .expect("failed to transact");
2710            catalog.expire().await;
2711        }
2712        {
2713            let catalog =
2714                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2715                    .await
2716                    .expect("unable to open debug catalog");
2717            let view = catalog.get_entry(&id);
2718            assert_eq!("v", view.name.item);
2719            match &view.item {
2720                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2721                item => panic!("expected view, got {}", item.typ()),
2722            }
2723            catalog.expire().await;
2724        }
2725    }
2726
2727    #[mz_ore::test(tokio::test)]
2728    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2729    async fn test_object_type() {
2730        Catalog::with_debug(|catalog| async move {
2731            let conn_catalog = catalog.for_system_session();
2732
2733            assert_eq!(
2734                mz_sql::catalog::ObjectType::ClusterReplica,
2735                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2736                    ClusterId::user(1).expect("1 is a valid ID"),
2737                    ReplicaId::User(1)
2738                )))
2739            );
2740            assert_eq!(
2741                mz_sql::catalog::ObjectType::Role,
2742                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2743            );
2744            catalog.expire().await;
2745        })
2746        .await;
2747    }
2748
2749    #[mz_ore::test(tokio::test)]
2750    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2751    async fn test_get_privileges() {
2752        Catalog::with_debug(|catalog| async move {
2753            let conn_catalog = catalog.for_system_session();
2754
2755            assert_eq!(
2756                None,
2757                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2758                    ClusterId::user(1).expect("1 is a valid ID"),
2759                    ReplicaId::User(1),
2760                ))))
2761            );
2762            assert_eq!(
2763                None,
2764                conn_catalog
2765                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2766            );
2767            catalog.expire().await;
2768        })
2769        .await;
2770    }
2771
2772    #[mz_ore::test(tokio::test)]
2773    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2774    async fn verify_builtin_descs() {
2775        Catalog::with_debug(|catalog| async move {
2776            let conn_catalog = catalog.for_system_session();
2777
2778            let builtins_cfg = BuiltinsConfig {
2779                include_continual_tasks: true,
2780            };
2781            for builtin in BUILTINS::iter(&builtins_cfg) {
2782                let (schema, name, expected_desc) = match builtin {
2783                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2784                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2785                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2786                    Builtin::Log(_)
2787                    | Builtin::Type(_)
2788                    | Builtin::Func(_)
2789                    | Builtin::ContinualTask(_)
2790                    | Builtin::Index(_)
2791                    | Builtin::Connection(_) => continue,
2792                };
2793                let item = conn_catalog
2794                    .resolve_item(&PartialItemName {
2795                        database: None,
2796                        schema: Some(schema.to_string()),
2797                        item: name.to_string(),
2798                    })
2799                    .expect("unable to resolve item")
2800                    .at_version(RelationVersionSelector::Latest);
2801
2802                let full_name = conn_catalog.resolve_full_name(item.name());
2803                let actual_desc = item.desc(&full_name).expect("invalid item type");
2804                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2805                    actual_desc.iter().zip(expected_desc.iter()).enumerate()
2806                {
2807                    assert_eq!(
2808                        actual_name, expected_name,
2809                        "item {schema}.{name} column {index} name did not match its expected name"
2810                    );
2811                    assert_eq!(
2812                        actual_typ, expected_typ,
2813                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2814                    );
2815                }
2816                assert_eq!(
2817                    &*actual_desc, expected_desc,
2818                    "item {schema}.{name} did not match its expected RelationDesc"
2819                );
2820            }
2821            catalog.expire().await;
2822        })
2823        .await
2824    }
2825
2826    // Connect to a running Postgres server and verify that our builtin
2827    // types and functions match it, in addition to some other things.
2828    #[mz_ore::test(tokio::test)]
2829    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2830    async fn test_compare_builtins_postgres() {
2831        async fn inner(catalog: Catalog) {
2832            // Verify that all builtin functions:
2833            // - have a unique OID
2834            // - if they have a postgres counterpart (same oid) then they have matching name
2835            let (client, connection) = tokio_postgres::connect(
2836                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2837                NoTls,
2838            )
2839            .await
2840            .expect("failed to connect to Postgres");
2841
2842            task::spawn(|| "compare_builtin_postgres", async move {
2843                if let Err(e) = connection.await {
2844                    panic!("connection error: {}", e);
2845                }
2846            });
2847
2848            struct PgProc {
2849                name: String,
2850                arg_oids: Vec<u32>,
2851                ret_oid: Option<u32>,
2852                ret_set: bool,
2853            }
2854
2855            struct PgType {
2856                name: String,
2857                ty: String,
2858                elem: u32,
2859                array: u32,
2860                input: u32,
2861                receive: u32,
2862            }
2863
2864            struct PgOper {
2865                oprresult: u32,
2866                name: String,
2867            }
2868
2869            let pg_proc: BTreeMap<_, _> = client
2870                .query(
2871                    "SELECT
2872                    p.oid,
2873                    proname,
2874                    proargtypes,
2875                    prorettype,
2876                    proretset
2877                FROM pg_proc p
2878                JOIN pg_namespace n ON p.pronamespace = n.oid",
2879                    &[],
2880                )
2881                .await
2882                .expect("pg query failed")
2883                .into_iter()
2884                .map(|row| {
2885                    let oid: u32 = row.get("oid");
2886                    let pg_proc = PgProc {
2887                        name: row.get("proname"),
2888                        arg_oids: row.get("proargtypes"),
2889                        ret_oid: row.get("prorettype"),
2890                        ret_set: row.get("proretset"),
2891                    };
2892                    (oid, pg_proc)
2893                })
2894                .collect();
2895
2896            let pg_type: BTreeMap<_, _> = client
2897                .query(
2898                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2899                    &[],
2900                )
2901                .await
2902                .expect("pg query failed")
2903                .into_iter()
2904                .map(|row| {
2905                    let oid: u32 = row.get("oid");
2906                    let pg_type = PgType {
2907                        name: row.get("typname"),
2908                        ty: row.get("typtype"),
2909                        elem: row.get("typelem"),
2910                        array: row.get("typarray"),
2911                        input: row.get("typinput"),
2912                        receive: row.get("typreceive"),
2913                    };
2914                    (oid, pg_type)
2915                })
2916                .collect();
2917
2918            let pg_oper: BTreeMap<_, _> = client
2919                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2920                .await
2921                .expect("pg query failed")
2922                .into_iter()
2923                .map(|row| {
2924                    let oid: u32 = row.get("oid");
2925                    let pg_oper = PgOper {
2926                        name: row.get("oprname"),
2927                        oprresult: row.get("oprresult"),
2928                    };
2929                    (oid, pg_oper)
2930                })
2931                .collect();
2932
2933            let conn_catalog = catalog.for_system_session();
2934            let resolve_type_oid = |item: &str| {
2935                conn_catalog
2936                    .resolve_type(&PartialItemName {
2937                        database: None,
2938                        // All functions we check exist in PG, so the types must, as
2939                        // well
2940                        schema: Some(PG_CATALOG_SCHEMA.into()),
2941                        item: item.to_string(),
2942                    })
2943                    .expect("unable to resolve type")
2944                    .oid()
2945            };
2946
2947            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2948                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2949                .collect();
2950
2951            let mut all_oids = BTreeSet::new();
2952
2953            // A function to determine if two oids are equivalent enough for these tests. We don't
2954            // support some types, so map exceptions here.
2955            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2956                [
2957                    // We don't support NAME.
2958                    (Type::NAME, Type::TEXT),
2959                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2960                    // We don't support time with time zone.
2961                    (Type::TIME, Type::TIMETZ),
2962                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2963                ]
2964                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2965            );
2966            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2967                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2968            ]);
2969            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2970                if ignore_return_types.contains(&fn_oid) {
2971                    return true;
2972                }
2973                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2974                    return true;
2975                }
2976                a == b
2977            };
2978
2979            let builtins_cfg = BuiltinsConfig {
2980                include_continual_tasks: true,
2981            };
2982            for builtin in BUILTINS::iter(&builtins_cfg) {
2983                match builtin {
2984                    Builtin::Type(ty) => {
2985                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2986
2987                        if ty.oid >= FIRST_MATERIALIZE_OID {
2988                            // High OIDs are reserved in Materialize and don't have
2989                            // PostgreSQL counterparts.
2990                            continue;
2991                        }
2992
2993                        // For types that have a PostgreSQL counterpart, verify that
2994                        // the name and oid match.
2995                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2996                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2997                        });
2998                        assert_eq!(
2999                            ty.name, pg_ty.name,
3000                            "oid {} has name {} in postgres; expected {}",
3001                            ty.oid, pg_ty.name, ty.name,
3002                        );
3003
3004                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3005                            None => (0, 0),
3006                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3007                        };
3008                        assert_eq!(
3009                            typinput_oid, pg_ty.input,
3010                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3011                            ty.name, typinput_oid, pg_ty.input,
3012                        );
3013                        assert_eq!(
3014                            typreceive_oid, pg_ty.receive,
3015                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3016                            ty.name, typreceive_oid, pg_ty.receive,
3017                        );
3018                        if typinput_oid != 0 {
3019                            assert!(
3020                                func_oids.contains(&typinput_oid),
3021                                "type {} has typinput OID {} that does not exist in pg_proc",
3022                                ty.name,
3023                                typinput_oid,
3024                            );
3025                        }
3026                        if typreceive_oid != 0 {
3027                            assert!(
3028                                func_oids.contains(&typreceive_oid),
3029                                "type {} has typreceive OID {} that does not exist in pg_proc",
3030                                ty.name,
3031                                typreceive_oid,
3032                            );
3033                        }
3034
3035                        // Ensure the type matches.
3036                        match &ty.details.typ {
3037                            CatalogType::Array { element_reference } => {
3038                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3039                                    .filter_map(|builtin| match builtin {
3040                                        Builtin::Type(ty @ BuiltinType { name, .. })
3041                                            if element_reference == name =>
3042                                        {
3043                                            Some(ty)
3044                                        }
3045                                        _ => None,
3046                                    })
3047                                    .next();
3048                                let elem_ty = match elem_ty {
3049                                    Some(ty) => ty,
3050                                    None => {
3051                                        panic!("{} is unexpectedly not a type", element_reference)
3052                                    }
3053                                };
3054                                assert_eq!(
3055                                    pg_ty.elem, elem_ty.oid,
3056                                    "type {} has mismatched element OIDs",
3057                                    ty.name
3058                                )
3059                            }
3060                            CatalogType::Pseudo => {
3061                                assert_eq!(
3062                                    pg_ty.ty, "p",
3063                                    "type {} is not a pseudo type as expected",
3064                                    ty.name
3065                                )
3066                            }
3067                            CatalogType::Range { .. } => {
3068                                assert_eq!(
3069                                    pg_ty.ty, "r",
3070                                    "type {} is not a range type as expected",
3071                                    ty.name
3072                                );
3073                            }
3074                            _ => {
3075                                assert_eq!(
3076                                    pg_ty.ty, "b",
3077                                    "type {} is not a base type as expected",
3078                                    ty.name
3079                                )
3080                            }
3081                        }
3082
3083                        // Ensure the array type reference is correct.
3084                        let schema = catalog
3085                            .resolve_schema_in_database(
3086                                &ResolvedDatabaseSpecifier::Ambient,
3087                                ty.schema,
3088                                &SYSTEM_CONN_ID,
3089                            )
3090                            .expect("unable to resolve schema");
3091                        let allocated_type = catalog
3092                            .resolve_type(
3093                                None,
3094                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3095                                &PartialItemName {
3096                                    database: None,
3097                                    schema: Some(schema.name().schema.clone()),
3098                                    item: ty.name.to_string(),
3099                                },
3100                                &SYSTEM_CONN_ID,
3101                            )
3102                            .expect("unable to resolve type");
3103                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3104                            ty
3105                        } else {
3106                            panic!("unexpectedly not a type")
3107                        };
3108                        match ty.details.array_id {
3109                            Some(array_id) => {
3110                                let array_ty = catalog.get_entry(&array_id);
3111                                assert_eq!(
3112                                    pg_ty.array, array_ty.oid,
3113                                    "type {} has mismatched array OIDs",
3114                                    allocated_type.name.item,
3115                                );
3116                            }
3117                            None => assert_eq!(
3118                                pg_ty.array, 0,
3119                                "type {} does not have an array type in mz but does in pg",
3120                                allocated_type.name.item,
3121                            ),
3122                        }
3123                    }
3124                    Builtin::Func(func) => {
3125                        for imp in func.inner.func_impls() {
3126                            assert!(
3127                                all_oids.insert(imp.oid),
3128                                "{} reused oid {}",
3129                                func.name,
3130                                imp.oid
3131                            );
3132
3133                            assert!(
3134                                imp.oid < FIRST_USER_OID,
3135                                "built-in function {} erroneously has OID in user space ({})",
3136                                func.name,
3137                                imp.oid,
3138                            );
3139
3140                            // For functions that have a postgres counterpart, verify that the name and
3141                            // oid match.
3142                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3143                                continue;
3144                            } else {
3145                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3146                                    panic!(
3147                                        "pg_proc missing function {}: oid {}",
3148                                        func.name, imp.oid
3149                                    )
3150                                })
3151                            };
3152                            assert_eq!(
3153                                func.name, pg_fn.name,
3154                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3155                                imp.oid, func.name, pg_fn.name
3156                            );
3157
3158                            // Complain, but don't fail, if argument oids don't match.
3159                            // TODO: make these match.
3160                            let imp_arg_oids = imp
3161                                .arg_typs
3162                                .iter()
3163                                .map(|item| resolve_type_oid(item))
3164                                .collect::<Vec<_>>();
3165
3166                            if imp_arg_oids != pg_fn.arg_oids {
3167                                println!(
3168                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3169                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3170                                );
3171                            }
3172
3173                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3174
3175                            assert!(
3176                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3177                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3178                                imp.oid,
3179                                func.name,
3180                                imp_return_oid,
3181                                pg_fn.ret_oid
3182                            );
3183
3184                            assert_eq!(
3185                                imp.return_is_set, pg_fn.ret_set,
3186                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3187                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3188                            );
3189                        }
3190                    }
3191                    _ => (),
3192                }
3193            }
3194
3195            for (op, func) in OP_IMPLS.iter() {
3196                for imp in func.func_impls() {
3197                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3198
3199                    // For operators that have a postgres counterpart, verify that the name and oid match.
3200                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3201                        continue;
3202                    } else {
3203                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3204                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3205                        })
3206                    };
3207
3208                    assert_eq!(*op, pg_op.name);
3209
3210                    let imp_return_oid =
3211                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3212                    if imp_return_oid != pg_op.oprresult {
3213                        panic!(
3214                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3215                            imp.oid, op, imp_return_oid, pg_op.oprresult
3216                        );
3217                    }
3218                }
3219            }
3220            catalog.expire().await;
3221        }
3222
3223        Catalog::with_debug(inner).await
3224    }
3225
3226    // Execute all builtin functions with all combinations of arguments from interesting datums.
3227    #[mz_ore::test(tokio::test)]
3228    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3229    async fn test_smoketest_all_builtins() {
3230        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3231            let catalog = Arc::new(catalog);
3232            let conn_catalog = catalog.for_system_session();
3233
3234            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3235            let mut handles = Vec::new();
3236
3237            // Extracted during planning; always panics when executed.
3238            let ignore_names = BTreeSet::from([
3239                "avg",
3240                "avg_internal_v1",
3241                "bool_and",
3242                "bool_or",
3243                "has_table_privilege", // > 3 s each
3244                "has_type_privilege",  // > 3 s each
3245                "mod",
3246                "mz_panic",
3247                "mz_sleep",
3248                "pow",
3249                "stddev_pop",
3250                "stddev_samp",
3251                "stddev",
3252                "var_pop",
3253                "var_samp",
3254                "variance",
3255            ]);
3256
3257            let fns = BUILTINS::funcs()
3258                .map(|func| (&func.name, func.inner))
3259                .chain(OP_IMPLS.iter());
3260
3261            for (name, func) in fns {
3262                if ignore_names.contains(name) {
3263                    continue;
3264                }
3265                let Func::Scalar(impls) = func else {
3266                    continue;
3267                };
3268
3269                'outer: for imp in impls {
3270                    let details = imp.details();
3271                    let mut styps = Vec::new();
3272                    for item in details.arg_typs.iter() {
3273                        let oid = resolve_type_oid(item);
3274                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3275                            continue 'outer;
3276                        };
3277                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3278                    }
3279                    let datums = styps
3280                        .iter()
3281                        .map(|styp| {
3282                            let mut datums = vec![Datum::Null];
3283                            datums.extend(styp.interesting_datums());
3284                            datums
3285                        })
3286                        .collect::<Vec<_>>();
3287                    // Skip nullary fns.
3288                    if datums.is_empty() {
3289                        continue;
3290                    }
3291
3292                    let return_oid = details
3293                        .return_typ
3294                        .map(resolve_type_oid)
3295                        .expect("must exist");
3296                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3297                        .ok()
3298                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3299
3300                    let mut idxs = vec![0; datums.len()];
3301                    while idxs[0] < datums[0].len() {
3302                        let mut args = Vec::with_capacity(idxs.len());
3303                        for i in 0..(datums.len()) {
3304                            args.push(datums[i][idxs[i]]);
3305                        }
3306
3307                        let op = &imp.op;
3308                        let scalars = args
3309                            .iter()
3310                            .enumerate()
3311                            .map(|(i, datum)| {
3312                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3313                                    datum.clone(),
3314                                    styps[i].clone(),
3315                                ))
3316                            })
3317                            .collect();
3318
3319                        let call_name = format!(
3320                            "{name}({}) (oid: {})",
3321                            args.iter()
3322                                .map(|d| d.to_string())
3323                                .collect::<Vec<_>>()
3324                                .join(", "),
3325                            imp.oid
3326                        );
3327                        let catalog = Arc::clone(&catalog);
3328                        let call_name_fn = call_name.clone();
3329                        let return_styp = return_styp.clone();
3330                        let handle = task::spawn_blocking(
3331                            || call_name,
3332                            move || {
3333                                smoketest_fn(
3334                                    name,
3335                                    call_name_fn,
3336                                    op,
3337                                    imp,
3338                                    args,
3339                                    catalog,
3340                                    scalars,
3341                                    return_styp,
3342                                )
3343                            },
3344                        );
3345                        handles.push(handle);
3346
3347                        // Advance to the next datum combination.
3348                        for i in (0..datums.len()).rev() {
3349                            idxs[i] += 1;
3350                            if idxs[i] >= datums[i].len() {
3351                                if i == 0 {
3352                                    break;
3353                                }
3354                                idxs[i] = 0;
3355                                continue;
3356                            } else {
3357                                break;
3358                            }
3359                        }
3360                    }
3361                }
3362            }
3363            handles
3364        }
3365
3366        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3367        for handle in handles {
3368            handle.await.expect("must succeed");
3369        }
3370    }
3371
3372    fn smoketest_fn(
3373        name: &&str,
3374        call_name: String,
3375        op: &Operation<HirScalarExpr>,
3376        imp: &FuncImpl<HirScalarExpr>,
3377        args: Vec<Datum<'_>>,
3378        catalog: Arc<Catalog>,
3379        scalars: Vec<CoercibleScalarExpr>,
3380        return_styp: Option<SqlScalarType>,
3381    ) {
3382        let conn_catalog = catalog.for_system_session();
3383        let pcx = PlanContext::zero();
3384        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3385        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3386        let ecx = ExprContext {
3387            qcx: &qcx,
3388            name: "smoketest",
3389            scope: &Scope::empty(),
3390            relation_type: &SqlRelationType::empty(),
3391            allow_aggregates: false,
3392            allow_subqueries: false,
3393            allow_parameters: false,
3394            allow_windows: false,
3395        };
3396        let arena = RowArena::new();
3397        let mut session = Session::<Timestamp>::dummy();
3398        session
3399            .start_transaction(to_datetime(0), None, None)
3400            .expect("must succeed");
3401        let prep_style = ExprPrepStyle::OneShot {
3402            logical_time: EvalTime::Time(Timestamp::MIN),
3403            session: &session,
3404            catalog_state: &catalog.state,
3405        };
3406
3407        // Execute the function as much as possible, ensuring no panics occur, but
3408        // otherwise ignoring eval errors. We also do various other checks.
3409        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3410        if let Ok(hir) = res {
3411            if let Ok(mut mir) = hir.lower_uncorrelated() {
3412                // Populate unmaterialized functions.
3413                prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3414
3415                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3416                    if let Some(return_styp) = return_styp {
3417                        let mir_typ = mir.typ(&[]);
3418                        // MIR type inference should be consistent with the type
3419                        // we get from the catalog.
3420                        assert_eq!(mir_typ.scalar_type, return_styp);
3421                        // The following will check not just that the scalar type
3422                        // is ok, but also catches if the function returned a null
3423                        // but the MIR type inference said "non-nullable".
3424                        if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3425                            panic!(
3426                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3427                            );
3428                        }
3429                        // Check the consistency of `introduces_nulls` and
3430                        // `propagates_nulls` with `MirScalarExpr::typ`.
3431                        if let Some((introduces_nulls, propagates_nulls)) =
3432                            call_introduces_propagates_nulls(&mir)
3433                        {
3434                            if introduces_nulls {
3435                                // If the function introduces_nulls, then the return
3436                                // type should always be nullable, regardless of
3437                                // the nullability of the input types.
3438                                assert!(
3439                                    mir_typ.nullable,
3440                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3441                                    name, args, mir, mir_typ.nullable
3442                                );
3443                            } else {
3444                                let any_input_null = args.iter().any(|arg| arg.is_null());
3445                                if !any_input_null {
3446                                    assert!(
3447                                        !mir_typ.nullable,
3448                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3449                                        name, args, mir, mir_typ.nullable
3450                                    );
3451                                } else {
3452                                    assert_eq!(
3453                                        mir_typ.nullable, propagates_nulls,
3454                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3455                                        name, args, mir, mir_typ.nullable
3456                                    );
3457                                }
3458                            }
3459                        }
3460                        // Check that `MirScalarExpr::reduce` yields the same result
3461                        // as the real evaluation.
3462                        let mut reduced = mir.clone();
3463                        reduced.reduce(&[]);
3464                        match reduced {
3465                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3466                                match reduce_result {
3467                                    Ok(reduce_result_row) => {
3468                                        let reduce_result_datum = reduce_result_row.unpack_first();
3469                                        assert_eq!(
3470                                            reduce_result_datum,
3471                                            eval_result_datum,
3472                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3473                                            name,
3474                                            args,
3475                                            mir,
3476                                            eval_result_datum,
3477                                            mir_typ.scalar_type,
3478                                            reduce_result_datum,
3479                                            ctyp.scalar_type
3480                                        );
3481                                        // Let's check that the types also match.
3482                                        // (We are not checking nullability here,
3483                                        // because it's ok when we know a more
3484                                        // precise nullability after actually
3485                                        // evaluating a function than before.)
3486                                        assert_eq!(
3487                                            ctyp.scalar_type,
3488                                            mir_typ.scalar_type,
3489                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3490                                            name,
3491                                            args,
3492                                            mir,
3493                                            eval_result_datum,
3494                                            mir_typ.scalar_type,
3495                                            reduce_result_datum,
3496                                            ctyp.scalar_type
3497                                        );
3498                                    }
3499                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3500                                }
3501                            }
3502                            _ => unreachable!(
3503                                "all args are literals, so should have reduced to a literal"
3504                            ),
3505                        }
3506                    }
3507                }
3508            }
3509        }
3510    }
3511
3512    /// If the given MirScalarExpr
3513    ///  - is a function call, and
3514    ///  - all arguments are literals
3515    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3516    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3517        match mir_func_call {
3518            MirScalarExpr::CallUnary { func, expr } => {
3519                if expr.is_literal() {
3520                    Some((func.introduces_nulls(), func.propagates_nulls()))
3521                } else {
3522                    None
3523                }
3524            }
3525            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3526                if expr1.is_literal() && expr2.is_literal() {
3527                    Some((func.introduces_nulls(), func.propagates_nulls()))
3528                } else {
3529                    None
3530                }
3531            }
3532            MirScalarExpr::CallVariadic { func, exprs } => {
3533                if exprs.iter().all(|arg| arg.is_literal()) {
3534                    Some((func.introduces_nulls(), func.propagates_nulls()))
3535                } else {
3536                    None
3537                }
3538            }
3539            _ => None,
3540        }
3541    }
3542
3543    // Make sure pg views don't use types that only exist in Materialize.
3544    #[mz_ore::test(tokio::test)]
3545    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3546    async fn test_pg_views_forbidden_types() {
3547        Catalog::with_debug(|catalog| async move {
3548            let conn_catalog = catalog.for_system_session();
3549
3550            for view in BUILTINS::views().filter(|view| {
3551                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3552            }) {
3553                let item = conn_catalog
3554                    .resolve_item(&PartialItemName {
3555                        database: None,
3556                        schema: Some(view.schema.to_string()),
3557                        item: view.name.to_string(),
3558                    })
3559                    .expect("unable to resolve view")
3560                    // TODO(alter_table)
3561                    .at_version(RelationVersionSelector::Latest);
3562                let full_name = conn_catalog.resolve_full_name(item.name());
3563                for col_type in item
3564                    .desc(&full_name)
3565                    .expect("invalid item type")
3566                    .iter_types()
3567                {
3568                    match &col_type.scalar_type {
3569                        typ @ SqlScalarType::UInt16
3570                        | typ @ SqlScalarType::UInt32
3571                        | typ @ SqlScalarType::UInt64
3572                        | typ @ SqlScalarType::MzTimestamp
3573                        | typ @ SqlScalarType::List { .. }
3574                        | typ @ SqlScalarType::Map { .. }
3575                        | typ @ SqlScalarType::MzAclItem => {
3576                            panic!("{typ:?} type found in {full_name}");
3577                        }
3578                        SqlScalarType::AclItem
3579                        | SqlScalarType::Bool
3580                        | SqlScalarType::Int16
3581                        | SqlScalarType::Int32
3582                        | SqlScalarType::Int64
3583                        | SqlScalarType::Float32
3584                        | SqlScalarType::Float64
3585                        | SqlScalarType::Numeric { .. }
3586                        | SqlScalarType::Date
3587                        | SqlScalarType::Time
3588                        | SqlScalarType::Timestamp { .. }
3589                        | SqlScalarType::TimestampTz { .. }
3590                        | SqlScalarType::Interval
3591                        | SqlScalarType::PgLegacyChar
3592                        | SqlScalarType::Bytes
3593                        | SqlScalarType::String
3594                        | SqlScalarType::Char { .. }
3595                        | SqlScalarType::VarChar { .. }
3596                        | SqlScalarType::Jsonb
3597                        | SqlScalarType::Uuid
3598                        | SqlScalarType::Array(_)
3599                        | SqlScalarType::Record { .. }
3600                        | SqlScalarType::Oid
3601                        | SqlScalarType::RegProc
3602                        | SqlScalarType::RegType
3603                        | SqlScalarType::RegClass
3604                        | SqlScalarType::Int2Vector
3605                        | SqlScalarType::Range { .. }
3606                        | SqlScalarType::PgLegacyName => {}
3607                    }
3608                }
3609            }
3610            catalog.expire().await;
3611        })
3612        .await
3613    }
3614
3615    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3616    // introspection relations.
3617    #[mz_ore::test(tokio::test)]
3618    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3619    async fn test_mz_introspection_builtins() {
3620        Catalog::with_debug(|catalog| async move {
3621            let conn_catalog = catalog.for_system_session();
3622
3623            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3624            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3625
3626            for entry in catalog.entries() {
3627                let schema_spec = entry.name().qualifiers.schema_spec;
3628                let introspection_deps = catalog.introspection_dependencies(entry.id);
3629                if introspection_deps.is_empty() {
3630                    assert!(
3631                        schema_spec != introspection_schema_spec,
3632                        "entry does not depend on introspection sources but is in \
3633                         `mz_introspection`: {}",
3634                        conn_catalog.resolve_full_name(entry.name()),
3635                    );
3636                } else {
3637                    assert!(
3638                        schema_spec == introspection_schema_spec,
3639                        "entry depends on introspection sources but is not in \
3640                         `mz_introspection`: {}",
3641                        conn_catalog.resolve_full_name(entry.name()),
3642                    );
3643                }
3644            }
3645        })
3646        .await
3647    }
3648
3649    #[mz_ore::test(tokio::test)]
3650    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3651    async fn test_multi_subscriber_catalog() {
3652        let persist_client = PersistClient::new_for_tests().await;
3653        let bootstrap_args = test_bootstrap_args();
3654        let organization_id = Uuid::new_v4();
3655        let db_name = "DB";
3656
3657        let mut writer_catalog = Catalog::open_debug_catalog(
3658            persist_client.clone(),
3659            organization_id.clone(),
3660            &bootstrap_args,
3661        )
3662        .await
3663        .expect("open_debug_catalog");
3664        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3665            persist_client.clone(),
3666            organization_id.clone(),
3667            &bootstrap_args,
3668        )
3669        .await
3670        .expect("open_debug_read_only_catalog");
3671        assert_err!(writer_catalog.resolve_database(db_name));
3672        assert_err!(read_only_catalog.resolve_database(db_name));
3673
3674        let commit_ts = writer_catalog.current_upper().await;
3675        writer_catalog
3676            .transact(
3677                None,
3678                commit_ts,
3679                None,
3680                vec![Op::CreateDatabase {
3681                    name: db_name.to_string(),
3682                    owner_id: MZ_SYSTEM_ROLE_ID,
3683                }],
3684            )
3685            .await
3686            .expect("failed to transact");
3687
3688        let write_db = writer_catalog
3689            .resolve_database(db_name)
3690            .expect("resolve_database");
3691        read_only_catalog
3692            .sync_to_current_updates()
3693            .await
3694            .expect("sync_to_current_updates");
3695        let read_db = read_only_catalog
3696            .resolve_database(db_name)
3697            .expect("resolve_database");
3698
3699        assert_eq!(write_db, read_db);
3700
3701        let writer_catalog_fencer =
3702            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3703                .await
3704                .expect("open_debug_catalog for fencer");
3705        let fencer_db = writer_catalog_fencer
3706            .resolve_database(db_name)
3707            .expect("resolve_database for fencer");
3708        assert_eq!(fencer_db, read_db);
3709
3710        let write_fence_err = writer_catalog
3711            .sync_to_current_updates()
3712            .await
3713            .expect_err("sync_to_current_updates for fencer");
3714        assert!(matches!(
3715            write_fence_err,
3716            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3717        ));
3718        let read_fence_err = read_only_catalog
3719            .sync_to_current_updates()
3720            .await
3721            .expect_err("sync_to_current_updates after fencer");
3722        assert!(matches!(
3723            read_fence_err,
3724            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3725        ));
3726
3727        writer_catalog.expire().await;
3728        read_only_catalog.expire().await;
3729        writer_catalog_fencer.expire().await;
3730    }
3731}