mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67    CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68    DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91// DO NOT add any more imports from `crate` outside of `crate::catalog`.
92pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    plans: CatalogPlans,
140    expr_cache_handle: Option<ExpressionCacheHandle>,
141    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142    transient_revision: u64,
143}
144
145// Implement our own Clone because derive can't unless S is Clone, which it's
146// not (hence the Arc).
147impl Clone for Catalog {
148    fn clone(&self) -> Self {
149        Self {
150            state: self.state.clone(),
151            plans: self.plans.clone(),
152            expr_cache_handle: self.expr_cache_handle.clone(),
153            storage: Arc::clone(&self.storage),
154            transient_revision: self.transient_revision,
155        }
156    }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168    /// Set the optimized plan for the item identified by `id`.
169    #[mz_ore::instrument(level = "trace")]
170    pub fn set_optimized_plan(
171        &mut self,
172        id: GlobalId,
173        plan: DataflowDescription<OptimizedMirRelationExpr>,
174    ) {
175        self.plans.optimized_plan_by_id.insert(id, plan.into());
176    }
177
178    /// Set the physical plan for the item identified by `id`.
179    #[mz_ore::instrument(level = "trace")]
180    pub fn set_physical_plan(
181        &mut self,
182        id: GlobalId,
183        plan: DataflowDescription<mz_compute_types::plan::Plan>,
184    ) {
185        self.plans.physical_plan_by_id.insert(id, plan.into());
186    }
187
188    /// Try to get the optimized plan for the item identified by `id`.
189    #[mz_ore::instrument(level = "trace")]
190    pub fn try_get_optimized_plan(
191        &self,
192        id: &GlobalId,
193    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195    }
196
197    /// Try to get the physical plan for the item identified by `id`.
198    #[mz_ore::instrument(level = "trace")]
199    pub fn try_get_physical_plan(
200        &self,
201        id: &GlobalId,
202    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    #[mz_ore::instrument(level = "trace")]
208    pub fn set_dataflow_metainfo(
209        &mut self,
210        id: GlobalId,
211        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212    ) {
213        // Add entries to the `notices_by_dep_id` lookup map.
214        for notice in metainfo.optimizer_notices.iter() {
215            for dep_id in notice.dependencies.iter() {
216                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217                entry.push(Arc::clone(notice))
218            }
219            if let Some(item_id) = notice.item_id {
220                soft_assert_eq_or_log!(
221                    item_id,
222                    id,
223                    "notice.item_id should match the id for whom we are saving the notice"
224                );
225            }
226        }
227        // Add the dataflow with the scoped entries.
228        self.plans.dataflow_metainfos.insert(id, metainfo);
229    }
230
231    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
232    #[mz_ore::instrument(level = "trace")]
233    pub fn try_get_dataflow_metainfo(
234        &self,
235        id: &GlobalId,
236    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237        self.plans.dataflow_metainfos.get(id)
238    }
239
240    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
241    /// item identified by `id`.
242    ///
243    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
244    ///
245    /// Return a set containing all dropped notices. Note that if for some
246    /// reason we end up with two identical notices being dropped by the same
247    /// call, the result will contain only one instance of that notice.
248    #[mz_ore::instrument(level = "trace")]
249    pub fn drop_plans_and_metainfos(
250        &mut self,
251        drop_ids: &BTreeSet<GlobalId>,
252    ) -> BTreeSet<Arc<OptimizerNotice>> {
253        // Collect dropped notices in this set.
254        let mut dropped_notices = BTreeSet::new();
255
256        // Remove plans and metainfo.optimizer_notices entries.
257        for id in drop_ids {
258            self.plans.optimized_plan_by_id.remove(id);
259            self.plans.physical_plan_by_id.remove(id);
260            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261                soft_assert_or_log!(
262                    metainfo.optimizer_notices.iter().all_unique(),
263                    "should have been pushed there by `push_optimizer_notice_dedup`"
264                );
265                for n in metainfo.optimizer_notices.drain(..) {
266                    // Remove the corresponding notices_by_dep_id entries.
267                    for dep_id in n.dependencies.iter() {
268                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269                            soft_assert_or_log!(
270                                notices.iter().any(|x| &n == x),
271                                "corrupt notices_by_dep_id"
272                            );
273                            notices.retain(|x| &n != x)
274                        }
275                    }
276                    dropped_notices.insert(n);
277                }
278            }
279        }
280
281        // Remove notices_by_dep_id entries.
282        for id in drop_ids {
283            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284                for n in notices.drain(..) {
285                    // Remove the corresponding metainfo.optimizer_notices entries.
286                    if let Some(item_id) = n.item_id.as_ref() {
287                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288                            metainfo.optimizer_notices.iter().for_each(|n2| {
289                                if let Some(item_id_2) = n2.item_id {
290                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291                                }
292                            });
293                            metainfo.optimizer_notices.retain(|x| &n != x);
294                        }
295                    }
296                    dropped_notices.insert(n);
297                }
298            }
299        }
300
301        // Collect dependency ids not in drop_ids with at least one dropped
302        // notice.
303        let mut todo_dep_ids = BTreeSet::new();
304        for notice in dropped_notices.iter() {
305            for dep_id in notice.dependencies.iter() {
306                if !drop_ids.contains(dep_id) {
307                    todo_dep_ids.insert(*dep_id);
308                }
309            }
310        }
311        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
312        // entries in `todo_dep_ids`.
313        for id in todo_dep_ids {
314            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315                notices.retain(|n| !dropped_notices.contains(n))
316            }
317        }
318
319        if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
320            use mz_ore::str::{bracketed, separated};
321            let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
322            let bad_notices = bad_notices.map(|n| {
323                // Try to find where the bad reference is.
324                // Maybe in `dataflow_metainfos`?
325                let mut dataflow_metainfo_occurrences = Vec::new();
326                for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
327                    if meta_info.optimizer_notices.contains(n) {
328                        dataflow_metainfo_occurrences.push(id);
329                    }
330                }
331                // Or `notices_by_dep_id`?
332                let mut notices_by_dep_id_occurrences = Vec::new();
333                for (id, notices) in self.plans.notices_by_dep_id.iter() {
334                    if notices.iter().contains(n) {
335                        notices_by_dep_id_occurrences.push(id);
336                    }
337                }
338                format!(
339                    "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
340                    dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
341                    n.id,
342                    n.kind,
343                    n.dependencies,
344                    Arc::strong_count(n),
345                    dataflow_metainfo_occurrences,
346                    notices_by_dep_id_occurrences
347                )
348            });
349            let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
350            error!(
351                "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
352                 bad_notices = {bad_notices}; \
353                 drop_ids = {drop_ids:?}"
354            );
355        }
356
357        dropped_notices
358    }
359
360    /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
361    /// as a result of creating new indexes on the items in `ons`.
362    ///
363    /// When creating and inserting a new index, we need to invalidate some entries that may
364    /// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
365    /// the following objects:
366    ///
367    ///   - `o`.
368    ///   - All compute objects that depend directly on `o`.
369    ///   - All compute objects that would directly depend on `o`, if all views were inlined.
370    pub(crate) fn invalidate_for_index(
371        &self,
372        ons: impl Iterator<Item = GlobalId>,
373    ) -> BTreeSet<GlobalId> {
374        let mut dependencies = BTreeSet::new();
375        let mut queue = VecDeque::new();
376        let mut seen = HashSet::new();
377        for on in ons {
378            let entry = self.get_entry_by_global_id(&on);
379            dependencies.insert(on);
380            seen.insert(entry.id);
381            let uses = entry.uses();
382            queue.extend(uses.clone());
383        }
384
385        while let Some(cur) = queue.pop_front() {
386            let entry = self.get_entry(&cur);
387            if seen.insert(cur) {
388                let global_ids = entry.global_ids();
389                match entry.item_type() {
390                    CatalogItemType::Table
391                    | CatalogItemType::Source
392                    | CatalogItemType::MaterializedView
393                    | CatalogItemType::Sink
394                    | CatalogItemType::Index
395                    | CatalogItemType::Type
396                    | CatalogItemType::Func
397                    | CatalogItemType::Secret
398                    | CatalogItemType::Connection
399                    | CatalogItemType::ContinualTask => {
400                        dependencies.extend(global_ids);
401                    }
402                    CatalogItemType::View => {
403                        dependencies.extend(global_ids);
404                        queue.extend(entry.uses());
405                    }
406                }
407            }
408        }
409        dependencies
410    }
411}
412
413#[derive(Debug)]
414pub struct ConnCatalog<'a> {
415    state: Cow<'a, CatalogState>,
416    /// Because we don't have any way of removing items from the catalog
417    /// temporarily, we allow the ConnCatalog to pretend that a set of items
418    /// don't exist during resolution.
419    ///
420    /// This feature is necessary to allow re-planning of statements, which is
421    /// either incredibly useful or required when altering item definitions.
422    ///
423    /// Note that uses of this field should be used by short-lived
424    /// catalogs.
425    unresolvable_ids: BTreeSet<CatalogItemId>,
426    conn_id: ConnectionId,
427    cluster: String,
428    database: Option<DatabaseId>,
429    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
430    role_id: RoleId,
431    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
432    portals: Option<&'a BTreeMap<String, Portal>>,
433    notices_tx: UnboundedSender<AdapterNotice>,
434}
435
436impl ConnCatalog<'_> {
437    pub fn conn_id(&self) -> &ConnectionId {
438        &self.conn_id
439    }
440
441    pub fn state(&self) -> &CatalogState {
442        &*self.state
443    }
444
445    /// Prevent planning from resolving item with the provided ID. Instead,
446    /// return an error as if the item did not exist.
447    ///
448    /// This feature is meant exclusively to permit re-planning statements
449    /// during update operations and should not be used otherwise given its
450    /// extremely "powerful" semantics.
451    ///
452    /// # Panics
453    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
454    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
455        assert_eq!(
456            self.role_id, MZ_SYSTEM_ROLE_ID,
457            "only the system role can mark IDs unresolvable",
458        );
459        self.unresolvable_ids.insert(id);
460    }
461
462    /// Returns the schemas:
463    /// - mz_catalog
464    /// - pg_catalog
465    /// - temp (if requested)
466    /// - all schemas from the session's search_path var that exist
467    pub fn effective_search_path(
468        &self,
469        include_temp_schema: bool,
470    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
471        self.state
472            .effective_search_path(&self.search_path, include_temp_schema)
473    }
474}
475
476impl ConnectionResolver for ConnCatalog<'_> {
477    fn resolve_connection(
478        &self,
479        id: CatalogItemId,
480    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
481        self.state().resolve_connection(id)
482    }
483}
484
485impl Catalog {
486    /// Returns the catalog's transient revision, which starts at 1 and is
487    /// incremented on every change. This is not persisted to disk, and will
488    /// restart on every load.
489    pub fn transient_revision(&self) -> u64 {
490        self.transient_revision
491    }
492
493    /// Creates a debug catalog from the current
494    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
495    /// like in tests.
496    ///
497    /// WARNING! This function can arbitrarily fail because it does not make any
498    /// effort to adjust the catalog's contents' structure or semantics to the
499    /// currently running version, i.e. it does not apply any migrations.
500    ///
501    /// This function must not be called in production contexts. Use
502    /// [`Catalog::open`] with appropriately set configuration parameters
503    /// instead.
504    pub async fn with_debug<F, Fut, T>(f: F) -> T
505    where
506        F: FnOnce(Catalog) -> Fut,
507        Fut: Future<Output = T>,
508    {
509        let persist_client = PersistClient::new_for_tests().await;
510        let organization_id = Uuid::new_v4();
511        let bootstrap_args = test_bootstrap_args();
512        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
513            .await
514            .expect("can open debug catalog");
515        f(catalog).await
516    }
517
518    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
519    /// in progress.
520    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
521    where
522        F: FnOnce(Catalog) -> Fut,
523        Fut: Future<Output = T>,
524    {
525        let persist_client = PersistClient::new_for_tests().await;
526        let organization_id = Uuid::new_v4();
527        let bootstrap_args = test_bootstrap_args();
528        let mut catalog =
529            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
530                .await
531                .expect("can open debug catalog");
532
533        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
534        let now = SYSTEM_TIME.clone();
535        let openable_storage = TestCatalogStateBuilder::new(persist_client)
536            .with_organization_id(organization_id)
537            .with_default_deploy_generation()
538            .build()
539            .await
540            .expect("can create durable catalog");
541        let mut storage = openable_storage
542            .open(now().into(), &bootstrap_args)
543            .await
544            .expect("can open durable catalog")
545            .0;
546        // Drain updates.
547        let _ = storage
548            .sync_to_current_updates()
549            .await
550            .expect("can sync to current updates");
551        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
552
553        f(catalog).await
554    }
555
556    /// Opens a debug catalog.
557    ///
558    /// See [`Catalog::with_debug`].
559    pub async fn open_debug_catalog(
560        persist_client: PersistClient,
561        organization_id: Uuid,
562        bootstrap_args: &BootstrapArgs,
563    ) -> Result<Catalog, anyhow::Error> {
564        let now = SYSTEM_TIME.clone();
565        let environment_id = None;
566        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
567            .with_organization_id(organization_id)
568            .with_default_deploy_generation()
569            .build()
570            .await?;
571        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
572        let system_parameter_defaults = BTreeMap::default();
573        Self::open_debug_catalog_inner(
574            persist_client,
575            storage,
576            now,
577            environment_id,
578            &DUMMY_BUILD_INFO,
579            system_parameter_defaults,
580            bootstrap_args,
581            None,
582        )
583        .await
584    }
585
586    /// Opens a read only debug persist backed catalog defined by `persist_client` and
587    /// `organization_id`.
588    ///
589    /// See [`Catalog::with_debug`].
590    pub async fn open_debug_read_only_catalog(
591        persist_client: PersistClient,
592        organization_id: Uuid,
593        bootstrap_args: &BootstrapArgs,
594    ) -> Result<Catalog, anyhow::Error> {
595        let now = SYSTEM_TIME.clone();
596        let environment_id = None;
597        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
598            .with_organization_id(organization_id)
599            .build()
600            .await?;
601        let storage = openable_storage
602            .open_read_only(&test_bootstrap_args())
603            .await?;
604        let system_parameter_defaults = BTreeMap::default();
605        Self::open_debug_catalog_inner(
606            persist_client,
607            storage,
608            now,
609            environment_id,
610            &DUMMY_BUILD_INFO,
611            system_parameter_defaults,
612            bootstrap_args,
613            None,
614        )
615        .await
616    }
617
618    /// Opens a read only debug persist backed catalog defined by `persist_client` and
619    /// `organization_id`.
620    ///
621    /// See [`Catalog::with_debug`].
622    pub async fn open_debug_read_only_persist_catalog_config(
623        persist_client: PersistClient,
624        now: NowFn,
625        environment_id: EnvironmentId,
626        system_parameter_defaults: BTreeMap<String, String>,
627        build_info: &'static BuildInfo,
628        bootstrap_args: &BootstrapArgs,
629        enable_expression_cache_override: Option<bool>,
630    ) -> Result<Catalog, anyhow::Error> {
631        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
632            .with_organization_id(environment_id.organization_id())
633            .with_version(
634                build_info
635                    .version
636                    .parse()
637                    .expect("build version is parseable"),
638            )
639            .build()
640            .await?;
641        let storage = openable_storage.open_read_only(bootstrap_args).await?;
642        Self::open_debug_catalog_inner(
643            persist_client,
644            storage,
645            now,
646            Some(environment_id),
647            build_info,
648            system_parameter_defaults,
649            bootstrap_args,
650            enable_expression_cache_override,
651        )
652        .await
653    }
654
655    async fn open_debug_catalog_inner(
656        persist_client: PersistClient,
657        storage: Box<dyn DurableCatalogState>,
658        now: NowFn,
659        environment_id: Option<EnvironmentId>,
660        build_info: &'static BuildInfo,
661        system_parameter_defaults: BTreeMap<String, String>,
662        bootstrap_args: &BootstrapArgs,
663        enable_expression_cache_override: Option<bool>,
664    ) -> Result<Catalog, anyhow::Error> {
665        let metrics_registry = &MetricsRegistry::new();
666        let secrets_reader = Arc::new(InMemorySecretsController::new());
667        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
668        // debugging/testing.
669        let previous_ts = now().into();
670        let replica_size = &bootstrap_args.default_cluster_replica_size;
671        let read_only = false;
672
673        let OpenCatalogResult {
674            catalog,
675            migrated_storage_collections_0dt: _,
676            new_builtin_collections: _,
677            builtin_table_updates: _,
678            cached_global_exprs: _,
679            uncached_local_exprs: _,
680        } = Catalog::open(Config {
681            storage,
682            metrics_registry,
683            state: StateConfig {
684                unsafe_mode: true,
685                all_features: false,
686                build_info,
687                deploy_generation: 0,
688                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
689                read_only,
690                now,
691                boot_ts: previous_ts,
692                skip_migrations: true,
693                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
694                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
695                    size: replica_size.clone(),
696                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
697                },
698                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
699                    size: replica_size.clone(),
700                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
701                },
702                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
703                    size: replica_size.clone(),
704                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
705                },
706                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
707                    size: replica_size.clone(),
708                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
709                },
710                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
711                    size: replica_size.clone(),
712                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
713                },
714                system_parameter_defaults,
715                remote_system_parameters: None,
716                availability_zones: vec![],
717                egress_addresses: vec![],
718                aws_principal_context: None,
719                aws_privatelink_availability_zones: None,
720                http_host_name: None,
721                connection_context: ConnectionContext::for_tests(secrets_reader),
722                builtin_item_migration_config: BuiltinItemMigrationConfig {
723                    persist_client: persist_client.clone(),
724                    read_only,
725                    force_migration: None,
726                },
727                persist_client,
728                enable_expression_cache_override,
729                helm_chart_version: None,
730                external_login_password_mz_system: None,
731                license_key: ValidatedLicenseKey::for_tests(),
732            },
733        })
734        .await?;
735        Ok(catalog)
736    }
737
738    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
739        self.state.for_session(session)
740    }
741
742    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
743        self.state.for_sessionless_user(role_id)
744    }
745
746    pub fn for_system_session(&self) -> ConnCatalog<'_> {
747        self.state.for_system_session()
748    }
749
750    async fn storage<'a>(
751        &'a self,
752    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
753        self.storage.lock().await
754    }
755
756    pub async fn current_upper(&self) -> mz_repr::Timestamp {
757        self.storage().await.current_upper().await
758    }
759
760    pub async fn allocate_user_id(
761        &self,
762        commit_ts: mz_repr::Timestamp,
763    ) -> Result<(CatalogItemId, GlobalId), Error> {
764        self.storage()
765            .await
766            .allocate_user_id(commit_ts)
767            .await
768            .maybe_terminate("allocating user ids")
769            .err_into()
770    }
771
772    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
773    pub async fn allocate_user_ids(
774        &self,
775        amount: u64,
776        commit_ts: mz_repr::Timestamp,
777    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
778        self.storage()
779            .await
780            .allocate_user_ids(amount, commit_ts)
781            .await
782            .maybe_terminate("allocating user ids")
783            .err_into()
784    }
785
786    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
787        let commit_ts = self.storage().await.current_upper().await;
788        self.allocate_user_id(commit_ts).await
789    }
790
791    /// Get the next user item ID without allocating it.
792    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
793        self.storage()
794            .await
795            .get_next_user_item_id()
796            .await
797            .err_into()
798    }
799
800    #[cfg(test)]
801    pub async fn allocate_system_id(
802        &self,
803        commit_ts: mz_repr::Timestamp,
804    ) -> Result<(CatalogItemId, GlobalId), Error> {
805        use mz_ore::collections::CollectionExt;
806
807        let mut storage = self.storage().await;
808        let mut txn = storage.transaction().await?;
809        let id = txn
810            .allocate_system_item_ids(1)
811            .maybe_terminate("allocating system ids")?
812            .into_element();
813        // Drain transaction.
814        let _ = txn.get_and_commit_op_updates();
815        txn.commit(commit_ts).await?;
816        Ok(id)
817    }
818
819    /// Get the next system item ID without allocating it.
820    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
821        self.storage()
822            .await
823            .get_next_system_item_id()
824            .await
825            .err_into()
826    }
827
828    pub async fn allocate_user_cluster_id(
829        &self,
830        commit_ts: mz_repr::Timestamp,
831    ) -> Result<ClusterId, Error> {
832        self.storage()
833            .await
834            .allocate_user_cluster_id(commit_ts)
835            .await
836            .maybe_terminate("allocating user cluster ids")
837            .err_into()
838    }
839
840    /// Get the next system replica id without allocating it.
841    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
842        self.storage()
843            .await
844            .get_next_system_replica_id()
845            .await
846            .err_into()
847    }
848
849    /// Get the next user replica id without allocating it.
850    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
851        self.storage()
852            .await
853            .get_next_user_replica_id()
854            .await
855            .err_into()
856    }
857
858    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
859        self.state.resolve_database(database_name)
860    }
861
862    pub fn resolve_schema(
863        &self,
864        current_database: Option<&DatabaseId>,
865        database_name: Option<&str>,
866        schema_name: &str,
867        conn_id: &ConnectionId,
868    ) -> Result<&Schema, SqlCatalogError> {
869        self.state
870            .resolve_schema(current_database, database_name, schema_name, conn_id)
871    }
872
873    pub fn resolve_schema_in_database(
874        &self,
875        database_spec: &ResolvedDatabaseSpecifier,
876        schema_name: &str,
877        conn_id: &ConnectionId,
878    ) -> Result<&Schema, SqlCatalogError> {
879        self.state
880            .resolve_schema_in_database(database_spec, schema_name, conn_id)
881    }
882
883    pub fn resolve_replica_in_cluster(
884        &self,
885        cluster_id: &ClusterId,
886        replica_name: &str,
887    ) -> Result<&ClusterReplica, SqlCatalogError> {
888        self.state
889            .resolve_replica_in_cluster(cluster_id, replica_name)
890    }
891
892    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
893        self.state.resolve_system_schema(name)
894    }
895
896    pub fn resolve_search_path(
897        &self,
898        session: &Session,
899    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
900        self.state.resolve_search_path(session)
901    }
902
903    /// Resolves `name` to a non-function [`CatalogEntry`].
904    pub fn resolve_entry(
905        &self,
906        current_database: Option<&DatabaseId>,
907        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
908        name: &PartialItemName,
909        conn_id: &ConnectionId,
910    ) -> Result<&CatalogEntry, SqlCatalogError> {
911        self.state
912            .resolve_entry(current_database, search_path, name, conn_id)
913    }
914
915    /// Resolves a `BuiltinTable`.
916    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
917        self.state.resolve_builtin_table(builtin)
918    }
919
920    /// Resolves a `BuiltinLog`.
921    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
922        self.state.resolve_builtin_log(builtin).0
923    }
924
925    /// Resolves a `BuiltinSource`.
926    pub fn resolve_builtin_storage_collection(
927        &self,
928        builtin: &'static BuiltinSource,
929    ) -> CatalogItemId {
930        self.state.resolve_builtin_source(builtin)
931    }
932
933    /// Resolves `name` to a function [`CatalogEntry`].
934    pub fn resolve_function(
935        &self,
936        current_database: Option<&DatabaseId>,
937        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
938        name: &PartialItemName,
939        conn_id: &ConnectionId,
940    ) -> Result<&CatalogEntry, SqlCatalogError> {
941        self.state
942            .resolve_function(current_database, search_path, name, conn_id)
943    }
944
945    /// Resolves `name` to a type [`CatalogEntry`].
946    pub fn resolve_type(
947        &self,
948        current_database: Option<&DatabaseId>,
949        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
950        name: &PartialItemName,
951        conn_id: &ConnectionId,
952    ) -> Result<&CatalogEntry, SqlCatalogError> {
953        self.state
954            .resolve_type(current_database, search_path, name, conn_id)
955    }
956
957    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
958        self.state.resolve_cluster(name)
959    }
960
961    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
962    ///
963    /// # Panics
964    /// * If the [`BuiltinCluster`] doesn't exist.
965    ///
966    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
967        self.state.resolve_builtin_cluster(cluster)
968    }
969
970    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
971        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
972    }
973
974    /// Resolves a [`Cluster`] for a TargetCluster.
975    pub fn resolve_target_cluster(
976        &self,
977        target_cluster: TargetCluster,
978        session: &Session,
979    ) -> Result<&Cluster, AdapterError> {
980        match target_cluster {
981            TargetCluster::CatalogServer => {
982                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
983            }
984            TargetCluster::Active => self.active_cluster(session),
985            TargetCluster::Transaction(cluster_id) => self
986                .try_get_cluster(cluster_id)
987                .ok_or(AdapterError::ConcurrentClusterDrop),
988        }
989    }
990
991    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
992        // TODO(benesch): this check here is not sufficiently protective. It'd
993        // be very easy for a code path to accidentally avoid this check by
994        // calling `resolve_cluster(session.vars().cluster())`.
995        if session.user().name != SYSTEM_USER.name
996            && session.user().name != SUPPORT_USER.name
997            && session.vars().cluster() == SYSTEM_USER.name
998        {
999            coord_bail!(
1000                "system cluster '{}' cannot execute user queries",
1001                SYSTEM_USER.name
1002            );
1003        }
1004        let cluster = self.resolve_cluster(session.vars().cluster())?;
1005        Ok(cluster)
1006    }
1007
1008    pub fn state(&self) -> &CatalogState {
1009        &self.state
1010    }
1011
1012    pub fn resolve_full_name(
1013        &self,
1014        name: &QualifiedItemName,
1015        conn_id: Option<&ConnectionId>,
1016    ) -> FullItemName {
1017        self.state.resolve_full_name(name, conn_id)
1018    }
1019
1020    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1021        self.state.try_get_entry(id)
1022    }
1023
1024    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1025        self.state.try_get_entry_by_global_id(id)
1026    }
1027
1028    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1029        self.state.get_entry(id)
1030    }
1031
1032    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1033        self.state.get_entry_by_global_id(id)
1034    }
1035
1036    pub fn get_global_ids<'a>(
1037        &'a self,
1038        id: &CatalogItemId,
1039    ) -> impl Iterator<Item = GlobalId> + use<'a> {
1040        self.get_entry(id).global_ids()
1041    }
1042
1043    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1044        self.get_entry_by_global_id(id).id()
1045    }
1046
1047    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1048        let item = self.try_get_entry_by_global_id(id)?;
1049        Some(item.id())
1050    }
1051
1052    pub fn get_schema(
1053        &self,
1054        database_spec: &ResolvedDatabaseSpecifier,
1055        schema_spec: &SchemaSpecifier,
1056        conn_id: &ConnectionId,
1057    ) -> &Schema {
1058        self.state.get_schema(database_spec, schema_spec, conn_id)
1059    }
1060
1061    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1062        self.state.get_mz_catalog_schema_id()
1063    }
1064
1065    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1066        self.state.get_pg_catalog_schema_id()
1067    }
1068
1069    pub fn get_information_schema_id(&self) -> SchemaId {
1070        self.state.get_information_schema_id()
1071    }
1072
1073    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1074        self.state.get_mz_internal_schema_id()
1075    }
1076
1077    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1078        self.state.get_mz_introspection_schema_id()
1079    }
1080
1081    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1082        self.state.get_mz_unsafe_schema_id()
1083    }
1084
1085    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1086        self.state.system_schema_ids()
1087    }
1088
1089    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1090        self.state.get_database(id)
1091    }
1092
1093    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1094        self.state.try_get_role(id)
1095    }
1096
1097    pub fn get_role(&self, id: &RoleId) -> &Role {
1098        self.state.get_role(id)
1099    }
1100
1101    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1102        self.state.try_get_role_by_name(role_name)
1103    }
1104
1105    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1106        self.state.try_get_role_auth_by_id(id)
1107    }
1108
1109    /// Creates a new schema in the `Catalog` for temporary items
1110    /// indicated by the TEMPORARY or TEMP keywords.
1111    pub fn create_temporary_schema(
1112        &mut self,
1113        conn_id: &ConnectionId,
1114        owner_id: RoleId,
1115    ) -> Result<(), Error> {
1116        self.state.create_temporary_schema(conn_id, owner_id)
1117    }
1118
1119    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1120        self.state.temporary_schemas[conn_id]
1121            .items
1122            .contains_key(item_name)
1123    }
1124
1125    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1126    /// Returns Ok if conn_id's temp schema does not exist.
1127    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1128        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1129            return Ok(());
1130        };
1131        if !schema.items.is_empty() {
1132            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1133        }
1134        Ok(())
1135    }
1136
1137    pub(crate) fn object_dependents(
1138        &self,
1139        object_ids: &Vec<ObjectId>,
1140        conn_id: &ConnectionId,
1141    ) -> Vec<ObjectId> {
1142        let mut seen = BTreeSet::new();
1143        self.state.object_dependents(object_ids, conn_id, &mut seen)
1144    }
1145
1146    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1147        FullNameV1 {
1148            database: name.database.to_string(),
1149            schema: name.schema.clone(),
1150            item: name.item.clone(),
1151        }
1152    }
1153
1154    pub fn find_available_cluster_name(&self, name: &str) -> String {
1155        let mut i = 0;
1156        let mut candidate = name.to_string();
1157        while self.state.clusters_by_name.contains_key(&candidate) {
1158            i += 1;
1159            candidate = format!("{}{}", name, i);
1160        }
1161        candidate
1162    }
1163
1164    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1165        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1166            self.cluster_replica_sizes()
1167                .enabled_allocations()
1168                .map(|a| a.0.to_owned())
1169                .collect::<Vec<_>>()
1170        } else {
1171            self.system_config().allowed_cluster_replica_sizes()
1172        }
1173    }
1174
1175    pub fn concretize_replica_location(
1176        &self,
1177        location: mz_catalog::durable::ReplicaLocation,
1178        allowed_sizes: &Vec<String>,
1179        allowed_availability_zones: Option<&[String]>,
1180    ) -> Result<ReplicaLocation, Error> {
1181        self.state
1182            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1183    }
1184
1185    pub(crate) fn ensure_valid_replica_size(
1186        &self,
1187        allowed_sizes: &[String],
1188        size: &String,
1189    ) -> Result<(), Error> {
1190        self.state.ensure_valid_replica_size(allowed_sizes, size)
1191    }
1192
1193    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1194        &self.state.cluster_replica_sizes
1195    }
1196
1197    /// Returns the privileges of an object by its ID.
1198    pub fn get_privileges(
1199        &self,
1200        id: &SystemObjectId,
1201        conn_id: &ConnectionId,
1202    ) -> Option<&PrivilegeMap> {
1203        match id {
1204            SystemObjectId::Object(id) => match id {
1205                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1206                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1207                ObjectId::Schema((database_spec, schema_spec)) => Some(
1208                    self.get_schema(database_spec, schema_spec, conn_id)
1209                        .privileges(),
1210                ),
1211                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1212                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1213                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1214            },
1215            SystemObjectId::System => Some(&self.state.system_privileges),
1216        }
1217    }
1218
1219    #[mz_ore::instrument(level = "debug")]
1220    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1221        Ok(self.storage().await.confirm_leadership().await?)
1222    }
1223
1224    /// Return the ids of all log sources the given object depends on.
1225    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1226        self.state.introspection_dependencies(id)
1227    }
1228
1229    /// Serializes the catalog's in-memory state.
1230    ///
1231    /// There are no guarantees about the format of the serialized state, except
1232    /// that the serialized state for two identical catalogs will compare
1233    /// identically.
1234    pub fn dump(&self) -> Result<CatalogDump, Error> {
1235        Ok(CatalogDump::new(self.state.dump(None)?))
1236    }
1237
1238    /// Checks the [`Catalog`]s internal consistency.
1239    ///
1240    /// Returns a JSON object describing the inconsistencies, if there are any.
1241    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1242        self.state.check_consistency().map_err(|inconsistencies| {
1243            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1244                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1245            })
1246        })
1247    }
1248
1249    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1250        self.state.config()
1251    }
1252
1253    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1254        self.state.entry_by_id.values()
1255    }
1256
1257    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1258        self.entries()
1259            .filter(|entry| entry.is_connection() && entry.id().is_user())
1260    }
1261
1262    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1263        self.entries()
1264            .filter(|entry| entry.is_table() && entry.id().is_user())
1265    }
1266
1267    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1268        self.entries()
1269            .filter(|entry| entry.is_source() && entry.id().is_user())
1270    }
1271
1272    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1273        self.entries()
1274            .filter(|entry| entry.is_sink() && entry.id().is_user())
1275    }
1276
1277    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1278        self.entries()
1279            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1280    }
1281
1282    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1283        self.entries()
1284            .filter(|entry| entry.is_secret() && entry.id().is_user())
1285    }
1286
1287    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1288        self.state.get_network_policy(&network_policy_id)
1289    }
1290
1291    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1292        self.state.try_get_network_policy_by_name(name)
1293    }
1294
1295    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1296        self.state.clusters_by_id.values()
1297    }
1298
1299    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1300        self.state.get_cluster(cluster_id)
1301    }
1302
1303    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1304        self.state.try_get_cluster(cluster_id)
1305    }
1306
1307    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1308        self.clusters().filter(|cluster| cluster.id.is_user())
1309    }
1310
1311    pub fn get_cluster_replica(
1312        &self,
1313        cluster_id: ClusterId,
1314        replica_id: ReplicaId,
1315    ) -> &ClusterReplica {
1316        self.state.get_cluster_replica(cluster_id, replica_id)
1317    }
1318
1319    pub fn try_get_cluster_replica(
1320        &self,
1321        cluster_id: ClusterId,
1322        replica_id: ReplicaId,
1323    ) -> Option<&ClusterReplica> {
1324        self.state.try_get_cluster_replica(cluster_id, replica_id)
1325    }
1326
1327    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1328        self.user_clusters()
1329            .flat_map(|cluster| cluster.user_replicas())
1330    }
1331
1332    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1333        self.state.database_by_id.values()
1334    }
1335
1336    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1337        self.state
1338            .roles_by_id
1339            .values()
1340            .filter(|role| role.is_user())
1341    }
1342
1343    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1344        self.entries()
1345            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1346    }
1347
1348    pub fn system_privileges(&self) -> &PrivilegeMap {
1349        &self.state.system_privileges
1350    }
1351
1352    pub fn default_privileges(
1353        &self,
1354    ) -> impl Iterator<
1355        Item = (
1356            &DefaultPrivilegeObject,
1357            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1358        ),
1359    > {
1360        self.state.default_privileges.iter()
1361    }
1362
1363    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1364        self.state
1365            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1366    }
1367
1368    pub fn pack_storage_usage_update(
1369        &self,
1370        event: VersionedStorageUsage,
1371        diff: Diff,
1372    ) -> BuiltinTableUpdate {
1373        self.state
1374            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1375    }
1376
1377    pub fn system_config(&self) -> &SystemVars {
1378        self.state.system_config()
1379    }
1380
1381    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1382        self.state.system_config_mut()
1383    }
1384
1385    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1386        self.state.ensure_not_reserved_role(role_id)
1387    }
1388
1389    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1390        self.state.ensure_grantable_role(role_id)
1391    }
1392
1393    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1394        self.state.ensure_not_system_role(role_id)
1395    }
1396
1397    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1398        self.state.ensure_not_predefined_role(role_id)
1399    }
1400
1401    pub fn ensure_not_reserved_network_policy(
1402        &self,
1403        network_policy_id: &NetworkPolicyId,
1404    ) -> Result<(), Error> {
1405        self.state
1406            .ensure_not_reserved_network_policy(network_policy_id)
1407    }
1408
1409    pub fn ensure_not_reserved_object(
1410        &self,
1411        object_id: &ObjectId,
1412        conn_id: &ConnectionId,
1413    ) -> Result<(), Error> {
1414        match object_id {
1415            ObjectId::Cluster(cluster_id) => {
1416                if cluster_id.is_system() {
1417                    let cluster = self.get_cluster(*cluster_id);
1418                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1419                        cluster.name().to_string(),
1420                    )))
1421                } else {
1422                    Ok(())
1423                }
1424            }
1425            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1426                if replica_id.is_system() {
1427                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1428                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1429                        replica.name().to_string(),
1430                    )))
1431                } else {
1432                    Ok(())
1433                }
1434            }
1435            ObjectId::Database(database_id) => {
1436                if database_id.is_system() {
1437                    let database = self.get_database(database_id);
1438                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1439                        database.name().to_string(),
1440                    )))
1441                } else {
1442                    Ok(())
1443                }
1444            }
1445            ObjectId::Schema((database_spec, schema_spec)) => {
1446                if schema_spec.is_system() {
1447                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1448                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1449                        schema.name().schema.clone(),
1450                    )))
1451                } else {
1452                    Ok(())
1453                }
1454            }
1455            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1456            ObjectId::Item(item_id) => {
1457                if item_id.is_system() {
1458                    let item = self.get_entry(item_id);
1459                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1460                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1461                } else {
1462                    Ok(())
1463                }
1464            }
1465            ObjectId::NetworkPolicy(network_policy_id) => {
1466                self.ensure_not_reserved_network_policy(network_policy_id)
1467            }
1468        }
1469    }
1470
1471    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1472    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1473        &mut self,
1474        create_sql: &str,
1475        force_if_exists_skip: bool,
1476    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1477        self.state
1478            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1479    }
1480
1481    pub(crate) fn update_expression_cache<'a, 'b>(
1482        &'a self,
1483        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1484        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1485    ) -> BoxFuture<'b, ()> {
1486        if let Some(expr_cache) = &self.expr_cache_handle {
1487            let ons = new_local_expressions
1488                .iter()
1489                .map(|(id, _)| id)
1490                .chain(new_global_expressions.iter().map(|(id, _)| id))
1491                .map(|id| self.get_entry_by_global_id(id))
1492                .filter_map(|entry| entry.index().map(|index| index.on));
1493            let invalidate_ids = self.invalidate_for_index(ons);
1494            expr_cache
1495                .update(
1496                    new_local_expressions,
1497                    new_global_expressions,
1498                    invalidate_ids,
1499                )
1500                .boxed()
1501        } else {
1502            async {}.boxed()
1503        }
1504    }
1505
1506    /// Listen for and apply all unconsumed updates to the durable catalog state.
1507    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1508    // `#[cfg(test)]` annotation.
1509    #[cfg(test)]
1510    async fn sync_to_current_updates(
1511        &mut self,
1512    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1513        let updates = self.storage().await.sync_to_current_updates().await?;
1514        let builtin_table_updates = self.state.apply_updates(updates)?;
1515        Ok(builtin_table_updates)
1516    }
1517}
1518
1519pub fn is_reserved_name(name: &str) -> bool {
1520    BUILTIN_PREFIXES
1521        .iter()
1522        .any(|prefix| name.starts_with(prefix))
1523}
1524
1525pub fn is_reserved_role_name(name: &str) -> bool {
1526    is_reserved_name(name) || is_public_role(name)
1527}
1528
1529pub fn is_public_role(name: &str) -> bool {
1530    name == &*PUBLIC_ROLE_NAME
1531}
1532
1533pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1534    object_type_to_audit_object_type(sql_type.into())
1535}
1536
1537pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1538    match id {
1539        CommentObjectId::Table(_) => ObjectType::Table,
1540        CommentObjectId::View(_) => ObjectType::View,
1541        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1542        CommentObjectId::Source(_) => ObjectType::Source,
1543        CommentObjectId::Sink(_) => ObjectType::Sink,
1544        CommentObjectId::Index(_) => ObjectType::Index,
1545        CommentObjectId::Func(_) => ObjectType::Func,
1546        CommentObjectId::Connection(_) => ObjectType::Connection,
1547        CommentObjectId::Type(_) => ObjectType::Type,
1548        CommentObjectId::Secret(_) => ObjectType::Secret,
1549        CommentObjectId::Role(_) => ObjectType::Role,
1550        CommentObjectId::Database(_) => ObjectType::Database,
1551        CommentObjectId::Schema(_) => ObjectType::Schema,
1552        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1553        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1554        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1555        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1556    }
1557}
1558
1559pub(crate) fn object_type_to_audit_object_type(
1560    object_type: mz_sql::catalog::ObjectType,
1561) -> ObjectType {
1562    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1563}
1564
1565pub(crate) fn system_object_type_to_audit_object_type(
1566    system_type: &SystemObjectType,
1567) -> ObjectType {
1568    match system_type {
1569        SystemObjectType::Object(object_type) => match object_type {
1570            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1571            mz_sql::catalog::ObjectType::View => ObjectType::View,
1572            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1573            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1574            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1575            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1576            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1577            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1578            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1579            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1580            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1581            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1582            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1583            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1584            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1585            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1586            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1587        },
1588        SystemObjectType::System => ObjectType::System,
1589    }
1590}
1591
1592#[derive(Debug, Copy, Clone)]
1593pub enum UpdatePrivilegeVariant {
1594    Grant,
1595    Revoke,
1596}
1597
1598impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1599    fn from(variant: UpdatePrivilegeVariant) -> Self {
1600        match variant {
1601            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1602            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1603        }
1604    }
1605}
1606
1607impl From<UpdatePrivilegeVariant> for EventType {
1608    fn from(variant: UpdatePrivilegeVariant) -> Self {
1609        match variant {
1610            UpdatePrivilegeVariant::Grant => EventType::Grant,
1611            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1612        }
1613    }
1614}
1615
1616impl ConnCatalog<'_> {
1617    fn resolve_item_name(
1618        &self,
1619        name: &PartialItemName,
1620    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1621        self.resolve_item(name).map(|entry| entry.name())
1622    }
1623
1624    fn resolve_function_name(
1625        &self,
1626        name: &PartialItemName,
1627    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1628        self.resolve_function(name).map(|entry| entry.name())
1629    }
1630
1631    fn resolve_type_name(
1632        &self,
1633        name: &PartialItemName,
1634    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1635        self.resolve_type(name).map(|entry| entry.name())
1636    }
1637}
1638
1639impl ExprHumanizer for ConnCatalog<'_> {
1640    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1641        let entry = self.state.try_get_entry_by_global_id(&id)?;
1642        Some(self.resolve_full_name(entry.name()).to_string())
1643    }
1644
1645    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1646        let entry = self.state.try_get_entry_by_global_id(&id)?;
1647        Some(entry.name().item.clone())
1648    }
1649
1650    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1651        let entry = self.state.try_get_entry_by_global_id(&id)?;
1652        Some(self.resolve_full_name(entry.name()).into_parts())
1653    }
1654
1655    fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1656        use SqlScalarType::*;
1657
1658        match typ {
1659            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1660            List {
1661                custom_id: Some(item_id),
1662                ..
1663            }
1664            | Map {
1665                custom_id: Some(item_id),
1666                ..
1667            } => {
1668                let item = self.get_item(item_id);
1669                self.minimal_qualification(item.name()).to_string()
1670            }
1671            List { element_type, .. } => {
1672                format!(
1673                    "{} list",
1674                    self.humanize_scalar_type(element_type, postgres_compat)
1675                )
1676            }
1677            Map { value_type, .. } => format!(
1678                "map[{}=>{}]",
1679                self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1680                self.humanize_scalar_type(value_type, postgres_compat)
1681            ),
1682            Record {
1683                custom_id: Some(item_id),
1684                ..
1685            } => {
1686                let item = self.get_item(item_id);
1687                self.minimal_qualification(item.name()).to_string()
1688            }
1689            Record { fields, .. } => format!(
1690                "record({})",
1691                fields
1692                    .iter()
1693                    .map(|f| format!(
1694                        "{}: {}",
1695                        f.0,
1696                        self.humanize_column_type(&f.1, postgres_compat)
1697                    ))
1698                    .join(",")
1699            ),
1700            PgLegacyChar => "\"char\"".into(),
1701            Char { length } if !postgres_compat => match length {
1702                None => "char".into(),
1703                Some(length) => format!("char({})", length.into_u32()),
1704            },
1705            VarChar { max_length } if !postgres_compat => match max_length {
1706                None => "varchar".into(),
1707                Some(length) => format!("varchar({})", length.into_u32()),
1708            },
1709            UInt16 => "uint2".into(),
1710            UInt32 => "uint4".into(),
1711            UInt64 => "uint8".into(),
1712            ty => {
1713                let pgrepr_type = mz_pgrepr::Type::from(ty);
1714                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1715
1716                let res = if self
1717                    .effective_search_path(true)
1718                    .iter()
1719                    .any(|(_, schema)| schema == &pg_catalog_schema)
1720                {
1721                    pgrepr_type.name().to_string()
1722                } else {
1723                    // If PG_CATALOG_SCHEMA is not in search path, you need
1724                    // qualified object name to refer to type.
1725                    let name = QualifiedItemName {
1726                        qualifiers: ItemQualifiers {
1727                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1728                            schema_spec: pg_catalog_schema,
1729                        },
1730                        item: pgrepr_type.name().to_string(),
1731                    };
1732                    self.resolve_full_name(&name).to_string()
1733                };
1734                res
1735            }
1736        }
1737    }
1738
1739    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1740        let entry = self.state.try_get_entry_by_global_id(&id)?;
1741
1742        match entry.index() {
1743            Some(index) => {
1744                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1745                let mut on_names = on_desc
1746                    .iter_names()
1747                    .map(|col_name| col_name.to_string())
1748                    .collect::<Vec<_>>();
1749
1750                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1751
1752                // Init ix_names with unknown column names. Unknown columns are
1753                // represented as an empty String and rendered as `#c` by the
1754                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1755                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1756                let mut ix_names = vec![String::new(); ix_arity];
1757
1758                // Apply the permutation by swapping on_names with ix_names.
1759                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1760                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1761                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1762                    std::mem::swap(on_name, ix_name);
1763                }
1764
1765                Some(ix_names) // Return the updated ix_names vector.
1766            }
1767            None => {
1768                let desc = self.state.try_get_desc_by_global_id(&id)?;
1769                let column_names = desc
1770                    .iter_names()
1771                    .map(|col_name| col_name.to_string())
1772                    .collect();
1773
1774                Some(column_names)
1775            }
1776        }
1777    }
1778
1779    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1780        let desc = self.state.try_get_desc_by_global_id(&id)?;
1781        Some(desc.get_name(column).to_string())
1782    }
1783
1784    fn id_exists(&self, id: GlobalId) -> bool {
1785        self.state.entry_by_global_id.contains_key(&id)
1786    }
1787}
1788
1789impl SessionCatalog for ConnCatalog<'_> {
1790    fn active_role_id(&self) -> &RoleId {
1791        &self.role_id
1792    }
1793
1794    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1795        self.prepared_statements
1796            .as_ref()
1797            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1798            .flatten()
1799    }
1800
1801    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1802        self.portals
1803            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1804    }
1805
1806    fn active_database(&self) -> Option<&DatabaseId> {
1807        self.database.as_ref()
1808    }
1809
1810    fn active_cluster(&self) -> &str {
1811        &self.cluster
1812    }
1813
1814    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1815        &self.search_path
1816    }
1817
1818    fn resolve_database(
1819        &self,
1820        database_name: &str,
1821    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1822        Ok(self.state.resolve_database(database_name)?)
1823    }
1824
1825    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1826        self.state
1827            .database_by_id
1828            .get(id)
1829            .expect("database doesn't exist")
1830    }
1831
1832    // `as` is ok to use to cast to a trait object.
1833    #[allow(clippy::as_conversions)]
1834    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1835        self.state
1836            .database_by_id
1837            .values()
1838            .map(|database| database as &dyn CatalogDatabase)
1839            .collect()
1840    }
1841
1842    fn resolve_schema(
1843        &self,
1844        database_name: Option<&str>,
1845        schema_name: &str,
1846    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1847        Ok(self.state.resolve_schema(
1848            self.database.as_ref(),
1849            database_name,
1850            schema_name,
1851            &self.conn_id,
1852        )?)
1853    }
1854
1855    fn resolve_schema_in_database(
1856        &self,
1857        database_spec: &ResolvedDatabaseSpecifier,
1858        schema_name: &str,
1859    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1860        Ok(self
1861            .state
1862            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1863    }
1864
1865    fn get_schema(
1866        &self,
1867        database_spec: &ResolvedDatabaseSpecifier,
1868        schema_spec: &SchemaSpecifier,
1869    ) -> &dyn CatalogSchema {
1870        self.state
1871            .get_schema(database_spec, schema_spec, &self.conn_id)
1872    }
1873
1874    // `as` is ok to use to cast to a trait object.
1875    #[allow(clippy::as_conversions)]
1876    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1877        self.get_databases()
1878            .into_iter()
1879            .flat_map(|database| database.schemas().into_iter())
1880            .chain(
1881                self.state
1882                    .ambient_schemas_by_id
1883                    .values()
1884                    .chain(self.state.temporary_schemas.values())
1885                    .map(|schema| schema as &dyn CatalogSchema),
1886            )
1887            .collect()
1888    }
1889
1890    fn get_mz_internal_schema_id(&self) -> SchemaId {
1891        self.state().get_mz_internal_schema_id()
1892    }
1893
1894    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1895        self.state().get_mz_unsafe_schema_id()
1896    }
1897
1898    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1899        self.state.is_system_schema_specifier(schema)
1900    }
1901
1902    fn resolve_role(
1903        &self,
1904        role_name: &str,
1905    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1906        match self.state.try_get_role_by_name(role_name) {
1907            Some(role) => Ok(role),
1908            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1909        }
1910    }
1911
1912    fn resolve_network_policy(
1913        &self,
1914        policy_name: &str,
1915    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1916        match self.state.try_get_network_policy_by_name(policy_name) {
1917            Some(policy) => Ok(policy),
1918            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1919        }
1920    }
1921
1922    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1923        Some(self.state.roles_by_id.get(id)?)
1924    }
1925
1926    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1927        self.state.get_role(id)
1928    }
1929
1930    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1931        // `as` is ok to use to cast to a trait object.
1932        #[allow(clippy::as_conversions)]
1933        self.state
1934            .roles_by_id
1935            .values()
1936            .map(|role| role as &dyn CatalogRole)
1937            .collect()
1938    }
1939
1940    fn mz_system_role_id(&self) -> RoleId {
1941        MZ_SYSTEM_ROLE_ID
1942    }
1943
1944    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1945        self.state.collect_role_membership(id)
1946    }
1947
1948    fn get_network_policy(
1949        &self,
1950        id: &NetworkPolicyId,
1951    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1952        self.state.get_network_policy(id)
1953    }
1954
1955    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1956        // `as` is ok to use to cast to a trait object.
1957        #[allow(clippy::as_conversions)]
1958        self.state
1959            .network_policies_by_id
1960            .values()
1961            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1962            .collect()
1963    }
1964
1965    fn resolve_cluster(
1966        &self,
1967        cluster_name: Option<&str>,
1968    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1969        Ok(self
1970            .state
1971            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1972    }
1973
1974    fn resolve_cluster_replica(
1975        &self,
1976        cluster_replica_name: &QualifiedReplica,
1977    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1978        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1979    }
1980
1981    fn resolve_item(
1982        &self,
1983        name: &PartialItemName,
1984    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1985        let r = self.state.resolve_entry(
1986            self.database.as_ref(),
1987            &self.effective_search_path(true),
1988            name,
1989            &self.conn_id,
1990        )?;
1991        if self.unresolvable_ids.contains(&r.id()) {
1992            Err(SqlCatalogError::UnknownItem(name.to_string()))
1993        } else {
1994            Ok(r)
1995        }
1996    }
1997
1998    fn resolve_function(
1999        &self,
2000        name: &PartialItemName,
2001    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2002        let r = self.state.resolve_function(
2003            self.database.as_ref(),
2004            &self.effective_search_path(false),
2005            name,
2006            &self.conn_id,
2007        )?;
2008
2009        if self.unresolvable_ids.contains(&r.id()) {
2010            Err(SqlCatalogError::UnknownFunction {
2011                name: name.to_string(),
2012                alternative: None,
2013            })
2014        } else {
2015            Ok(r)
2016        }
2017    }
2018
2019    fn resolve_type(
2020        &self,
2021        name: &PartialItemName,
2022    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2023        let r = self.state.resolve_type(
2024            self.database.as_ref(),
2025            &self.effective_search_path(false),
2026            name,
2027            &self.conn_id,
2028        )?;
2029
2030        if self.unresolvable_ids.contains(&r.id()) {
2031            Err(SqlCatalogError::UnknownType {
2032                name: name.to_string(),
2033            })
2034        } else {
2035            Ok(r)
2036        }
2037    }
2038
2039    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2040        self.state.get_system_type(name)
2041    }
2042
2043    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2044        Some(self.state.try_get_entry(id)?)
2045    }
2046
2047    fn try_get_item_by_global_id(
2048        &self,
2049        id: &GlobalId,
2050    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2051        let entry = self.state.try_get_entry_by_global_id(id)?;
2052        let entry = match &entry.item {
2053            CatalogItem::Table(table) => {
2054                let (version, _gid) = table
2055                    .collections
2056                    .iter()
2057                    .find(|(_version, gid)| *gid == id)
2058                    .expect("catalog out of sync, mismatched GlobalId");
2059                entry.at_version(RelationVersionSelector::Specific(*version))
2060            }
2061            _ => entry.at_version(RelationVersionSelector::Latest),
2062        };
2063        Some(entry)
2064    }
2065
2066    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2067        self.state.get_entry(id)
2068    }
2069
2070    fn get_item_by_global_id(
2071        &self,
2072        id: &GlobalId,
2073    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2074        let entry = self.state.get_entry_by_global_id(id);
2075        let entry = match &entry.item {
2076            CatalogItem::Table(table) => {
2077                let (version, _gid) = table
2078                    .collections
2079                    .iter()
2080                    .find(|(_version, gid)| *gid == id)
2081                    .expect("catalog out of sync, mismatched GlobalId");
2082                entry.at_version(RelationVersionSelector::Specific(*version))
2083            }
2084            _ => entry.at_version(RelationVersionSelector::Latest),
2085        };
2086        entry
2087    }
2088
2089    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2090        self.get_schemas()
2091            .into_iter()
2092            .flat_map(|schema| schema.item_ids())
2093            .map(|id| self.get_item(&id))
2094            .collect()
2095    }
2096
2097    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2098        self.state
2099            .get_item_by_name(name, &self.conn_id)
2100            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2101    }
2102
2103    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2104        self.state
2105            .get_type_by_name(name, &self.conn_id)
2106            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2107    }
2108
2109    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2110        &self.state.clusters_by_id[&id]
2111    }
2112
2113    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2114        self.state
2115            .clusters_by_id
2116            .values()
2117            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2118            .collect()
2119    }
2120
2121    fn get_cluster_replica(
2122        &self,
2123        cluster_id: ClusterId,
2124        replica_id: ReplicaId,
2125    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2126        let cluster = self.get_cluster(cluster_id);
2127        cluster.replica(replica_id)
2128    }
2129
2130    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2131        self.get_clusters()
2132            .into_iter()
2133            .flat_map(|cluster| cluster.replicas().into_iter())
2134            .collect()
2135    }
2136
2137    fn get_system_privileges(&self) -> &PrivilegeMap {
2138        &self.state.system_privileges
2139    }
2140
2141    fn get_default_privileges(
2142        &self,
2143    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2144        self.state
2145            .default_privileges
2146            .iter()
2147            .map(|(object, acl_items)| (object, acl_items.collect()))
2148            .collect()
2149    }
2150
2151    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2152        self.state.find_available_name(name, &self.conn_id)
2153    }
2154
2155    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2156        self.state.resolve_full_name(name, Some(&self.conn_id))
2157    }
2158
2159    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2160        self.state.resolve_full_schema_name(name)
2161    }
2162
2163    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2164        self.state.get_entry_by_global_id(global_id).id()
2165    }
2166
2167    fn resolve_global_id(
2168        &self,
2169        item_id: &CatalogItemId,
2170        version: RelationVersionSelector,
2171    ) -> GlobalId {
2172        self.state
2173            .get_entry(item_id)
2174            .at_version(version)
2175            .global_id()
2176    }
2177
2178    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2179        self.state.config()
2180    }
2181
2182    fn now(&self) -> EpochMillis {
2183        (self.state.config().now)()
2184    }
2185
2186    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2187        self.state.aws_privatelink_availability_zones.clone()
2188    }
2189
2190    fn system_vars(&self) -> &SystemVars {
2191        &self.state.system_configuration
2192    }
2193
2194    fn system_vars_mut(&mut self) -> &mut SystemVars {
2195        &mut self.state.to_mut().system_configuration
2196    }
2197
2198    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2199        self.state().get_owner_id(id, self.conn_id())
2200    }
2201
2202    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2203        match id {
2204            SystemObjectId::System => Some(&self.state.system_privileges),
2205            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2206                Some(self.get_cluster(*id).privileges())
2207            }
2208            SystemObjectId::Object(ObjectId::Database(id)) => {
2209                Some(self.get_database(id).privileges())
2210            }
2211            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2212                Some(self.get_schema(database_spec, schema_spec).privileges())
2213            }
2214            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2215            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2216                Some(self.get_network_policy(id).privileges())
2217            }
2218            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2219            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2220        }
2221    }
2222
2223    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2224        let mut seen = BTreeSet::new();
2225        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2226    }
2227
2228    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2229        let mut seen = BTreeSet::new();
2230        self.state.item_dependents(id, &mut seen)
2231    }
2232
2233    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2234        rbac::all_object_privileges(object_type)
2235    }
2236
2237    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2238        self.state.get_object_type(object_id)
2239    }
2240
2241    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2242        self.state.get_system_object_type(id)
2243    }
2244
2245    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2246    /// the object.
2247    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2248        let database_id = match &qualified_name.qualifiers.database_spec {
2249            ResolvedDatabaseSpecifier::Ambient => None,
2250            ResolvedDatabaseSpecifier::Id(id)
2251                if self.database.is_some() && self.database == Some(*id) =>
2252            {
2253                None
2254            }
2255            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2256        };
2257
2258        let schema_spec = if database_id.is_none()
2259            && self.resolve_item_name(&PartialItemName {
2260                database: None,
2261                schema: None,
2262                item: qualified_name.item.clone(),
2263            }) == Ok(qualified_name)
2264            || self.resolve_function_name(&PartialItemName {
2265                database: None,
2266                schema: None,
2267                item: qualified_name.item.clone(),
2268            }) == Ok(qualified_name)
2269            || self.resolve_type_name(&PartialItemName {
2270                database: None,
2271                schema: None,
2272                item: qualified_name.item.clone(),
2273            }) == Ok(qualified_name)
2274        {
2275            None
2276        } else {
2277            // If `search_path` does not contain `full_name.schema`, the
2278            // `PartialName` must contain it.
2279            Some(qualified_name.qualifiers.schema_spec.clone())
2280        };
2281
2282        let res = PartialItemName {
2283            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2284            schema: schema_spec.map(|spec| {
2285                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2286                    .name()
2287                    .schema
2288                    .clone()
2289            }),
2290            item: qualified_name.item.clone(),
2291        };
2292        assert!(
2293            self.resolve_item_name(&res) == Ok(qualified_name)
2294                || self.resolve_function_name(&res) == Ok(qualified_name)
2295                || self.resolve_type_name(&res) == Ok(qualified_name)
2296        );
2297        res
2298    }
2299
2300    fn add_notice(&self, notice: PlanNotice) {
2301        let _ = self.notices_tx.send(notice.into());
2302    }
2303
2304    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2305        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2306        self.state.comments.get_object_comments(comment_id)
2307    }
2308
2309    fn is_cluster_size_cc(&self, size: &str) -> bool {
2310        self.state
2311            .cluster_replica_sizes
2312            .0
2313            .get(size)
2314            .map_or(false, |a| a.is_cc)
2315    }
2316}
2317
2318#[cfg(test)]
2319mod tests {
2320    use std::collections::{BTreeMap, BTreeSet};
2321    use std::sync::Arc;
2322    use std::{env, iter};
2323
2324    use itertools::Itertools;
2325    use mz_catalog::memory::objects::CatalogItem;
2326    use tokio_postgres::NoTls;
2327    use tokio_postgres::types::Type;
2328    use uuid::Uuid;
2329
2330    use mz_catalog::SYSTEM_CONN_ID;
2331    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2332    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2333    use mz_controller_types::{ClusterId, ReplicaId};
2334    use mz_expr::MirScalarExpr;
2335    use mz_ore::now::to_datetime;
2336    use mz_ore::{assert_err, assert_ok, task};
2337    use mz_persist_client::PersistClient;
2338    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2339    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2340    use mz_repr::role_id::RoleId;
2341    use mz_repr::{
2342        CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2343        SqlScalarType, Timestamp,
2344    };
2345    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2346    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2347    use mz_sql::names::{
2348        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2349        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2350    };
2351    use mz_sql::plan::{
2352        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2353        Scope, StatementContext,
2354    };
2355    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2356    use mz_sql::session::vars::{SystemVars, VarInput};
2357
2358    use crate::catalog::state::LocalExpressionCache;
2359    use crate::catalog::{Catalog, Op};
2360    use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2361    use crate::session::Session;
2362
2363    /// System sessions have an empty `search_path` so it's necessary to
2364    /// schema-qualify all referenced items.
2365    ///
2366    /// Dummy (and ostensibly client) sessions contain system schemas in their
2367    /// search paths, so do not require schema qualification on system objects such
2368    /// as types.
2369    #[mz_ore::test(tokio::test)]
2370    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2371    async fn test_minimal_qualification() {
2372        Catalog::with_debug(|catalog| async move {
2373            struct TestCase {
2374                input: QualifiedItemName,
2375                system_output: PartialItemName,
2376                normal_output: PartialItemName,
2377            }
2378
2379            let test_cases = vec![
2380                TestCase {
2381                    input: QualifiedItemName {
2382                        qualifiers: ItemQualifiers {
2383                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2384                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2385                        },
2386                        item: "numeric".to_string(),
2387                    },
2388                    system_output: PartialItemName {
2389                        database: None,
2390                        schema: None,
2391                        item: "numeric".to_string(),
2392                    },
2393                    normal_output: PartialItemName {
2394                        database: None,
2395                        schema: None,
2396                        item: "numeric".to_string(),
2397                    },
2398                },
2399                TestCase {
2400                    input: QualifiedItemName {
2401                        qualifiers: ItemQualifiers {
2402                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2403                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2404                        },
2405                        item: "mz_array_types".to_string(),
2406                    },
2407                    system_output: PartialItemName {
2408                        database: None,
2409                        schema: None,
2410                        item: "mz_array_types".to_string(),
2411                    },
2412                    normal_output: PartialItemName {
2413                        database: None,
2414                        schema: None,
2415                        item: "mz_array_types".to_string(),
2416                    },
2417                },
2418            ];
2419
2420            for tc in test_cases {
2421                assert_eq!(
2422                    catalog
2423                        .for_system_session()
2424                        .minimal_qualification(&tc.input),
2425                    tc.system_output
2426                );
2427                assert_eq!(
2428                    catalog
2429                        .for_session(&Session::dummy())
2430                        .minimal_qualification(&tc.input),
2431                    tc.normal_output
2432                );
2433            }
2434            catalog.expire().await;
2435        })
2436        .await
2437    }
2438
2439    #[mz_ore::test(tokio::test)]
2440    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2441    async fn test_catalog_revision() {
2442        let persist_client = PersistClient::new_for_tests().await;
2443        let organization_id = Uuid::new_v4();
2444        let bootstrap_args = test_bootstrap_args();
2445        {
2446            let mut catalog = Catalog::open_debug_catalog(
2447                persist_client.clone(),
2448                organization_id.clone(),
2449                &bootstrap_args,
2450            )
2451            .await
2452            .expect("unable to open debug catalog");
2453            assert_eq!(catalog.transient_revision(), 1);
2454            let commit_ts = catalog.current_upper().await;
2455            catalog
2456                .transact(
2457                    None,
2458                    commit_ts,
2459                    None,
2460                    vec![Op::CreateDatabase {
2461                        name: "test".to_string(),
2462                        owner_id: MZ_SYSTEM_ROLE_ID,
2463                    }],
2464                )
2465                .await
2466                .expect("failed to transact");
2467            assert_eq!(catalog.transient_revision(), 2);
2468            catalog.expire().await;
2469        }
2470        {
2471            let catalog =
2472                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2473                    .await
2474                    .expect("unable to open debug catalog");
2475            // Re-opening the same catalog resets the transient_revision to 1.
2476            assert_eq!(catalog.transient_revision(), 1);
2477            catalog.expire().await;
2478        }
2479    }
2480
2481    #[mz_ore::test(tokio::test)]
2482    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2483    async fn test_effective_search_path() {
2484        Catalog::with_debug(|catalog| async move {
2485            let mz_catalog_schema = (
2486                ResolvedDatabaseSpecifier::Ambient,
2487                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2488            );
2489            let pg_catalog_schema = (
2490                ResolvedDatabaseSpecifier::Ambient,
2491                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2492            );
2493            let mz_temp_schema = (
2494                ResolvedDatabaseSpecifier::Ambient,
2495                SchemaSpecifier::Temporary,
2496            );
2497
2498            // Behavior with the default search_schema (public)
2499            let session = Session::dummy();
2500            let conn_catalog = catalog.for_session(&session);
2501            assert_ne!(
2502                conn_catalog.effective_search_path(false),
2503                conn_catalog.search_path
2504            );
2505            assert_ne!(
2506                conn_catalog.effective_search_path(true),
2507                conn_catalog.search_path
2508            );
2509            assert_eq!(
2510                conn_catalog.effective_search_path(false),
2511                vec![
2512                    mz_catalog_schema.clone(),
2513                    pg_catalog_schema.clone(),
2514                    conn_catalog.search_path[0].clone()
2515                ]
2516            );
2517            assert_eq!(
2518                conn_catalog.effective_search_path(true),
2519                vec![
2520                    mz_temp_schema.clone(),
2521                    mz_catalog_schema.clone(),
2522                    pg_catalog_schema.clone(),
2523                    conn_catalog.search_path[0].clone()
2524                ]
2525            );
2526
2527            // missing schemas are added when missing
2528            let mut session = Session::dummy();
2529            session
2530                .vars_mut()
2531                .set(
2532                    &SystemVars::new(),
2533                    "search_path",
2534                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2535                    false,
2536                )
2537                .expect("failed to set search_path");
2538            let conn_catalog = catalog.for_session(&session);
2539            assert_ne!(
2540                conn_catalog.effective_search_path(false),
2541                conn_catalog.search_path
2542            );
2543            assert_ne!(
2544                conn_catalog.effective_search_path(true),
2545                conn_catalog.search_path
2546            );
2547            assert_eq!(
2548                conn_catalog.effective_search_path(false),
2549                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2550            );
2551            assert_eq!(
2552                conn_catalog.effective_search_path(true),
2553                vec![
2554                    mz_temp_schema.clone(),
2555                    mz_catalog_schema.clone(),
2556                    pg_catalog_schema.clone()
2557                ]
2558            );
2559
2560            let mut session = Session::dummy();
2561            session
2562                .vars_mut()
2563                .set(
2564                    &SystemVars::new(),
2565                    "search_path",
2566                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2567                    false,
2568                )
2569                .expect("failed to set search_path");
2570            let conn_catalog = catalog.for_session(&session);
2571            assert_ne!(
2572                conn_catalog.effective_search_path(false),
2573                conn_catalog.search_path
2574            );
2575            assert_ne!(
2576                conn_catalog.effective_search_path(true),
2577                conn_catalog.search_path
2578            );
2579            assert_eq!(
2580                conn_catalog.effective_search_path(false),
2581                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2582            );
2583            assert_eq!(
2584                conn_catalog.effective_search_path(true),
2585                vec![
2586                    mz_temp_schema.clone(),
2587                    pg_catalog_schema.clone(),
2588                    mz_catalog_schema.clone()
2589                ]
2590            );
2591
2592            let mut session = Session::dummy();
2593            session
2594                .vars_mut()
2595                .set(
2596                    &SystemVars::new(),
2597                    "search_path",
2598                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2599                    false,
2600                )
2601                .expect("failed to set search_path");
2602            let conn_catalog = catalog.for_session(&session);
2603            assert_ne!(
2604                conn_catalog.effective_search_path(false),
2605                conn_catalog.search_path
2606            );
2607            assert_ne!(
2608                conn_catalog.effective_search_path(true),
2609                conn_catalog.search_path
2610            );
2611            assert_eq!(
2612                conn_catalog.effective_search_path(false),
2613                vec![
2614                    mz_catalog_schema.clone(),
2615                    pg_catalog_schema.clone(),
2616                    mz_temp_schema.clone()
2617                ]
2618            );
2619            assert_eq!(
2620                conn_catalog.effective_search_path(true),
2621                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2622            );
2623            catalog.expire().await;
2624        })
2625        .await
2626    }
2627
2628    #[mz_ore::test(tokio::test)]
2629    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2630    async fn test_normalized_create() {
2631        use mz_ore::collections::CollectionExt;
2632        Catalog::with_debug(|catalog| async move {
2633            let conn_catalog = catalog.for_system_session();
2634            let scx = &mut StatementContext::new(None, &conn_catalog);
2635
2636            let parsed = mz_sql_parser::parser::parse_statements(
2637                "create view public.foo as select 1 as bar",
2638            )
2639            .expect("")
2640            .into_element()
2641            .ast;
2642
2643            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2644
2645            // Ensure that all identifiers are quoted.
2646            assert_eq!(
2647                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2648                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2649            );
2650            catalog.expire().await;
2651        })
2652        .await;
2653    }
2654
2655    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2656    #[mz_ore::test(tokio::test)]
2657    #[cfg_attr(miri, ignore)] // slow
2658    async fn test_large_catalog_item() {
2659        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2660        let column = ", 1";
2661        let view_def_size = view_def.bytes().count();
2662        let column_size = column.bytes().count();
2663        let column_count =
2664            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2665        let columns = iter::repeat(column).take(column_count).join("");
2666        let create_sql = format!("{view_def}{columns})");
2667        let create_sql_check = create_sql.clone();
2668        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2669        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2670            &create_sql
2671        ));
2672
2673        let persist_client = PersistClient::new_for_tests().await;
2674        let organization_id = Uuid::new_v4();
2675        let id = CatalogItemId::User(1);
2676        let gid = GlobalId::User(1);
2677        let bootstrap_args = test_bootstrap_args();
2678        {
2679            let mut catalog = Catalog::open_debug_catalog(
2680                persist_client.clone(),
2681                organization_id.clone(),
2682                &bootstrap_args,
2683            )
2684            .await
2685            .expect("unable to open debug catalog");
2686            let item = catalog
2687                .state()
2688                .deserialize_item(
2689                    gid,
2690                    &create_sql,
2691                    &BTreeMap::new(),
2692                    &mut LocalExpressionCache::Closed,
2693                    None,
2694                )
2695                .expect("unable to parse view");
2696            let commit_ts = catalog.current_upper().await;
2697            catalog
2698                .transact(
2699                    None,
2700                    commit_ts,
2701                    None,
2702                    vec![Op::CreateItem {
2703                        item,
2704                        name: QualifiedItemName {
2705                            qualifiers: ItemQualifiers {
2706                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2707                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2708                            },
2709                            item: "v".to_string(),
2710                        },
2711                        id,
2712                        owner_id: MZ_SYSTEM_ROLE_ID,
2713                    }],
2714                )
2715                .await
2716                .expect("failed to transact");
2717            catalog.expire().await;
2718        }
2719        {
2720            let catalog =
2721                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2722                    .await
2723                    .expect("unable to open debug catalog");
2724            let view = catalog.get_entry(&id);
2725            assert_eq!("v", view.name.item);
2726            match &view.item {
2727                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2728                item => panic!("expected view, got {}", item.typ()),
2729            }
2730            catalog.expire().await;
2731        }
2732    }
2733
2734    #[mz_ore::test(tokio::test)]
2735    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2736    async fn test_object_type() {
2737        Catalog::with_debug(|catalog| async move {
2738            let conn_catalog = catalog.for_system_session();
2739
2740            assert_eq!(
2741                mz_sql::catalog::ObjectType::ClusterReplica,
2742                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2743                    ClusterId::user(1).expect("1 is a valid ID"),
2744                    ReplicaId::User(1)
2745                )))
2746            );
2747            assert_eq!(
2748                mz_sql::catalog::ObjectType::Role,
2749                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2750            );
2751            catalog.expire().await;
2752        })
2753        .await;
2754    }
2755
2756    #[mz_ore::test(tokio::test)]
2757    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2758    async fn test_get_privileges() {
2759        Catalog::with_debug(|catalog| async move {
2760            let conn_catalog = catalog.for_system_session();
2761
2762            assert_eq!(
2763                None,
2764                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2765                    ClusterId::user(1).expect("1 is a valid ID"),
2766                    ReplicaId::User(1),
2767                ))))
2768            );
2769            assert_eq!(
2770                None,
2771                conn_catalog
2772                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2773            );
2774            catalog.expire().await;
2775        })
2776        .await;
2777    }
2778
2779    #[mz_ore::test(tokio::test)]
2780    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2781    async fn verify_builtin_descs() {
2782        Catalog::with_debug(|catalog| async move {
2783            let conn_catalog = catalog.for_system_session();
2784
2785            let builtins_cfg = BuiltinsConfig {
2786                include_continual_tasks: true,
2787            };
2788            for builtin in BUILTINS::iter(&builtins_cfg) {
2789                let (schema, name, expected_desc) = match builtin {
2790                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2791                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2792                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2793                    Builtin::Log(_)
2794                    | Builtin::Type(_)
2795                    | Builtin::Func(_)
2796                    | Builtin::ContinualTask(_)
2797                    | Builtin::Index(_)
2798                    | Builtin::Connection(_) => continue,
2799                };
2800                let item = conn_catalog
2801                    .resolve_item(&PartialItemName {
2802                        database: None,
2803                        schema: Some(schema.to_string()),
2804                        item: name.to_string(),
2805                    })
2806                    .expect("unable to resolve item")
2807                    .at_version(RelationVersionSelector::Latest);
2808
2809                let full_name = conn_catalog.resolve_full_name(item.name());
2810                let actual_desc = item.desc(&full_name).expect("invalid item type");
2811                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2812                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2813                {
2814                    assert_eq!(
2815                        actual_name, expected_name,
2816                        "item {schema}.{name} column {index} name did not match its expected name"
2817                    );
2818                    assert_eq!(
2819                        actual_typ, expected_typ,
2820                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2821                    );
2822                }
2823                assert_eq!(
2824                    &*actual_desc, expected_desc,
2825                    "item {schema}.{name} did not match its expected RelationDesc"
2826                );
2827            }
2828            catalog.expire().await;
2829        })
2830        .await
2831    }
2832
2833    // Connect to a running Postgres server and verify that our builtin
2834    // types and functions match it, in addition to some other things.
2835    #[mz_ore::test(tokio::test)]
2836    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2837    async fn test_compare_builtins_postgres() {
2838        async fn inner(catalog: Catalog) {
2839            // Verify that all builtin functions:
2840            // - have a unique OID
2841            // - if they have a postgres counterpart (same oid) then they have matching name
2842            let (client, connection) = tokio_postgres::connect(
2843                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2844                NoTls,
2845            )
2846            .await
2847            .expect("failed to connect to Postgres");
2848
2849            task::spawn(|| "compare_builtin_postgres", async move {
2850                if let Err(e) = connection.await {
2851                    panic!("connection error: {}", e);
2852                }
2853            });
2854
2855            struct PgProc {
2856                name: String,
2857                arg_oids: Vec<u32>,
2858                ret_oid: Option<u32>,
2859                ret_set: bool,
2860            }
2861
2862            struct PgType {
2863                name: String,
2864                ty: String,
2865                elem: u32,
2866                array: u32,
2867                input: u32,
2868                receive: u32,
2869            }
2870
2871            struct PgOper {
2872                oprresult: u32,
2873                name: String,
2874            }
2875
2876            let pg_proc: BTreeMap<_, _> = client
2877                .query(
2878                    "SELECT
2879                    p.oid,
2880                    proname,
2881                    proargtypes,
2882                    prorettype,
2883                    proretset
2884                FROM pg_proc p
2885                JOIN pg_namespace n ON p.pronamespace = n.oid",
2886                    &[],
2887                )
2888                .await
2889                .expect("pg query failed")
2890                .into_iter()
2891                .map(|row| {
2892                    let oid: u32 = row.get("oid");
2893                    let pg_proc = PgProc {
2894                        name: row.get("proname"),
2895                        arg_oids: row.get("proargtypes"),
2896                        ret_oid: row.get("prorettype"),
2897                        ret_set: row.get("proretset"),
2898                    };
2899                    (oid, pg_proc)
2900                })
2901                .collect();
2902
2903            let pg_type: BTreeMap<_, _> = client
2904                .query(
2905                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2906                    &[],
2907                )
2908                .await
2909                .expect("pg query failed")
2910                .into_iter()
2911                .map(|row| {
2912                    let oid: u32 = row.get("oid");
2913                    let pg_type = PgType {
2914                        name: row.get("typname"),
2915                        ty: row.get("typtype"),
2916                        elem: row.get("typelem"),
2917                        array: row.get("typarray"),
2918                        input: row.get("typinput"),
2919                        receive: row.get("typreceive"),
2920                    };
2921                    (oid, pg_type)
2922                })
2923                .collect();
2924
2925            let pg_oper: BTreeMap<_, _> = client
2926                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2927                .await
2928                .expect("pg query failed")
2929                .into_iter()
2930                .map(|row| {
2931                    let oid: u32 = row.get("oid");
2932                    let pg_oper = PgOper {
2933                        name: row.get("oprname"),
2934                        oprresult: row.get("oprresult"),
2935                    };
2936                    (oid, pg_oper)
2937                })
2938                .collect();
2939
2940            let conn_catalog = catalog.for_system_session();
2941            let resolve_type_oid = |item: &str| {
2942                conn_catalog
2943                    .resolve_type(&PartialItemName {
2944                        database: None,
2945                        // All functions we check exist in PG, so the types must, as
2946                        // well
2947                        schema: Some(PG_CATALOG_SCHEMA.into()),
2948                        item: item.to_string(),
2949                    })
2950                    .expect("unable to resolve type")
2951                    .oid()
2952            };
2953
2954            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2955                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2956                .collect();
2957
2958            let mut all_oids = BTreeSet::new();
2959
2960            // A function to determine if two oids are equivalent enough for these tests. We don't
2961            // support some types, so map exceptions here.
2962            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2963                [
2964                    // We don't support NAME.
2965                    (Type::NAME, Type::TEXT),
2966                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2967                    // We don't support time with time zone.
2968                    (Type::TIME, Type::TIMETZ),
2969                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2970                ]
2971                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2972            );
2973            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2974                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2975            ]);
2976            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2977                if ignore_return_types.contains(&fn_oid) {
2978                    return true;
2979                }
2980                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2981                    return true;
2982                }
2983                a == b
2984            };
2985
2986            let builtins_cfg = BuiltinsConfig {
2987                include_continual_tasks: true,
2988            };
2989            for builtin in BUILTINS::iter(&builtins_cfg) {
2990                match builtin {
2991                    Builtin::Type(ty) => {
2992                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2993
2994                        if ty.oid >= FIRST_MATERIALIZE_OID {
2995                            // High OIDs are reserved in Materialize and don't have
2996                            // PostgreSQL counterparts.
2997                            continue;
2998                        }
2999
3000                        // For types that have a PostgreSQL counterpart, verify that
3001                        // the name and oid match.
3002                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3003                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3004                        });
3005                        assert_eq!(
3006                            ty.name, pg_ty.name,
3007                            "oid {} has name {} in postgres; expected {}",
3008                            ty.oid, pg_ty.name, ty.name,
3009                        );
3010
3011                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3012                            None => (0, 0),
3013                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3014                        };
3015                        assert_eq!(
3016                            typinput_oid, pg_ty.input,
3017                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3018                            ty.name, typinput_oid, pg_ty.input,
3019                        );
3020                        assert_eq!(
3021                            typreceive_oid, pg_ty.receive,
3022                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3023                            ty.name, typreceive_oid, pg_ty.receive,
3024                        );
3025                        if typinput_oid != 0 {
3026                            assert!(
3027                                func_oids.contains(&typinput_oid),
3028                                "type {} has typinput OID {} that does not exist in pg_proc",
3029                                ty.name,
3030                                typinput_oid,
3031                            );
3032                        }
3033                        if typreceive_oid != 0 {
3034                            assert!(
3035                                func_oids.contains(&typreceive_oid),
3036                                "type {} has typreceive OID {} that does not exist in pg_proc",
3037                                ty.name,
3038                                typreceive_oid,
3039                            );
3040                        }
3041
3042                        // Ensure the type matches.
3043                        match &ty.details.typ {
3044                            CatalogType::Array { element_reference } => {
3045                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3046                                    .filter_map(|builtin| match builtin {
3047                                        Builtin::Type(ty @ BuiltinType { name, .. })
3048                                            if element_reference == name =>
3049                                        {
3050                                            Some(ty)
3051                                        }
3052                                        _ => None,
3053                                    })
3054                                    .next();
3055                                let elem_ty = match elem_ty {
3056                                    Some(ty) => ty,
3057                                    None => {
3058                                        panic!("{} is unexpectedly not a type", element_reference)
3059                                    }
3060                                };
3061                                assert_eq!(
3062                                    pg_ty.elem, elem_ty.oid,
3063                                    "type {} has mismatched element OIDs",
3064                                    ty.name
3065                                )
3066                            }
3067                            CatalogType::Pseudo => {
3068                                assert_eq!(
3069                                    pg_ty.ty, "p",
3070                                    "type {} is not a pseudo type as expected",
3071                                    ty.name
3072                                )
3073                            }
3074                            CatalogType::Range { .. } => {
3075                                assert_eq!(
3076                                    pg_ty.ty, "r",
3077                                    "type {} is not a range type as expected",
3078                                    ty.name
3079                                );
3080                            }
3081                            _ => {
3082                                assert_eq!(
3083                                    pg_ty.ty, "b",
3084                                    "type {} is not a base type as expected",
3085                                    ty.name
3086                                )
3087                            }
3088                        }
3089
3090                        // Ensure the array type reference is correct.
3091                        let schema = catalog
3092                            .resolve_schema_in_database(
3093                                &ResolvedDatabaseSpecifier::Ambient,
3094                                ty.schema,
3095                                &SYSTEM_CONN_ID,
3096                            )
3097                            .expect("unable to resolve schema");
3098                        let allocated_type = catalog
3099                            .resolve_type(
3100                                None,
3101                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3102                                &PartialItemName {
3103                                    database: None,
3104                                    schema: Some(schema.name().schema.clone()),
3105                                    item: ty.name.to_string(),
3106                                },
3107                                &SYSTEM_CONN_ID,
3108                            )
3109                            .expect("unable to resolve type");
3110                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3111                            ty
3112                        } else {
3113                            panic!("unexpectedly not a type")
3114                        };
3115                        match ty.details.array_id {
3116                            Some(array_id) => {
3117                                let array_ty = catalog.get_entry(&array_id);
3118                                assert_eq!(
3119                                    pg_ty.array, array_ty.oid,
3120                                    "type {} has mismatched array OIDs",
3121                                    allocated_type.name.item,
3122                                );
3123                            }
3124                            None => assert_eq!(
3125                                pg_ty.array, 0,
3126                                "type {} does not have an array type in mz but does in pg",
3127                                allocated_type.name.item,
3128                            ),
3129                        }
3130                    }
3131                    Builtin::Func(func) => {
3132                        for imp in func.inner.func_impls() {
3133                            assert!(
3134                                all_oids.insert(imp.oid),
3135                                "{} reused oid {}",
3136                                func.name,
3137                                imp.oid
3138                            );
3139
3140                            assert!(
3141                                imp.oid < FIRST_USER_OID,
3142                                "built-in function {} erroneously has OID in user space ({})",
3143                                func.name,
3144                                imp.oid,
3145                            );
3146
3147                            // For functions that have a postgres counterpart, verify that the name and
3148                            // oid match.
3149                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3150                                continue;
3151                            } else {
3152                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3153                                    panic!(
3154                                        "pg_proc missing function {}: oid {}",
3155                                        func.name, imp.oid
3156                                    )
3157                                })
3158                            };
3159                            assert_eq!(
3160                                func.name, pg_fn.name,
3161                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3162                                imp.oid, func.name, pg_fn.name
3163                            );
3164
3165                            // Complain, but don't fail, if argument oids don't match.
3166                            // TODO: make these match.
3167                            let imp_arg_oids = imp
3168                                .arg_typs
3169                                .iter()
3170                                .map(|item| resolve_type_oid(item))
3171                                .collect::<Vec<_>>();
3172
3173                            if imp_arg_oids != pg_fn.arg_oids {
3174                                println!(
3175                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3176                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3177                                );
3178                            }
3179
3180                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3181
3182                            assert!(
3183                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3184                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3185                                imp.oid,
3186                                func.name,
3187                                imp_return_oid,
3188                                pg_fn.ret_oid
3189                            );
3190
3191                            assert_eq!(
3192                                imp.return_is_set, pg_fn.ret_set,
3193                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3194                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3195                            );
3196                        }
3197                    }
3198                    _ => (),
3199                }
3200            }
3201
3202            for (op, func) in OP_IMPLS.iter() {
3203                for imp in func.func_impls() {
3204                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3205
3206                    // For operators that have a postgres counterpart, verify that the name and oid match.
3207                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3208                        continue;
3209                    } else {
3210                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3211                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3212                        })
3213                    };
3214
3215                    assert_eq!(*op, pg_op.name);
3216
3217                    let imp_return_oid =
3218                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3219                    if imp_return_oid != pg_op.oprresult {
3220                        panic!(
3221                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3222                            imp.oid, op, imp_return_oid, pg_op.oprresult
3223                        );
3224                    }
3225                }
3226            }
3227            catalog.expire().await;
3228        }
3229
3230        Catalog::with_debug(inner).await
3231    }
3232
3233    // Execute all builtin functions with all combinations of arguments from interesting datums.
3234    #[mz_ore::test(tokio::test)]
3235    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3236    async fn test_smoketest_all_builtins() {
3237        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3238            let catalog = Arc::new(catalog);
3239            let conn_catalog = catalog.for_system_session();
3240
3241            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3242            let mut handles = Vec::new();
3243
3244            // Extracted during planning; always panics when executed.
3245            let ignore_names = BTreeSet::from([
3246                "avg",
3247                "avg_internal_v1",
3248                "bool_and",
3249                "bool_or",
3250                "has_table_privilege", // > 3 s each
3251                "has_type_privilege",  // > 3 s each
3252                "mod",
3253                "mz_panic",
3254                "mz_sleep",
3255                "pow",
3256                "stddev_pop",
3257                "stddev_samp",
3258                "stddev",
3259                "var_pop",
3260                "var_samp",
3261                "variance",
3262            ]);
3263
3264            let fns = BUILTINS::funcs()
3265                .map(|func| (&func.name, func.inner))
3266                .chain(OP_IMPLS.iter());
3267
3268            for (name, func) in fns {
3269                if ignore_names.contains(name) {
3270                    continue;
3271                }
3272                let Func::Scalar(impls) = func else {
3273                    continue;
3274                };
3275
3276                'outer: for imp in impls {
3277                    let details = imp.details();
3278                    let mut styps = Vec::new();
3279                    for item in details.arg_typs.iter() {
3280                        let oid = resolve_type_oid(item);
3281                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3282                            continue 'outer;
3283                        };
3284                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3285                    }
3286                    let datums = styps
3287                        .iter()
3288                        .map(|styp| {
3289                            let mut datums = vec![Datum::Null];
3290                            datums.extend(styp.interesting_datums());
3291                            datums
3292                        })
3293                        .collect::<Vec<_>>();
3294                    // Skip nullary fns.
3295                    if datums.is_empty() {
3296                        continue;
3297                    }
3298
3299                    let return_oid = details
3300                        .return_typ
3301                        .map(resolve_type_oid)
3302                        .expect("must exist");
3303                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3304                        .ok()
3305                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3306
3307                    let mut idxs = vec![0; datums.len()];
3308                    while idxs[0] < datums[0].len() {
3309                        let mut args = Vec::with_capacity(idxs.len());
3310                        for i in 0..(datums.len()) {
3311                            args.push(datums[i][idxs[i]]);
3312                        }
3313
3314                        let op = &imp.op;
3315                        let scalars = args
3316                            .iter()
3317                            .enumerate()
3318                            .map(|(i, datum)| {
3319                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3320                                    datum.clone(),
3321                                    styps[i].clone(),
3322                                ))
3323                            })
3324                            .collect();
3325
3326                        let call_name = format!(
3327                            "{name}({}) (oid: {})",
3328                            args.iter()
3329                                .map(|d| d.to_string())
3330                                .collect::<Vec<_>>()
3331                                .join(", "),
3332                            imp.oid
3333                        );
3334                        let catalog = Arc::clone(&catalog);
3335                        let call_name_fn = call_name.clone();
3336                        let return_styp = return_styp.clone();
3337                        let handle = task::spawn_blocking(
3338                            || call_name,
3339                            move || {
3340                                smoketest_fn(
3341                                    name,
3342                                    call_name_fn,
3343                                    op,
3344                                    imp,
3345                                    args,
3346                                    catalog,
3347                                    scalars,
3348                                    return_styp,
3349                                )
3350                            },
3351                        );
3352                        handles.push(handle);
3353
3354                        // Advance to the next datum combination.
3355                        for i in (0..datums.len()).rev() {
3356                            idxs[i] += 1;
3357                            if idxs[i] >= datums[i].len() {
3358                                if i == 0 {
3359                                    break;
3360                                }
3361                                idxs[i] = 0;
3362                                continue;
3363                            } else {
3364                                break;
3365                            }
3366                        }
3367                    }
3368                }
3369            }
3370            handles
3371        }
3372
3373        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3374        for handle in handles {
3375            handle.await.expect("must succeed");
3376        }
3377    }
3378
3379    fn smoketest_fn(
3380        name: &&str,
3381        call_name: String,
3382        op: &Operation<HirScalarExpr>,
3383        imp: &FuncImpl<HirScalarExpr>,
3384        args: Vec<Datum<'_>>,
3385        catalog: Arc<Catalog>,
3386        scalars: Vec<CoercibleScalarExpr>,
3387        return_styp: Option<SqlScalarType>,
3388    ) {
3389        let conn_catalog = catalog.for_system_session();
3390        let pcx = PlanContext::zero();
3391        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3392        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3393        let ecx = ExprContext {
3394            qcx: &qcx,
3395            name: "smoketest",
3396            scope: &Scope::empty(),
3397            relation_type: &SqlRelationType::empty(),
3398            allow_aggregates: false,
3399            allow_subqueries: false,
3400            allow_parameters: false,
3401            allow_windows: false,
3402        };
3403        let arena = RowArena::new();
3404        let mut session = Session::<Timestamp>::dummy();
3405        session
3406            .start_transaction(to_datetime(0), None, None)
3407            .expect("must succeed");
3408        let prep_style = ExprPrepStyle::OneShot {
3409            logical_time: EvalTime::Time(Timestamp::MIN),
3410            session: &session,
3411            catalog_state: &catalog.state,
3412        };
3413
3414        // Execute the function as much as possible, ensuring no panics occur, but
3415        // otherwise ignoring eval errors. We also do various other checks.
3416        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3417        if let Ok(hir) = res {
3418            if let Ok(mut mir) = hir.lower_uncorrelated() {
3419                // Populate unmaterialized functions.
3420                prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3421
3422                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3423                    if let Some(return_styp) = return_styp {
3424                        let mir_typ = mir.typ(&[]);
3425                        // MIR type inference should be consistent with the type
3426                        // we get from the catalog.
3427                        assert_eq!(mir_typ.scalar_type, return_styp);
3428                        // The following will check not just that the scalar type
3429                        // is ok, but also catches if the function returned a null
3430                        // but the MIR type inference said "non-nullable".
3431                        if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3432                            panic!(
3433                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3434                            );
3435                        }
3436                        // Check the consistency of `introduces_nulls` and
3437                        // `propagates_nulls` with `MirScalarExpr::typ`.
3438                        if let Some((introduces_nulls, propagates_nulls)) =
3439                            call_introduces_propagates_nulls(&mir)
3440                        {
3441                            if introduces_nulls {
3442                                // If the function introduces_nulls, then the return
3443                                // type should always be nullable, regardless of
3444                                // the nullability of the input types.
3445                                assert!(
3446                                    mir_typ.nullable,
3447                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3448                                    name, args, mir, mir_typ.nullable
3449                                );
3450                            } else {
3451                                let any_input_null = args.iter().any(|arg| arg.is_null());
3452                                if !any_input_null {
3453                                    assert!(
3454                                        !mir_typ.nullable,
3455                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3456                                        name, args, mir, mir_typ.nullable
3457                                    );
3458                                } else {
3459                                    assert_eq!(
3460                                        mir_typ.nullable, propagates_nulls,
3461                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3462                                        name, args, mir, mir_typ.nullable
3463                                    );
3464                                }
3465                            }
3466                        }
3467                        // Check that `MirScalarExpr::reduce` yields the same result
3468                        // as the real evaluation.
3469                        let mut reduced = mir.clone();
3470                        reduced.reduce(&[]);
3471                        match reduced {
3472                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3473                                match reduce_result {
3474                                    Ok(reduce_result_row) => {
3475                                        let reduce_result_datum = reduce_result_row.unpack_first();
3476                                        assert_eq!(
3477                                            reduce_result_datum,
3478                                            eval_result_datum,
3479                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3480                                            name,
3481                                            args,
3482                                            mir,
3483                                            eval_result_datum,
3484                                            mir_typ.scalar_type,
3485                                            reduce_result_datum,
3486                                            ctyp.scalar_type
3487                                        );
3488                                        // Let's check that the types also match.
3489                                        // (We are not checking nullability here,
3490                                        // because it's ok when we know a more
3491                                        // precise nullability after actually
3492                                        // evaluating a function than before.)
3493                                        assert_eq!(
3494                                            ctyp.scalar_type,
3495                                            mir_typ.scalar_type,
3496                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3497                                            name,
3498                                            args,
3499                                            mir,
3500                                            eval_result_datum,
3501                                            mir_typ.scalar_type,
3502                                            reduce_result_datum,
3503                                            ctyp.scalar_type
3504                                        );
3505                                    }
3506                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3507                                }
3508                            }
3509                            _ => unreachable!(
3510                                "all args are literals, so should have reduced to a literal"
3511                            ),
3512                        }
3513                    }
3514                }
3515            }
3516        }
3517    }
3518
3519    /// If the given MirScalarExpr
3520    ///  - is a function call, and
3521    ///  - all arguments are literals
3522    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3523    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3524        match mir_func_call {
3525            MirScalarExpr::CallUnary { func, expr } => {
3526                if expr.is_literal() {
3527                    Some((func.introduces_nulls(), func.propagates_nulls()))
3528                } else {
3529                    None
3530                }
3531            }
3532            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3533                if expr1.is_literal() && expr2.is_literal() {
3534                    Some((func.introduces_nulls(), func.propagates_nulls()))
3535                } else {
3536                    None
3537                }
3538            }
3539            MirScalarExpr::CallVariadic { func, exprs } => {
3540                if exprs.iter().all(|arg| arg.is_literal()) {
3541                    Some((func.introduces_nulls(), func.propagates_nulls()))
3542                } else {
3543                    None
3544                }
3545            }
3546            _ => None,
3547        }
3548    }
3549
3550    // Make sure pg views don't use types that only exist in Materialize.
3551    #[mz_ore::test(tokio::test)]
3552    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3553    async fn test_pg_views_forbidden_types() {
3554        Catalog::with_debug(|catalog| async move {
3555            let conn_catalog = catalog.for_system_session();
3556
3557            for view in BUILTINS::views().filter(|view| {
3558                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3559            }) {
3560                let item = conn_catalog
3561                    .resolve_item(&PartialItemName {
3562                        database: None,
3563                        schema: Some(view.schema.to_string()),
3564                        item: view.name.to_string(),
3565                    })
3566                    .expect("unable to resolve view")
3567                    // TODO(alter_table)
3568                    .at_version(RelationVersionSelector::Latest);
3569                let full_name = conn_catalog.resolve_full_name(item.name());
3570                for col_type in item
3571                    .desc(&full_name)
3572                    .expect("invalid item type")
3573                    .iter_types()
3574                {
3575                    match &col_type.scalar_type {
3576                        typ @ SqlScalarType::UInt16
3577                        | typ @ SqlScalarType::UInt32
3578                        | typ @ SqlScalarType::UInt64
3579                        | typ @ SqlScalarType::MzTimestamp
3580                        | typ @ SqlScalarType::List { .. }
3581                        | typ @ SqlScalarType::Map { .. }
3582                        | typ @ SqlScalarType::MzAclItem => {
3583                            panic!("{typ:?} type found in {full_name}");
3584                        }
3585                        SqlScalarType::AclItem
3586                        | SqlScalarType::Bool
3587                        | SqlScalarType::Int16
3588                        | SqlScalarType::Int32
3589                        | SqlScalarType::Int64
3590                        | SqlScalarType::Float32
3591                        | SqlScalarType::Float64
3592                        | SqlScalarType::Numeric { .. }
3593                        | SqlScalarType::Date
3594                        | SqlScalarType::Time
3595                        | SqlScalarType::Timestamp { .. }
3596                        | SqlScalarType::TimestampTz { .. }
3597                        | SqlScalarType::Interval
3598                        | SqlScalarType::PgLegacyChar
3599                        | SqlScalarType::Bytes
3600                        | SqlScalarType::String
3601                        | SqlScalarType::Char { .. }
3602                        | SqlScalarType::VarChar { .. }
3603                        | SqlScalarType::Jsonb
3604                        | SqlScalarType::Uuid
3605                        | SqlScalarType::Array(_)
3606                        | SqlScalarType::Record { .. }
3607                        | SqlScalarType::Oid
3608                        | SqlScalarType::RegProc
3609                        | SqlScalarType::RegType
3610                        | SqlScalarType::RegClass
3611                        | SqlScalarType::Int2Vector
3612                        | SqlScalarType::Range { .. }
3613                        | SqlScalarType::PgLegacyName => {}
3614                    }
3615                }
3616            }
3617            catalog.expire().await;
3618        })
3619        .await
3620    }
3621
3622    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3623    // introspection relations.
3624    #[mz_ore::test(tokio::test)]
3625    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3626    async fn test_mz_introspection_builtins() {
3627        Catalog::with_debug(|catalog| async move {
3628            let conn_catalog = catalog.for_system_session();
3629
3630            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3631            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3632
3633            for entry in catalog.entries() {
3634                let schema_spec = entry.name().qualifiers.schema_spec;
3635                let introspection_deps = catalog.introspection_dependencies(entry.id);
3636                if introspection_deps.is_empty() {
3637                    assert!(
3638                        schema_spec != introspection_schema_spec,
3639                        "entry does not depend on introspection sources but is in \
3640                         `mz_introspection`: {}",
3641                        conn_catalog.resolve_full_name(entry.name()),
3642                    );
3643                } else {
3644                    assert!(
3645                        schema_spec == introspection_schema_spec,
3646                        "entry depends on introspection sources but is not in \
3647                         `mz_introspection`: {}",
3648                        conn_catalog.resolve_full_name(entry.name()),
3649                    );
3650                }
3651            }
3652        })
3653        .await
3654    }
3655
3656    #[mz_ore::test(tokio::test)]
3657    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3658    async fn test_multi_subscriber_catalog() {
3659        let persist_client = PersistClient::new_for_tests().await;
3660        let bootstrap_args = test_bootstrap_args();
3661        let organization_id = Uuid::new_v4();
3662        let db_name = "DB";
3663
3664        let mut writer_catalog = Catalog::open_debug_catalog(
3665            persist_client.clone(),
3666            organization_id.clone(),
3667            &bootstrap_args,
3668        )
3669        .await
3670        .expect("open_debug_catalog");
3671        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3672            persist_client.clone(),
3673            organization_id.clone(),
3674            &bootstrap_args,
3675        )
3676        .await
3677        .expect("open_debug_read_only_catalog");
3678        assert_err!(writer_catalog.resolve_database(db_name));
3679        assert_err!(read_only_catalog.resolve_database(db_name));
3680
3681        let commit_ts = writer_catalog.current_upper().await;
3682        writer_catalog
3683            .transact(
3684                None,
3685                commit_ts,
3686                None,
3687                vec![Op::CreateDatabase {
3688                    name: db_name.to_string(),
3689                    owner_id: MZ_SYSTEM_ROLE_ID,
3690                }],
3691            )
3692            .await
3693            .expect("failed to transact");
3694
3695        let write_db = writer_catalog
3696            .resolve_database(db_name)
3697            .expect("resolve_database");
3698        read_only_catalog
3699            .sync_to_current_updates()
3700            .await
3701            .expect("sync_to_current_updates");
3702        let read_db = read_only_catalog
3703            .resolve_database(db_name)
3704            .expect("resolve_database");
3705
3706        assert_eq!(write_db, read_db);
3707
3708        let writer_catalog_fencer =
3709            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3710                .await
3711                .expect("open_debug_catalog for fencer");
3712        let fencer_db = writer_catalog_fencer
3713            .resolve_database(db_name)
3714            .expect("resolve_database for fencer");
3715        assert_eq!(fencer_db, read_db);
3716
3717        let write_fence_err = writer_catalog
3718            .sync_to_current_updates()
3719            .await
3720            .expect_err("sync_to_current_updates for fencer");
3721        assert!(matches!(
3722            write_fence_err,
3723            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3724        ));
3725        let read_fence_err = read_only_catalog
3726            .sync_to_current_updates()
3727            .await
3728            .expect_err("sync_to_current_updates after fencer");
3729        assert!(matches!(
3730            read_fence_err,
3731            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3732        ));
3733
3734        writer_catalog.expire().await;
3735        read_only_catalog.expire().await;
3736        writer_catalog_fencer.expire().await;
3737    }
3738}