mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67    CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68    DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use uuid::Uuid;
89
90// DO NOT add any more imports from `crate` outside of `crate::catalog`.
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
92pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
93pub use crate::catalog::state::CatalogState;
94pub use crate::catalog::transact::{
95    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
96};
97use crate::command::CatalogDump;
98use crate::coord::TargetCluster;
99#[cfg(test)]
100use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
101use crate::session::{Portal, PreparedStatement, Session};
102use crate::util::ResultExt;
103use crate::{AdapterError, AdapterNotice, ExecuteResponse};
104
105mod builtin_table_updates;
106pub(crate) mod consistency;
107mod migrate;
108
109mod apply;
110mod open;
111mod state;
112mod timeline;
113mod transact;
114
115/// A `Catalog` keeps track of the SQL objects known to the planner.
116///
117/// For each object, it keeps track of both forward and reverse dependencies:
118/// i.e., which objects are depended upon by the object, and which objects
119/// depend upon the object. It enforces the SQL rules around dropping: an object
120/// cannot be dropped until all of the objects that depend upon it are dropped.
121/// It also enforces uniqueness of names.
122///
123/// SQL mandates a hierarchy of exactly three layers. A catalog contains
124/// databases, databases contain schemas, and schemas contain catalog items,
125/// like sources, sinks, view, and indexes.
126///
127/// To the outside world, databases, schemas, and items are all identified by
128/// name. Items can be referred to by their [`FullItemName`], which fully and
129/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
130/// database name and/or the schema name. Partial names can be converted into
131/// full names via a complicated resolution process documented by the
132/// [`CatalogState::resolve`] method.
133///
134/// The catalog also maintains special "ambient schemas": virtual schemas,
135/// implicitly present in all databases, that house various system views.
136/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
137#[derive(Debug)]
138pub struct Catalog {
139    state: CatalogState,
140    plans: CatalogPlans,
141    expr_cache_handle: Option<ExpressionCacheHandle>,
142    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
143    transient_revision: u64,
144}
145
146// Implement our own Clone because derive can't unless S is Clone, which it's
147// not (hence the Arc).
148impl Clone for Catalog {
149    fn clone(&self) -> Self {
150        Self {
151            state: self.state.clone(),
152            plans: self.plans.clone(),
153            expr_cache_handle: self.expr_cache_handle.clone(),
154            storage: Arc::clone(&self.storage),
155            transient_revision: self.transient_revision,
156        }
157    }
158}
159
160#[derive(Default, Debug, Clone)]
161pub struct CatalogPlans {
162    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
163    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
164    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
165    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
166}
167
168impl Catalog {
169    /// Set the optimized plan for the item identified by `id`.
170    #[mz_ore::instrument(level = "trace")]
171    pub fn set_optimized_plan(
172        &mut self,
173        id: GlobalId,
174        plan: DataflowDescription<OptimizedMirRelationExpr>,
175    ) {
176        self.plans.optimized_plan_by_id.insert(id, plan.into());
177    }
178
179    /// Set the physical plan for the item identified by `id`.
180    #[mz_ore::instrument(level = "trace")]
181    pub fn set_physical_plan(
182        &mut self,
183        id: GlobalId,
184        plan: DataflowDescription<mz_compute_types::plan::Plan>,
185    ) {
186        self.plans.physical_plan_by_id.insert(id, plan.into());
187    }
188
189    /// Try to get the optimized plan for the item identified by `id`.
190    #[mz_ore::instrument(level = "trace")]
191    pub fn try_get_optimized_plan(
192        &self,
193        id: &GlobalId,
194    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
195        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
196    }
197
198    /// Try to get the physical plan for the item identified by `id`.
199    #[mz_ore::instrument(level = "trace")]
200    pub fn try_get_physical_plan(
201        &self,
202        id: &GlobalId,
203    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
204        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
205    }
206
207    /// Set the `DataflowMetainfo` for the item identified by `id`.
208    #[mz_ore::instrument(level = "trace")]
209    pub fn set_dataflow_metainfo(
210        &mut self,
211        id: GlobalId,
212        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
213    ) {
214        // Add entries to the `notices_by_dep_id` lookup map.
215        for notice in metainfo.optimizer_notices.iter() {
216            for dep_id in notice.dependencies.iter() {
217                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
218                entry.push(Arc::clone(notice))
219            }
220            if let Some(item_id) = notice.item_id {
221                soft_assert_eq_or_log!(
222                    item_id,
223                    id,
224                    "notice.item_id should match the id for whom we are saving the notice"
225                );
226            }
227        }
228        // Add the dataflow with the scoped entries.
229        self.plans.dataflow_metainfos.insert(id, metainfo);
230    }
231
232    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
233    #[mz_ore::instrument(level = "trace")]
234    pub fn try_get_dataflow_metainfo(
235        &self,
236        id: &GlobalId,
237    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
238        self.plans.dataflow_metainfos.get(id)
239    }
240
241    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
242    /// item identified by `id`.
243    ///
244    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
245    ///
246    /// Return a set containing all dropped notices. Note that if for some
247    /// reason we end up with two identical notices being dropped by the same
248    /// call, the result will contain only one instance of that notice.
249    #[mz_ore::instrument(level = "trace")]
250    pub fn drop_plans_and_metainfos(
251        &mut self,
252        drop_ids: &BTreeSet<GlobalId>,
253    ) -> BTreeSet<Arc<OptimizerNotice>> {
254        // Collect dropped notices in this set.
255        let mut dropped_notices = BTreeSet::new();
256
257        // Remove plans and metainfo.optimizer_notices entries.
258        for id in drop_ids {
259            self.plans.optimized_plan_by_id.remove(id);
260            self.plans.physical_plan_by_id.remove(id);
261            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
262                soft_assert_or_log!(
263                    metainfo.optimizer_notices.iter().all_unique(),
264                    "should have been pushed there by `push_optimizer_notice_dedup`"
265                );
266                for n in metainfo.optimizer_notices.drain(..) {
267                    // Remove the corresponding notices_by_dep_id entries.
268                    for dep_id in n.dependencies.iter() {
269                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
270                            soft_assert_or_log!(
271                                notices.iter().any(|x| &n == x),
272                                "corrupt notices_by_dep_id"
273                            );
274                            notices.retain(|x| &n != x)
275                        }
276                    }
277                    dropped_notices.insert(n);
278                }
279            }
280        }
281
282        // Remove notices_by_dep_id entries.
283        for id in drop_ids {
284            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
285                for n in notices.drain(..) {
286                    // Remove the corresponding metainfo.optimizer_notices entries.
287                    if let Some(item_id) = n.item_id.as_ref() {
288                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
289                            metainfo.optimizer_notices.iter().for_each(|n2| {
290                                if let Some(item_id_2) = n2.item_id {
291                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
292                                }
293                            });
294                            metainfo.optimizer_notices.retain(|x| &n != x);
295                        }
296                    }
297                    dropped_notices.insert(n);
298                }
299            }
300        }
301
302        // Collect dependency ids not in drop_ids with at least one dropped
303        // notice.
304        let mut todo_dep_ids = BTreeSet::new();
305        for notice in dropped_notices.iter() {
306            for dep_id in notice.dependencies.iter() {
307                if !drop_ids.contains(dep_id) {
308                    todo_dep_ids.insert(*dep_id);
309                }
310            }
311        }
312        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
313        // entries in `todo_dep_ids`.
314        for id in todo_dep_ids {
315            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
316                notices.retain(|n| !dropped_notices.contains(n))
317            }
318        }
319
320        // (We used to have a sanity check here that
321        // `dropped_notices.iter().any(|n| Arc::strong_count(n) != 1)`
322        // but this is not a correct assertion: There might be other clones of the catalog (e.g. if
323        // a peek is running concurrently to an index drop), in which case those clones also hold
324        // references to the notices.)
325
326        dropped_notices
327    }
328
329    /// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
330    /// as a result of creating new indexes on the items in `ons`.
331    ///
332    /// When creating and inserting a new index, we need to invalidate some entries that may
333    /// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
334    /// the following objects:
335    ///
336    ///   - `o`.
337    ///   - All compute objects that depend directly on `o`.
338    ///   - All compute objects that would directly depend on `o`, if all views were inlined.
339    pub(crate) fn invalidate_for_index(
340        &self,
341        ons: impl Iterator<Item = GlobalId>,
342    ) -> BTreeSet<GlobalId> {
343        let mut dependencies = BTreeSet::new();
344        let mut queue = VecDeque::new();
345        let mut seen = HashSet::new();
346        for on in ons {
347            let entry = self.get_entry_by_global_id(&on);
348            dependencies.insert(on);
349            seen.insert(entry.id);
350            let uses = entry.uses();
351            queue.extend(uses.clone());
352        }
353
354        while let Some(cur) = queue.pop_front() {
355            let entry = self.get_entry(&cur);
356            if seen.insert(cur) {
357                let global_ids = entry.global_ids();
358                match entry.item_type() {
359                    CatalogItemType::Table
360                    | CatalogItemType::Source
361                    | CatalogItemType::MaterializedView
362                    | CatalogItemType::Sink
363                    | CatalogItemType::Index
364                    | CatalogItemType::Type
365                    | CatalogItemType::Func
366                    | CatalogItemType::Secret
367                    | CatalogItemType::Connection
368                    | CatalogItemType::ContinualTask => {
369                        dependencies.extend(global_ids);
370                    }
371                    CatalogItemType::View => {
372                        dependencies.extend(global_ids);
373                        queue.extend(entry.uses());
374                    }
375                }
376            }
377        }
378        dependencies
379    }
380}
381
382#[derive(Debug)]
383pub struct ConnCatalog<'a> {
384    state: Cow<'a, CatalogState>,
385    /// Because we don't have any way of removing items from the catalog
386    /// temporarily, we allow the ConnCatalog to pretend that a set of items
387    /// don't exist during resolution.
388    ///
389    /// This feature is necessary to allow re-planning of statements, which is
390    /// either incredibly useful or required when altering item definitions.
391    ///
392    /// Note that uses of this field should be used by short-lived
393    /// catalogs.
394    unresolvable_ids: BTreeSet<CatalogItemId>,
395    conn_id: ConnectionId,
396    cluster: String,
397    database: Option<DatabaseId>,
398    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
399    role_id: RoleId,
400    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
401    portals: Option<&'a BTreeMap<String, Portal>>,
402    notices_tx: UnboundedSender<AdapterNotice>,
403}
404
405impl ConnCatalog<'_> {
406    pub fn conn_id(&self) -> &ConnectionId {
407        &self.conn_id
408    }
409
410    pub fn state(&self) -> &CatalogState {
411        &*self.state
412    }
413
414    /// Prevent planning from resolving item with the provided ID. Instead,
415    /// return an error as if the item did not exist.
416    ///
417    /// This feature is meant exclusively to permit re-planning statements
418    /// during update operations and should not be used otherwise given its
419    /// extremely "powerful" semantics.
420    ///
421    /// # Panics
422    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
423    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
424        assert_eq!(
425            self.role_id, MZ_SYSTEM_ROLE_ID,
426            "only the system role can mark IDs unresolvable",
427        );
428        self.unresolvable_ids.insert(id);
429    }
430
431    /// Returns the schemas:
432    /// - mz_catalog
433    /// - pg_catalog
434    /// - temp (if requested)
435    /// - all schemas from the session's search_path var that exist
436    pub fn effective_search_path(
437        &self,
438        include_temp_schema: bool,
439    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
440        self.state
441            .effective_search_path(&self.search_path, include_temp_schema)
442    }
443}
444
445impl ConnectionResolver for ConnCatalog<'_> {
446    fn resolve_connection(
447        &self,
448        id: CatalogItemId,
449    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
450        self.state().resolve_connection(id)
451    }
452}
453
454impl Catalog {
455    /// Returns the catalog's transient revision, which starts at 1 and is
456    /// incremented on every change. This is not persisted to disk, and will
457    /// restart on every load.
458    pub fn transient_revision(&self) -> u64 {
459        self.transient_revision
460    }
461
462    /// Creates a debug catalog from the current
463    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
464    /// like in tests.
465    ///
466    /// WARNING! This function can arbitrarily fail because it does not make any
467    /// effort to adjust the catalog's contents' structure or semantics to the
468    /// currently running version, i.e. it does not apply any migrations.
469    ///
470    /// This function must not be called in production contexts. Use
471    /// [`Catalog::open`] with appropriately set configuration parameters
472    /// instead.
473    pub async fn with_debug<F, Fut, T>(f: F) -> T
474    where
475        F: FnOnce(Catalog) -> Fut,
476        Fut: Future<Output = T>,
477    {
478        let persist_client = PersistClient::new_for_tests().await;
479        let organization_id = Uuid::new_v4();
480        let bootstrap_args = test_bootstrap_args();
481        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
482            .await
483            .expect("can open debug catalog");
484        f(catalog).await
485    }
486
487    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
488    /// in progress.
489    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
490    where
491        F: FnOnce(Catalog) -> Fut,
492        Fut: Future<Output = T>,
493    {
494        let persist_client = PersistClient::new_for_tests().await;
495        let organization_id = Uuid::new_v4();
496        let bootstrap_args = test_bootstrap_args();
497        let mut catalog =
498            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
499                .await
500                .expect("can open debug catalog");
501
502        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
503        let now = SYSTEM_TIME.clone();
504        let openable_storage = TestCatalogStateBuilder::new(persist_client)
505            .with_organization_id(organization_id)
506            .with_default_deploy_generation()
507            .build()
508            .await
509            .expect("can create durable catalog");
510        let mut storage = openable_storage
511            .open(now().into(), &bootstrap_args)
512            .await
513            .expect("can open durable catalog")
514            .0;
515        // Drain updates.
516        let _ = storage
517            .sync_to_current_updates()
518            .await
519            .expect("can sync to current updates");
520        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
521
522        f(catalog).await
523    }
524
525    /// Opens a debug catalog.
526    ///
527    /// See [`Catalog::with_debug`].
528    pub async fn open_debug_catalog(
529        persist_client: PersistClient,
530        organization_id: Uuid,
531        bootstrap_args: &BootstrapArgs,
532    ) -> Result<Catalog, anyhow::Error> {
533        let now = SYSTEM_TIME.clone();
534        let environment_id = None;
535        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
536            .with_organization_id(organization_id)
537            .with_default_deploy_generation()
538            .build()
539            .await?;
540        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
541        let system_parameter_defaults = BTreeMap::default();
542        Self::open_debug_catalog_inner(
543            persist_client,
544            storage,
545            now,
546            environment_id,
547            &DUMMY_BUILD_INFO,
548            system_parameter_defaults,
549            bootstrap_args,
550            None,
551        )
552        .await
553    }
554
555    /// Opens a read only debug persist backed catalog defined by `persist_client` and
556    /// `organization_id`.
557    ///
558    /// See [`Catalog::with_debug`].
559    pub async fn open_debug_read_only_catalog(
560        persist_client: PersistClient,
561        organization_id: Uuid,
562        bootstrap_args: &BootstrapArgs,
563    ) -> Result<Catalog, anyhow::Error> {
564        let now = SYSTEM_TIME.clone();
565        let environment_id = None;
566        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
567            .with_organization_id(organization_id)
568            .build()
569            .await?;
570        let storage = openable_storage
571            .open_read_only(&test_bootstrap_args())
572            .await?;
573        let system_parameter_defaults = BTreeMap::default();
574        Self::open_debug_catalog_inner(
575            persist_client,
576            storage,
577            now,
578            environment_id,
579            &DUMMY_BUILD_INFO,
580            system_parameter_defaults,
581            bootstrap_args,
582            None,
583        )
584        .await
585    }
586
587    /// Opens a read only debug persist backed catalog defined by `persist_client` and
588    /// `organization_id`.
589    ///
590    /// See [`Catalog::with_debug`].
591    pub async fn open_debug_read_only_persist_catalog_config(
592        persist_client: PersistClient,
593        now: NowFn,
594        environment_id: EnvironmentId,
595        system_parameter_defaults: BTreeMap<String, String>,
596        build_info: &'static BuildInfo,
597        bootstrap_args: &BootstrapArgs,
598        enable_expression_cache_override: Option<bool>,
599    ) -> Result<Catalog, anyhow::Error> {
600        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
601            .with_organization_id(environment_id.organization_id())
602            .with_version(
603                build_info
604                    .version
605                    .parse()
606                    .expect("build version is parseable"),
607            )
608            .build()
609            .await?;
610        let storage = openable_storage.open_read_only(bootstrap_args).await?;
611        Self::open_debug_catalog_inner(
612            persist_client,
613            storage,
614            now,
615            Some(environment_id),
616            build_info,
617            system_parameter_defaults,
618            bootstrap_args,
619            enable_expression_cache_override,
620        )
621        .await
622    }
623
624    async fn open_debug_catalog_inner(
625        persist_client: PersistClient,
626        storage: Box<dyn DurableCatalogState>,
627        now: NowFn,
628        environment_id: Option<EnvironmentId>,
629        build_info: &'static BuildInfo,
630        system_parameter_defaults: BTreeMap<String, String>,
631        bootstrap_args: &BootstrapArgs,
632        enable_expression_cache_override: Option<bool>,
633    ) -> Result<Catalog, anyhow::Error> {
634        let metrics_registry = &MetricsRegistry::new();
635        let secrets_reader = Arc::new(InMemorySecretsController::new());
636        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
637        // debugging/testing.
638        let previous_ts = now().into();
639        let replica_size = &bootstrap_args.default_cluster_replica_size;
640        let read_only = false;
641
642        let OpenCatalogResult {
643            catalog,
644            migrated_storage_collections_0dt: _,
645            new_builtin_collections: _,
646            builtin_table_updates: _,
647            cached_global_exprs: _,
648            uncached_local_exprs: _,
649        } = Catalog::open(Config {
650            storage,
651            metrics_registry,
652            state: StateConfig {
653                unsafe_mode: true,
654                all_features: false,
655                build_info,
656                deploy_generation: 0,
657                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
658                read_only,
659                now,
660                boot_ts: previous_ts,
661                skip_migrations: true,
662                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
663                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
664                    size: replica_size.clone(),
665                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
666                },
667                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
668                    size: replica_size.clone(),
669                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
670                },
671                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
672                    size: replica_size.clone(),
673                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
674                },
675                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
676                    size: replica_size.clone(),
677                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
678                },
679                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
680                    size: replica_size.clone(),
681                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
682                },
683                system_parameter_defaults,
684                remote_system_parameters: None,
685                availability_zones: vec![],
686                egress_addresses: vec![],
687                aws_principal_context: None,
688                aws_privatelink_availability_zones: None,
689                http_host_name: None,
690                connection_context: ConnectionContext::for_tests(secrets_reader),
691                builtin_item_migration_config: BuiltinItemMigrationConfig {
692                    persist_client: persist_client.clone(),
693                    read_only,
694                    force_migration: None,
695                },
696                persist_client,
697                enable_expression_cache_override,
698                helm_chart_version: None,
699                external_login_password_mz_system: None,
700                license_key: ValidatedLicenseKey::for_tests(),
701            },
702        })
703        .await?;
704        Ok(catalog)
705    }
706
707    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
708        self.state.for_session(session)
709    }
710
711    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
712        self.state.for_sessionless_user(role_id)
713    }
714
715    pub fn for_system_session(&self) -> ConnCatalog<'_> {
716        self.state.for_system_session()
717    }
718
719    async fn storage<'a>(
720        &'a self,
721    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
722        self.storage.lock().await
723    }
724
725    pub async fn current_upper(&self) -> mz_repr::Timestamp {
726        self.storage().await.current_upper().await
727    }
728
729    pub async fn allocate_user_id(
730        &self,
731        commit_ts: mz_repr::Timestamp,
732    ) -> Result<(CatalogItemId, GlobalId), Error> {
733        self.storage()
734            .await
735            .allocate_user_id(commit_ts)
736            .await
737            .maybe_terminate("allocating user ids")
738            .err_into()
739    }
740
741    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
742    pub async fn allocate_user_ids(
743        &self,
744        amount: u64,
745        commit_ts: mz_repr::Timestamp,
746    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
747        self.storage()
748            .await
749            .allocate_user_ids(amount, commit_ts)
750            .await
751            .maybe_terminate("allocating user ids")
752            .err_into()
753    }
754
755    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
756        let commit_ts = self.storage().await.current_upper().await;
757        self.allocate_user_id(commit_ts).await
758    }
759
760    /// Get the next user item ID without allocating it.
761    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
762        self.storage()
763            .await
764            .get_next_user_item_id()
765            .await
766            .err_into()
767    }
768
769    #[cfg(test)]
770    pub async fn allocate_system_id(
771        &self,
772        commit_ts: mz_repr::Timestamp,
773    ) -> Result<(CatalogItemId, GlobalId), Error> {
774        use mz_ore::collections::CollectionExt;
775
776        let mut storage = self.storage().await;
777        let mut txn = storage.transaction().await?;
778        let id = txn
779            .allocate_system_item_ids(1)
780            .maybe_terminate("allocating system ids")?
781            .into_element();
782        // Drain transaction.
783        let _ = txn.get_and_commit_op_updates();
784        txn.commit(commit_ts).await?;
785        Ok(id)
786    }
787
788    /// Get the next system item ID without allocating it.
789    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
790        self.storage()
791            .await
792            .get_next_system_item_id()
793            .await
794            .err_into()
795    }
796
797    pub async fn allocate_user_cluster_id(
798        &self,
799        commit_ts: mz_repr::Timestamp,
800    ) -> Result<ClusterId, Error> {
801        self.storage()
802            .await
803            .allocate_user_cluster_id(commit_ts)
804            .await
805            .maybe_terminate("allocating user cluster ids")
806            .err_into()
807    }
808
809    /// Get the next system replica id without allocating it.
810    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
811        self.storage()
812            .await
813            .get_next_system_replica_id()
814            .await
815            .err_into()
816    }
817
818    /// Get the next user replica id without allocating it.
819    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
820        self.storage()
821            .await
822            .get_next_user_replica_id()
823            .await
824            .err_into()
825    }
826
827    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
828        self.state.resolve_database(database_name)
829    }
830
831    pub fn resolve_schema(
832        &self,
833        current_database: Option<&DatabaseId>,
834        database_name: Option<&str>,
835        schema_name: &str,
836        conn_id: &ConnectionId,
837    ) -> Result<&Schema, SqlCatalogError> {
838        self.state
839            .resolve_schema(current_database, database_name, schema_name, conn_id)
840    }
841
842    pub fn resolve_schema_in_database(
843        &self,
844        database_spec: &ResolvedDatabaseSpecifier,
845        schema_name: &str,
846        conn_id: &ConnectionId,
847    ) -> Result<&Schema, SqlCatalogError> {
848        self.state
849            .resolve_schema_in_database(database_spec, schema_name, conn_id)
850    }
851
852    pub fn resolve_replica_in_cluster(
853        &self,
854        cluster_id: &ClusterId,
855        replica_name: &str,
856    ) -> Result<&ClusterReplica, SqlCatalogError> {
857        self.state
858            .resolve_replica_in_cluster(cluster_id, replica_name)
859    }
860
861    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
862        self.state.resolve_system_schema(name)
863    }
864
865    pub fn resolve_search_path(
866        &self,
867        session: &Session,
868    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
869        self.state.resolve_search_path(session)
870    }
871
872    /// Resolves `name` to a non-function [`CatalogEntry`].
873    pub fn resolve_entry(
874        &self,
875        current_database: Option<&DatabaseId>,
876        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
877        name: &PartialItemName,
878        conn_id: &ConnectionId,
879    ) -> Result<&CatalogEntry, SqlCatalogError> {
880        self.state
881            .resolve_entry(current_database, search_path, name, conn_id)
882    }
883
884    /// Resolves a `BuiltinTable`.
885    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
886        self.state.resolve_builtin_table(builtin)
887    }
888
889    /// Resolves a `BuiltinLog`.
890    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
891        self.state.resolve_builtin_log(builtin).0
892    }
893
894    /// Resolves a `BuiltinSource`.
895    pub fn resolve_builtin_storage_collection(
896        &self,
897        builtin: &'static BuiltinSource,
898    ) -> CatalogItemId {
899        self.state.resolve_builtin_source(builtin)
900    }
901
902    /// Resolves `name` to a function [`CatalogEntry`].
903    pub fn resolve_function(
904        &self,
905        current_database: Option<&DatabaseId>,
906        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
907        name: &PartialItemName,
908        conn_id: &ConnectionId,
909    ) -> Result<&CatalogEntry, SqlCatalogError> {
910        self.state
911            .resolve_function(current_database, search_path, name, conn_id)
912    }
913
914    /// Resolves `name` to a type [`CatalogEntry`].
915    pub fn resolve_type(
916        &self,
917        current_database: Option<&DatabaseId>,
918        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
919        name: &PartialItemName,
920        conn_id: &ConnectionId,
921    ) -> Result<&CatalogEntry, SqlCatalogError> {
922        self.state
923            .resolve_type(current_database, search_path, name, conn_id)
924    }
925
926    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
927        self.state.resolve_cluster(name)
928    }
929
930    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
931    ///
932    /// # Panics
933    /// * If the [`BuiltinCluster`] doesn't exist.
934    ///
935    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
936        self.state.resolve_builtin_cluster(cluster)
937    }
938
939    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
940        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
941    }
942
943    /// Resolves a [`Cluster`] for a TargetCluster.
944    pub fn resolve_target_cluster(
945        &self,
946        target_cluster: TargetCluster,
947        session: &Session,
948    ) -> Result<&Cluster, AdapterError> {
949        match target_cluster {
950            TargetCluster::CatalogServer => {
951                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
952            }
953            TargetCluster::Active => self.active_cluster(session),
954            TargetCluster::Transaction(cluster_id) => self
955                .try_get_cluster(cluster_id)
956                .ok_or(AdapterError::ConcurrentClusterDrop),
957        }
958    }
959
960    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
961        // TODO(benesch): this check here is not sufficiently protective. It'd
962        // be very easy for a code path to accidentally avoid this check by
963        // calling `resolve_cluster(session.vars().cluster())`.
964        if session.user().name != SYSTEM_USER.name
965            && session.user().name != SUPPORT_USER.name
966            && session.vars().cluster() == SYSTEM_USER.name
967        {
968            coord_bail!(
969                "system cluster '{}' cannot execute user queries",
970                SYSTEM_USER.name
971            );
972        }
973        let cluster = self.resolve_cluster(session.vars().cluster())?;
974        Ok(cluster)
975    }
976
977    pub fn state(&self) -> &CatalogState {
978        &self.state
979    }
980
981    pub fn resolve_full_name(
982        &self,
983        name: &QualifiedItemName,
984        conn_id: Option<&ConnectionId>,
985    ) -> FullItemName {
986        self.state.resolve_full_name(name, conn_id)
987    }
988
989    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
990        self.state.try_get_entry(id)
991    }
992
993    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
994        self.state.try_get_entry_by_global_id(id)
995    }
996
997    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
998        self.state.get_entry(id)
999    }
1000
1001    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1002        self.state.get_entry_by_global_id(id)
1003    }
1004
1005    pub fn get_global_ids<'a>(
1006        &'a self,
1007        id: &CatalogItemId,
1008    ) -> impl Iterator<Item = GlobalId> + use<'a> {
1009        self.get_entry(id).global_ids()
1010    }
1011
1012    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1013        self.get_entry_by_global_id(id).id()
1014    }
1015
1016    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1017        let item = self.try_get_entry_by_global_id(id)?;
1018        Some(item.id())
1019    }
1020
1021    pub fn get_schema(
1022        &self,
1023        database_spec: &ResolvedDatabaseSpecifier,
1024        schema_spec: &SchemaSpecifier,
1025        conn_id: &ConnectionId,
1026    ) -> &Schema {
1027        self.state.get_schema(database_spec, schema_spec, conn_id)
1028    }
1029
1030    pub fn try_get_schema(
1031        &self,
1032        database_spec: &ResolvedDatabaseSpecifier,
1033        schema_spec: &SchemaSpecifier,
1034        conn_id: &ConnectionId,
1035    ) -> Option<&Schema> {
1036        self.state
1037            .try_get_schema(database_spec, schema_spec, conn_id)
1038    }
1039
1040    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1041        self.state.get_mz_catalog_schema_id()
1042    }
1043
1044    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1045        self.state.get_pg_catalog_schema_id()
1046    }
1047
1048    pub fn get_information_schema_id(&self) -> SchemaId {
1049        self.state.get_information_schema_id()
1050    }
1051
1052    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1053        self.state.get_mz_internal_schema_id()
1054    }
1055
1056    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1057        self.state.get_mz_introspection_schema_id()
1058    }
1059
1060    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1061        self.state.get_mz_unsafe_schema_id()
1062    }
1063
1064    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1065        self.state.system_schema_ids()
1066    }
1067
1068    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1069        self.state.get_database(id)
1070    }
1071
1072    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1073        self.state.try_get_role(id)
1074    }
1075
1076    pub fn get_role(&self, id: &RoleId) -> &Role {
1077        self.state.get_role(id)
1078    }
1079
1080    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1081        self.state.try_get_role_by_name(role_name)
1082    }
1083
1084    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1085        self.state.try_get_role_auth_by_id(id)
1086    }
1087
1088    /// Creates a new schema in the `Catalog` for temporary items
1089    /// indicated by the TEMPORARY or TEMP keywords.
1090    pub fn create_temporary_schema(
1091        &mut self,
1092        conn_id: &ConnectionId,
1093        owner_id: RoleId,
1094    ) -> Result<(), Error> {
1095        self.state.create_temporary_schema(conn_id, owner_id)
1096    }
1097
1098    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1099        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
1100        self.state
1101            .temporary_schemas
1102            .get(conn_id)
1103            .map(|schema| schema.items.contains_key(item_name))
1104            .unwrap_or(false)
1105    }
1106
1107    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1108    /// Returns Ok if conn_id's temp schema does not exist.
1109    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1110        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1111            return Ok(());
1112        };
1113        if !schema.items.is_empty() {
1114            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1115        }
1116        Ok(())
1117    }
1118
1119    pub(crate) fn object_dependents(
1120        &self,
1121        object_ids: &Vec<ObjectId>,
1122        conn_id: &ConnectionId,
1123    ) -> Vec<ObjectId> {
1124        let mut seen = BTreeSet::new();
1125        self.state.object_dependents(object_ids, conn_id, &mut seen)
1126    }
1127
1128    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1129        FullNameV1 {
1130            database: name.database.to_string(),
1131            schema: name.schema.clone(),
1132            item: name.item.clone(),
1133        }
1134    }
1135
1136    pub fn find_available_cluster_name(&self, name: &str) -> String {
1137        let mut i = 0;
1138        let mut candidate = name.to_string();
1139        while self.state.clusters_by_name.contains_key(&candidate) {
1140            i += 1;
1141            candidate = format!("{}{}", name, i);
1142        }
1143        candidate
1144    }
1145
1146    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1147        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1148            self.cluster_replica_sizes()
1149                .enabled_allocations()
1150                .map(|a| a.0.to_owned())
1151                .collect::<Vec<_>>()
1152        } else {
1153            self.system_config().allowed_cluster_replica_sizes()
1154        }
1155    }
1156
1157    pub fn concretize_replica_location(
1158        &self,
1159        location: mz_catalog::durable::ReplicaLocation,
1160        allowed_sizes: &Vec<String>,
1161        allowed_availability_zones: Option<&[String]>,
1162    ) -> Result<ReplicaLocation, Error> {
1163        self.state
1164            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1165    }
1166
1167    pub(crate) fn ensure_valid_replica_size(
1168        &self,
1169        allowed_sizes: &[String],
1170        size: &String,
1171    ) -> Result<(), Error> {
1172        self.state.ensure_valid_replica_size(allowed_sizes, size)
1173    }
1174
1175    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1176        &self.state.cluster_replica_sizes
1177    }
1178
1179    /// Returns the privileges of an object by its ID.
1180    pub fn get_privileges(
1181        &self,
1182        id: &SystemObjectId,
1183        conn_id: &ConnectionId,
1184    ) -> Option<&PrivilegeMap> {
1185        match id {
1186            SystemObjectId::Object(id) => match id {
1187                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1188                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1189                ObjectId::Schema((database_spec, schema_spec)) => Some(
1190                    self.get_schema(database_spec, schema_spec, conn_id)
1191                        .privileges(),
1192                ),
1193                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1194                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1195                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1196            },
1197            SystemObjectId::System => Some(&self.state.system_privileges),
1198        }
1199    }
1200
1201    #[mz_ore::instrument(level = "debug")]
1202    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1203        Ok(self.storage().await.confirm_leadership().await?)
1204    }
1205
1206    /// Return the ids of all log sources the given object depends on.
1207    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1208        self.state.introspection_dependencies(id)
1209    }
1210
1211    /// Serializes the catalog's in-memory state.
1212    ///
1213    /// There are no guarantees about the format of the serialized state, except
1214    /// that the serialized state for two identical catalogs will compare
1215    /// identically.
1216    pub fn dump(&self) -> Result<CatalogDump, Error> {
1217        Ok(CatalogDump::new(self.state.dump(None)?))
1218    }
1219
1220    /// Checks the [`Catalog`]s internal consistency.
1221    ///
1222    /// Returns a JSON object describing the inconsistencies, if there are any.
1223    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1224        self.state.check_consistency().map_err(|inconsistencies| {
1225            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1226                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1227            })
1228        })
1229    }
1230
1231    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1232        self.state.config()
1233    }
1234
1235    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1236        self.state.entry_by_id.values()
1237    }
1238
1239    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1240        self.entries()
1241            .filter(|entry| entry.is_connection() && entry.id().is_user())
1242    }
1243
1244    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1245        self.entries()
1246            .filter(|entry| entry.is_table() && entry.id().is_user())
1247    }
1248
1249    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1250        self.entries()
1251            .filter(|entry| entry.is_source() && entry.id().is_user())
1252    }
1253
1254    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1255        self.entries()
1256            .filter(|entry| entry.is_sink() && entry.id().is_user())
1257    }
1258
1259    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1260        self.entries()
1261            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1262    }
1263
1264    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1265        self.entries()
1266            .filter(|entry| entry.is_secret() && entry.id().is_user())
1267    }
1268
1269    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1270        self.state.get_network_policy(&network_policy_id)
1271    }
1272
1273    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1274        self.state.try_get_network_policy_by_name(name)
1275    }
1276
1277    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1278        self.state.clusters_by_id.values()
1279    }
1280
1281    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1282        self.state.get_cluster(cluster_id)
1283    }
1284
1285    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1286        self.state.try_get_cluster(cluster_id)
1287    }
1288
1289    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1290        self.clusters().filter(|cluster| cluster.id.is_user())
1291    }
1292
1293    pub fn get_cluster_replica(
1294        &self,
1295        cluster_id: ClusterId,
1296        replica_id: ReplicaId,
1297    ) -> &ClusterReplica {
1298        self.state.get_cluster_replica(cluster_id, replica_id)
1299    }
1300
1301    pub fn try_get_cluster_replica(
1302        &self,
1303        cluster_id: ClusterId,
1304        replica_id: ReplicaId,
1305    ) -> Option<&ClusterReplica> {
1306        self.state.try_get_cluster_replica(cluster_id, replica_id)
1307    }
1308
1309    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1310        self.user_clusters()
1311            .flat_map(|cluster| cluster.user_replicas())
1312    }
1313
1314    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1315        self.state.database_by_id.values()
1316    }
1317
1318    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1319        self.state
1320            .roles_by_id
1321            .values()
1322            .filter(|role| role.is_user())
1323    }
1324
1325    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1326        self.entries()
1327            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1328    }
1329
1330    pub fn system_privileges(&self) -> &PrivilegeMap {
1331        &self.state.system_privileges
1332    }
1333
1334    pub fn default_privileges(
1335        &self,
1336    ) -> impl Iterator<
1337        Item = (
1338            &DefaultPrivilegeObject,
1339            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1340        ),
1341    > {
1342        self.state.default_privileges.iter()
1343    }
1344
1345    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1346        self.state
1347            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1348    }
1349
1350    pub fn pack_storage_usage_update(
1351        &self,
1352        event: VersionedStorageUsage,
1353        diff: Diff,
1354    ) -> BuiltinTableUpdate {
1355        self.state
1356            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1357    }
1358
1359    pub fn system_config(&self) -> &SystemVars {
1360        self.state.system_config()
1361    }
1362
1363    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1364        self.state.system_config_mut()
1365    }
1366
1367    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1368        self.state.ensure_not_reserved_role(role_id)
1369    }
1370
1371    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1372        self.state.ensure_grantable_role(role_id)
1373    }
1374
1375    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1376        self.state.ensure_not_system_role(role_id)
1377    }
1378
1379    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1380        self.state.ensure_not_predefined_role(role_id)
1381    }
1382
1383    pub fn ensure_not_reserved_network_policy(
1384        &self,
1385        network_policy_id: &NetworkPolicyId,
1386    ) -> Result<(), Error> {
1387        self.state
1388            .ensure_not_reserved_network_policy(network_policy_id)
1389    }
1390
1391    pub fn ensure_not_reserved_object(
1392        &self,
1393        object_id: &ObjectId,
1394        conn_id: &ConnectionId,
1395    ) -> Result<(), Error> {
1396        match object_id {
1397            ObjectId::Cluster(cluster_id) => {
1398                if cluster_id.is_system() {
1399                    let cluster = self.get_cluster(*cluster_id);
1400                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1401                        cluster.name().to_string(),
1402                    )))
1403                } else {
1404                    Ok(())
1405                }
1406            }
1407            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1408                if replica_id.is_system() {
1409                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1410                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1411                        replica.name().to_string(),
1412                    )))
1413                } else {
1414                    Ok(())
1415                }
1416            }
1417            ObjectId::Database(database_id) => {
1418                if database_id.is_system() {
1419                    let database = self.get_database(database_id);
1420                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1421                        database.name().to_string(),
1422                    )))
1423                } else {
1424                    Ok(())
1425                }
1426            }
1427            ObjectId::Schema((database_spec, schema_spec)) => {
1428                if schema_spec.is_system() {
1429                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1430                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1431                        schema.name().schema.clone(),
1432                    )))
1433                } else {
1434                    Ok(())
1435                }
1436            }
1437            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1438            ObjectId::Item(item_id) => {
1439                if item_id.is_system() {
1440                    let item = self.get_entry(item_id);
1441                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1442                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1443                } else {
1444                    Ok(())
1445                }
1446            }
1447            ObjectId::NetworkPolicy(network_policy_id) => {
1448                self.ensure_not_reserved_network_policy(network_policy_id)
1449            }
1450        }
1451    }
1452
1453    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1454    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1455        &mut self,
1456        create_sql: &str,
1457        force_if_exists_skip: bool,
1458    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1459        self.state
1460            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1461    }
1462
1463    pub(crate) fn update_expression_cache<'a, 'b>(
1464        &'a self,
1465        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1466        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1467    ) -> BoxFuture<'b, ()> {
1468        if let Some(expr_cache) = &self.expr_cache_handle {
1469            let ons = new_local_expressions
1470                .iter()
1471                .map(|(id, _)| id)
1472                .chain(new_global_expressions.iter().map(|(id, _)| id))
1473                .map(|id| self.get_entry_by_global_id(id))
1474                .filter_map(|entry| entry.index().map(|index| index.on));
1475            let invalidate_ids = self.invalidate_for_index(ons);
1476            expr_cache
1477                .update(
1478                    new_local_expressions,
1479                    new_global_expressions,
1480                    invalidate_ids,
1481                )
1482                .boxed()
1483        } else {
1484            async {}.boxed()
1485        }
1486    }
1487
1488    /// Listen for and apply all unconsumed updates to the durable catalog state.
1489    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1490    // `#[cfg(test)]` annotation.
1491    #[cfg(test)]
1492    async fn sync_to_current_updates(
1493        &mut self,
1494    ) -> Result<
1495        (
1496            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1497            Vec<ParsedStateUpdate>,
1498        ),
1499        CatalogError,
1500    > {
1501        let updates = self.storage().await.sync_to_current_updates().await?;
1502        let (builtin_table_updates, catalog_updates) = self
1503            .state
1504            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1505            .await;
1506        Ok((builtin_table_updates, catalog_updates))
1507    }
1508}
1509
1510pub fn is_reserved_name(name: &str) -> bool {
1511    BUILTIN_PREFIXES
1512        .iter()
1513        .any(|prefix| name.starts_with(prefix))
1514}
1515
1516pub fn is_reserved_role_name(name: &str) -> bool {
1517    is_reserved_name(name) || is_public_role(name)
1518}
1519
1520pub fn is_public_role(name: &str) -> bool {
1521    name == &*PUBLIC_ROLE_NAME
1522}
1523
1524pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1525    object_type_to_audit_object_type(sql_type.into())
1526}
1527
1528pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1529    match id {
1530        CommentObjectId::Table(_) => ObjectType::Table,
1531        CommentObjectId::View(_) => ObjectType::View,
1532        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1533        CommentObjectId::Source(_) => ObjectType::Source,
1534        CommentObjectId::Sink(_) => ObjectType::Sink,
1535        CommentObjectId::Index(_) => ObjectType::Index,
1536        CommentObjectId::Func(_) => ObjectType::Func,
1537        CommentObjectId::Connection(_) => ObjectType::Connection,
1538        CommentObjectId::Type(_) => ObjectType::Type,
1539        CommentObjectId::Secret(_) => ObjectType::Secret,
1540        CommentObjectId::Role(_) => ObjectType::Role,
1541        CommentObjectId::Database(_) => ObjectType::Database,
1542        CommentObjectId::Schema(_) => ObjectType::Schema,
1543        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1544        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1545        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1546        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1547    }
1548}
1549
1550pub(crate) fn object_type_to_audit_object_type(
1551    object_type: mz_sql::catalog::ObjectType,
1552) -> ObjectType {
1553    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1554}
1555
1556pub(crate) fn system_object_type_to_audit_object_type(
1557    system_type: &SystemObjectType,
1558) -> ObjectType {
1559    match system_type {
1560        SystemObjectType::Object(object_type) => match object_type {
1561            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1562            mz_sql::catalog::ObjectType::View => ObjectType::View,
1563            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1564            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1565            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1566            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1567            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1568            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1569            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1570            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1571            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1572            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1573            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1574            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1575            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1576            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1577            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1578        },
1579        SystemObjectType::System => ObjectType::System,
1580    }
1581}
1582
1583#[derive(Debug, Copy, Clone)]
1584pub enum UpdatePrivilegeVariant {
1585    Grant,
1586    Revoke,
1587}
1588
1589impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1590    fn from(variant: UpdatePrivilegeVariant) -> Self {
1591        match variant {
1592            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1593            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1594        }
1595    }
1596}
1597
1598impl From<UpdatePrivilegeVariant> for EventType {
1599    fn from(variant: UpdatePrivilegeVariant) -> Self {
1600        match variant {
1601            UpdatePrivilegeVariant::Grant => EventType::Grant,
1602            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1603        }
1604    }
1605}
1606
1607impl ConnCatalog<'_> {
1608    fn resolve_item_name(
1609        &self,
1610        name: &PartialItemName,
1611    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1612        self.resolve_item(name).map(|entry| entry.name())
1613    }
1614
1615    fn resolve_function_name(
1616        &self,
1617        name: &PartialItemName,
1618    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1619        self.resolve_function(name).map(|entry| entry.name())
1620    }
1621
1622    fn resolve_type_name(
1623        &self,
1624        name: &PartialItemName,
1625    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1626        self.resolve_type(name).map(|entry| entry.name())
1627    }
1628}
1629
1630impl ExprHumanizer for ConnCatalog<'_> {
1631    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1632        let entry = self.state.try_get_entry_by_global_id(&id)?;
1633        Some(self.resolve_full_name(entry.name()).to_string())
1634    }
1635
1636    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1637        let entry = self.state.try_get_entry_by_global_id(&id)?;
1638        Some(entry.name().item.clone())
1639    }
1640
1641    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1642        let entry = self.state.try_get_entry_by_global_id(&id)?;
1643        Some(self.resolve_full_name(entry.name()).into_parts())
1644    }
1645
1646    fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1647        use SqlScalarType::*;
1648
1649        match typ {
1650            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1651            List {
1652                custom_id: Some(item_id),
1653                ..
1654            }
1655            | Map {
1656                custom_id: Some(item_id),
1657                ..
1658            } => {
1659                let item = self.get_item(item_id);
1660                self.minimal_qualification(item.name()).to_string()
1661            }
1662            List { element_type, .. } => {
1663                format!(
1664                    "{} list",
1665                    self.humanize_scalar_type(element_type, postgres_compat)
1666                )
1667            }
1668            Map { value_type, .. } => format!(
1669                "map[{}=>{}]",
1670                self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1671                self.humanize_scalar_type(value_type, postgres_compat)
1672            ),
1673            Record {
1674                custom_id: Some(item_id),
1675                ..
1676            } => {
1677                let item = self.get_item(item_id);
1678                self.minimal_qualification(item.name()).to_string()
1679            }
1680            Record { fields, .. } => format!(
1681                "record({})",
1682                fields
1683                    .iter()
1684                    .map(|f| format!(
1685                        "{}: {}",
1686                        f.0,
1687                        self.humanize_column_type(&f.1, postgres_compat)
1688                    ))
1689                    .join(",")
1690            ),
1691            PgLegacyChar => "\"char\"".into(),
1692            Char { length } if !postgres_compat => match length {
1693                None => "char".into(),
1694                Some(length) => format!("char({})", length.into_u32()),
1695            },
1696            VarChar { max_length } if !postgres_compat => match max_length {
1697                None => "varchar".into(),
1698                Some(length) => format!("varchar({})", length.into_u32()),
1699            },
1700            UInt16 => "uint2".into(),
1701            UInt32 => "uint4".into(),
1702            UInt64 => "uint8".into(),
1703            ty => {
1704                let pgrepr_type = mz_pgrepr::Type::from(ty);
1705                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1706
1707                let res = if self
1708                    .effective_search_path(true)
1709                    .iter()
1710                    .any(|(_, schema)| schema == &pg_catalog_schema)
1711                {
1712                    pgrepr_type.name().to_string()
1713                } else {
1714                    // If PG_CATALOG_SCHEMA is not in search path, you need
1715                    // qualified object name to refer to type.
1716                    let name = QualifiedItemName {
1717                        qualifiers: ItemQualifiers {
1718                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1719                            schema_spec: pg_catalog_schema,
1720                        },
1721                        item: pgrepr_type.name().to_string(),
1722                    };
1723                    self.resolve_full_name(&name).to_string()
1724                };
1725                res
1726            }
1727        }
1728    }
1729
1730    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1731        let entry = self.state.try_get_entry_by_global_id(&id)?;
1732
1733        match entry.index() {
1734            Some(index) => {
1735                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1736                let mut on_names = on_desc
1737                    .iter_names()
1738                    .map(|col_name| col_name.to_string())
1739                    .collect::<Vec<_>>();
1740
1741                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1742
1743                // Init ix_names with unknown column names. Unknown columns are
1744                // represented as an empty String and rendered as `#c` by the
1745                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1746                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1747                let mut ix_names = vec![String::new(); ix_arity];
1748
1749                // Apply the permutation by swapping on_names with ix_names.
1750                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1751                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1752                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1753                    std::mem::swap(on_name, ix_name);
1754                }
1755
1756                Some(ix_names) // Return the updated ix_names vector.
1757            }
1758            None => {
1759                let desc = self.state.try_get_desc_by_global_id(&id)?;
1760                let column_names = desc
1761                    .iter_names()
1762                    .map(|col_name| col_name.to_string())
1763                    .collect();
1764
1765                Some(column_names)
1766            }
1767        }
1768    }
1769
1770    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1771        let desc = self.state.try_get_desc_by_global_id(&id)?;
1772        Some(desc.get_name(column).to_string())
1773    }
1774
1775    fn id_exists(&self, id: GlobalId) -> bool {
1776        self.state.entry_by_global_id.contains_key(&id)
1777    }
1778}
1779
1780impl SessionCatalog for ConnCatalog<'_> {
1781    fn active_role_id(&self) -> &RoleId {
1782        &self.role_id
1783    }
1784
1785    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1786        self.prepared_statements
1787            .as_ref()
1788            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1789            .flatten()
1790    }
1791
1792    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1793        self.portals
1794            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1795    }
1796
1797    fn active_database(&self) -> Option<&DatabaseId> {
1798        self.database.as_ref()
1799    }
1800
1801    fn active_cluster(&self) -> &str {
1802        &self.cluster
1803    }
1804
1805    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1806        &self.search_path
1807    }
1808
1809    fn resolve_database(
1810        &self,
1811        database_name: &str,
1812    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1813        Ok(self.state.resolve_database(database_name)?)
1814    }
1815
1816    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1817        self.state
1818            .database_by_id
1819            .get(id)
1820            .expect("database doesn't exist")
1821    }
1822
1823    // `as` is ok to use to cast to a trait object.
1824    #[allow(clippy::as_conversions)]
1825    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1826        self.state
1827            .database_by_id
1828            .values()
1829            .map(|database| database as &dyn CatalogDatabase)
1830            .collect()
1831    }
1832
1833    fn resolve_schema(
1834        &self,
1835        database_name: Option<&str>,
1836        schema_name: &str,
1837    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1838        Ok(self.state.resolve_schema(
1839            self.database.as_ref(),
1840            database_name,
1841            schema_name,
1842            &self.conn_id,
1843        )?)
1844    }
1845
1846    fn resolve_schema_in_database(
1847        &self,
1848        database_spec: &ResolvedDatabaseSpecifier,
1849        schema_name: &str,
1850    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1851        Ok(self
1852            .state
1853            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1854    }
1855
1856    fn get_schema(
1857        &self,
1858        database_spec: &ResolvedDatabaseSpecifier,
1859        schema_spec: &SchemaSpecifier,
1860    ) -> &dyn CatalogSchema {
1861        self.state
1862            .get_schema(database_spec, schema_spec, &self.conn_id)
1863    }
1864
1865    // `as` is ok to use to cast to a trait object.
1866    #[allow(clippy::as_conversions)]
1867    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1868        self.get_databases()
1869            .into_iter()
1870            .flat_map(|database| database.schemas().into_iter())
1871            .chain(
1872                self.state
1873                    .ambient_schemas_by_id
1874                    .values()
1875                    .chain(self.state.temporary_schemas.values())
1876                    .map(|schema| schema as &dyn CatalogSchema),
1877            )
1878            .collect()
1879    }
1880
1881    fn get_mz_internal_schema_id(&self) -> SchemaId {
1882        self.state().get_mz_internal_schema_id()
1883    }
1884
1885    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1886        self.state().get_mz_unsafe_schema_id()
1887    }
1888
1889    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1890        self.state.is_system_schema_specifier(schema)
1891    }
1892
1893    fn resolve_role(
1894        &self,
1895        role_name: &str,
1896    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1897        match self.state.try_get_role_by_name(role_name) {
1898            Some(role) => Ok(role),
1899            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1900        }
1901    }
1902
1903    fn resolve_network_policy(
1904        &self,
1905        policy_name: &str,
1906    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1907        match self.state.try_get_network_policy_by_name(policy_name) {
1908            Some(policy) => Ok(policy),
1909            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1910        }
1911    }
1912
1913    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1914        Some(self.state.roles_by_id.get(id)?)
1915    }
1916
1917    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1918        self.state.get_role(id)
1919    }
1920
1921    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1922        // `as` is ok to use to cast to a trait object.
1923        #[allow(clippy::as_conversions)]
1924        self.state
1925            .roles_by_id
1926            .values()
1927            .map(|role| role as &dyn CatalogRole)
1928            .collect()
1929    }
1930
1931    fn mz_system_role_id(&self) -> RoleId {
1932        MZ_SYSTEM_ROLE_ID
1933    }
1934
1935    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1936        self.state.collect_role_membership(id)
1937    }
1938
1939    fn get_network_policy(
1940        &self,
1941        id: &NetworkPolicyId,
1942    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1943        self.state.get_network_policy(id)
1944    }
1945
1946    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1947        // `as` is ok to use to cast to a trait object.
1948        #[allow(clippy::as_conversions)]
1949        self.state
1950            .network_policies_by_id
1951            .values()
1952            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1953            .collect()
1954    }
1955
1956    fn resolve_cluster(
1957        &self,
1958        cluster_name: Option<&str>,
1959    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1960        Ok(self
1961            .state
1962            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1963    }
1964
1965    fn resolve_cluster_replica(
1966        &self,
1967        cluster_replica_name: &QualifiedReplica,
1968    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1969        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1970    }
1971
1972    fn resolve_item(
1973        &self,
1974        name: &PartialItemName,
1975    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1976        let r = self.state.resolve_entry(
1977            self.database.as_ref(),
1978            &self.effective_search_path(true),
1979            name,
1980            &self.conn_id,
1981        )?;
1982        if self.unresolvable_ids.contains(&r.id()) {
1983            Err(SqlCatalogError::UnknownItem(name.to_string()))
1984        } else {
1985            Ok(r)
1986        }
1987    }
1988
1989    fn resolve_function(
1990        &self,
1991        name: &PartialItemName,
1992    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1993        let r = self.state.resolve_function(
1994            self.database.as_ref(),
1995            &self.effective_search_path(false),
1996            name,
1997            &self.conn_id,
1998        )?;
1999
2000        if self.unresolvable_ids.contains(&r.id()) {
2001            Err(SqlCatalogError::UnknownFunction {
2002                name: name.to_string(),
2003                alternative: None,
2004            })
2005        } else {
2006            Ok(r)
2007        }
2008    }
2009
2010    fn resolve_type(
2011        &self,
2012        name: &PartialItemName,
2013    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2014        let r = self.state.resolve_type(
2015            self.database.as_ref(),
2016            &self.effective_search_path(false),
2017            name,
2018            &self.conn_id,
2019        )?;
2020
2021        if self.unresolvable_ids.contains(&r.id()) {
2022            Err(SqlCatalogError::UnknownType {
2023                name: name.to_string(),
2024            })
2025        } else {
2026            Ok(r)
2027        }
2028    }
2029
2030    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2031        self.state.get_system_type(name)
2032    }
2033
2034    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2035        Some(self.state.try_get_entry(id)?)
2036    }
2037
2038    fn try_get_item_by_global_id(
2039        &self,
2040        id: &GlobalId,
2041    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2042        let entry = self.state.try_get_entry_by_global_id(id)?;
2043        let entry = match &entry.item {
2044            CatalogItem::Table(table) => {
2045                let (version, _gid) = table
2046                    .collections
2047                    .iter()
2048                    .find(|(_version, gid)| *gid == id)
2049                    .expect("catalog out of sync, mismatched GlobalId");
2050                entry.at_version(RelationVersionSelector::Specific(*version))
2051            }
2052            _ => entry.at_version(RelationVersionSelector::Latest),
2053        };
2054        Some(entry)
2055    }
2056
2057    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2058        self.state.get_entry(id)
2059    }
2060
2061    fn get_item_by_global_id(
2062        &self,
2063        id: &GlobalId,
2064    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2065        let entry = self.state.get_entry_by_global_id(id);
2066        let entry = match &entry.item {
2067            CatalogItem::Table(table) => {
2068                let (version, _gid) = table
2069                    .collections
2070                    .iter()
2071                    .find(|(_version, gid)| *gid == id)
2072                    .expect("catalog out of sync, mismatched GlobalId");
2073                entry.at_version(RelationVersionSelector::Specific(*version))
2074            }
2075            _ => entry.at_version(RelationVersionSelector::Latest),
2076        };
2077        entry
2078    }
2079
2080    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2081        self.get_schemas()
2082            .into_iter()
2083            .flat_map(|schema| schema.item_ids())
2084            .map(|id| self.get_item(&id))
2085            .collect()
2086    }
2087
2088    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2089        self.state
2090            .get_item_by_name(name, &self.conn_id)
2091            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2092    }
2093
2094    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2095        self.state
2096            .get_type_by_name(name, &self.conn_id)
2097            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2098    }
2099
2100    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2101        &self.state.clusters_by_id[&id]
2102    }
2103
2104    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2105        self.state
2106            .clusters_by_id
2107            .values()
2108            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2109            .collect()
2110    }
2111
2112    fn get_cluster_replica(
2113        &self,
2114        cluster_id: ClusterId,
2115        replica_id: ReplicaId,
2116    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2117        let cluster = self.get_cluster(cluster_id);
2118        cluster.replica(replica_id)
2119    }
2120
2121    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2122        self.get_clusters()
2123            .into_iter()
2124            .flat_map(|cluster| cluster.replicas().into_iter())
2125            .collect()
2126    }
2127
2128    fn get_system_privileges(&self) -> &PrivilegeMap {
2129        &self.state.system_privileges
2130    }
2131
2132    fn get_default_privileges(
2133        &self,
2134    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2135        self.state
2136            .default_privileges
2137            .iter()
2138            .map(|(object, acl_items)| (object, acl_items.collect()))
2139            .collect()
2140    }
2141
2142    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2143        self.state.find_available_name(name, &self.conn_id)
2144    }
2145
2146    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2147        self.state.resolve_full_name(name, Some(&self.conn_id))
2148    }
2149
2150    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2151        self.state.resolve_full_schema_name(name)
2152    }
2153
2154    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2155        self.state.get_entry_by_global_id(global_id).id()
2156    }
2157
2158    fn resolve_global_id(
2159        &self,
2160        item_id: &CatalogItemId,
2161        version: RelationVersionSelector,
2162    ) -> GlobalId {
2163        self.state
2164            .get_entry(item_id)
2165            .at_version(version)
2166            .global_id()
2167    }
2168
2169    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2170        self.state.config()
2171    }
2172
2173    fn now(&self) -> EpochMillis {
2174        (self.state.config().now)()
2175    }
2176
2177    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2178        self.state.aws_privatelink_availability_zones.clone()
2179    }
2180
2181    fn system_vars(&self) -> &SystemVars {
2182        &self.state.system_configuration
2183    }
2184
2185    fn system_vars_mut(&mut self) -> &mut SystemVars {
2186        &mut self.state.to_mut().system_configuration
2187    }
2188
2189    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2190        self.state().get_owner_id(id, self.conn_id())
2191    }
2192
2193    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2194        match id {
2195            SystemObjectId::System => Some(&self.state.system_privileges),
2196            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2197                Some(self.get_cluster(*id).privileges())
2198            }
2199            SystemObjectId::Object(ObjectId::Database(id)) => {
2200                Some(self.get_database(id).privileges())
2201            }
2202            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2203                // For temporary schemas that haven't been created yet (lazy creation),
2204                // we return None - the RBAC check will need to handle this case.
2205                self.state
2206                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2207                    .map(|schema| schema.privileges())
2208            }
2209            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2210            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2211                Some(self.get_network_policy(id).privileges())
2212            }
2213            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2214            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2215        }
2216    }
2217
2218    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2219        let mut seen = BTreeSet::new();
2220        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2221    }
2222
2223    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2224        let mut seen = BTreeSet::new();
2225        self.state.item_dependents(id, &mut seen)
2226    }
2227
2228    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2229        rbac::all_object_privileges(object_type)
2230    }
2231
2232    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2233        self.state.get_object_type(object_id)
2234    }
2235
2236    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2237        self.state.get_system_object_type(id)
2238    }
2239
2240    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2241    /// the object.
2242    ///
2243    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2244    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2245    /// only for error msgs and other humanizations.
2246    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2247        if qualified_name.qualifiers.schema_spec.is_temporary() {
2248            // All bets are off. Just give up and return the qualified name as is.
2249            // TODO: Figure out what's going on with temporary objects.
2250
2251            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2252            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2253            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2254            // which we don't have a simple repro.
2255            return qualified_name.item.clone().into();
2256        }
2257
2258        let database_id = match &qualified_name.qualifiers.database_spec {
2259            ResolvedDatabaseSpecifier::Ambient => None,
2260            ResolvedDatabaseSpecifier::Id(id)
2261                if self.database.is_some() && self.database == Some(*id) =>
2262            {
2263                None
2264            }
2265            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2266        };
2267
2268        let schema_spec = if database_id.is_none()
2269            && self.resolve_item_name(&PartialItemName {
2270                database: None,
2271                schema: None,
2272                item: qualified_name.item.clone(),
2273            }) == Ok(qualified_name)
2274            || self.resolve_function_name(&PartialItemName {
2275                database: None,
2276                schema: None,
2277                item: qualified_name.item.clone(),
2278            }) == Ok(qualified_name)
2279            || self.resolve_type_name(&PartialItemName {
2280                database: None,
2281                schema: None,
2282                item: qualified_name.item.clone(),
2283            }) == Ok(qualified_name)
2284        {
2285            None
2286        } else {
2287            // If `search_path` does not contain `full_name.schema`, the
2288            // `PartialName` must contain it.
2289            Some(qualified_name.qualifiers.schema_spec.clone())
2290        };
2291
2292        let res = PartialItemName {
2293            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2294            schema: schema_spec.map(|spec| {
2295                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2296                    .name()
2297                    .schema
2298                    .clone()
2299            }),
2300            item: qualified_name.item.clone(),
2301        };
2302        assert!(
2303            self.resolve_item_name(&res) == Ok(qualified_name)
2304                || self.resolve_function_name(&res) == Ok(qualified_name)
2305                || self.resolve_type_name(&res) == Ok(qualified_name)
2306        );
2307        res
2308    }
2309
2310    fn add_notice(&self, notice: PlanNotice) {
2311        let _ = self.notices_tx.send(notice.into());
2312    }
2313
2314    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2315        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2316        self.state.comments.get_object_comments(comment_id)
2317    }
2318
2319    fn is_cluster_size_cc(&self, size: &str) -> bool {
2320        self.state
2321            .cluster_replica_sizes
2322            .0
2323            .get(size)
2324            .map_or(false, |a| a.is_cc)
2325    }
2326}
2327
2328#[cfg(test)]
2329mod tests {
2330    use std::collections::{BTreeMap, BTreeSet};
2331    use std::sync::Arc;
2332    use std::{env, iter};
2333
2334    use itertools::Itertools;
2335    use mz_catalog::memory::objects::CatalogItem;
2336    use tokio_postgres::NoTls;
2337    use tokio_postgres::types::Type;
2338    use uuid::Uuid;
2339
2340    use mz_catalog::SYSTEM_CONN_ID;
2341    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2342    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2343    use mz_controller_types::{ClusterId, ReplicaId};
2344    use mz_expr::MirScalarExpr;
2345    use mz_ore::now::to_datetime;
2346    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2347    use mz_persist_client::PersistClient;
2348    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2349    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2350    use mz_repr::role_id::RoleId;
2351    use mz_repr::{
2352        CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2353        SqlScalarType, Timestamp,
2354    };
2355    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2356    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2357    use mz_sql::names::{
2358        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2359        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2360    };
2361    use mz_sql::plan::{
2362        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2363        Scope, StatementContext,
2364    };
2365    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2366    use mz_sql::session::vars::{SystemVars, VarInput};
2367
2368    use crate::catalog::state::LocalExpressionCache;
2369    use crate::catalog::{Catalog, Op};
2370    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2371    use crate::session::Session;
2372
2373    /// System sessions have an empty `search_path` so it's necessary to
2374    /// schema-qualify all referenced items.
2375    ///
2376    /// Dummy (and ostensibly client) sessions contain system schemas in their
2377    /// search paths, so do not require schema qualification on system objects such
2378    /// as types.
2379    #[mz_ore::test(tokio::test)]
2380    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2381    async fn test_minimal_qualification() {
2382        Catalog::with_debug(|catalog| async move {
2383            struct TestCase {
2384                input: QualifiedItemName,
2385                system_output: PartialItemName,
2386                normal_output: PartialItemName,
2387            }
2388
2389            let test_cases = vec![
2390                TestCase {
2391                    input: QualifiedItemName {
2392                        qualifiers: ItemQualifiers {
2393                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2394                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2395                        },
2396                        item: "numeric".to_string(),
2397                    },
2398                    system_output: PartialItemName {
2399                        database: None,
2400                        schema: None,
2401                        item: "numeric".to_string(),
2402                    },
2403                    normal_output: PartialItemName {
2404                        database: None,
2405                        schema: None,
2406                        item: "numeric".to_string(),
2407                    },
2408                },
2409                TestCase {
2410                    input: QualifiedItemName {
2411                        qualifiers: ItemQualifiers {
2412                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2413                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2414                        },
2415                        item: "mz_array_types".to_string(),
2416                    },
2417                    system_output: PartialItemName {
2418                        database: None,
2419                        schema: None,
2420                        item: "mz_array_types".to_string(),
2421                    },
2422                    normal_output: PartialItemName {
2423                        database: None,
2424                        schema: None,
2425                        item: "mz_array_types".to_string(),
2426                    },
2427                },
2428            ];
2429
2430            for tc in test_cases {
2431                assert_eq!(
2432                    catalog
2433                        .for_system_session()
2434                        .minimal_qualification(&tc.input),
2435                    tc.system_output
2436                );
2437                assert_eq!(
2438                    catalog
2439                        .for_session(&Session::dummy())
2440                        .minimal_qualification(&tc.input),
2441                    tc.normal_output
2442                );
2443            }
2444            catalog.expire().await;
2445        })
2446        .await
2447    }
2448
2449    #[mz_ore::test(tokio::test)]
2450    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2451    async fn test_catalog_revision() {
2452        let persist_client = PersistClient::new_for_tests().await;
2453        let organization_id = Uuid::new_v4();
2454        let bootstrap_args = test_bootstrap_args();
2455        {
2456            let mut catalog = Catalog::open_debug_catalog(
2457                persist_client.clone(),
2458                organization_id.clone(),
2459                &bootstrap_args,
2460            )
2461            .await
2462            .expect("unable to open debug catalog");
2463            assert_eq!(catalog.transient_revision(), 1);
2464            let commit_ts = catalog.current_upper().await;
2465            catalog
2466                .transact(
2467                    None,
2468                    commit_ts,
2469                    None,
2470                    vec![Op::CreateDatabase {
2471                        name: "test".to_string(),
2472                        owner_id: MZ_SYSTEM_ROLE_ID,
2473                    }],
2474                )
2475                .await
2476                .expect("failed to transact");
2477            assert_eq!(catalog.transient_revision(), 2);
2478            catalog.expire().await;
2479        }
2480        {
2481            let catalog =
2482                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2483                    .await
2484                    .expect("unable to open debug catalog");
2485            // Re-opening the same catalog resets the transient_revision to 1.
2486            assert_eq!(catalog.transient_revision(), 1);
2487            catalog.expire().await;
2488        }
2489    }
2490
2491    #[mz_ore::test(tokio::test)]
2492    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2493    async fn test_effective_search_path() {
2494        Catalog::with_debug(|catalog| async move {
2495            let mz_catalog_schema = (
2496                ResolvedDatabaseSpecifier::Ambient,
2497                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2498            );
2499            let pg_catalog_schema = (
2500                ResolvedDatabaseSpecifier::Ambient,
2501                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2502            );
2503            let mz_temp_schema = (
2504                ResolvedDatabaseSpecifier::Ambient,
2505                SchemaSpecifier::Temporary,
2506            );
2507
2508            // Behavior with the default search_schema (public)
2509            let session = Session::dummy();
2510            let conn_catalog = catalog.for_session(&session);
2511            assert_ne!(
2512                conn_catalog.effective_search_path(false),
2513                conn_catalog.search_path
2514            );
2515            assert_ne!(
2516                conn_catalog.effective_search_path(true),
2517                conn_catalog.search_path
2518            );
2519            assert_eq!(
2520                conn_catalog.effective_search_path(false),
2521                vec![
2522                    mz_catalog_schema.clone(),
2523                    pg_catalog_schema.clone(),
2524                    conn_catalog.search_path[0].clone()
2525                ]
2526            );
2527            assert_eq!(
2528                conn_catalog.effective_search_path(true),
2529                vec![
2530                    mz_temp_schema.clone(),
2531                    mz_catalog_schema.clone(),
2532                    pg_catalog_schema.clone(),
2533                    conn_catalog.search_path[0].clone()
2534                ]
2535            );
2536
2537            // missing schemas are added when missing
2538            let mut session = Session::dummy();
2539            session
2540                .vars_mut()
2541                .set(
2542                    &SystemVars::new(),
2543                    "search_path",
2544                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2545                    false,
2546                )
2547                .expect("failed to set search_path");
2548            let conn_catalog = catalog.for_session(&session);
2549            assert_ne!(
2550                conn_catalog.effective_search_path(false),
2551                conn_catalog.search_path
2552            );
2553            assert_ne!(
2554                conn_catalog.effective_search_path(true),
2555                conn_catalog.search_path
2556            );
2557            assert_eq!(
2558                conn_catalog.effective_search_path(false),
2559                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2560            );
2561            assert_eq!(
2562                conn_catalog.effective_search_path(true),
2563                vec![
2564                    mz_temp_schema.clone(),
2565                    mz_catalog_schema.clone(),
2566                    pg_catalog_schema.clone()
2567                ]
2568            );
2569
2570            let mut session = Session::dummy();
2571            session
2572                .vars_mut()
2573                .set(
2574                    &SystemVars::new(),
2575                    "search_path",
2576                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2577                    false,
2578                )
2579                .expect("failed to set search_path");
2580            let conn_catalog = catalog.for_session(&session);
2581            assert_ne!(
2582                conn_catalog.effective_search_path(false),
2583                conn_catalog.search_path
2584            );
2585            assert_ne!(
2586                conn_catalog.effective_search_path(true),
2587                conn_catalog.search_path
2588            );
2589            assert_eq!(
2590                conn_catalog.effective_search_path(false),
2591                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2592            );
2593            assert_eq!(
2594                conn_catalog.effective_search_path(true),
2595                vec![
2596                    mz_temp_schema.clone(),
2597                    pg_catalog_schema.clone(),
2598                    mz_catalog_schema.clone()
2599                ]
2600            );
2601
2602            let mut session = Session::dummy();
2603            session
2604                .vars_mut()
2605                .set(
2606                    &SystemVars::new(),
2607                    "search_path",
2608                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2609                    false,
2610                )
2611                .expect("failed to set search_path");
2612            let conn_catalog = catalog.for_session(&session);
2613            assert_ne!(
2614                conn_catalog.effective_search_path(false),
2615                conn_catalog.search_path
2616            );
2617            assert_ne!(
2618                conn_catalog.effective_search_path(true),
2619                conn_catalog.search_path
2620            );
2621            assert_eq!(
2622                conn_catalog.effective_search_path(false),
2623                vec![
2624                    mz_catalog_schema.clone(),
2625                    pg_catalog_schema.clone(),
2626                    mz_temp_schema.clone()
2627                ]
2628            );
2629            assert_eq!(
2630                conn_catalog.effective_search_path(true),
2631                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2632            );
2633            catalog.expire().await;
2634        })
2635        .await
2636    }
2637
2638    #[mz_ore::test(tokio::test)]
2639    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2640    async fn test_normalized_create() {
2641        use mz_ore::collections::CollectionExt;
2642        Catalog::with_debug(|catalog| async move {
2643            let conn_catalog = catalog.for_system_session();
2644            let scx = &mut StatementContext::new(None, &conn_catalog);
2645
2646            let parsed = mz_sql_parser::parser::parse_statements(
2647                "create view public.foo as select 1 as bar",
2648            )
2649            .expect("")
2650            .into_element()
2651            .ast;
2652
2653            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2654
2655            // Ensure that all identifiers are quoted.
2656            assert_eq!(
2657                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2658                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2659            );
2660            catalog.expire().await;
2661        })
2662        .await;
2663    }
2664
2665    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2666    #[mz_ore::test(tokio::test)]
2667    #[cfg_attr(miri, ignore)] // slow
2668    async fn test_large_catalog_item() {
2669        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2670        let column = ", 1";
2671        let view_def_size = view_def.bytes().count();
2672        let column_size = column.bytes().count();
2673        let column_count =
2674            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2675        let columns = iter::repeat(column).take(column_count).join("");
2676        let create_sql = format!("{view_def}{columns})");
2677        let create_sql_check = create_sql.clone();
2678        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2679        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2680            &create_sql
2681        ));
2682
2683        let persist_client = PersistClient::new_for_tests().await;
2684        let organization_id = Uuid::new_v4();
2685        let id = CatalogItemId::User(1);
2686        let gid = GlobalId::User(1);
2687        let bootstrap_args = test_bootstrap_args();
2688        {
2689            let mut catalog = Catalog::open_debug_catalog(
2690                persist_client.clone(),
2691                organization_id.clone(),
2692                &bootstrap_args,
2693            )
2694            .await
2695            .expect("unable to open debug catalog");
2696            let item = catalog
2697                .state()
2698                .deserialize_item(
2699                    gid,
2700                    &create_sql,
2701                    &BTreeMap::new(),
2702                    &mut LocalExpressionCache::Closed,
2703                    None,
2704                )
2705                .expect("unable to parse view");
2706            let commit_ts = catalog.current_upper().await;
2707            catalog
2708                .transact(
2709                    None,
2710                    commit_ts,
2711                    None,
2712                    vec![Op::CreateItem {
2713                        item,
2714                        name: QualifiedItemName {
2715                            qualifiers: ItemQualifiers {
2716                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2717                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2718                            },
2719                            item: "v".to_string(),
2720                        },
2721                        id,
2722                        owner_id: MZ_SYSTEM_ROLE_ID,
2723                    }],
2724                )
2725                .await
2726                .expect("failed to transact");
2727            catalog.expire().await;
2728        }
2729        {
2730            let catalog =
2731                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2732                    .await
2733                    .expect("unable to open debug catalog");
2734            let view = catalog.get_entry(&id);
2735            assert_eq!("v", view.name.item);
2736            match &view.item {
2737                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2738                item => panic!("expected view, got {}", item.typ()),
2739            }
2740            catalog.expire().await;
2741        }
2742    }
2743
2744    #[mz_ore::test(tokio::test)]
2745    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2746    async fn test_object_type() {
2747        Catalog::with_debug(|catalog| async move {
2748            let conn_catalog = catalog.for_system_session();
2749
2750            assert_eq!(
2751                mz_sql::catalog::ObjectType::ClusterReplica,
2752                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2753                    ClusterId::user(1).expect("1 is a valid ID"),
2754                    ReplicaId::User(1)
2755                )))
2756            );
2757            assert_eq!(
2758                mz_sql::catalog::ObjectType::Role,
2759                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2760            );
2761            catalog.expire().await;
2762        })
2763        .await;
2764    }
2765
2766    #[mz_ore::test(tokio::test)]
2767    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2768    async fn test_get_privileges() {
2769        Catalog::with_debug(|catalog| async move {
2770            let conn_catalog = catalog.for_system_session();
2771
2772            assert_eq!(
2773                None,
2774                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2775                    ClusterId::user(1).expect("1 is a valid ID"),
2776                    ReplicaId::User(1),
2777                ))))
2778            );
2779            assert_eq!(
2780                None,
2781                conn_catalog
2782                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2783            );
2784            catalog.expire().await;
2785        })
2786        .await;
2787    }
2788
2789    #[mz_ore::test(tokio::test)]
2790    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2791    async fn verify_builtin_descs() {
2792        Catalog::with_debug(|catalog| async move {
2793            let conn_catalog = catalog.for_system_session();
2794
2795            let builtins_cfg = BuiltinsConfig {
2796                include_continual_tasks: true,
2797            };
2798            for builtin in BUILTINS::iter(&builtins_cfg) {
2799                let (schema, name, expected_desc) = match builtin {
2800                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2801                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2802                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2803                    Builtin::Log(_)
2804                    | Builtin::Type(_)
2805                    | Builtin::Func(_)
2806                    | Builtin::ContinualTask(_)
2807                    | Builtin::Index(_)
2808                    | Builtin::Connection(_) => continue,
2809                };
2810                let item = conn_catalog
2811                    .resolve_item(&PartialItemName {
2812                        database: None,
2813                        schema: Some(schema.to_string()),
2814                        item: name.to_string(),
2815                    })
2816                    .expect("unable to resolve item")
2817                    .at_version(RelationVersionSelector::Latest);
2818
2819                let actual_desc = item.relation_desc().expect("invalid item type");
2820                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2821                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2822                {
2823                    assert_eq!(
2824                        actual_name, expected_name,
2825                        "item {schema}.{name} column {index} name did not match its expected name"
2826                    );
2827                    assert_eq!(
2828                        actual_typ, expected_typ,
2829                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2830                    );
2831                }
2832                assert_eq!(
2833                    &*actual_desc, expected_desc,
2834                    "item {schema}.{name} did not match its expected RelationDesc"
2835                );
2836            }
2837            catalog.expire().await;
2838        })
2839        .await
2840    }
2841
2842    // Connect to a running Postgres server and verify that our builtin
2843    // types and functions match it, in addition to some other things.
2844    #[mz_ore::test(tokio::test)]
2845    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2846    async fn test_compare_builtins_postgres() {
2847        async fn inner(catalog: Catalog) {
2848            // Verify that all builtin functions:
2849            // - have a unique OID
2850            // - if they have a postgres counterpart (same oid) then they have matching name
2851            let (client, connection) = tokio_postgres::connect(
2852                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2853                NoTls,
2854            )
2855            .await
2856            .expect("failed to connect to Postgres");
2857
2858            task::spawn(|| "compare_builtin_postgres", async move {
2859                if let Err(e) = connection.await {
2860                    panic!("connection error: {}", e);
2861                }
2862            });
2863
2864            struct PgProc {
2865                name: String,
2866                arg_oids: Vec<u32>,
2867                ret_oid: Option<u32>,
2868                ret_set: bool,
2869            }
2870
2871            struct PgType {
2872                name: String,
2873                ty: String,
2874                elem: u32,
2875                array: u32,
2876                input: u32,
2877                receive: u32,
2878            }
2879
2880            struct PgOper {
2881                oprresult: u32,
2882                name: String,
2883            }
2884
2885            let pg_proc: BTreeMap<_, _> = client
2886                .query(
2887                    "SELECT
2888                    p.oid,
2889                    proname,
2890                    proargtypes,
2891                    prorettype,
2892                    proretset
2893                FROM pg_proc p
2894                JOIN pg_namespace n ON p.pronamespace = n.oid",
2895                    &[],
2896                )
2897                .await
2898                .expect("pg query failed")
2899                .into_iter()
2900                .map(|row| {
2901                    let oid: u32 = row.get("oid");
2902                    let pg_proc = PgProc {
2903                        name: row.get("proname"),
2904                        arg_oids: row.get("proargtypes"),
2905                        ret_oid: row.get("prorettype"),
2906                        ret_set: row.get("proretset"),
2907                    };
2908                    (oid, pg_proc)
2909                })
2910                .collect();
2911
2912            let pg_type: BTreeMap<_, _> = client
2913                .query(
2914                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2915                    &[],
2916                )
2917                .await
2918                .expect("pg query failed")
2919                .into_iter()
2920                .map(|row| {
2921                    let oid: u32 = row.get("oid");
2922                    let pg_type = PgType {
2923                        name: row.get("typname"),
2924                        ty: row.get("typtype"),
2925                        elem: row.get("typelem"),
2926                        array: row.get("typarray"),
2927                        input: row.get("typinput"),
2928                        receive: row.get("typreceive"),
2929                    };
2930                    (oid, pg_type)
2931                })
2932                .collect();
2933
2934            let pg_oper: BTreeMap<_, _> = client
2935                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2936                .await
2937                .expect("pg query failed")
2938                .into_iter()
2939                .map(|row| {
2940                    let oid: u32 = row.get("oid");
2941                    let pg_oper = PgOper {
2942                        name: row.get("oprname"),
2943                        oprresult: row.get("oprresult"),
2944                    };
2945                    (oid, pg_oper)
2946                })
2947                .collect();
2948
2949            let conn_catalog = catalog.for_system_session();
2950            let resolve_type_oid = |item: &str| {
2951                conn_catalog
2952                    .resolve_type(&PartialItemName {
2953                        database: None,
2954                        // All functions we check exist in PG, so the types must, as
2955                        // well
2956                        schema: Some(PG_CATALOG_SCHEMA.into()),
2957                        item: item.to_string(),
2958                    })
2959                    .expect("unable to resolve type")
2960                    .oid()
2961            };
2962
2963            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2964                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2965                .collect();
2966
2967            let mut all_oids = BTreeSet::new();
2968
2969            // A function to determine if two oids are equivalent enough for these tests. We don't
2970            // support some types, so map exceptions here.
2971            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2972                [
2973                    // We don't support NAME.
2974                    (Type::NAME, Type::TEXT),
2975                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2976                    // We don't support time with time zone.
2977                    (Type::TIME, Type::TIMETZ),
2978                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2979                ]
2980                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2981            );
2982            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2983                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2984            ]);
2985            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2986                if ignore_return_types.contains(&fn_oid) {
2987                    return true;
2988                }
2989                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2990                    return true;
2991                }
2992                a == b
2993            };
2994
2995            let builtins_cfg = BuiltinsConfig {
2996                include_continual_tasks: true,
2997            };
2998            for builtin in BUILTINS::iter(&builtins_cfg) {
2999                match builtin {
3000                    Builtin::Type(ty) => {
3001                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3002
3003                        if ty.oid >= FIRST_MATERIALIZE_OID {
3004                            // High OIDs are reserved in Materialize and don't have
3005                            // PostgreSQL counterparts.
3006                            continue;
3007                        }
3008
3009                        // For types that have a PostgreSQL counterpart, verify that
3010                        // the name and oid match.
3011                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3012                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3013                        });
3014                        assert_eq!(
3015                            ty.name, pg_ty.name,
3016                            "oid {} has name {} in postgres; expected {}",
3017                            ty.oid, pg_ty.name, ty.name,
3018                        );
3019
3020                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3021                            None => (0, 0),
3022                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3023                        };
3024                        assert_eq!(
3025                            typinput_oid, pg_ty.input,
3026                            "type {} has typinput OID {:?} in mz but {:?} in pg",
3027                            ty.name, typinput_oid, pg_ty.input,
3028                        );
3029                        assert_eq!(
3030                            typreceive_oid, pg_ty.receive,
3031                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
3032                            ty.name, typreceive_oid, pg_ty.receive,
3033                        );
3034                        if typinput_oid != 0 {
3035                            assert!(
3036                                func_oids.contains(&typinput_oid),
3037                                "type {} has typinput OID {} that does not exist in pg_proc",
3038                                ty.name,
3039                                typinput_oid,
3040                            );
3041                        }
3042                        if typreceive_oid != 0 {
3043                            assert!(
3044                                func_oids.contains(&typreceive_oid),
3045                                "type {} has typreceive OID {} that does not exist in pg_proc",
3046                                ty.name,
3047                                typreceive_oid,
3048                            );
3049                        }
3050
3051                        // Ensure the type matches.
3052                        match &ty.details.typ {
3053                            CatalogType::Array { element_reference } => {
3054                                let elem_ty = BUILTINS::iter(&builtins_cfg)
3055                                    .filter_map(|builtin| match builtin {
3056                                        Builtin::Type(ty @ BuiltinType { name, .. })
3057                                            if element_reference == name =>
3058                                        {
3059                                            Some(ty)
3060                                        }
3061                                        _ => None,
3062                                    })
3063                                    .next();
3064                                let elem_ty = match elem_ty {
3065                                    Some(ty) => ty,
3066                                    None => {
3067                                        panic!("{} is unexpectedly not a type", element_reference)
3068                                    }
3069                                };
3070                                assert_eq!(
3071                                    pg_ty.elem, elem_ty.oid,
3072                                    "type {} has mismatched element OIDs",
3073                                    ty.name
3074                                )
3075                            }
3076                            CatalogType::Pseudo => {
3077                                assert_eq!(
3078                                    pg_ty.ty, "p",
3079                                    "type {} is not a pseudo type as expected",
3080                                    ty.name
3081                                )
3082                            }
3083                            CatalogType::Range { .. } => {
3084                                assert_eq!(
3085                                    pg_ty.ty, "r",
3086                                    "type {} is not a range type as expected",
3087                                    ty.name
3088                                );
3089                            }
3090                            _ => {
3091                                assert_eq!(
3092                                    pg_ty.ty, "b",
3093                                    "type {} is not a base type as expected",
3094                                    ty.name
3095                                )
3096                            }
3097                        }
3098
3099                        // Ensure the array type reference is correct.
3100                        let schema = catalog
3101                            .resolve_schema_in_database(
3102                                &ResolvedDatabaseSpecifier::Ambient,
3103                                ty.schema,
3104                                &SYSTEM_CONN_ID,
3105                            )
3106                            .expect("unable to resolve schema");
3107                        let allocated_type = catalog
3108                            .resolve_type(
3109                                None,
3110                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3111                                &PartialItemName {
3112                                    database: None,
3113                                    schema: Some(schema.name().schema.clone()),
3114                                    item: ty.name.to_string(),
3115                                },
3116                                &SYSTEM_CONN_ID,
3117                            )
3118                            .expect("unable to resolve type");
3119                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3120                            ty
3121                        } else {
3122                            panic!("unexpectedly not a type")
3123                        };
3124                        match ty.details.array_id {
3125                            Some(array_id) => {
3126                                let array_ty = catalog.get_entry(&array_id);
3127                                assert_eq!(
3128                                    pg_ty.array, array_ty.oid,
3129                                    "type {} has mismatched array OIDs",
3130                                    allocated_type.name.item,
3131                                );
3132                            }
3133                            None => assert_eq!(
3134                                pg_ty.array, 0,
3135                                "type {} does not have an array type in mz but does in pg",
3136                                allocated_type.name.item,
3137                            ),
3138                        }
3139                    }
3140                    Builtin::Func(func) => {
3141                        for imp in func.inner.func_impls() {
3142                            assert!(
3143                                all_oids.insert(imp.oid),
3144                                "{} reused oid {}",
3145                                func.name,
3146                                imp.oid
3147                            );
3148
3149                            assert!(
3150                                imp.oid < FIRST_USER_OID,
3151                                "built-in function {} erroneously has OID in user space ({})",
3152                                func.name,
3153                                imp.oid,
3154                            );
3155
3156                            // For functions that have a postgres counterpart, verify that the name and
3157                            // oid match.
3158                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3159                                continue;
3160                            } else {
3161                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3162                                    panic!(
3163                                        "pg_proc missing function {}: oid {}",
3164                                        func.name, imp.oid
3165                                    )
3166                                })
3167                            };
3168                            assert_eq!(
3169                                func.name, pg_fn.name,
3170                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3171                                imp.oid, func.name, pg_fn.name
3172                            );
3173
3174                            // Complain, but don't fail, if argument oids don't match.
3175                            // TODO: make these match.
3176                            let imp_arg_oids = imp
3177                                .arg_typs
3178                                .iter()
3179                                .map(|item| resolve_type_oid(item))
3180                                .collect::<Vec<_>>();
3181
3182                            if imp_arg_oids != pg_fn.arg_oids {
3183                                println!(
3184                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3185                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3186                                );
3187                            }
3188
3189                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3190
3191                            assert!(
3192                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3193                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3194                                imp.oid,
3195                                func.name,
3196                                imp_return_oid,
3197                                pg_fn.ret_oid
3198                            );
3199
3200                            assert_eq!(
3201                                imp.return_is_set, pg_fn.ret_set,
3202                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3203                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3204                            );
3205                        }
3206                    }
3207                    _ => (),
3208                }
3209            }
3210
3211            for (op, func) in OP_IMPLS.iter() {
3212                for imp in func.func_impls() {
3213                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3214
3215                    // For operators that have a postgres counterpart, verify that the name and oid match.
3216                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3217                        continue;
3218                    } else {
3219                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3220                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3221                        })
3222                    };
3223
3224                    assert_eq!(*op, pg_op.name);
3225
3226                    let imp_return_oid =
3227                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3228                    if imp_return_oid != pg_op.oprresult {
3229                        panic!(
3230                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3231                            imp.oid, op, imp_return_oid, pg_op.oprresult
3232                        );
3233                    }
3234                }
3235            }
3236            catalog.expire().await;
3237        }
3238
3239        Catalog::with_debug(inner).await
3240    }
3241
3242    // Execute all builtin functions with all combinations of arguments from interesting datums.
3243    #[mz_ore::test(tokio::test)]
3244    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3245    async fn test_smoketest_all_builtins() {
3246        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3247            let catalog = Arc::new(catalog);
3248            let conn_catalog = catalog.for_system_session();
3249
3250            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3251            let mut handles = Vec::new();
3252
3253            // Extracted during planning; always panics when executed.
3254            let ignore_names = BTreeSet::from([
3255                "avg",
3256                "avg_internal_v1",
3257                "bool_and",
3258                "bool_or",
3259                "has_table_privilege", // > 3 s each
3260                "has_type_privilege",  // > 3 s each
3261                "mod",
3262                "mz_panic",
3263                "mz_sleep",
3264                "pow",
3265                "stddev_pop",
3266                "stddev_samp",
3267                "stddev",
3268                "var_pop",
3269                "var_samp",
3270                "variance",
3271            ]);
3272
3273            let fns = BUILTINS::funcs()
3274                .map(|func| (&func.name, func.inner))
3275                .chain(OP_IMPLS.iter());
3276
3277            for (name, func) in fns {
3278                if ignore_names.contains(name) {
3279                    continue;
3280                }
3281                let Func::Scalar(impls) = func else {
3282                    continue;
3283                };
3284
3285                'outer: for imp in impls {
3286                    let details = imp.details();
3287                    let mut styps = Vec::new();
3288                    for item in details.arg_typs.iter() {
3289                        let oid = resolve_type_oid(item);
3290                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3291                            continue 'outer;
3292                        };
3293                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3294                    }
3295                    let datums = styps
3296                        .iter()
3297                        .map(|styp| {
3298                            let mut datums = vec![Datum::Null];
3299                            datums.extend(styp.interesting_datums());
3300                            datums
3301                        })
3302                        .collect::<Vec<_>>();
3303                    // Skip nullary fns.
3304                    if datums.is_empty() {
3305                        continue;
3306                    }
3307
3308                    let return_oid = details
3309                        .return_typ
3310                        .map(resolve_type_oid)
3311                        .expect("must exist");
3312                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3313                        .ok()
3314                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3315
3316                    let mut idxs = vec![0; datums.len()];
3317                    while idxs[0] < datums[0].len() {
3318                        let mut args = Vec::with_capacity(idxs.len());
3319                        for i in 0..(datums.len()) {
3320                            args.push(datums[i][idxs[i]]);
3321                        }
3322
3323                        let op = &imp.op;
3324                        let scalars = args
3325                            .iter()
3326                            .enumerate()
3327                            .map(|(i, datum)| {
3328                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3329                                    datum.clone(),
3330                                    styps[i].clone(),
3331                                ))
3332                            })
3333                            .collect();
3334
3335                        let call_name = format!(
3336                            "{name}({}) (oid: {})",
3337                            args.iter()
3338                                .map(|d| d.to_string())
3339                                .collect::<Vec<_>>()
3340                                .join(", "),
3341                            imp.oid
3342                        );
3343                        let catalog = Arc::clone(&catalog);
3344                        let call_name_fn = call_name.clone();
3345                        let return_styp = return_styp.clone();
3346                        let handle = task::spawn_blocking(
3347                            || call_name,
3348                            move || {
3349                                smoketest_fn(
3350                                    name,
3351                                    call_name_fn,
3352                                    op,
3353                                    imp,
3354                                    args,
3355                                    catalog,
3356                                    scalars,
3357                                    return_styp,
3358                                )
3359                            },
3360                        );
3361                        handles.push(handle);
3362
3363                        // Advance to the next datum combination.
3364                        for i in (0..datums.len()).rev() {
3365                            idxs[i] += 1;
3366                            if idxs[i] >= datums[i].len() {
3367                                if i == 0 {
3368                                    break;
3369                                }
3370                                idxs[i] = 0;
3371                                continue;
3372                            } else {
3373                                break;
3374                            }
3375                        }
3376                    }
3377                }
3378            }
3379            handles
3380        }
3381
3382        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3383        for handle in handles {
3384            handle.await;
3385        }
3386    }
3387
3388    fn smoketest_fn(
3389        name: &&str,
3390        call_name: String,
3391        op: &Operation<HirScalarExpr>,
3392        imp: &FuncImpl<HirScalarExpr>,
3393        args: Vec<Datum<'_>>,
3394        catalog: Arc<Catalog>,
3395        scalars: Vec<CoercibleScalarExpr>,
3396        return_styp: Option<SqlScalarType>,
3397    ) {
3398        let conn_catalog = catalog.for_system_session();
3399        let pcx = PlanContext::zero();
3400        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3401        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3402        let ecx = ExprContext {
3403            qcx: &qcx,
3404            name: "smoketest",
3405            scope: &Scope::empty(),
3406            relation_type: &SqlRelationType::empty(),
3407            allow_aggregates: false,
3408            allow_subqueries: false,
3409            allow_parameters: false,
3410            allow_windows: false,
3411        };
3412        let arena = RowArena::new();
3413        let mut session = Session::<Timestamp>::dummy();
3414        session
3415            .start_transaction(to_datetime(0), None, None)
3416            .expect("must succeed");
3417        let prep_style = ExprPrepOneShot {
3418            logical_time: EvalTime::Time(Timestamp::MIN),
3419            session: &session,
3420            catalog_state: &catalog.state,
3421        };
3422
3423        // Execute the function as much as possible, ensuring no panics occur, but
3424        // otherwise ignoring eval errors. We also do various other checks.
3425        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3426        if let Ok(hir) = res {
3427            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3428                // Populate unmaterialized functions.
3429                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3430
3431                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3432                    if let Some(return_styp) = return_styp {
3433                        let mir_typ = mir.typ(&[]);
3434                        // MIR type inference should be consistent with the type
3435                        // we get from the catalog.
3436                        soft_assert_eq_or_log!(
3437                            mir_typ.scalar_type,
3438                            return_styp,
3439                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3440                        );
3441                        // The following will check not just that the scalar type
3442                        // is ok, but also catches if the function returned a null
3443                        // but the MIR type inference said "non-nullable".
3444                        if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3445                            panic!(
3446                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3447                            );
3448                        }
3449                        // Check the consistency of `introduces_nulls` and
3450                        // `propagates_nulls` with `MirScalarExpr::typ`.
3451                        if let Some((introduces_nulls, propagates_nulls)) =
3452                            call_introduces_propagates_nulls(&mir)
3453                        {
3454                            if introduces_nulls {
3455                                // If the function introduces_nulls, then the return
3456                                // type should always be nullable, regardless of
3457                                // the nullability of the input types.
3458                                assert!(
3459                                    mir_typ.nullable,
3460                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3461                                    name, args, mir, mir_typ.nullable
3462                                );
3463                            } else {
3464                                let any_input_null = args.iter().any(|arg| arg.is_null());
3465                                if !any_input_null {
3466                                    assert!(
3467                                        !mir_typ.nullable,
3468                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3469                                        name, args, mir, mir_typ.nullable
3470                                    );
3471                                } else {
3472                                    assert_eq!(
3473                                        mir_typ.nullable, propagates_nulls,
3474                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3475                                        name, args, mir, mir_typ.nullable
3476                                    );
3477                                }
3478                            }
3479                        }
3480                        // Check that `MirScalarExpr::reduce` yields the same result
3481                        // as the real evaluation.
3482                        let mut reduced = mir.clone();
3483                        reduced.reduce(&[]);
3484                        match reduced {
3485                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3486                                match reduce_result {
3487                                    Ok(reduce_result_row) => {
3488                                        let reduce_result_datum = reduce_result_row.unpack_first();
3489                                        assert_eq!(
3490                                            reduce_result_datum,
3491                                            eval_result_datum,
3492                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3493                                            name,
3494                                            args,
3495                                            mir,
3496                                            eval_result_datum,
3497                                            mir_typ.scalar_type,
3498                                            reduce_result_datum,
3499                                            ctyp.scalar_type
3500                                        );
3501                                        // Let's check that the types also match.
3502                                        // (We are not checking nullability here,
3503                                        // because it's ok when we know a more
3504                                        // precise nullability after actually
3505                                        // evaluating a function than before.)
3506                                        assert_eq!(
3507                                            ctyp.scalar_type,
3508                                            mir_typ.scalar_type,
3509                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3510                                            name,
3511                                            args,
3512                                            mir,
3513                                            eval_result_datum,
3514                                            mir_typ.scalar_type,
3515                                            reduce_result_datum,
3516                                            ctyp.scalar_type
3517                                        );
3518                                    }
3519                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3520                                }
3521                            }
3522                            _ => unreachable!(
3523                                "all args are literals, so should have reduced to a literal"
3524                            ),
3525                        }
3526                    }
3527                }
3528            }
3529        }
3530    }
3531
3532    /// If the given MirScalarExpr
3533    ///  - is a function call, and
3534    ///  - all arguments are literals
3535    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3536    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3537        match mir_func_call {
3538            MirScalarExpr::CallUnary { func, expr } => {
3539                if expr.is_literal() {
3540                    Some((func.introduces_nulls(), func.propagates_nulls()))
3541                } else {
3542                    None
3543                }
3544            }
3545            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3546                if expr1.is_literal() && expr2.is_literal() {
3547                    Some((func.introduces_nulls(), func.propagates_nulls()))
3548                } else {
3549                    None
3550                }
3551            }
3552            MirScalarExpr::CallVariadic { func, exprs } => {
3553                if exprs.iter().all(|arg| arg.is_literal()) {
3554                    Some((func.introduces_nulls(), func.propagates_nulls()))
3555                } else {
3556                    None
3557                }
3558            }
3559            _ => None,
3560        }
3561    }
3562
3563    // Make sure pg views don't use types that only exist in Materialize.
3564    #[mz_ore::test(tokio::test)]
3565    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3566    async fn test_pg_views_forbidden_types() {
3567        Catalog::with_debug(|catalog| async move {
3568            let conn_catalog = catalog.for_system_session();
3569
3570            for view in BUILTINS::views().filter(|view| {
3571                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3572            }) {
3573                let item = conn_catalog
3574                    .resolve_item(&PartialItemName {
3575                        database: None,
3576                        schema: Some(view.schema.to_string()),
3577                        item: view.name.to_string(),
3578                    })
3579                    .expect("unable to resolve view")
3580                    // TODO(alter_table)
3581                    .at_version(RelationVersionSelector::Latest);
3582                let full_name = conn_catalog.resolve_full_name(item.name());
3583                let desc = item.relation_desc().expect("invalid item type");
3584                for col_type in desc.iter_types() {
3585                    match &col_type.scalar_type {
3586                        typ @ SqlScalarType::UInt16
3587                        | typ @ SqlScalarType::UInt32
3588                        | typ @ SqlScalarType::UInt64
3589                        | typ @ SqlScalarType::MzTimestamp
3590                        | typ @ SqlScalarType::List { .. }
3591                        | typ @ SqlScalarType::Map { .. }
3592                        | typ @ SqlScalarType::MzAclItem => {
3593                            panic!("{typ:?} type found in {full_name}");
3594                        }
3595                        SqlScalarType::AclItem
3596                        | SqlScalarType::Bool
3597                        | SqlScalarType::Int16
3598                        | SqlScalarType::Int32
3599                        | SqlScalarType::Int64
3600                        | SqlScalarType::Float32
3601                        | SqlScalarType::Float64
3602                        | SqlScalarType::Numeric { .. }
3603                        | SqlScalarType::Date
3604                        | SqlScalarType::Time
3605                        | SqlScalarType::Timestamp { .. }
3606                        | SqlScalarType::TimestampTz { .. }
3607                        | SqlScalarType::Interval
3608                        | SqlScalarType::PgLegacyChar
3609                        | SqlScalarType::Bytes
3610                        | SqlScalarType::String
3611                        | SqlScalarType::Char { .. }
3612                        | SqlScalarType::VarChar { .. }
3613                        | SqlScalarType::Jsonb
3614                        | SqlScalarType::Uuid
3615                        | SqlScalarType::Array(_)
3616                        | SqlScalarType::Record { .. }
3617                        | SqlScalarType::Oid
3618                        | SqlScalarType::RegProc
3619                        | SqlScalarType::RegType
3620                        | SqlScalarType::RegClass
3621                        | SqlScalarType::Int2Vector
3622                        | SqlScalarType::Range { .. }
3623                        | SqlScalarType::PgLegacyName => {}
3624                    }
3625                }
3626            }
3627            catalog.expire().await;
3628        })
3629        .await
3630    }
3631
3632    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3633    // introspection relations.
3634    #[mz_ore::test(tokio::test)]
3635    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3636    async fn test_mz_introspection_builtins() {
3637        Catalog::with_debug(|catalog| async move {
3638            let conn_catalog = catalog.for_system_session();
3639
3640            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3641            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3642
3643            for entry in catalog.entries() {
3644                let schema_spec = entry.name().qualifiers.schema_spec;
3645                let introspection_deps = catalog.introspection_dependencies(entry.id);
3646                if introspection_deps.is_empty() {
3647                    assert!(
3648                        schema_spec != introspection_schema_spec,
3649                        "entry does not depend on introspection sources but is in \
3650                         `mz_introspection`: {}",
3651                        conn_catalog.resolve_full_name(entry.name()),
3652                    );
3653                } else {
3654                    assert!(
3655                        schema_spec == introspection_schema_spec,
3656                        "entry depends on introspection sources but is not in \
3657                         `mz_introspection`: {}",
3658                        conn_catalog.resolve_full_name(entry.name()),
3659                    );
3660                }
3661            }
3662        })
3663        .await
3664    }
3665
3666    #[mz_ore::test(tokio::test)]
3667    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3668    async fn test_multi_subscriber_catalog() {
3669        let persist_client = PersistClient::new_for_tests().await;
3670        let bootstrap_args = test_bootstrap_args();
3671        let organization_id = Uuid::new_v4();
3672        let db_name = "DB";
3673
3674        let mut writer_catalog = Catalog::open_debug_catalog(
3675            persist_client.clone(),
3676            organization_id.clone(),
3677            &bootstrap_args,
3678        )
3679        .await
3680        .expect("open_debug_catalog");
3681        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3682            persist_client.clone(),
3683            organization_id.clone(),
3684            &bootstrap_args,
3685        )
3686        .await
3687        .expect("open_debug_read_only_catalog");
3688        assert_err!(writer_catalog.resolve_database(db_name));
3689        assert_err!(read_only_catalog.resolve_database(db_name));
3690
3691        let commit_ts = writer_catalog.current_upper().await;
3692        writer_catalog
3693            .transact(
3694                None,
3695                commit_ts,
3696                None,
3697                vec![Op::CreateDatabase {
3698                    name: db_name.to_string(),
3699                    owner_id: MZ_SYSTEM_ROLE_ID,
3700                }],
3701            )
3702            .await
3703            .expect("failed to transact");
3704
3705        let write_db = writer_catalog
3706            .resolve_database(db_name)
3707            .expect("resolve_database");
3708        read_only_catalog
3709            .sync_to_current_updates()
3710            .await
3711            .expect("sync_to_current_updates");
3712        let read_db = read_only_catalog
3713            .resolve_database(db_name)
3714            .expect("resolve_database");
3715
3716        assert_eq!(write_db, read_db);
3717
3718        let writer_catalog_fencer =
3719            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3720                .await
3721                .expect("open_debug_catalog for fencer");
3722        let fencer_db = writer_catalog_fencer
3723            .resolve_database(db_name)
3724            .expect("resolve_database for fencer");
3725        assert_eq!(fencer_db, read_db);
3726
3727        let write_fence_err = writer_catalog
3728            .sync_to_current_updates()
3729            .await
3730            .expect_err("sync_to_current_updates for fencer");
3731        assert!(matches!(
3732            write_fence_err,
3733            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3734        ));
3735        let read_fence_err = read_only_catalog
3736            .sync_to_current_updates()
3737            .await
3738            .expect_err("sync_to_current_updates after fencer");
3739        assert!(matches!(
3740            read_fence_err,
3741            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3742        ));
3743
3744        writer_catalog.expire().await;
3745        read_only_catalog.expire().await;
3746        writer_catalog_fencer.expire().await;
3747    }
3748}