Skip to main content

mz_adapter/
catalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// TODO(jkosh44) Move to mz_catalog crate.
11
12//! Persistent metadata storage for the coordinator.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31    BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32    MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38    BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43    CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44    NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64    CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65    CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
66    CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
67    SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70    CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71    PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72    ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_transform::dataflow::DataflowMetainfo;
83use mz_transform::notice::OptimizerNotice;
84use smallvec::SmallVec;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89// DO NOT add any more imports from `crate` outside of `crate::catalog`.
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94    DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114/// A `Catalog` keeps track of the SQL objects known to the planner.
115///
116/// For each object, it keeps track of both forward and reverse dependencies:
117/// i.e., which objects are depended upon by the object, and which objects
118/// depend upon the object. It enforces the SQL rules around dropping: an object
119/// cannot be dropped until all of the objects that depend upon it are dropped.
120/// It also enforces uniqueness of names.
121///
122/// SQL mandates a hierarchy of exactly three layers. A catalog contains
123/// databases, databases contain schemas, and schemas contain catalog items,
124/// like sources, sinks, view, and indexes.
125///
126/// To the outside world, databases, schemas, and items are all identified by
127/// name. Items can be referred to by their [`FullItemName`], which fully and
128/// unambiguously specifies the item, or a [`PartialItemName`], which can omit the
129/// database name and/or the schema name. Partial names can be converted into
130/// full names via a complicated resolution process documented by the
131/// [`CatalogState::resolve`] method.
132///
133/// The catalog also maintains special "ambient schemas": virtual schemas,
134/// implicitly present in all databases, that house various system views.
135/// The big examples of ambient schemas are `pg_catalog` and `mz_catalog`.
136#[derive(Debug)]
137pub struct Catalog {
138    state: CatalogState,
139    plans: CatalogPlans,
140    expr_cache_handle: Option<ExpressionCacheHandle>,
141    storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142    transient_revision: u64,
143}
144
145// Implement our own Clone because derive can't unless S is Clone, which it's
146// not (hence the Arc).
147impl Clone for Catalog {
148    fn clone(&self) -> Self {
149        Self {
150            state: self.state.clone(),
151            plans: self.plans.clone(),
152            expr_cache_handle: self.expr_cache_handle.clone(),
153            storage: Arc::clone(&self.storage),
154            transient_revision: self.transient_revision,
155        }
156    }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161    optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162    physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163    dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164    notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168    /// Set the optimized plan for the item identified by `id`.
169    #[mz_ore::instrument(level = "trace")]
170    pub fn set_optimized_plan(
171        &mut self,
172        id: GlobalId,
173        plan: DataflowDescription<OptimizedMirRelationExpr>,
174    ) {
175        self.plans.optimized_plan_by_id.insert(id, plan.into());
176    }
177
178    /// Set the physical plan for the item identified by `id`.
179    #[mz_ore::instrument(level = "trace")]
180    pub fn set_physical_plan(
181        &mut self,
182        id: GlobalId,
183        plan: DataflowDescription<mz_compute_types::plan::Plan>,
184    ) {
185        self.plans.physical_plan_by_id.insert(id, plan.into());
186    }
187
188    /// Try to get the optimized plan for the item identified by `id`.
189    #[mz_ore::instrument(level = "trace")]
190    pub fn try_get_optimized_plan(
191        &self,
192        id: &GlobalId,
193    ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194        self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195    }
196
197    /// Try to get the physical plan for the item identified by `id`.
198    #[mz_ore::instrument(level = "trace")]
199    pub fn try_get_physical_plan(
200        &self,
201        id: &GlobalId,
202    ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203        self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204    }
205
206    /// Set the `DataflowMetainfo` for the item identified by `id`.
207    #[mz_ore::instrument(level = "trace")]
208    pub fn set_dataflow_metainfo(
209        &mut self,
210        id: GlobalId,
211        metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212    ) {
213        // Add entries to the `notices_by_dep_id` lookup map.
214        for notice in metainfo.optimizer_notices.iter() {
215            for dep_id in notice.dependencies.iter() {
216                let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217                entry.push(Arc::clone(notice))
218            }
219            if let Some(item_id) = notice.item_id {
220                soft_assert_eq_or_log!(
221                    item_id,
222                    id,
223                    "notice.item_id should match the id for whom we are saving the notice"
224                );
225            }
226        }
227        // Add the dataflow with the scoped entries.
228        self.plans.dataflow_metainfos.insert(id, metainfo);
229    }
230
231    /// Try to get the `DataflowMetainfo` for the item identified by `id`.
232    #[mz_ore::instrument(level = "trace")]
233    pub fn try_get_dataflow_metainfo(
234        &self,
235        id: &GlobalId,
236    ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237        self.plans.dataflow_metainfos.get(id)
238    }
239
240    /// Drop all optimized and physical plans and `DataflowMetainfo`s for the
241    /// item identified by `id`.
242    ///
243    /// Ignore requests for non-existing plans or `DataflowMetainfo`s.
244    ///
245    /// Return a set containing all dropped notices. Note that if for some
246    /// reason we end up with two identical notices being dropped by the same
247    /// call, the result will contain only one instance of that notice.
248    #[mz_ore::instrument(level = "trace")]
249    pub fn drop_plans_and_metainfos(
250        &mut self,
251        drop_ids: &BTreeSet<GlobalId>,
252    ) -> BTreeSet<Arc<OptimizerNotice>> {
253        // Collect dropped notices in this set.
254        let mut dropped_notices = BTreeSet::new();
255
256        // Remove plans and metainfo.optimizer_notices entries.
257        for id in drop_ids {
258            self.plans.optimized_plan_by_id.remove(id);
259            self.plans.physical_plan_by_id.remove(id);
260            if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261                soft_assert_or_log!(
262                    metainfo.optimizer_notices.iter().all_unique(),
263                    "should have been pushed there by `push_optimizer_notice_dedup`"
264                );
265                for n in metainfo.optimizer_notices.drain(..) {
266                    // Remove the corresponding notices_by_dep_id entries.
267                    for dep_id in n.dependencies.iter() {
268                        if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269                            soft_assert_or_log!(
270                                notices.iter().any(|x| &n == x),
271                                "corrupt notices_by_dep_id"
272                            );
273                            notices.retain(|x| &n != x)
274                        }
275                    }
276                    dropped_notices.insert(n);
277                }
278            }
279        }
280
281        // Remove notices_by_dep_id entries.
282        for id in drop_ids {
283            if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284                for n in notices.drain(..) {
285                    // Remove the corresponding metainfo.optimizer_notices entries.
286                    if let Some(item_id) = n.item_id.as_ref() {
287                        if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288                            metainfo.optimizer_notices.iter().for_each(|n2| {
289                                if let Some(item_id_2) = n2.item_id {
290                                    soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291                                }
292                            });
293                            metainfo.optimizer_notices.retain(|x| &n != x);
294                        }
295                    }
296                    dropped_notices.insert(n);
297                }
298            }
299        }
300
301        // Collect dependency ids not in drop_ids with at least one dropped
302        // notice.
303        let mut todo_dep_ids = BTreeSet::new();
304        for notice in dropped_notices.iter() {
305            for dep_id in notice.dependencies.iter() {
306                if !drop_ids.contains(dep_id) {
307                    todo_dep_ids.insert(*dep_id);
308                }
309            }
310        }
311        // Remove notices in `dropped_notices` for all `notices_by_dep_id`
312        // entries in `todo_dep_ids`.
313        for id in todo_dep_ids {
314            if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315                notices.retain(|n| !dropped_notices.contains(n))
316            }
317        }
318
319        // (We used to have a sanity check here that
320        // `dropped_notices.iter().any(|n| Arc::strong_count(n) != 1)`
321        // but this is not a correct assertion: There might be other clones of the catalog (e.g. if
322        // a peek is running concurrently to an index drop), in which case those clones also hold
323        // references to the notices.)
324
325        dropped_notices
326    }
327}
328
329#[derive(Debug)]
330pub struct ConnCatalog<'a> {
331    state: Cow<'a, CatalogState>,
332    /// Because we don't have any way of removing items from the catalog
333    /// temporarily, we allow the ConnCatalog to pretend that a set of items
334    /// don't exist during resolution.
335    ///
336    /// This feature is necessary to allow re-planning of statements, which is
337    /// either incredibly useful or required when altering item definitions.
338    ///
339    /// Note that uses of this field should be used by short-lived
340    /// catalogs.
341    unresolvable_ids: BTreeSet<CatalogItemId>,
342    conn_id: ConnectionId,
343    cluster: String,
344    database: Option<DatabaseId>,
345    search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
346    role_id: RoleId,
347    prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
348    portals: Option<&'a BTreeMap<String, Portal>>,
349    notices_tx: UnboundedSender<AdapterNotice>,
350}
351
352impl ConnCatalog<'_> {
353    pub fn conn_id(&self) -> &ConnectionId {
354        &self.conn_id
355    }
356
357    pub fn state(&self) -> &CatalogState {
358        &*self.state
359    }
360
361    /// Prevent planning from resolving item with the provided ID. Instead,
362    /// return an error as if the item did not exist.
363    ///
364    /// This feature is meant exclusively to permit re-planning statements
365    /// during update operations and should not be used otherwise given its
366    /// extremely "powerful" semantics.
367    ///
368    /// # Panics
369    /// If the catalog's role ID is not [`MZ_SYSTEM_ROLE_ID`].
370    pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
371        assert_eq!(
372            self.role_id, MZ_SYSTEM_ROLE_ID,
373            "only the system role can mark IDs unresolvable",
374        );
375        self.unresolvable_ids.insert(id);
376    }
377
378    /// Returns the schemas:
379    /// - mz_catalog
380    /// - pg_catalog
381    /// - temp (if requested)
382    /// - all schemas from the session's search_path var that exist
383    pub fn effective_search_path(
384        &self,
385        include_temp_schema: bool,
386    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
387        self.state
388            .effective_search_path(&self.search_path, include_temp_schema)
389    }
390}
391
392impl ConnectionResolver for ConnCatalog<'_> {
393    fn resolve_connection(
394        &self,
395        id: CatalogItemId,
396    ) -> mz_storage_types::connections::Connection<InlinedConnection> {
397        self.state().resolve_connection(id)
398    }
399}
400
401impl Catalog {
402    /// Returns the catalog's transient revision, which starts at 1 and is
403    /// incremented on every change. This is not persisted to disk, and will
404    /// restart on every load.
405    pub fn transient_revision(&self) -> u64 {
406        self.transient_revision
407    }
408
409    /// Creates a debug catalog from the current
410    /// `METADATA_BACKEND_URL` with parameters set appropriately for debug contexts,
411    /// like in tests.
412    ///
413    /// WARNING! This function can arbitrarily fail because it does not make any
414    /// effort to adjust the catalog's contents' structure or semantics to the
415    /// currently running version, i.e. it does not apply any migrations.
416    ///
417    /// This function must not be called in production contexts. Use
418    /// [`Catalog::open`] with appropriately set configuration parameters
419    /// instead.
420    pub async fn with_debug<F, Fut, T>(f: F) -> T
421    where
422        F: FnOnce(Catalog) -> Fut,
423        Fut: Future<Output = T>,
424    {
425        let persist_client = PersistClient::new_for_tests().await;
426        let organization_id = Uuid::new_v4();
427        let bootstrap_args = test_bootstrap_args();
428        let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
429            .await
430            .expect("can open debug catalog");
431        f(catalog).await
432    }
433
434    /// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
435    /// in progress.
436    pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
437    where
438        F: FnOnce(Catalog) -> Fut,
439        Fut: Future<Output = T>,
440    {
441        let persist_client = PersistClient::new_for_tests().await;
442        let organization_id = Uuid::new_v4();
443        let bootstrap_args = test_bootstrap_args();
444        let mut catalog =
445            Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
446                .await
447                .expect("can open debug catalog");
448
449        // Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
450        let now = SYSTEM_TIME.clone();
451        let openable_storage = TestCatalogStateBuilder::new(persist_client)
452            .with_organization_id(organization_id)
453            .with_default_deploy_generation()
454            .build()
455            .await
456            .expect("can create durable catalog");
457        let mut storage = openable_storage
458            .open(now().into(), &bootstrap_args)
459            .await
460            .expect("can open durable catalog")
461            .0;
462        // Drain updates.
463        let _ = storage
464            .sync_to_current_updates()
465            .await
466            .expect("can sync to current updates");
467        catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
468
469        f(catalog).await
470    }
471
472    /// Opens a debug catalog.
473    ///
474    /// See [`Catalog::with_debug`].
475    pub async fn open_debug_catalog(
476        persist_client: PersistClient,
477        organization_id: Uuid,
478        bootstrap_args: &BootstrapArgs,
479    ) -> Result<Catalog, anyhow::Error> {
480        let now = SYSTEM_TIME.clone();
481        let environment_id = None;
482        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
483            .with_organization_id(organization_id)
484            .with_default_deploy_generation()
485            .build()
486            .await?;
487        let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
488        let system_parameter_defaults = BTreeMap::default();
489        Self::open_debug_catalog_inner(
490            persist_client,
491            storage,
492            now,
493            environment_id,
494            &DUMMY_BUILD_INFO,
495            system_parameter_defaults,
496            bootstrap_args,
497            None,
498        )
499        .await
500    }
501
502    /// Opens a read only debug persist backed catalog defined by `persist_client` and
503    /// `organization_id`.
504    ///
505    /// See [`Catalog::with_debug`].
506    pub async fn open_debug_read_only_catalog(
507        persist_client: PersistClient,
508        organization_id: Uuid,
509        bootstrap_args: &BootstrapArgs,
510    ) -> Result<Catalog, anyhow::Error> {
511        let now = SYSTEM_TIME.clone();
512        let environment_id = None;
513        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
514            .with_organization_id(organization_id)
515            .build()
516            .await?;
517        let storage = openable_storage
518            .open_read_only(&test_bootstrap_args())
519            .await?;
520        let system_parameter_defaults = BTreeMap::default();
521        Self::open_debug_catalog_inner(
522            persist_client,
523            storage,
524            now,
525            environment_id,
526            &DUMMY_BUILD_INFO,
527            system_parameter_defaults,
528            bootstrap_args,
529            None,
530        )
531        .await
532    }
533
534    /// Opens a read only debug persist backed catalog defined by `persist_client` and
535    /// `organization_id`.
536    ///
537    /// See [`Catalog::with_debug`].
538    pub async fn open_debug_read_only_persist_catalog_config(
539        persist_client: PersistClient,
540        now: NowFn,
541        environment_id: EnvironmentId,
542        system_parameter_defaults: BTreeMap<String, String>,
543        build_info: &'static BuildInfo,
544        bootstrap_args: &BootstrapArgs,
545        enable_expression_cache_override: Option<bool>,
546    ) -> Result<Catalog, anyhow::Error> {
547        let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
548            .with_organization_id(environment_id.organization_id())
549            .with_version(
550                build_info
551                    .version
552                    .parse()
553                    .expect("build version is parseable"),
554            )
555            .build()
556            .await?;
557        let storage = openable_storage.open_read_only(bootstrap_args).await?;
558        Self::open_debug_catalog_inner(
559            persist_client,
560            storage,
561            now,
562            Some(environment_id),
563            build_info,
564            system_parameter_defaults,
565            bootstrap_args,
566            enable_expression_cache_override,
567        )
568        .await
569    }
570
571    async fn open_debug_catalog_inner(
572        persist_client: PersistClient,
573        storage: Box<dyn DurableCatalogState>,
574        now: NowFn,
575        environment_id: Option<EnvironmentId>,
576        build_info: &'static BuildInfo,
577        system_parameter_defaults: BTreeMap<String, String>,
578        bootstrap_args: &BootstrapArgs,
579        enable_expression_cache_override: Option<bool>,
580    ) -> Result<Catalog, anyhow::Error> {
581        let metrics_registry = &MetricsRegistry::new();
582        let secrets_reader = Arc::new(InMemorySecretsController::new());
583        // Used as a lower boundary of the boot_ts, but it's ok to use now() for
584        // debugging/testing.
585        let previous_ts = now().into();
586        let replica_size = &bootstrap_args.default_cluster_replica_size;
587        let read_only = false;
588
589        let OpenCatalogResult {
590            catalog,
591            migrated_storage_collections_0dt: _,
592            new_builtin_collections: _,
593            builtin_table_updates: _,
594            cached_global_exprs: _,
595            uncached_local_exprs: _,
596        } = Catalog::open(Config {
597            storage,
598            metrics_registry,
599            state: StateConfig {
600                unsafe_mode: true,
601                all_features: false,
602                build_info,
603                deploy_generation: 0,
604                environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
605                read_only,
606                now,
607                boot_ts: previous_ts,
608                skip_migrations: true,
609                cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
610                builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
611                    size: replica_size.clone(),
612                    replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
613                },
614                builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
615                    size: replica_size.clone(),
616                    replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
617                },
618                builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
619                    size: replica_size.clone(),
620                    replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
621                },
622                builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
623                    size: replica_size.clone(),
624                    replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
625                },
626                builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
627                    size: replica_size.clone(),
628                    replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
629                },
630                system_parameter_defaults,
631                remote_system_parameters: None,
632                availability_zones: vec![],
633                egress_addresses: vec![],
634                aws_principal_context: None,
635                aws_privatelink_availability_zones: None,
636                http_host_name: None,
637                connection_context: ConnectionContext::for_tests(secrets_reader),
638                builtin_item_migration_config: BuiltinItemMigrationConfig {
639                    persist_client: persist_client.clone(),
640                    read_only,
641                    force_migration: None,
642                },
643                persist_client,
644                enable_expression_cache_override,
645                helm_chart_version: None,
646                external_login_password_mz_system: None,
647                license_key: ValidatedLicenseKey::for_tests(),
648            },
649        })
650        .await?;
651        Ok(catalog)
652    }
653
654    pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
655        self.state.for_session(session)
656    }
657
658    pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
659        self.state.for_sessionless_user(role_id)
660    }
661
662    pub fn for_system_session(&self) -> ConnCatalog<'_> {
663        self.state.for_system_session()
664    }
665
666    async fn storage<'a>(
667        &'a self,
668    ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
669        self.storage.lock().await
670    }
671
672    pub async fn current_upper(&self) -> mz_repr::Timestamp {
673        self.storage().await.current_upper().await
674    }
675
676    pub async fn allocate_user_id(
677        &self,
678        commit_ts: mz_repr::Timestamp,
679    ) -> Result<(CatalogItemId, GlobalId), Error> {
680        self.storage()
681            .await
682            .allocate_user_id(commit_ts)
683            .await
684            .maybe_terminate("allocating user ids")
685            .err_into()
686    }
687
688    /// Allocate `amount` many user IDs. See [`DurableCatalogState::allocate_user_ids`].
689    pub async fn allocate_user_ids(
690        &self,
691        amount: u64,
692        commit_ts: mz_repr::Timestamp,
693    ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
694        self.storage()
695            .await
696            .allocate_user_ids(amount, commit_ts)
697            .await
698            .maybe_terminate("allocating user ids")
699            .err_into()
700    }
701
702    pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
703        let commit_ts = self.storage().await.current_upper().await;
704        self.allocate_user_id(commit_ts).await
705    }
706
707    /// Get the next user item ID without allocating it.
708    pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
709        self.storage()
710            .await
711            .get_next_user_item_id()
712            .await
713            .err_into()
714    }
715
716    #[cfg(test)]
717    pub async fn allocate_system_id(
718        &self,
719        commit_ts: mz_repr::Timestamp,
720    ) -> Result<(CatalogItemId, GlobalId), Error> {
721        use mz_ore::collections::CollectionExt;
722
723        let mut storage = self.storage().await;
724        let mut txn = storage.transaction().await?;
725        let id = txn
726            .allocate_system_item_ids(1)
727            .maybe_terminate("allocating system ids")?
728            .into_element();
729        // Drain transaction.
730        let _ = txn.get_and_commit_op_updates();
731        txn.commit(commit_ts).await?;
732        Ok(id)
733    }
734
735    /// Get the next system item ID without allocating it.
736    pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
737        self.storage()
738            .await
739            .get_next_system_item_id()
740            .await
741            .err_into()
742    }
743
744    pub async fn allocate_user_cluster_id(
745        &self,
746        commit_ts: mz_repr::Timestamp,
747    ) -> Result<ClusterId, Error> {
748        self.storage()
749            .await
750            .allocate_user_cluster_id(commit_ts)
751            .await
752            .maybe_terminate("allocating user cluster ids")
753            .err_into()
754    }
755
756    /// Get the next system replica id without allocating it.
757    pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
758        self.storage()
759            .await
760            .get_next_system_replica_id()
761            .await
762            .err_into()
763    }
764
765    /// Get the next user replica id without allocating it.
766    pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
767        self.storage()
768            .await
769            .get_next_user_replica_id()
770            .await
771            .err_into()
772    }
773
774    pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
775        self.state.resolve_database(database_name)
776    }
777
778    pub fn resolve_schema(
779        &self,
780        current_database: Option<&DatabaseId>,
781        database_name: Option<&str>,
782        schema_name: &str,
783        conn_id: &ConnectionId,
784    ) -> Result<&Schema, SqlCatalogError> {
785        self.state
786            .resolve_schema(current_database, database_name, schema_name, conn_id)
787    }
788
789    pub fn resolve_schema_in_database(
790        &self,
791        database_spec: &ResolvedDatabaseSpecifier,
792        schema_name: &str,
793        conn_id: &ConnectionId,
794    ) -> Result<&Schema, SqlCatalogError> {
795        self.state
796            .resolve_schema_in_database(database_spec, schema_name, conn_id)
797    }
798
799    pub fn resolve_replica_in_cluster(
800        &self,
801        cluster_id: &ClusterId,
802        replica_name: &str,
803    ) -> Result<&ClusterReplica, SqlCatalogError> {
804        self.state
805            .resolve_replica_in_cluster(cluster_id, replica_name)
806    }
807
808    pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
809        self.state.resolve_system_schema(name)
810    }
811
812    pub fn resolve_search_path(
813        &self,
814        session: &Session,
815    ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
816        self.state.resolve_search_path(session)
817    }
818
819    /// Resolves `name` to a non-function [`CatalogEntry`].
820    pub fn resolve_entry(
821        &self,
822        current_database: Option<&DatabaseId>,
823        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
824        name: &PartialItemName,
825        conn_id: &ConnectionId,
826    ) -> Result<&CatalogEntry, SqlCatalogError> {
827        self.state
828            .resolve_entry(current_database, search_path, name, conn_id)
829    }
830
831    /// Resolves a `BuiltinTable`.
832    pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
833        self.state.resolve_builtin_table(builtin)
834    }
835
836    /// Resolves a `BuiltinLog`.
837    pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
838        self.state.resolve_builtin_log(builtin).0
839    }
840
841    /// Resolves a `BuiltinSource`.
842    pub fn resolve_builtin_storage_collection(
843        &self,
844        builtin: &'static BuiltinSource,
845    ) -> CatalogItemId {
846        self.state.resolve_builtin_source(builtin)
847    }
848
849    /// Resolves `name` to a function [`CatalogEntry`].
850    pub fn resolve_function(
851        &self,
852        current_database: Option<&DatabaseId>,
853        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
854        name: &PartialItemName,
855        conn_id: &ConnectionId,
856    ) -> Result<&CatalogEntry, SqlCatalogError> {
857        self.state
858            .resolve_function(current_database, search_path, name, conn_id)
859    }
860
861    /// Resolves `name` to a type [`CatalogEntry`].
862    pub fn resolve_type(
863        &self,
864        current_database: Option<&DatabaseId>,
865        search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
866        name: &PartialItemName,
867        conn_id: &ConnectionId,
868    ) -> Result<&CatalogEntry, SqlCatalogError> {
869        self.state
870            .resolve_type(current_database, search_path, name, conn_id)
871    }
872
873    pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
874        self.state.resolve_cluster(name)
875    }
876
877    /// Resolves a [`Cluster`] for a [`BuiltinCluster`].
878    ///
879    /// # Panics
880    /// * If the [`BuiltinCluster`] doesn't exist.
881    ///
882    pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
883        self.state.resolve_builtin_cluster(cluster)
884    }
885
886    pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
887        &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
888    }
889
890    /// Resolves a [`Cluster`] for a TargetCluster.
891    pub fn resolve_target_cluster(
892        &self,
893        target_cluster: TargetCluster,
894        session: &Session,
895    ) -> Result<&Cluster, AdapterError> {
896        match target_cluster {
897            TargetCluster::CatalogServer => {
898                Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
899            }
900            TargetCluster::Active => self.active_cluster(session),
901            TargetCluster::Transaction(cluster_id) => self
902                .try_get_cluster(cluster_id)
903                .ok_or(AdapterError::ConcurrentClusterDrop),
904        }
905    }
906
907    pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
908        // TODO(benesch): this check here is not sufficiently protective. It'd
909        // be very easy for a code path to accidentally avoid this check by
910        // calling `resolve_cluster(session.vars().cluster())`.
911        if session.user().name != SYSTEM_USER.name
912            && session.user().name != SUPPORT_USER.name
913            && session.vars().cluster() == SYSTEM_USER.name
914        {
915            coord_bail!(
916                "system cluster '{}' cannot execute user queries",
917                SYSTEM_USER.name
918            );
919        }
920        let cluster = self.resolve_cluster(session.vars().cluster())?;
921        Ok(cluster)
922    }
923
924    pub fn state(&self) -> &CatalogState {
925        &self.state
926    }
927
928    pub fn resolve_full_name(
929        &self,
930        name: &QualifiedItemName,
931        conn_id: Option<&ConnectionId>,
932    ) -> FullItemName {
933        self.state.resolve_full_name(name, conn_id)
934    }
935
936    pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
937        self.state.try_get_entry(id)
938    }
939
940    pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
941        self.state.try_get_entry_by_global_id(id)
942    }
943
944    pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
945        self.state.get_entry(id)
946    }
947
948    pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
949        self.state.get_entry_by_global_id(id)
950    }
951
952    pub fn get_global_ids<'a>(
953        &'a self,
954        id: &CatalogItemId,
955    ) -> impl Iterator<Item = GlobalId> + use<'a> {
956        self.get_entry(id).global_ids()
957    }
958
959    pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
960        self.get_entry_by_global_id(id).id()
961    }
962
963    pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
964        let item = self.try_get_entry_by_global_id(id)?;
965        Some(item.id())
966    }
967
968    pub fn get_schema(
969        &self,
970        database_spec: &ResolvedDatabaseSpecifier,
971        schema_spec: &SchemaSpecifier,
972        conn_id: &ConnectionId,
973    ) -> &Schema {
974        self.state.get_schema(database_spec, schema_spec, conn_id)
975    }
976
977    pub fn try_get_schema(
978        &self,
979        database_spec: &ResolvedDatabaseSpecifier,
980        schema_spec: &SchemaSpecifier,
981        conn_id: &ConnectionId,
982    ) -> Option<&Schema> {
983        self.state
984            .try_get_schema(database_spec, schema_spec, conn_id)
985    }
986
987    pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
988        self.state.get_mz_catalog_schema_id()
989    }
990
991    pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
992        self.state.get_pg_catalog_schema_id()
993    }
994
995    pub fn get_information_schema_id(&self) -> SchemaId {
996        self.state.get_information_schema_id()
997    }
998
999    pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1000        self.state.get_mz_internal_schema_id()
1001    }
1002
1003    pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1004        self.state.get_mz_introspection_schema_id()
1005    }
1006
1007    pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1008        self.state.get_mz_unsafe_schema_id()
1009    }
1010
1011    pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1012        self.state.system_schema_ids()
1013    }
1014
1015    pub fn get_database(&self, id: &DatabaseId) -> &Database {
1016        self.state.get_database(id)
1017    }
1018
1019    pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1020        self.state.try_get_role(id)
1021    }
1022
1023    pub fn get_role(&self, id: &RoleId) -> &Role {
1024        self.state.get_role(id)
1025    }
1026
1027    pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1028        self.state.try_get_role_by_name(role_name)
1029    }
1030
1031    pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1032        self.state.try_get_role_auth_by_id(id)
1033    }
1034
1035    /// Creates a new schema in the `Catalog` for temporary items
1036    /// indicated by the TEMPORARY or TEMP keywords.
1037    pub fn create_temporary_schema(
1038        &mut self,
1039        conn_id: &ConnectionId,
1040        owner_id: RoleId,
1041    ) -> Result<(), Error> {
1042        self.state.create_temporary_schema(conn_id, owner_id)
1043    }
1044
1045    fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1046        // Temporary schemas are created lazily, so it's valid for one to not exist yet.
1047        self.state
1048            .temporary_schemas
1049            .get(conn_id)
1050            .map(|schema| schema.items.contains_key(item_name))
1051            .unwrap_or(false)
1052    }
1053
1054    /// Drops schema for connection if it exists. Returns an error if it exists and has items.
1055    /// Returns Ok if conn_id's temp schema does not exist.
1056    pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1057        let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1058            return Ok(());
1059        };
1060        if !schema.items.is_empty() {
1061            return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1062        }
1063        Ok(())
1064    }
1065
1066    pub(crate) fn object_dependents(
1067        &self,
1068        object_ids: &Vec<ObjectId>,
1069        conn_id: &ConnectionId,
1070    ) -> Vec<ObjectId> {
1071        let mut seen = BTreeSet::new();
1072        self.state.object_dependents(object_ids, conn_id, &mut seen)
1073    }
1074
1075    fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1076        FullNameV1 {
1077            database: name.database.to_string(),
1078            schema: name.schema.clone(),
1079            item: name.item.clone(),
1080        }
1081    }
1082
1083    pub fn find_available_cluster_name(&self, name: &str) -> String {
1084        let mut i = 0;
1085        let mut candidate = name.to_string();
1086        while self.state.clusters_by_name.contains_key(&candidate) {
1087            i += 1;
1088            candidate = format!("{}{}", name, i);
1089        }
1090        candidate
1091    }
1092
1093    pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1094        if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1095            self.cluster_replica_sizes()
1096                .enabled_allocations()
1097                .map(|a| a.0.to_owned())
1098                .collect::<Vec<_>>()
1099        } else {
1100            self.system_config().allowed_cluster_replica_sizes()
1101        }
1102    }
1103
1104    pub fn concretize_replica_location(
1105        &self,
1106        location: mz_catalog::durable::ReplicaLocation,
1107        allowed_sizes: &Vec<String>,
1108        allowed_availability_zones: Option<&[String]>,
1109    ) -> Result<ReplicaLocation, Error> {
1110        self.state
1111            .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1112    }
1113
1114    pub(crate) fn ensure_valid_replica_size(
1115        &self,
1116        allowed_sizes: &[String],
1117        size: &String,
1118    ) -> Result<(), Error> {
1119        self.state.ensure_valid_replica_size(allowed_sizes, size)
1120    }
1121
1122    pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1123        &self.state.cluster_replica_sizes
1124    }
1125
1126    /// Returns the privileges of an object by its ID.
1127    pub fn get_privileges(
1128        &self,
1129        id: &SystemObjectId,
1130        conn_id: &ConnectionId,
1131    ) -> Option<&PrivilegeMap> {
1132        match id {
1133            SystemObjectId::Object(id) => match id {
1134                ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1135                ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1136                ObjectId::Schema((database_spec, schema_spec)) => Some(
1137                    self.get_schema(database_spec, schema_spec, conn_id)
1138                        .privileges(),
1139                ),
1140                ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1141                ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1142                ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1143            },
1144            SystemObjectId::System => Some(&self.state.system_privileges),
1145        }
1146    }
1147
1148    #[mz_ore::instrument(level = "debug")]
1149    pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1150        Ok(self.storage().await.confirm_leadership().await?)
1151    }
1152
1153    /// Return the ids of all log sources the given object depends on.
1154    pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1155        self.state.introspection_dependencies(id)
1156    }
1157
1158    /// Serializes the catalog's in-memory state.
1159    ///
1160    /// There are no guarantees about the format of the serialized state, except
1161    /// that the serialized state for two identical catalogs will compare
1162    /// identically.
1163    pub fn dump(&self) -> Result<CatalogDump, Error> {
1164        Ok(CatalogDump::new(self.state.dump(None)?))
1165    }
1166
1167    /// Checks the [`Catalog`]s internal consistency.
1168    ///
1169    /// Returns a JSON object describing the inconsistencies, if there are any.
1170    pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1171        self.state.check_consistency().map_err(|inconsistencies| {
1172            serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1173                serde_json::Value::String("failed to serialize inconsistencies".to_string())
1174            })
1175        })
1176    }
1177
1178    pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1179        self.state.config()
1180    }
1181
1182    pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1183        self.state.entry_by_id.values()
1184    }
1185
1186    pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1187        self.entries()
1188            .filter(|entry| entry.is_connection() && entry.id().is_user())
1189    }
1190
1191    pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1192        self.entries()
1193            .filter(|entry| entry.is_table() && entry.id().is_user())
1194    }
1195
1196    pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1197        self.entries()
1198            .filter(|entry| entry.is_source() && entry.id().is_user())
1199    }
1200
1201    pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1202        self.entries()
1203            .filter(|entry| entry.is_sink() && entry.id().is_user())
1204    }
1205
1206    pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1207        self.entries()
1208            .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1209    }
1210
1211    pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1212        self.entries()
1213            .filter(|entry| entry.is_secret() && entry.id().is_user())
1214    }
1215
1216    pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1217        self.state.get_network_policy(&network_policy_id)
1218    }
1219
1220    pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1221        self.state.try_get_network_policy_by_name(name)
1222    }
1223
1224    pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1225        self.state.clusters_by_id.values()
1226    }
1227
1228    pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1229        self.state.get_cluster(cluster_id)
1230    }
1231
1232    pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1233        self.state.try_get_cluster(cluster_id)
1234    }
1235
1236    pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1237        self.clusters().filter(|cluster| cluster.id.is_user())
1238    }
1239
1240    pub fn get_cluster_replica(
1241        &self,
1242        cluster_id: ClusterId,
1243        replica_id: ReplicaId,
1244    ) -> &ClusterReplica {
1245        self.state.get_cluster_replica(cluster_id, replica_id)
1246    }
1247
1248    pub fn try_get_cluster_replica(
1249        &self,
1250        cluster_id: ClusterId,
1251        replica_id: ReplicaId,
1252    ) -> Option<&ClusterReplica> {
1253        self.state.try_get_cluster_replica(cluster_id, replica_id)
1254    }
1255
1256    pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1257        self.user_clusters()
1258            .flat_map(|cluster| cluster.user_replicas())
1259    }
1260
1261    pub fn databases(&self) -> impl Iterator<Item = &Database> {
1262        self.state.database_by_id.values()
1263    }
1264
1265    pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1266        self.state
1267            .roles_by_id
1268            .values()
1269            .filter(|role| role.is_user())
1270    }
1271
1272    pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1273        self.entries()
1274            .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1275    }
1276
1277    pub fn system_privileges(&self) -> &PrivilegeMap {
1278        &self.state.system_privileges
1279    }
1280
1281    pub fn default_privileges(
1282        &self,
1283    ) -> impl Iterator<
1284        Item = (
1285            &DefaultPrivilegeObject,
1286            impl Iterator<Item = &DefaultPrivilegeAclItem>,
1287        ),
1288    > {
1289        self.state.default_privileges.iter()
1290    }
1291
1292    pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1293        self.state
1294            .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1295    }
1296
1297    pub fn pack_storage_usage_update(
1298        &self,
1299        event: VersionedStorageUsage,
1300        diff: Diff,
1301    ) -> BuiltinTableUpdate {
1302        self.state
1303            .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1304    }
1305
1306    pub fn system_config(&self) -> &SystemVars {
1307        self.state.system_config()
1308    }
1309
1310    pub fn system_config_mut(&mut self) -> &mut SystemVars {
1311        self.state.system_config_mut()
1312    }
1313
1314    pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1315        self.state.ensure_not_reserved_role(role_id)
1316    }
1317
1318    pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1319        self.state.ensure_grantable_role(role_id)
1320    }
1321
1322    pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1323        self.state.ensure_not_system_role(role_id)
1324    }
1325
1326    pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1327        self.state.ensure_not_predefined_role(role_id)
1328    }
1329
1330    pub fn ensure_not_reserved_network_policy(
1331        &self,
1332        network_policy_id: &NetworkPolicyId,
1333    ) -> Result<(), Error> {
1334        self.state
1335            .ensure_not_reserved_network_policy(network_policy_id)
1336    }
1337
1338    pub fn ensure_not_reserved_object(
1339        &self,
1340        object_id: &ObjectId,
1341        conn_id: &ConnectionId,
1342    ) -> Result<(), Error> {
1343        match object_id {
1344            ObjectId::Cluster(cluster_id) => {
1345                if cluster_id.is_system() {
1346                    let cluster = self.get_cluster(*cluster_id);
1347                    Err(Error::new(ErrorKind::ReadOnlyCluster(
1348                        cluster.name().to_string(),
1349                    )))
1350                } else {
1351                    Ok(())
1352                }
1353            }
1354            ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1355                if replica_id.is_system() {
1356                    let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1357                    Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1358                        replica.name().to_string(),
1359                    )))
1360                } else {
1361                    Ok(())
1362                }
1363            }
1364            ObjectId::Database(database_id) => {
1365                if database_id.is_system() {
1366                    let database = self.get_database(database_id);
1367                    Err(Error::new(ErrorKind::ReadOnlyDatabase(
1368                        database.name().to_string(),
1369                    )))
1370                } else {
1371                    Ok(())
1372                }
1373            }
1374            ObjectId::Schema((database_spec, schema_spec)) => {
1375                if schema_spec.is_system() {
1376                    let schema = self.get_schema(database_spec, schema_spec, conn_id);
1377                    Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1378                        schema.name().schema.clone(),
1379                    )))
1380                } else {
1381                    Ok(())
1382                }
1383            }
1384            ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1385            ObjectId::Item(item_id) => {
1386                if item_id.is_system() {
1387                    let item = self.get_entry(item_id);
1388                    let name = self.resolve_full_name(item.name(), Some(conn_id));
1389                    Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1390                } else {
1391                    Ok(())
1392                }
1393            }
1394            ObjectId::NetworkPolicy(network_policy_id) => {
1395                self.ensure_not_reserved_network_policy(network_policy_id)
1396            }
1397        }
1398    }
1399
1400    /// See [`CatalogState::deserialize_plan_with_enable_for_item_parsing`].
1401    pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1402        &mut self,
1403        create_sql: &str,
1404        force_if_exists_skip: bool,
1405    ) -> Result<(Plan, ResolvedIds), AdapterError> {
1406        self.state
1407            .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1408    }
1409
1410    pub(crate) fn update_expression_cache<'a, 'b>(
1411        &'a self,
1412        new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1413        new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1414    ) -> BoxFuture<'b, ()> {
1415        if let Some(expr_cache) = &self.expr_cache_handle {
1416            expr_cache
1417                .update(
1418                    new_local_expressions,
1419                    new_global_expressions,
1420                    Default::default(),
1421                )
1422                .boxed()
1423        } else {
1424            async {}.boxed()
1425        }
1426    }
1427
1428    /// Listen for and apply all unconsumed updates to the durable catalog state.
1429    // TODO(jkosh44) When this method is actually used outside of a test we can remove the
1430    // `#[cfg(test)]` annotation.
1431    #[cfg(test)]
1432    async fn sync_to_current_updates(
1433        &mut self,
1434    ) -> Result<
1435        (
1436            Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1437            Vec<ParsedStateUpdate>,
1438        ),
1439        CatalogError,
1440    > {
1441        let updates = self.storage().await.sync_to_current_updates().await?;
1442        let (builtin_table_updates, catalog_updates) = self
1443            .state
1444            .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1445            .await;
1446        Ok((builtin_table_updates, catalog_updates))
1447    }
1448}
1449
1450pub fn is_reserved_name(name: &str) -> bool {
1451    BUILTIN_PREFIXES
1452        .iter()
1453        .any(|prefix| name.starts_with(prefix))
1454}
1455
1456pub fn is_reserved_role_name(name: &str) -> bool {
1457    is_reserved_name(name) || is_public_role(name)
1458}
1459
1460pub fn is_public_role(name: &str) -> bool {
1461    name == &*PUBLIC_ROLE_NAME
1462}
1463
1464pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1465    object_type_to_audit_object_type(sql_type.into())
1466}
1467
1468pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1469    match id {
1470        CommentObjectId::Table(_) => ObjectType::Table,
1471        CommentObjectId::View(_) => ObjectType::View,
1472        CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1473        CommentObjectId::Source(_) => ObjectType::Source,
1474        CommentObjectId::Sink(_) => ObjectType::Sink,
1475        CommentObjectId::Index(_) => ObjectType::Index,
1476        CommentObjectId::Func(_) => ObjectType::Func,
1477        CommentObjectId::Connection(_) => ObjectType::Connection,
1478        CommentObjectId::Type(_) => ObjectType::Type,
1479        CommentObjectId::Secret(_) => ObjectType::Secret,
1480        CommentObjectId::Role(_) => ObjectType::Role,
1481        CommentObjectId::Database(_) => ObjectType::Database,
1482        CommentObjectId::Schema(_) => ObjectType::Schema,
1483        CommentObjectId::Cluster(_) => ObjectType::Cluster,
1484        CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1485        CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1486        CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1487    }
1488}
1489
1490pub(crate) fn object_type_to_audit_object_type(
1491    object_type: mz_sql::catalog::ObjectType,
1492) -> ObjectType {
1493    system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1494}
1495
1496pub(crate) fn system_object_type_to_audit_object_type(
1497    system_type: &SystemObjectType,
1498) -> ObjectType {
1499    match system_type {
1500        SystemObjectType::Object(object_type) => match object_type {
1501            mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1502            mz_sql::catalog::ObjectType::View => ObjectType::View,
1503            mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1504            mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1505            mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1506            mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1507            mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1508            mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1509            mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1510            mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1511            mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1512            mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1513            mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1514            mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1515            mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1516            mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1517            mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1518        },
1519        SystemObjectType::System => ObjectType::System,
1520    }
1521}
1522
1523#[derive(Debug, Copy, Clone)]
1524pub enum UpdatePrivilegeVariant {
1525    Grant,
1526    Revoke,
1527}
1528
1529impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1530    fn from(variant: UpdatePrivilegeVariant) -> Self {
1531        match variant {
1532            UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1533            UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1534        }
1535    }
1536}
1537
1538impl From<UpdatePrivilegeVariant> for EventType {
1539    fn from(variant: UpdatePrivilegeVariant) -> Self {
1540        match variant {
1541            UpdatePrivilegeVariant::Grant => EventType::Grant,
1542            UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1543        }
1544    }
1545}
1546
1547impl ConnCatalog<'_> {
1548    fn resolve_item_name(
1549        &self,
1550        name: &PartialItemName,
1551    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1552        self.resolve_item(name).map(|entry| entry.name())
1553    }
1554
1555    fn resolve_function_name(
1556        &self,
1557        name: &PartialItemName,
1558    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1559        self.resolve_function(name).map(|entry| entry.name())
1560    }
1561
1562    fn resolve_type_name(
1563        &self,
1564        name: &PartialItemName,
1565    ) -> Result<&QualifiedItemName, SqlCatalogError> {
1566        self.resolve_type(name).map(|entry| entry.name())
1567    }
1568}
1569
1570impl ExprHumanizer for ConnCatalog<'_> {
1571    fn humanize_id(&self, id: GlobalId) -> Option<String> {
1572        let entry = self.state.try_get_entry_by_global_id(&id)?;
1573        Some(self.resolve_full_name(entry.name()).to_string())
1574    }
1575
1576    fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1577        let entry = self.state.try_get_entry_by_global_id(&id)?;
1578        Some(entry.name().item.clone())
1579    }
1580
1581    fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1582        let entry = self.state.try_get_entry_by_global_id(&id)?;
1583        Some(self.resolve_full_name(entry.name()).into_parts())
1584    }
1585
1586    fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1587        use SqlScalarType::*;
1588
1589        match typ {
1590            Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1591            List {
1592                custom_id: Some(item_id),
1593                ..
1594            }
1595            | Map {
1596                custom_id: Some(item_id),
1597                ..
1598            } => {
1599                let item = self.get_item(item_id);
1600                self.minimal_qualification(item.name()).to_string()
1601            }
1602            List { element_type, .. } => {
1603                format!(
1604                    "{} list",
1605                    self.humanize_scalar_type(element_type, postgres_compat)
1606                )
1607            }
1608            Map { value_type, .. } => format!(
1609                "map[{}=>{}]",
1610                self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1611                self.humanize_scalar_type(value_type, postgres_compat)
1612            ),
1613            Record {
1614                custom_id: Some(item_id),
1615                ..
1616            } => {
1617                let item = self.get_item(item_id);
1618                self.minimal_qualification(item.name()).to_string()
1619            }
1620            Record { fields, .. } => format!(
1621                "record({})",
1622                fields
1623                    .iter()
1624                    .map(|f| format!(
1625                        "{}: {}",
1626                        f.0,
1627                        self.humanize_column_type(&f.1, postgres_compat)
1628                    ))
1629                    .join(",")
1630            ),
1631            PgLegacyChar => "\"char\"".into(),
1632            Char { length } if !postgres_compat => match length {
1633                None => "char".into(),
1634                Some(length) => format!("char({})", length.into_u32()),
1635            },
1636            VarChar { max_length } if !postgres_compat => match max_length {
1637                None => "varchar".into(),
1638                Some(length) => format!("varchar({})", length.into_u32()),
1639            },
1640            UInt16 => "uint2".into(),
1641            UInt32 => "uint4".into(),
1642            UInt64 => "uint8".into(),
1643            ty => {
1644                let pgrepr_type = mz_pgrepr::Type::from(ty);
1645                let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1646
1647                let res = if self
1648                    .effective_search_path(true)
1649                    .iter()
1650                    .any(|(_, schema)| schema == &pg_catalog_schema)
1651                {
1652                    pgrepr_type.name().to_string()
1653                } else {
1654                    // If PG_CATALOG_SCHEMA is not in search path, you need
1655                    // qualified object name to refer to type.
1656                    let name = QualifiedItemName {
1657                        qualifiers: ItemQualifiers {
1658                            database_spec: ResolvedDatabaseSpecifier::Ambient,
1659                            schema_spec: pg_catalog_schema,
1660                        },
1661                        item: pgrepr_type.name().to_string(),
1662                    };
1663                    self.resolve_full_name(&name).to_string()
1664                };
1665                res
1666            }
1667        }
1668    }
1669
1670    fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1671        let entry = self.state.try_get_entry_by_global_id(&id)?;
1672
1673        match entry.index() {
1674            Some(index) => {
1675                let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1676                let mut on_names = on_desc
1677                    .iter_names()
1678                    .map(|col_name| col_name.to_string())
1679                    .collect::<Vec<_>>();
1680
1681                let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1682
1683                // Init ix_names with unknown column names. Unknown columns are
1684                // represented as an empty String and rendered as `#c` by the
1685                // Display::fmt implementation for HumanizedExpr<'a, usize, M>.
1686                let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1687                let mut ix_names = vec![String::new(); ix_arity];
1688
1689                // Apply the permutation by swapping on_names with ix_names.
1690                for (on_pos, ix_pos) in p.into_iter().enumerate() {
1691                    let on_name = on_names.get_mut(on_pos).expect("on_name");
1692                    let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1693                    std::mem::swap(on_name, ix_name);
1694                }
1695
1696                Some(ix_names) // Return the updated ix_names vector.
1697            }
1698            None => {
1699                let desc = self.state.try_get_desc_by_global_id(&id)?;
1700                let column_names = desc
1701                    .iter_names()
1702                    .map(|col_name| col_name.to_string())
1703                    .collect();
1704
1705                Some(column_names)
1706            }
1707        }
1708    }
1709
1710    fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1711        let desc = self.state.try_get_desc_by_global_id(&id)?;
1712        Some(desc.get_name(column).to_string())
1713    }
1714
1715    fn id_exists(&self, id: GlobalId) -> bool {
1716        self.state.entry_by_global_id.contains_key(&id)
1717    }
1718}
1719
1720impl SessionCatalog for ConnCatalog<'_> {
1721    fn active_role_id(&self) -> &RoleId {
1722        &self.role_id
1723    }
1724
1725    fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1726        self.prepared_statements
1727            .as_ref()
1728            .map(|ps| ps.get(name).map(|ps| ps.desc()))
1729            .flatten()
1730    }
1731
1732    fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1733        self.portals
1734            .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1735    }
1736
1737    fn active_database(&self) -> Option<&DatabaseId> {
1738        self.database.as_ref()
1739    }
1740
1741    fn active_cluster(&self) -> &str {
1742        &self.cluster
1743    }
1744
1745    fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1746        &self.search_path
1747    }
1748
1749    fn resolve_database(
1750        &self,
1751        database_name: &str,
1752    ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1753        Ok(self.state.resolve_database(database_name)?)
1754    }
1755
1756    fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1757        self.state
1758            .database_by_id
1759            .get(id)
1760            .expect("database doesn't exist")
1761    }
1762
1763    // `as` is ok to use to cast to a trait object.
1764    #[allow(clippy::as_conversions)]
1765    fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1766        self.state
1767            .database_by_id
1768            .values()
1769            .map(|database| database as &dyn CatalogDatabase)
1770            .collect()
1771    }
1772
1773    fn resolve_schema(
1774        &self,
1775        database_name: Option<&str>,
1776        schema_name: &str,
1777    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1778        Ok(self.state.resolve_schema(
1779            self.database.as_ref(),
1780            database_name,
1781            schema_name,
1782            &self.conn_id,
1783        )?)
1784    }
1785
1786    fn resolve_schema_in_database(
1787        &self,
1788        database_spec: &ResolvedDatabaseSpecifier,
1789        schema_name: &str,
1790    ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1791        Ok(self
1792            .state
1793            .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1794    }
1795
1796    fn get_schema(
1797        &self,
1798        database_spec: &ResolvedDatabaseSpecifier,
1799        schema_spec: &SchemaSpecifier,
1800    ) -> &dyn CatalogSchema {
1801        self.state
1802            .get_schema(database_spec, schema_spec, &self.conn_id)
1803    }
1804
1805    // `as` is ok to use to cast to a trait object.
1806    #[allow(clippy::as_conversions)]
1807    fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1808        self.get_databases()
1809            .into_iter()
1810            .flat_map(|database| database.schemas().into_iter())
1811            .chain(
1812                self.state
1813                    .ambient_schemas_by_id
1814                    .values()
1815                    .chain(self.state.temporary_schemas.values())
1816                    .map(|schema| schema as &dyn CatalogSchema),
1817            )
1818            .collect()
1819    }
1820
1821    fn get_mz_internal_schema_id(&self) -> SchemaId {
1822        self.state().get_mz_internal_schema_id()
1823    }
1824
1825    fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1826        self.state().get_mz_unsafe_schema_id()
1827    }
1828
1829    fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1830        self.state.is_system_schema_specifier(schema)
1831    }
1832
1833    fn resolve_role(
1834        &self,
1835        role_name: &str,
1836    ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1837        match self.state.try_get_role_by_name(role_name) {
1838            Some(role) => Ok(role),
1839            None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1840        }
1841    }
1842
1843    fn resolve_network_policy(
1844        &self,
1845        policy_name: &str,
1846    ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1847        match self.state.try_get_network_policy_by_name(policy_name) {
1848            Some(policy) => Ok(policy),
1849            None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1850        }
1851    }
1852
1853    fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1854        Some(self.state.roles_by_id.get(id)?)
1855    }
1856
1857    fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1858        self.state.get_role(id)
1859    }
1860
1861    fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1862        // `as` is ok to use to cast to a trait object.
1863        #[allow(clippy::as_conversions)]
1864        self.state
1865            .roles_by_id
1866            .values()
1867            .map(|role| role as &dyn CatalogRole)
1868            .collect()
1869    }
1870
1871    fn mz_system_role_id(&self) -> RoleId {
1872        MZ_SYSTEM_ROLE_ID
1873    }
1874
1875    fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1876        self.state.collect_role_membership(id)
1877    }
1878
1879    fn get_network_policy(
1880        &self,
1881        id: &NetworkPolicyId,
1882    ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1883        self.state.get_network_policy(id)
1884    }
1885
1886    fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1887        // `as` is ok to use to cast to a trait object.
1888        #[allow(clippy::as_conversions)]
1889        self.state
1890            .network_policies_by_id
1891            .values()
1892            .map(|policy| policy as &dyn CatalogNetworkPolicy)
1893            .collect()
1894    }
1895
1896    fn resolve_cluster(
1897        &self,
1898        cluster_name: Option<&str>,
1899    ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1900        Ok(self
1901            .state
1902            .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1903    }
1904
1905    fn resolve_cluster_replica(
1906        &self,
1907        cluster_replica_name: &QualifiedReplica,
1908    ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1909        Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1910    }
1911
1912    fn resolve_item(
1913        &self,
1914        name: &PartialItemName,
1915    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1916        let r = self.state.resolve_entry(
1917            self.database.as_ref(),
1918            &self.effective_search_path(true),
1919            name,
1920            &self.conn_id,
1921        )?;
1922        if self.unresolvable_ids.contains(&r.id()) {
1923            Err(SqlCatalogError::UnknownItem(name.to_string()))
1924        } else {
1925            Ok(r)
1926        }
1927    }
1928
1929    fn resolve_function(
1930        &self,
1931        name: &PartialItemName,
1932    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1933        let r = self.state.resolve_function(
1934            self.database.as_ref(),
1935            &self.effective_search_path(false),
1936            name,
1937            &self.conn_id,
1938        )?;
1939
1940        if self.unresolvable_ids.contains(&r.id()) {
1941            Err(SqlCatalogError::UnknownFunction {
1942                name: name.to_string(),
1943                alternative: None,
1944            })
1945        } else {
1946            Ok(r)
1947        }
1948    }
1949
1950    fn resolve_type(
1951        &self,
1952        name: &PartialItemName,
1953    ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1954        let r = self.state.resolve_type(
1955            self.database.as_ref(),
1956            &self.effective_search_path(false),
1957            name,
1958            &self.conn_id,
1959        )?;
1960
1961        if self.unresolvable_ids.contains(&r.id()) {
1962            Err(SqlCatalogError::UnknownType {
1963                name: name.to_string(),
1964            })
1965        } else {
1966            Ok(r)
1967        }
1968    }
1969
1970    fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1971        self.state.get_system_type(name)
1972    }
1973
1974    fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1975        Some(self.state.try_get_entry(id)?)
1976    }
1977
1978    fn try_get_item_by_global_id(
1979        &self,
1980        id: &GlobalId,
1981    ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1982        let entry = self.state.try_get_entry_by_global_id(id)?;
1983        let entry = match &entry.item {
1984            CatalogItem::Table(table) => {
1985                let (version, _gid) = table
1986                    .collections
1987                    .iter()
1988                    .find(|(_version, gid)| *gid == id)
1989                    .expect("catalog out of sync, mismatched GlobalId");
1990                entry.at_version(RelationVersionSelector::Specific(*version))
1991            }
1992            _ => entry.at_version(RelationVersionSelector::Latest),
1993        };
1994        Some(entry)
1995    }
1996
1997    fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1998        self.state.get_entry(id)
1999    }
2000
2001    fn get_item_by_global_id(
2002        &self,
2003        id: &GlobalId,
2004    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2005        let entry = self.state.get_entry_by_global_id(id);
2006        let entry = match &entry.item {
2007            CatalogItem::Table(table) => {
2008                let (version, _gid) = table
2009                    .collections
2010                    .iter()
2011                    .find(|(_version, gid)| *gid == id)
2012                    .expect("catalog out of sync, mismatched GlobalId");
2013                entry.at_version(RelationVersionSelector::Specific(*version))
2014            }
2015            _ => entry.at_version(RelationVersionSelector::Latest),
2016        };
2017        entry
2018    }
2019
2020    fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2021        self.get_schemas()
2022            .into_iter()
2023            .flat_map(|schema| schema.item_ids())
2024            .map(|id| self.get_item(&id))
2025            .collect()
2026    }
2027
2028    fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2029        self.state
2030            .get_item_by_name(name, &self.conn_id)
2031            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2032    }
2033
2034    fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2035        self.state
2036            .get_type_by_name(name, &self.conn_id)
2037            .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2038    }
2039
2040    fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2041        &self.state.clusters_by_id[&id]
2042    }
2043
2044    fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2045        self.state
2046            .clusters_by_id
2047            .values()
2048            .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2049            .collect()
2050    }
2051
2052    fn get_cluster_replica(
2053        &self,
2054        cluster_id: ClusterId,
2055        replica_id: ReplicaId,
2056    ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2057        let cluster = self.get_cluster(cluster_id);
2058        cluster.replica(replica_id)
2059    }
2060
2061    fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2062        self.get_clusters()
2063            .into_iter()
2064            .flat_map(|cluster| cluster.replicas().into_iter())
2065            .collect()
2066    }
2067
2068    fn get_system_privileges(&self) -> &PrivilegeMap {
2069        &self.state.system_privileges
2070    }
2071
2072    fn get_default_privileges(
2073        &self,
2074    ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2075        self.state
2076            .default_privileges
2077            .iter()
2078            .map(|(object, acl_items)| (object, acl_items.collect()))
2079            .collect()
2080    }
2081
2082    fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2083        self.state.find_available_name(name, &self.conn_id)
2084    }
2085
2086    fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2087        self.state.resolve_full_name(name, Some(&self.conn_id))
2088    }
2089
2090    fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2091        self.state.resolve_full_schema_name(name)
2092    }
2093
2094    fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2095        self.state.get_entry_by_global_id(global_id).id()
2096    }
2097
2098    fn resolve_global_id(
2099        &self,
2100        item_id: &CatalogItemId,
2101        version: RelationVersionSelector,
2102    ) -> GlobalId {
2103        self.state
2104            .get_entry(item_id)
2105            .at_version(version)
2106            .global_id()
2107    }
2108
2109    fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2110        self.state.config()
2111    }
2112
2113    fn now(&self) -> EpochMillis {
2114        (self.state.config().now)()
2115    }
2116
2117    fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2118        self.state.aws_privatelink_availability_zones.clone()
2119    }
2120
2121    fn system_vars(&self) -> &SystemVars {
2122        &self.state.system_configuration
2123    }
2124
2125    fn system_vars_mut(&mut self) -> &mut SystemVars {
2126        &mut self.state.to_mut().system_configuration
2127    }
2128
2129    fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2130        self.state().get_owner_id(id, self.conn_id())
2131    }
2132
2133    fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2134        match id {
2135            SystemObjectId::System => Some(&self.state.system_privileges),
2136            SystemObjectId::Object(ObjectId::Cluster(id)) => {
2137                Some(self.get_cluster(*id).privileges())
2138            }
2139            SystemObjectId::Object(ObjectId::Database(id)) => {
2140                Some(self.get_database(id).privileges())
2141            }
2142            SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2143                // For temporary schemas that haven't been created yet (lazy creation),
2144                // we return None - the RBAC check will need to handle this case.
2145                self.state
2146                    .try_get_schema(database_spec, schema_spec, &self.conn_id)
2147                    .map(|schema| schema.privileges())
2148            }
2149            SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2150            SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2151                Some(self.get_network_policy(id).privileges())
2152            }
2153            SystemObjectId::Object(ObjectId::ClusterReplica(_))
2154            | SystemObjectId::Object(ObjectId::Role(_)) => None,
2155        }
2156    }
2157
2158    fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2159        let mut seen = BTreeSet::new();
2160        self.state.object_dependents(ids, &self.conn_id, &mut seen)
2161    }
2162
2163    fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2164        let mut seen = BTreeSet::new();
2165        self.state.item_dependents(id, &mut seen)
2166    }
2167
2168    fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2169        rbac::all_object_privileges(object_type)
2170    }
2171
2172    fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2173        self.state.get_object_type(object_id)
2174    }
2175
2176    fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2177        self.state.get_system_object_type(id)
2178    }
2179
2180    /// Returns a [`PartialItemName`] with the minimum amount of qualifiers to unambiguously resolve
2181    /// the object.
2182    ///
2183    /// Warning: This is broken for temporary objects. Don't use this function for serious stuff,
2184    /// i.e., don't expect that what you get back is a thing you can resolve. Current usages are
2185    /// only for error msgs and other humanizations.
2186    fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2187        if qualified_name.qualifiers.schema_spec.is_temporary() {
2188            // All bets are off. Just give up and return the qualified name as is.
2189            // TODO: Figure out what's going on with temporary objects.
2190
2191            // See e.g. `temporary_objects.slt` fail if you comment this out, which has the repro
2192            // from https://github.com/MaterializeInc/database-issues/issues/9973#issuecomment-3646382143
2193            // There is also https://github.com/MaterializeInc/database-issues/issues/9974, for
2194            // which we don't have a simple repro.
2195            return qualified_name.item.clone().into();
2196        }
2197
2198        let database_id = match &qualified_name.qualifiers.database_spec {
2199            ResolvedDatabaseSpecifier::Ambient => None,
2200            ResolvedDatabaseSpecifier::Id(id)
2201                if self.database.is_some() && self.database == Some(*id) =>
2202            {
2203                None
2204            }
2205            ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2206        };
2207
2208        let schema_spec = if database_id.is_none()
2209            && self.resolve_item_name(&PartialItemName {
2210                database: None,
2211                schema: None,
2212                item: qualified_name.item.clone(),
2213            }) == Ok(qualified_name)
2214            || self.resolve_function_name(&PartialItemName {
2215                database: None,
2216                schema: None,
2217                item: qualified_name.item.clone(),
2218            }) == Ok(qualified_name)
2219            || self.resolve_type_name(&PartialItemName {
2220                database: None,
2221                schema: None,
2222                item: qualified_name.item.clone(),
2223            }) == Ok(qualified_name)
2224        {
2225            None
2226        } else {
2227            // If `search_path` does not contain `full_name.schema`, the
2228            // `PartialName` must contain it.
2229            Some(qualified_name.qualifiers.schema_spec.clone())
2230        };
2231
2232        let res = PartialItemName {
2233            database: database_id.map(|id| self.get_database(&id).name().to_string()),
2234            schema: schema_spec.map(|spec| {
2235                self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2236                    .name()
2237                    .schema
2238                    .clone()
2239            }),
2240            item: qualified_name.item.clone(),
2241        };
2242        assert!(
2243            self.resolve_item_name(&res) == Ok(qualified_name)
2244                || self.resolve_function_name(&res) == Ok(qualified_name)
2245                || self.resolve_type_name(&res) == Ok(qualified_name)
2246        );
2247        res
2248    }
2249
2250    fn add_notice(&self, notice: PlanNotice) {
2251        let _ = self.notices_tx.send(notice.into());
2252    }
2253
2254    fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2255        let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2256        self.state.comments.get_object_comments(comment_id)
2257    }
2258
2259    fn is_cluster_size_cc(&self, size: &str) -> bool {
2260        self.state
2261            .cluster_replica_sizes
2262            .0
2263            .get(size)
2264            .map_or(false, |a| a.is_cc)
2265    }
2266}
2267
2268#[cfg(test)]
2269mod tests {
2270    use std::collections::{BTreeMap, BTreeSet};
2271    use std::sync::Arc;
2272    use std::{env, iter};
2273
2274    use itertools::Itertools;
2275    use mz_catalog::memory::objects::CatalogItem;
2276    use tokio_postgres::NoTls;
2277    use tokio_postgres::types::Type;
2278    use uuid::Uuid;
2279
2280    use mz_catalog::SYSTEM_CONN_ID;
2281    use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2282    use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2283    use mz_controller_types::{ClusterId, ReplicaId};
2284    use mz_expr::MirScalarExpr;
2285    use mz_ore::now::to_datetime;
2286    use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2287    use mz_persist_client::PersistClient;
2288    use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2289    use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2290    use mz_repr::role_id::RoleId;
2291    use mz_repr::{
2292        CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2293        SqlScalarType, Timestamp,
2294    };
2295    use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2296    use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2297    use mz_sql::names::{
2298        self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2299        ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2300    };
2301    use mz_sql::plan::{
2302        CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2303        Scope, StatementContext,
2304    };
2305    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2306    use mz_sql::session::vars::{SystemVars, VarInput};
2307
2308    use crate::catalog::state::LocalExpressionCache;
2309    use crate::catalog::{Catalog, Op};
2310    use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2311    use crate::session::Session;
2312
2313    /// System sessions have an empty `search_path` so it's necessary to
2314    /// schema-qualify all referenced items.
2315    ///
2316    /// Dummy (and ostensibly client) sessions contain system schemas in their
2317    /// search paths, so do not require schema qualification on system objects such
2318    /// as types.
2319    #[mz_ore::test(tokio::test)]
2320    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2321    async fn test_minimal_qualification() {
2322        Catalog::with_debug(|catalog| async move {
2323            struct TestCase {
2324                input: QualifiedItemName,
2325                system_output: PartialItemName,
2326                normal_output: PartialItemName,
2327            }
2328
2329            let test_cases = vec![
2330                TestCase {
2331                    input: QualifiedItemName {
2332                        qualifiers: ItemQualifiers {
2333                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2334                            schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2335                        },
2336                        item: "numeric".to_string(),
2337                    },
2338                    system_output: PartialItemName {
2339                        database: None,
2340                        schema: None,
2341                        item: "numeric".to_string(),
2342                    },
2343                    normal_output: PartialItemName {
2344                        database: None,
2345                        schema: None,
2346                        item: "numeric".to_string(),
2347                    },
2348                },
2349                TestCase {
2350                    input: QualifiedItemName {
2351                        qualifiers: ItemQualifiers {
2352                            database_spec: ResolvedDatabaseSpecifier::Ambient,
2353                            schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2354                        },
2355                        item: "mz_array_types".to_string(),
2356                    },
2357                    system_output: PartialItemName {
2358                        database: None,
2359                        schema: None,
2360                        item: "mz_array_types".to_string(),
2361                    },
2362                    normal_output: PartialItemName {
2363                        database: None,
2364                        schema: None,
2365                        item: "mz_array_types".to_string(),
2366                    },
2367                },
2368            ];
2369
2370            for tc in test_cases {
2371                assert_eq!(
2372                    catalog
2373                        .for_system_session()
2374                        .minimal_qualification(&tc.input),
2375                    tc.system_output
2376                );
2377                assert_eq!(
2378                    catalog
2379                        .for_session(&Session::dummy())
2380                        .minimal_qualification(&tc.input),
2381                    tc.normal_output
2382                );
2383            }
2384            catalog.expire().await;
2385        })
2386        .await
2387    }
2388
2389    #[mz_ore::test(tokio::test)]
2390    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2391    async fn test_catalog_revision() {
2392        let persist_client = PersistClient::new_for_tests().await;
2393        let organization_id = Uuid::new_v4();
2394        let bootstrap_args = test_bootstrap_args();
2395        {
2396            let mut catalog = Catalog::open_debug_catalog(
2397                persist_client.clone(),
2398                organization_id.clone(),
2399                &bootstrap_args,
2400            )
2401            .await
2402            .expect("unable to open debug catalog");
2403            assert_eq!(catalog.transient_revision(), 1);
2404            let commit_ts = catalog.current_upper().await;
2405            catalog
2406                .transact(
2407                    None,
2408                    commit_ts,
2409                    None,
2410                    vec![Op::CreateDatabase {
2411                        name: "test".to_string(),
2412                        owner_id: MZ_SYSTEM_ROLE_ID,
2413                    }],
2414                )
2415                .await
2416                .expect("failed to transact");
2417            assert_eq!(catalog.transient_revision(), 2);
2418            catalog.expire().await;
2419        }
2420        {
2421            let catalog =
2422                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2423                    .await
2424                    .expect("unable to open debug catalog");
2425            // Re-opening the same catalog resets the transient_revision to 1.
2426            assert_eq!(catalog.transient_revision(), 1);
2427            catalog.expire().await;
2428        }
2429    }
2430
2431    #[mz_ore::test(tokio::test)]
2432    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2433    async fn test_effective_search_path() {
2434        Catalog::with_debug(|catalog| async move {
2435            let mz_catalog_schema = (
2436                ResolvedDatabaseSpecifier::Ambient,
2437                SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2438            );
2439            let pg_catalog_schema = (
2440                ResolvedDatabaseSpecifier::Ambient,
2441                SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2442            );
2443            let mz_temp_schema = (
2444                ResolvedDatabaseSpecifier::Ambient,
2445                SchemaSpecifier::Temporary,
2446            );
2447
2448            // Behavior with the default search_schema (public)
2449            let session = Session::dummy();
2450            let conn_catalog = catalog.for_session(&session);
2451            assert_ne!(
2452                conn_catalog.effective_search_path(false),
2453                conn_catalog.search_path
2454            );
2455            assert_ne!(
2456                conn_catalog.effective_search_path(true),
2457                conn_catalog.search_path
2458            );
2459            assert_eq!(
2460                conn_catalog.effective_search_path(false),
2461                vec![
2462                    mz_catalog_schema.clone(),
2463                    pg_catalog_schema.clone(),
2464                    conn_catalog.search_path[0].clone()
2465                ]
2466            );
2467            assert_eq!(
2468                conn_catalog.effective_search_path(true),
2469                vec![
2470                    mz_temp_schema.clone(),
2471                    mz_catalog_schema.clone(),
2472                    pg_catalog_schema.clone(),
2473                    conn_catalog.search_path[0].clone()
2474                ]
2475            );
2476
2477            // missing schemas are added when missing
2478            let mut session = Session::dummy();
2479            session
2480                .vars_mut()
2481                .set(
2482                    &SystemVars::new(),
2483                    "search_path",
2484                    VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2485                    false,
2486                )
2487                .expect("failed to set search_path");
2488            let conn_catalog = catalog.for_session(&session);
2489            assert_ne!(
2490                conn_catalog.effective_search_path(false),
2491                conn_catalog.search_path
2492            );
2493            assert_ne!(
2494                conn_catalog.effective_search_path(true),
2495                conn_catalog.search_path
2496            );
2497            assert_eq!(
2498                conn_catalog.effective_search_path(false),
2499                vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2500            );
2501            assert_eq!(
2502                conn_catalog.effective_search_path(true),
2503                vec![
2504                    mz_temp_schema.clone(),
2505                    mz_catalog_schema.clone(),
2506                    pg_catalog_schema.clone()
2507                ]
2508            );
2509
2510            let mut session = Session::dummy();
2511            session
2512                .vars_mut()
2513                .set(
2514                    &SystemVars::new(),
2515                    "search_path",
2516                    VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2517                    false,
2518                )
2519                .expect("failed to set search_path");
2520            let conn_catalog = catalog.for_session(&session);
2521            assert_ne!(
2522                conn_catalog.effective_search_path(false),
2523                conn_catalog.search_path
2524            );
2525            assert_ne!(
2526                conn_catalog.effective_search_path(true),
2527                conn_catalog.search_path
2528            );
2529            assert_eq!(
2530                conn_catalog.effective_search_path(false),
2531                vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2532            );
2533            assert_eq!(
2534                conn_catalog.effective_search_path(true),
2535                vec![
2536                    mz_temp_schema.clone(),
2537                    pg_catalog_schema.clone(),
2538                    mz_catalog_schema.clone()
2539                ]
2540            );
2541
2542            let mut session = Session::dummy();
2543            session
2544                .vars_mut()
2545                .set(
2546                    &SystemVars::new(),
2547                    "search_path",
2548                    VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2549                    false,
2550                )
2551                .expect("failed to set search_path");
2552            let conn_catalog = catalog.for_session(&session);
2553            assert_ne!(
2554                conn_catalog.effective_search_path(false),
2555                conn_catalog.search_path
2556            );
2557            assert_ne!(
2558                conn_catalog.effective_search_path(true),
2559                conn_catalog.search_path
2560            );
2561            assert_eq!(
2562                conn_catalog.effective_search_path(false),
2563                vec![
2564                    mz_catalog_schema.clone(),
2565                    pg_catalog_schema.clone(),
2566                    mz_temp_schema.clone()
2567                ]
2568            );
2569            assert_eq!(
2570                conn_catalog.effective_search_path(true),
2571                vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2572            );
2573            catalog.expire().await;
2574        })
2575        .await
2576    }
2577
2578    #[mz_ore::test(tokio::test)]
2579    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2580    async fn test_normalized_create() {
2581        use mz_ore::collections::CollectionExt;
2582        Catalog::with_debug(|catalog| async move {
2583            let conn_catalog = catalog.for_system_session();
2584            let scx = &mut StatementContext::new(None, &conn_catalog);
2585
2586            let parsed = mz_sql_parser::parser::parse_statements(
2587                "create view public.foo as select 1 as bar",
2588            )
2589            .expect("")
2590            .into_element()
2591            .ast;
2592
2593            let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2594
2595            // Ensure that all identifiers are quoted.
2596            assert_eq!(
2597                r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2598                mz_sql::normalize::create_statement(scx, stmt).expect(""),
2599            );
2600            catalog.expire().await;
2601        })
2602        .await;
2603    }
2604
2605    // Test that if a large catalog item is somehow committed, then we can still load the catalog.
2606    #[mz_ore::test(tokio::test)]
2607    #[cfg_attr(miri, ignore)] // slow
2608    async fn test_large_catalog_item() {
2609        let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2610        let column = ", 1";
2611        let view_def_size = view_def.bytes().count();
2612        let column_size = column.bytes().count();
2613        let column_count =
2614            (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2615        let columns = iter::repeat(column).take(column_count).join("");
2616        let create_sql = format!("{view_def}{columns})");
2617        let create_sql_check = create_sql.clone();
2618        assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2619        assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2620            &create_sql
2621        ));
2622
2623        let persist_client = PersistClient::new_for_tests().await;
2624        let organization_id = Uuid::new_v4();
2625        let id = CatalogItemId::User(1);
2626        let gid = GlobalId::User(1);
2627        let bootstrap_args = test_bootstrap_args();
2628        {
2629            let mut catalog = Catalog::open_debug_catalog(
2630                persist_client.clone(),
2631                organization_id.clone(),
2632                &bootstrap_args,
2633            )
2634            .await
2635            .expect("unable to open debug catalog");
2636            let item = catalog
2637                .state()
2638                .deserialize_item(
2639                    gid,
2640                    &create_sql,
2641                    &BTreeMap::new(),
2642                    &mut LocalExpressionCache::Closed,
2643                    None,
2644                )
2645                .expect("unable to parse view");
2646            let commit_ts = catalog.current_upper().await;
2647            catalog
2648                .transact(
2649                    None,
2650                    commit_ts,
2651                    None,
2652                    vec![Op::CreateItem {
2653                        item,
2654                        name: QualifiedItemName {
2655                            qualifiers: ItemQualifiers {
2656                                database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2657                                schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2658                            },
2659                            item: "v".to_string(),
2660                        },
2661                        id,
2662                        owner_id: MZ_SYSTEM_ROLE_ID,
2663                    }],
2664                )
2665                .await
2666                .expect("failed to transact");
2667            catalog.expire().await;
2668        }
2669        {
2670            let catalog =
2671                Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2672                    .await
2673                    .expect("unable to open debug catalog");
2674            let view = catalog.get_entry(&id);
2675            assert_eq!("v", view.name.item);
2676            match &view.item {
2677                CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2678                item => panic!("expected view, got {}", item.typ()),
2679            }
2680            catalog.expire().await;
2681        }
2682    }
2683
2684    #[mz_ore::test(tokio::test)]
2685    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2686    async fn test_object_type() {
2687        Catalog::with_debug(|catalog| async move {
2688            let conn_catalog = catalog.for_system_session();
2689
2690            assert_eq!(
2691                mz_sql::catalog::ObjectType::ClusterReplica,
2692                conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2693                    ClusterId::user(1).expect("1 is a valid ID"),
2694                    ReplicaId::User(1)
2695                )))
2696            );
2697            assert_eq!(
2698                mz_sql::catalog::ObjectType::Role,
2699                conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2700            );
2701            catalog.expire().await;
2702        })
2703        .await;
2704    }
2705
2706    #[mz_ore::test(tokio::test)]
2707    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2708    async fn test_get_privileges() {
2709        Catalog::with_debug(|catalog| async move {
2710            let conn_catalog = catalog.for_system_session();
2711
2712            assert_eq!(
2713                None,
2714                conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2715                    ClusterId::user(1).expect("1 is a valid ID"),
2716                    ReplicaId::User(1),
2717                ))))
2718            );
2719            assert_eq!(
2720                None,
2721                conn_catalog
2722                    .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2723            );
2724            catalog.expire().await;
2725        })
2726        .await;
2727    }
2728
2729    #[mz_ore::test(tokio::test)]
2730    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2731    async fn verify_builtin_descs() {
2732        Catalog::with_debug(|catalog| async move {
2733            let conn_catalog = catalog.for_system_session();
2734
2735            let builtins_cfg = BuiltinsConfig {
2736                include_continual_tasks: true,
2737            };
2738            for builtin in BUILTINS::iter(&builtins_cfg) {
2739                let (schema, name, expected_desc) = match builtin {
2740                    Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2741                    Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2742                    Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2743                    Builtin::Log(_)
2744                    | Builtin::Type(_)
2745                    | Builtin::Func(_)
2746                    | Builtin::ContinualTask(_)
2747                    | Builtin::Index(_)
2748                    | Builtin::Connection(_) => continue,
2749                };
2750                let item = conn_catalog
2751                    .resolve_item(&PartialItemName {
2752                        database: None,
2753                        schema: Some(schema.to_string()),
2754                        item: name.to_string(),
2755                    })
2756                    .expect("unable to resolve item")
2757                    .at_version(RelationVersionSelector::Latest);
2758
2759                let actual_desc = item.relation_desc().expect("invalid item type");
2760                for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2761                    actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2762                {
2763                    assert_eq!(
2764                        actual_name, expected_name,
2765                        "item {schema}.{name} column {index} name did not match its expected name"
2766                    );
2767                    assert_eq!(
2768                        actual_typ, expected_typ,
2769                        "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2770                    );
2771                }
2772                assert_eq!(
2773                    &*actual_desc, expected_desc,
2774                    "item {schema}.{name} did not match its expected RelationDesc"
2775                );
2776            }
2777            catalog.expire().await;
2778        })
2779        .await
2780    }
2781
2782    // Connect to a running Postgres server and verify that our builtin
2783    // types and functions match it, in addition to some other things.
2784    #[mz_ore::test(tokio::test)]
2785    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
2786    async fn test_compare_builtins_postgres() {
2787        async fn inner(catalog: Catalog) {
2788            // Verify that all builtin functions:
2789            // - have a unique OID
2790            // - if they have a postgres counterpart (same oid) then they have matching name
2791            let (client, connection) = tokio_postgres::connect(
2792                &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2793                NoTls,
2794            )
2795            .await
2796            .expect("failed to connect to Postgres");
2797
2798            task::spawn(|| "compare_builtin_postgres", async move {
2799                if let Err(e) = connection.await {
2800                    panic!("connection error: {}", e);
2801                }
2802            });
2803
2804            struct PgProc {
2805                name: String,
2806                arg_oids: Vec<u32>,
2807                ret_oid: Option<u32>,
2808                ret_set: bool,
2809            }
2810
2811            struct PgType {
2812                name: String,
2813                ty: String,
2814                elem: u32,
2815                array: u32,
2816                input: u32,
2817                receive: u32,
2818            }
2819
2820            struct PgOper {
2821                oprresult: u32,
2822                name: String,
2823            }
2824
2825            let pg_proc: BTreeMap<_, _> = client
2826                .query(
2827                    "SELECT
2828                    p.oid,
2829                    proname,
2830                    proargtypes,
2831                    prorettype,
2832                    proretset
2833                FROM pg_proc p
2834                JOIN pg_namespace n ON p.pronamespace = n.oid",
2835                    &[],
2836                )
2837                .await
2838                .expect("pg query failed")
2839                .into_iter()
2840                .map(|row| {
2841                    let oid: u32 = row.get("oid");
2842                    let pg_proc = PgProc {
2843                        name: row.get("proname"),
2844                        arg_oids: row.get("proargtypes"),
2845                        ret_oid: row.get("prorettype"),
2846                        ret_set: row.get("proretset"),
2847                    };
2848                    (oid, pg_proc)
2849                })
2850                .collect();
2851
2852            let pg_type: BTreeMap<_, _> = client
2853                .query(
2854                    "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2855                    &[],
2856                )
2857                .await
2858                .expect("pg query failed")
2859                .into_iter()
2860                .map(|row| {
2861                    let oid: u32 = row.get("oid");
2862                    let pg_type = PgType {
2863                        name: row.get("typname"),
2864                        ty: row.get("typtype"),
2865                        elem: row.get("typelem"),
2866                        array: row.get("typarray"),
2867                        input: row.get("typinput"),
2868                        receive: row.get("typreceive"),
2869                    };
2870                    (oid, pg_type)
2871                })
2872                .collect();
2873
2874            let pg_oper: BTreeMap<_, _> = client
2875                .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2876                .await
2877                .expect("pg query failed")
2878                .into_iter()
2879                .map(|row| {
2880                    let oid: u32 = row.get("oid");
2881                    let pg_oper = PgOper {
2882                        name: row.get("oprname"),
2883                        oprresult: row.get("oprresult"),
2884                    };
2885                    (oid, pg_oper)
2886                })
2887                .collect();
2888
2889            let conn_catalog = catalog.for_system_session();
2890            let resolve_type_oid = |item: &str| {
2891                conn_catalog
2892                    .resolve_type(&PartialItemName {
2893                        database: None,
2894                        // All functions we check exist in PG, so the types must, as
2895                        // well
2896                        schema: Some(PG_CATALOG_SCHEMA.into()),
2897                        item: item.to_string(),
2898                    })
2899                    .expect("unable to resolve type")
2900                    .oid()
2901            };
2902
2903            let func_oids: BTreeSet<_> = BUILTINS::funcs()
2904                .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2905                .collect();
2906
2907            let mut all_oids = BTreeSet::new();
2908
2909            // A function to determine if two oids are equivalent enough for these tests. We don't
2910            // support some types, so map exceptions here.
2911            let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2912                [
2913                    // We don't support NAME.
2914                    (Type::NAME, Type::TEXT),
2915                    (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2916                    // We don't support time with time zone.
2917                    (Type::TIME, Type::TIMETZ),
2918                    (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2919                ]
2920                .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2921            );
2922            let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2923                1619, // pg_typeof: TODO: We now have regtype and can correctly implement this.
2924            ]);
2925            let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2926                if ignore_return_types.contains(&fn_oid) {
2927                    return true;
2928                }
2929                if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2930                    return true;
2931                }
2932                a == b
2933            };
2934
2935            let builtins_cfg = BuiltinsConfig {
2936                include_continual_tasks: true,
2937            };
2938            for builtin in BUILTINS::iter(&builtins_cfg) {
2939                match builtin {
2940                    Builtin::Type(ty) => {
2941                        assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2942
2943                        if ty.oid >= FIRST_MATERIALIZE_OID {
2944                            // High OIDs are reserved in Materialize and don't have
2945                            // PostgreSQL counterparts.
2946                            continue;
2947                        }
2948
2949                        // For types that have a PostgreSQL counterpart, verify that
2950                        // the name and oid match.
2951                        let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2952                            panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2953                        });
2954                        assert_eq!(
2955                            ty.name, pg_ty.name,
2956                            "oid {} has name {} in postgres; expected {}",
2957                            ty.oid, pg_ty.name, ty.name,
2958                        );
2959
2960                        let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2961                            None => (0, 0),
2962                            Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2963                        };
2964                        assert_eq!(
2965                            typinput_oid, pg_ty.input,
2966                            "type {} has typinput OID {:?} in mz but {:?} in pg",
2967                            ty.name, typinput_oid, pg_ty.input,
2968                        );
2969                        assert_eq!(
2970                            typreceive_oid, pg_ty.receive,
2971                            "type {} has typreceive OID {:?} in mz but {:?} in pg",
2972                            ty.name, typreceive_oid, pg_ty.receive,
2973                        );
2974                        if typinput_oid != 0 {
2975                            assert!(
2976                                func_oids.contains(&typinput_oid),
2977                                "type {} has typinput OID {} that does not exist in pg_proc",
2978                                ty.name,
2979                                typinput_oid,
2980                            );
2981                        }
2982                        if typreceive_oid != 0 {
2983                            assert!(
2984                                func_oids.contains(&typreceive_oid),
2985                                "type {} has typreceive OID {} that does not exist in pg_proc",
2986                                ty.name,
2987                                typreceive_oid,
2988                            );
2989                        }
2990
2991                        // Ensure the type matches.
2992                        match &ty.details.typ {
2993                            CatalogType::Array { element_reference } => {
2994                                let elem_ty = BUILTINS::iter(&builtins_cfg)
2995                                    .filter_map(|builtin| match builtin {
2996                                        Builtin::Type(ty @ BuiltinType { name, .. })
2997                                            if element_reference == name =>
2998                                        {
2999                                            Some(ty)
3000                                        }
3001                                        _ => None,
3002                                    })
3003                                    .next();
3004                                let elem_ty = match elem_ty {
3005                                    Some(ty) => ty,
3006                                    None => {
3007                                        panic!("{} is unexpectedly not a type", element_reference)
3008                                    }
3009                                };
3010                                assert_eq!(
3011                                    pg_ty.elem, elem_ty.oid,
3012                                    "type {} has mismatched element OIDs",
3013                                    ty.name
3014                                )
3015                            }
3016                            CatalogType::Pseudo => {
3017                                assert_eq!(
3018                                    pg_ty.ty, "p",
3019                                    "type {} is not a pseudo type as expected",
3020                                    ty.name
3021                                )
3022                            }
3023                            CatalogType::Range { .. } => {
3024                                assert_eq!(
3025                                    pg_ty.ty, "r",
3026                                    "type {} is not a range type as expected",
3027                                    ty.name
3028                                );
3029                            }
3030                            _ => {
3031                                assert_eq!(
3032                                    pg_ty.ty, "b",
3033                                    "type {} is not a base type as expected",
3034                                    ty.name
3035                                )
3036                            }
3037                        }
3038
3039                        // Ensure the array type reference is correct.
3040                        let schema = catalog
3041                            .resolve_schema_in_database(
3042                                &ResolvedDatabaseSpecifier::Ambient,
3043                                ty.schema,
3044                                &SYSTEM_CONN_ID,
3045                            )
3046                            .expect("unable to resolve schema");
3047                        let allocated_type = catalog
3048                            .resolve_type(
3049                                None,
3050                                &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3051                                &PartialItemName {
3052                                    database: None,
3053                                    schema: Some(schema.name().schema.clone()),
3054                                    item: ty.name.to_string(),
3055                                },
3056                                &SYSTEM_CONN_ID,
3057                            )
3058                            .expect("unable to resolve type");
3059                        let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3060                            ty
3061                        } else {
3062                            panic!("unexpectedly not a type")
3063                        };
3064                        match ty.details.array_id {
3065                            Some(array_id) => {
3066                                let array_ty = catalog.get_entry(&array_id);
3067                                assert_eq!(
3068                                    pg_ty.array, array_ty.oid,
3069                                    "type {} has mismatched array OIDs",
3070                                    allocated_type.name.item,
3071                                );
3072                            }
3073                            None => assert_eq!(
3074                                pg_ty.array, 0,
3075                                "type {} does not have an array type in mz but does in pg",
3076                                allocated_type.name.item,
3077                            ),
3078                        }
3079                    }
3080                    Builtin::Func(func) => {
3081                        for imp in func.inner.func_impls() {
3082                            assert!(
3083                                all_oids.insert(imp.oid),
3084                                "{} reused oid {}",
3085                                func.name,
3086                                imp.oid
3087                            );
3088
3089                            assert!(
3090                                imp.oid < FIRST_USER_OID,
3091                                "built-in function {} erroneously has OID in user space ({})",
3092                                func.name,
3093                                imp.oid,
3094                            );
3095
3096                            // For functions that have a postgres counterpart, verify that the name and
3097                            // oid match.
3098                            let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3099                                continue;
3100                            } else {
3101                                pg_proc.get(&imp.oid).unwrap_or_else(|| {
3102                                    panic!(
3103                                        "pg_proc missing function {}: oid {}",
3104                                        func.name, imp.oid
3105                                    )
3106                                })
3107                            };
3108                            assert_eq!(
3109                                func.name, pg_fn.name,
3110                                "funcs with oid {} don't match names: {} in mz, {} in pg",
3111                                imp.oid, func.name, pg_fn.name
3112                            );
3113
3114                            // Complain, but don't fail, if argument oids don't match.
3115                            // TODO: make these match.
3116                            let imp_arg_oids = imp
3117                                .arg_typs
3118                                .iter()
3119                                .map(|item| resolve_type_oid(item))
3120                                .collect::<Vec<_>>();
3121
3122                            if imp_arg_oids != pg_fn.arg_oids {
3123                                println!(
3124                                    "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3125                                    imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3126                                );
3127                            }
3128
3129                            let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3130
3131                            assert!(
3132                                is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3133                                "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3134                                imp.oid,
3135                                func.name,
3136                                imp_return_oid,
3137                                pg_fn.ret_oid
3138                            );
3139
3140                            assert_eq!(
3141                                imp.return_is_set, pg_fn.ret_set,
3142                                "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3143                                imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3144                            );
3145                        }
3146                    }
3147                    _ => (),
3148                }
3149            }
3150
3151            for (op, func) in OP_IMPLS.iter() {
3152                for imp in func.func_impls() {
3153                    assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3154
3155                    // For operators that have a postgres counterpart, verify that the name and oid match.
3156                    let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3157                        continue;
3158                    } else {
3159                        pg_oper.get(&imp.oid).unwrap_or_else(|| {
3160                            panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3161                        })
3162                    };
3163
3164                    assert_eq!(*op, pg_op.name);
3165
3166                    let imp_return_oid =
3167                        imp.return_typ.map(resolve_type_oid).expect("must have oid");
3168                    if imp_return_oid != pg_op.oprresult {
3169                        panic!(
3170                            "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3171                            imp.oid, op, imp_return_oid, pg_op.oprresult
3172                        );
3173                    }
3174                }
3175            }
3176            catalog.expire().await;
3177        }
3178
3179        Catalog::with_debug(inner).await
3180    }
3181
3182    // Execute all builtin functions with all combinations of arguments from interesting datums.
3183    #[mz_ore::test(tokio::test)]
3184    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3185    async fn test_smoketest_all_builtins() {
3186        fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3187            let catalog = Arc::new(catalog);
3188            let conn_catalog = catalog.for_system_session();
3189
3190            let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3191            let mut handles = Vec::new();
3192
3193            // Extracted during planning; always panics when executed.
3194            let ignore_names = BTreeSet::from([
3195                "avg",
3196                "avg_internal_v1",
3197                "bool_and",
3198                "bool_or",
3199                "has_table_privilege", // > 3 s each
3200                "has_type_privilege",  // > 3 s each
3201                "mod",
3202                "mz_panic",
3203                "mz_sleep",
3204                "pow",
3205                "stddev_pop",
3206                "stddev_samp",
3207                "stddev",
3208                "var_pop",
3209                "var_samp",
3210                "variance",
3211            ]);
3212
3213            let fns = BUILTINS::funcs()
3214                .map(|func| (&func.name, func.inner))
3215                .chain(OP_IMPLS.iter());
3216
3217            for (name, func) in fns {
3218                if ignore_names.contains(name) {
3219                    continue;
3220                }
3221                let Func::Scalar(impls) = func else {
3222                    continue;
3223                };
3224
3225                'outer: for imp in impls {
3226                    let details = imp.details();
3227                    let mut styps = Vec::new();
3228                    for item in details.arg_typs.iter() {
3229                        let oid = resolve_type_oid(item);
3230                        let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3231                            continue 'outer;
3232                        };
3233                        styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3234                    }
3235                    let datums = styps
3236                        .iter()
3237                        .map(|styp| {
3238                            let mut datums = vec![Datum::Null];
3239                            datums.extend(styp.interesting_datums());
3240                            datums
3241                        })
3242                        .collect::<Vec<_>>();
3243                    // Skip nullary fns.
3244                    if datums.is_empty() {
3245                        continue;
3246                    }
3247
3248                    let return_oid = details
3249                        .return_typ
3250                        .map(resolve_type_oid)
3251                        .expect("must exist");
3252                    let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3253                        .ok()
3254                        .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3255
3256                    let mut idxs = vec![0; datums.len()];
3257                    while idxs[0] < datums[0].len() {
3258                        let mut args = Vec::with_capacity(idxs.len());
3259                        for i in 0..(datums.len()) {
3260                            args.push(datums[i][idxs[i]]);
3261                        }
3262
3263                        let op = &imp.op;
3264                        let scalars = args
3265                            .iter()
3266                            .enumerate()
3267                            .map(|(i, datum)| {
3268                                CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3269                                    datum.clone(),
3270                                    styps[i].clone(),
3271                                ))
3272                            })
3273                            .collect();
3274
3275                        let call_name = format!(
3276                            "{name}({}) (oid: {})",
3277                            args.iter()
3278                                .map(|d| d.to_string())
3279                                .collect::<Vec<_>>()
3280                                .join(", "),
3281                            imp.oid
3282                        );
3283                        let catalog = Arc::clone(&catalog);
3284                        let call_name_fn = call_name.clone();
3285                        let return_styp = return_styp.clone();
3286                        let handle = task::spawn_blocking(
3287                            || call_name,
3288                            move || {
3289                                smoketest_fn(
3290                                    name,
3291                                    call_name_fn,
3292                                    op,
3293                                    imp,
3294                                    args,
3295                                    catalog,
3296                                    scalars,
3297                                    return_styp,
3298                                )
3299                            },
3300                        );
3301                        handles.push(handle);
3302
3303                        // Advance to the next datum combination.
3304                        for i in (0..datums.len()).rev() {
3305                            idxs[i] += 1;
3306                            if idxs[i] >= datums[i].len() {
3307                                if i == 0 {
3308                                    break;
3309                                }
3310                                idxs[i] = 0;
3311                                continue;
3312                            } else {
3313                                break;
3314                            }
3315                        }
3316                    }
3317                }
3318            }
3319            handles
3320        }
3321
3322        let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3323        for handle in handles {
3324            handle.await;
3325        }
3326    }
3327
3328    fn smoketest_fn(
3329        name: &&str,
3330        call_name: String,
3331        op: &Operation<HirScalarExpr>,
3332        imp: &FuncImpl<HirScalarExpr>,
3333        args: Vec<Datum<'_>>,
3334        catalog: Arc<Catalog>,
3335        scalars: Vec<CoercibleScalarExpr>,
3336        return_styp: Option<SqlScalarType>,
3337    ) {
3338        let conn_catalog = catalog.for_system_session();
3339        let pcx = PlanContext::zero();
3340        let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3341        let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3342        let ecx = ExprContext {
3343            qcx: &qcx,
3344            name: "smoketest",
3345            scope: &Scope::empty(),
3346            relation_type: &SqlRelationType::empty(),
3347            allow_aggregates: false,
3348            allow_subqueries: false,
3349            allow_parameters: false,
3350            allow_windows: false,
3351        };
3352        let arena = RowArena::new();
3353        let mut session = Session::<Timestamp>::dummy();
3354        session
3355            .start_transaction(to_datetime(0), None, None)
3356            .expect("must succeed");
3357        let prep_style = ExprPrepOneShot {
3358            logical_time: EvalTime::Time(Timestamp::MIN),
3359            session: &session,
3360            catalog_state: &catalog.state,
3361        };
3362
3363        // Execute the function as much as possible, ensuring no panics occur, but
3364        // otherwise ignoring eval errors. We also do various other checks.
3365        let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3366        if let Ok(hir) = res {
3367            if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3368                // Populate unmaterialized functions.
3369                prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3370
3371                if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3372                    if let Some(return_styp) = return_styp {
3373                        let mir_typ = mir.typ(&[]);
3374                        // MIR type inference should be consistent with the type
3375                        // we get from the catalog.
3376                        soft_assert_eq_or_log!(
3377                            mir_typ.scalar_type,
3378                            return_styp,
3379                            "MIR type did not match the catalog type (cast elimination/repr type error)"
3380                        );
3381                        // The following will check not just that the scalar type
3382                        // is ok, but also catches if the function returned a null
3383                        // but the MIR type inference said "non-nullable".
3384                        if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3385                            panic!(
3386                                "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3387                            );
3388                        }
3389                        // Check the consistency of `introduces_nulls` and
3390                        // `propagates_nulls` with `MirScalarExpr::typ`.
3391                        if let Some((introduces_nulls, propagates_nulls)) =
3392                            call_introduces_propagates_nulls(&mir)
3393                        {
3394                            if introduces_nulls {
3395                                // If the function introduces_nulls, then the return
3396                                // type should always be nullable, regardless of
3397                                // the nullability of the input types.
3398                                assert!(
3399                                    mir_typ.nullable,
3400                                    "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3401                                    name, args, mir, mir_typ.nullable
3402                                );
3403                            } else {
3404                                let any_input_null = args.iter().any(|arg| arg.is_null());
3405                                if !any_input_null {
3406                                    assert!(
3407                                        !mir_typ.nullable,
3408                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3409                                        name, args, mir, mir_typ.nullable
3410                                    );
3411                                } else {
3412                                    assert_eq!(
3413                                        mir_typ.nullable, propagates_nulls,
3414                                        "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3415                                        name, args, mir, mir_typ.nullable
3416                                    );
3417                                }
3418                            }
3419                        }
3420                        // Check that `MirScalarExpr::reduce` yields the same result
3421                        // as the real evaluation.
3422                        let mut reduced = mir.clone();
3423                        reduced.reduce(&[]);
3424                        match reduced {
3425                            MirScalarExpr::Literal(reduce_result, ctyp) => {
3426                                match reduce_result {
3427                                    Ok(reduce_result_row) => {
3428                                        let reduce_result_datum = reduce_result_row.unpack_first();
3429                                        assert_eq!(
3430                                            reduce_result_datum,
3431                                            eval_result_datum,
3432                                            "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3433                                            name,
3434                                            args,
3435                                            mir,
3436                                            eval_result_datum,
3437                                            mir_typ.scalar_type,
3438                                            reduce_result_datum,
3439                                            ctyp.scalar_type
3440                                        );
3441                                        // Let's check that the types also match.
3442                                        // (We are not checking nullability here,
3443                                        // because it's ok when we know a more
3444                                        // precise nullability after actually
3445                                        // evaluating a function than before.)
3446                                        assert_eq!(
3447                                            ctyp.scalar_type,
3448                                            mir_typ.scalar_type,
3449                                            "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3450                                            name,
3451                                            args,
3452                                            mir,
3453                                            eval_result_datum,
3454                                            mir_typ.scalar_type,
3455                                            reduce_result_datum,
3456                                            ctyp.scalar_type
3457                                        );
3458                                    }
3459                                    Err(..) => {} // It's ok, we might have given invalid args to the function
3460                                }
3461                            }
3462                            _ => unreachable!(
3463                                "all args are literals, so should have reduced to a literal"
3464                            ),
3465                        }
3466                    }
3467                }
3468            }
3469        }
3470    }
3471
3472    /// If the given MirScalarExpr
3473    ///  - is a function call, and
3474    ///  - all arguments are literals
3475    /// then it returns whether the called function (introduces_nulls, propagates_nulls).
3476    fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3477        match mir_func_call {
3478            MirScalarExpr::CallUnary { func, expr } => {
3479                if expr.is_literal() {
3480                    Some((func.introduces_nulls(), func.propagates_nulls()))
3481                } else {
3482                    None
3483                }
3484            }
3485            MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3486                if expr1.is_literal() && expr2.is_literal() {
3487                    Some((func.introduces_nulls(), func.propagates_nulls()))
3488                } else {
3489                    None
3490                }
3491            }
3492            MirScalarExpr::CallVariadic { func, exprs } => {
3493                if exprs.iter().all(|arg| arg.is_literal()) {
3494                    Some((func.introduces_nulls(), func.propagates_nulls()))
3495                } else {
3496                    None
3497                }
3498            }
3499            _ => None,
3500        }
3501    }
3502
3503    // Make sure pg views don't use types that only exist in Materialize.
3504    #[mz_ore::test(tokio::test)]
3505    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3506    async fn test_pg_views_forbidden_types() {
3507        Catalog::with_debug(|catalog| async move {
3508            let conn_catalog = catalog.for_system_session();
3509
3510            for view in BUILTINS::views().filter(|view| {
3511                view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3512            }) {
3513                let item = conn_catalog
3514                    .resolve_item(&PartialItemName {
3515                        database: None,
3516                        schema: Some(view.schema.to_string()),
3517                        item: view.name.to_string(),
3518                    })
3519                    .expect("unable to resolve view")
3520                    // TODO(alter_table)
3521                    .at_version(RelationVersionSelector::Latest);
3522                let full_name = conn_catalog.resolve_full_name(item.name());
3523                let desc = item.relation_desc().expect("invalid item type");
3524                for col_type in desc.iter_types() {
3525                    match &col_type.scalar_type {
3526                        typ @ SqlScalarType::UInt16
3527                        | typ @ SqlScalarType::UInt32
3528                        | typ @ SqlScalarType::UInt64
3529                        | typ @ SqlScalarType::MzTimestamp
3530                        | typ @ SqlScalarType::List { .. }
3531                        | typ @ SqlScalarType::Map { .. }
3532                        | typ @ SqlScalarType::MzAclItem => {
3533                            panic!("{typ:?} type found in {full_name}");
3534                        }
3535                        SqlScalarType::AclItem
3536                        | SqlScalarType::Bool
3537                        | SqlScalarType::Int16
3538                        | SqlScalarType::Int32
3539                        | SqlScalarType::Int64
3540                        | SqlScalarType::Float32
3541                        | SqlScalarType::Float64
3542                        | SqlScalarType::Numeric { .. }
3543                        | SqlScalarType::Date
3544                        | SqlScalarType::Time
3545                        | SqlScalarType::Timestamp { .. }
3546                        | SqlScalarType::TimestampTz { .. }
3547                        | SqlScalarType::Interval
3548                        | SqlScalarType::PgLegacyChar
3549                        | SqlScalarType::Bytes
3550                        | SqlScalarType::String
3551                        | SqlScalarType::Char { .. }
3552                        | SqlScalarType::VarChar { .. }
3553                        | SqlScalarType::Jsonb
3554                        | SqlScalarType::Uuid
3555                        | SqlScalarType::Array(_)
3556                        | SqlScalarType::Record { .. }
3557                        | SqlScalarType::Oid
3558                        | SqlScalarType::RegProc
3559                        | SqlScalarType::RegType
3560                        | SqlScalarType::RegClass
3561                        | SqlScalarType::Int2Vector
3562                        | SqlScalarType::Range { .. }
3563                        | SqlScalarType::PgLegacyName => {}
3564                    }
3565                }
3566            }
3567            catalog.expire().await;
3568        })
3569        .await
3570    }
3571
3572    // Make sure objects reside in the `mz_introspection` schema iff they depend on per-replica
3573    // introspection relations.
3574    #[mz_ore::test(tokio::test)]
3575    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
3576    async fn test_mz_introspection_builtins() {
3577        Catalog::with_debug(|catalog| async move {
3578            let conn_catalog = catalog.for_system_session();
3579
3580            let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3581            let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3582
3583            for entry in catalog.entries() {
3584                let schema_spec = entry.name().qualifiers.schema_spec;
3585                let introspection_deps = catalog.introspection_dependencies(entry.id);
3586                if introspection_deps.is_empty() {
3587                    assert!(
3588                        schema_spec != introspection_schema_spec,
3589                        "entry does not depend on introspection sources but is in \
3590                         `mz_introspection`: {}",
3591                        conn_catalog.resolve_full_name(entry.name()),
3592                    );
3593                } else {
3594                    assert!(
3595                        schema_spec == introspection_schema_spec,
3596                        "entry depends on introspection sources but is not in \
3597                         `mz_introspection`: {}",
3598                        conn_catalog.resolve_full_name(entry.name()),
3599                    );
3600                }
3601            }
3602        })
3603        .await
3604    }
3605
3606    #[mz_ore::test(tokio::test)]
3607    #[cfg_attr(miri, ignore)] //  unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
3608    async fn test_multi_subscriber_catalog() {
3609        let persist_client = PersistClient::new_for_tests().await;
3610        let bootstrap_args = test_bootstrap_args();
3611        let organization_id = Uuid::new_v4();
3612        let db_name = "DB";
3613
3614        let mut writer_catalog = Catalog::open_debug_catalog(
3615            persist_client.clone(),
3616            organization_id.clone(),
3617            &bootstrap_args,
3618        )
3619        .await
3620        .expect("open_debug_catalog");
3621        let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3622            persist_client.clone(),
3623            organization_id.clone(),
3624            &bootstrap_args,
3625        )
3626        .await
3627        .expect("open_debug_read_only_catalog");
3628        assert_err!(writer_catalog.resolve_database(db_name));
3629        assert_err!(read_only_catalog.resolve_database(db_name));
3630
3631        let commit_ts = writer_catalog.current_upper().await;
3632        writer_catalog
3633            .transact(
3634                None,
3635                commit_ts,
3636                None,
3637                vec![Op::CreateDatabase {
3638                    name: db_name.to_string(),
3639                    owner_id: MZ_SYSTEM_ROLE_ID,
3640                }],
3641            )
3642            .await
3643            .expect("failed to transact");
3644
3645        let write_db = writer_catalog
3646            .resolve_database(db_name)
3647            .expect("resolve_database");
3648        read_only_catalog
3649            .sync_to_current_updates()
3650            .await
3651            .expect("sync_to_current_updates");
3652        let read_db = read_only_catalog
3653            .resolve_database(db_name)
3654            .expect("resolve_database");
3655
3656        assert_eq!(write_db, read_db);
3657
3658        let writer_catalog_fencer =
3659            Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3660                .await
3661                .expect("open_debug_catalog for fencer");
3662        let fencer_db = writer_catalog_fencer
3663            .resolve_database(db_name)
3664            .expect("resolve_database for fencer");
3665        assert_eq!(fencer_db, read_db);
3666
3667        let write_fence_err = writer_catalog
3668            .sync_to_current_updates()
3669            .await
3670            .expect_err("sync_to_current_updates for fencer");
3671        assert!(matches!(
3672            write_fence_err,
3673            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3674        ));
3675        let read_fence_err = read_only_catalog
3676            .sync_to_current_updates()
3677            .await
3678            .expect_err("sync_to_current_updates after fencer");
3679        assert!(matches!(
3680            read_fence_err,
3681            CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3682        ));
3683
3684        writer_catalog.expire().await;
3685        read_only_catalog.expire().await;
3686        writer_catalog_fencer.expire().await;
3687    }
3688}