1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_persist_client::PersistClient;
55use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
56use mz_repr::explain::ExprHumanizer;
57use mz_repr::namespaces::MZ_TEMP_SCHEMA;
58use mz_repr::network_policy_id::NetworkPolicyId;
59use mz_repr::optimize::OptimizerFeatures;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
66 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
67 SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_transform::dataflow::DataflowMetainfo;
83use mz_transform::notice::OptimizerNotice;
84use tokio::sync::MutexGuard;
85use tokio::sync::mpsc::UnboundedSender;
86use uuid::Uuid;
87
88pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
90pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
91pub use crate::catalog::state::CatalogState;
92pub use crate::catalog::transact::{
93 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
94};
95use crate::command::CatalogDump;
96use crate::coord::TargetCluster;
97#[cfg(test)]
98use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
99use crate::session::{Portal, PreparedStatement, Session};
100use crate::util::ResultExt;
101use crate::{AdapterError, AdapterNotice, ExecuteResponse};
102
103mod builtin_table_updates;
104pub(crate) mod consistency;
105mod migrate;
106
107mod apply;
108mod open;
109mod state;
110mod timeline;
111mod transact;
112
113#[derive(Debug)]
136pub struct Catalog {
137 state: CatalogState,
138 expr_cache_handle: Option<ExpressionCacheHandle>,
139 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
140 transient_revision: u64,
141}
142
143impl Clone for Catalog {
146 fn clone(&self) -> Self {
147 Self {
148 state: self.state.clone(),
149 expr_cache_handle: self.expr_cache_handle.clone(),
150 storage: Arc::clone(&self.storage),
151 transient_revision: self.transient_revision,
152 }
153 }
154}
155
156impl Catalog {
157 #[mz_ore::instrument(level = "trace")]
163 pub fn set_optimized_plan(
164 &mut self,
165 id: GlobalId,
166 plan: DataflowDescription<OptimizedMirRelationExpr>,
167 ) {
168 self.state.set_optimized_plan(id, plan);
169 }
170
171 #[mz_ore::instrument(level = "trace")]
177 pub fn set_physical_plan(
178 &mut self,
179 id: GlobalId,
180 plan: DataflowDescription<mz_compute_types::plan::Plan>,
181 ) {
182 self.state.set_physical_plan(id, plan);
183 }
184
185 #[mz_ore::instrument(level = "trace")]
187 pub fn try_get_optimized_plan(
188 &self,
189 id: &GlobalId,
190 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
191 let entry = self.state.try_get_entry_by_global_id(id)?;
192 entry.item().optimized_plan().map(AsRef::as_ref)
193 }
194
195 #[mz_ore::instrument(level = "trace")]
197 pub fn try_get_physical_plan(
198 &self,
199 id: &GlobalId,
200 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
201 let entry = self.state.try_get_entry_by_global_id(id)?;
202 entry.item().physical_plan().map(AsRef::as_ref)
203 }
204
205 #[mz_ore::instrument(level = "trace")]
211 pub fn set_dataflow_metainfo(
212 &mut self,
213 id: GlobalId,
214 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
215 ) {
216 self.state.set_dataflow_metainfo(id, metainfo);
217 }
218
219 #[mz_ore::instrument(level = "trace")]
221 pub fn try_get_dataflow_metainfo(
222 &self,
223 id: &GlobalId,
224 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
225 let entry = self.state.try_get_entry_by_global_id(id)?;
226 entry.item().dataflow_metainfo()
227 }
228}
229
230#[derive(Debug)]
231pub struct ConnCatalog<'a> {
232 state: Cow<'a, CatalogState>,
233 unresolvable_ids: BTreeSet<CatalogItemId>,
243 conn_id: ConnectionId,
244 cluster: String,
245 database: Option<DatabaseId>,
246 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
247 role_id: RoleId,
248 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
249 portals: Option<&'a BTreeMap<String, Portal>>,
250 notices_tx: UnboundedSender<AdapterNotice>,
251}
252
253impl ConnCatalog<'_> {
254 pub fn conn_id(&self) -> &ConnectionId {
255 &self.conn_id
256 }
257
258 pub fn state(&self) -> &CatalogState {
259 &*self.state
260 }
261
262 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
272 assert_eq!(
273 self.role_id, MZ_SYSTEM_ROLE_ID,
274 "only the system role can mark IDs unresolvable",
275 );
276 self.unresolvable_ids.insert(id);
277 }
278
279 pub fn effective_search_path(
285 &self,
286 include_temp_schema: bool,
287 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
288 self.state
289 .effective_search_path(&self.search_path, include_temp_schema)
290 }
291}
292
293impl ConnectionResolver for ConnCatalog<'_> {
294 fn resolve_connection(
295 &self,
296 id: CatalogItemId,
297 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
298 self.state().resolve_connection(id)
299 }
300}
301
302impl Catalog {
303 pub fn transient_revision(&self) -> u64 {
307 self.transient_revision
308 }
309
310 pub async fn with_debug<F, Fut, T>(f: F) -> T
322 where
323 F: FnOnce(Catalog) -> Fut,
324 Fut: Future<Output = T>,
325 {
326 let persist_client = PersistClient::new_for_tests().await;
327 let organization_id = Uuid::new_v4();
328 let bootstrap_args = test_bootstrap_args();
329 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
330 .await
331 .expect("can open debug catalog");
332 f(catalog).await
333 }
334
335 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
338 where
339 F: FnOnce(Catalog) -> Fut,
340 Fut: Future<Output = T>,
341 {
342 let persist_client = PersistClient::new_for_tests().await;
343 let organization_id = Uuid::new_v4();
344 let bootstrap_args = test_bootstrap_args();
345 let mut catalog =
346 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
347 .await
348 .expect("can open debug catalog");
349
350 let now = SYSTEM_TIME.clone();
352 let openable_storage = TestCatalogStateBuilder::new(persist_client)
353 .with_organization_id(organization_id)
354 .with_default_deploy_generation()
355 .build()
356 .await
357 .expect("can create durable catalog");
358 let mut storage = openable_storage
359 .open(now().into(), &bootstrap_args)
360 .await
361 .expect("can open durable catalog")
362 .0;
363 let _ = storage
365 .sync_to_current_updates()
366 .await
367 .expect("can sync to current updates");
368 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
369
370 f(catalog).await
371 }
372
373 pub async fn open_debug_catalog(
377 persist_client: PersistClient,
378 organization_id: Uuid,
379 bootstrap_args: &BootstrapArgs,
380 ) -> Result<Catalog, anyhow::Error> {
381 let now = SYSTEM_TIME.clone();
382 let environment_id = None;
383 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
384 .with_organization_id(organization_id)
385 .with_default_deploy_generation()
386 .build()
387 .await?;
388 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
389 let system_parameter_defaults = BTreeMap::default();
390 Self::open_debug_catalog_inner(
391 persist_client,
392 storage,
393 now,
394 environment_id,
395 &DUMMY_BUILD_INFO,
396 system_parameter_defaults,
397 bootstrap_args,
398 None,
399 )
400 .await
401 }
402
403 pub async fn open_debug_read_only_catalog(
408 persist_client: PersistClient,
409 organization_id: Uuid,
410 bootstrap_args: &BootstrapArgs,
411 ) -> Result<Catalog, anyhow::Error> {
412 let now = SYSTEM_TIME.clone();
413 let environment_id = None;
414 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
415 .with_organization_id(organization_id)
416 .build()
417 .await?;
418 let storage = openable_storage
419 .open_read_only(&test_bootstrap_args())
420 .await?;
421 let system_parameter_defaults = BTreeMap::default();
422 Self::open_debug_catalog_inner(
423 persist_client,
424 storage,
425 now,
426 environment_id,
427 &DUMMY_BUILD_INFO,
428 system_parameter_defaults,
429 bootstrap_args,
430 None,
431 )
432 .await
433 }
434
435 pub async fn open_debug_read_only_persist_catalog_config(
440 persist_client: PersistClient,
441 now: NowFn,
442 environment_id: EnvironmentId,
443 system_parameter_defaults: BTreeMap<String, String>,
444 build_info: &'static BuildInfo,
445 bootstrap_args: &BootstrapArgs,
446 enable_expression_cache_override: Option<bool>,
447 ) -> Result<Catalog, anyhow::Error> {
448 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
449 .with_organization_id(environment_id.organization_id())
450 .with_version(
451 build_info
452 .version
453 .parse()
454 .expect("build version is parseable"),
455 )
456 .build()
457 .await?;
458 let storage = openable_storage.open_read_only(bootstrap_args).await?;
459 Self::open_debug_catalog_inner(
460 persist_client,
461 storage,
462 now,
463 Some(environment_id),
464 build_info,
465 system_parameter_defaults,
466 bootstrap_args,
467 enable_expression_cache_override,
468 )
469 .await
470 }
471
472 async fn open_debug_catalog_inner(
473 persist_client: PersistClient,
474 storage: Box<dyn DurableCatalogState>,
475 now: NowFn,
476 environment_id: Option<EnvironmentId>,
477 build_info: &'static BuildInfo,
478 system_parameter_defaults: BTreeMap<String, String>,
479 bootstrap_args: &BootstrapArgs,
480 enable_expression_cache_override: Option<bool>,
481 ) -> Result<Catalog, anyhow::Error> {
482 let metrics_registry = &MetricsRegistry::new();
483 let secrets_reader = Arc::new(InMemorySecretsController::new());
484 let previous_ts = now().into();
487 let replica_size = &bootstrap_args.default_cluster_replica_size;
488 let read_only = false;
489
490 let OpenCatalogResult {
491 catalog,
492 migrated_storage_collections_0dt: _,
493 new_builtin_collections: _,
494 builtin_table_updates: _,
495 cached_global_exprs: _,
496 uncached_local_exprs: _,
497 } = Catalog::open(Config {
498 storage,
499 metrics_registry,
500 state: StateConfig {
501 unsafe_mode: true,
502 all_features: false,
503 build_info,
504 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
505 read_only,
506 now,
507 boot_ts: previous_ts,
508 skip_migrations: true,
509 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
510 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
511 size: replica_size.clone(),
512 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
513 },
514 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
515 size: replica_size.clone(),
516 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
517 },
518 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
519 size: replica_size.clone(),
520 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
521 },
522 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
523 size: replica_size.clone(),
524 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
525 },
526 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
527 size: replica_size.clone(),
528 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
529 },
530 system_parameter_defaults,
531 remote_system_parameters: None,
532 availability_zones: vec![],
533 egress_addresses: vec![],
534 aws_principal_context: None,
535 aws_privatelink_availability_zones: None,
536 http_host_name: None,
537 connection_context: ConnectionContext::for_tests(secrets_reader),
538 builtin_item_migration_config: BuiltinItemMigrationConfig {
539 persist_client: persist_client.clone(),
540 read_only,
541 force_migration: None,
542 },
543 persist_client,
544 enable_expression_cache_override,
545 helm_chart_version: None,
546 external_login_password_mz_system: None,
547 license_key: ValidatedLicenseKey::for_tests(),
548 },
549 })
550 .await?;
551 Ok(catalog)
552 }
553
554 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
555 self.state.for_session(session)
556 }
557
558 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
559 self.state.for_sessionless_user(role_id)
560 }
561
562 pub fn for_system_session(&self) -> ConnCatalog<'_> {
563 self.state.for_system_session()
564 }
565
566 async fn storage<'a>(
567 &'a self,
568 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
569 self.storage.lock().await
570 }
571
572 pub async fn current_upper(&self) -> mz_repr::Timestamp {
573 self.storage().await.current_upper().await
574 }
575
576 pub async fn allocate_user_id(
577 &self,
578 commit_ts: mz_repr::Timestamp,
579 ) -> Result<(CatalogItemId, GlobalId), Error> {
580 self.storage()
581 .await
582 .allocate_user_id(commit_ts)
583 .await
584 .maybe_terminate("allocating user ids")
585 .err_into()
586 }
587
588 pub async fn allocate_user_ids(
590 &self,
591 amount: u64,
592 commit_ts: mz_repr::Timestamp,
593 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
594 self.storage()
595 .await
596 .allocate_user_ids(amount, commit_ts)
597 .await
598 .maybe_terminate("allocating user ids")
599 .err_into()
600 }
601
602 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
603 let commit_ts = self.storage().await.current_upper().await;
604 self.allocate_user_id(commit_ts).await
605 }
606
607 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
609 self.storage()
610 .await
611 .get_next_user_item_id()
612 .await
613 .err_into()
614 }
615
616 #[cfg(test)]
617 pub async fn allocate_system_id(
618 &self,
619 commit_ts: mz_repr::Timestamp,
620 ) -> Result<(CatalogItemId, GlobalId), Error> {
621 use mz_ore::collections::CollectionExt;
622
623 let mut storage = self.storage().await;
624 let mut txn = storage.transaction().await?;
625 let id = txn
626 .allocate_system_item_ids(1)
627 .maybe_terminate("allocating system ids")?
628 .into_element();
629 let _ = txn.get_and_commit_op_updates();
631 txn.commit(commit_ts).await?;
632 Ok(id)
633 }
634
635 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
637 self.storage()
638 .await
639 .get_next_system_item_id()
640 .await
641 .err_into()
642 }
643
644 pub async fn allocate_user_cluster_id(
645 &self,
646 commit_ts: mz_repr::Timestamp,
647 ) -> Result<ClusterId, Error> {
648 self.storage()
649 .await
650 .allocate_user_cluster_id(commit_ts)
651 .await
652 .maybe_terminate("allocating user cluster ids")
653 .err_into()
654 }
655
656 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
658 self.storage()
659 .await
660 .get_next_system_replica_id()
661 .await
662 .err_into()
663 }
664
665 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
667 self.storage()
668 .await
669 .get_next_user_replica_id()
670 .await
671 .err_into()
672 }
673
674 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
675 self.state.resolve_database(database_name)
676 }
677
678 pub fn resolve_schema(
679 &self,
680 current_database: Option<&DatabaseId>,
681 database_name: Option<&str>,
682 schema_name: &str,
683 conn_id: &ConnectionId,
684 ) -> Result<&Schema, SqlCatalogError> {
685 self.state
686 .resolve_schema(current_database, database_name, schema_name, conn_id)
687 }
688
689 pub fn resolve_schema_in_database(
690 &self,
691 database_spec: &ResolvedDatabaseSpecifier,
692 schema_name: &str,
693 conn_id: &ConnectionId,
694 ) -> Result<&Schema, SqlCatalogError> {
695 self.state
696 .resolve_schema_in_database(database_spec, schema_name, conn_id)
697 }
698
699 pub fn resolve_replica_in_cluster(
700 &self,
701 cluster_id: &ClusterId,
702 replica_name: &str,
703 ) -> Result<&ClusterReplica, SqlCatalogError> {
704 self.state
705 .resolve_replica_in_cluster(cluster_id, replica_name)
706 }
707
708 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
709 self.state.resolve_system_schema(name)
710 }
711
712 pub fn resolve_search_path(
713 &self,
714 session: &Session,
715 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
716 self.state.resolve_search_path(session)
717 }
718
719 pub fn resolve_entry(
721 &self,
722 current_database: Option<&DatabaseId>,
723 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
724 name: &PartialItemName,
725 conn_id: &ConnectionId,
726 ) -> Result<&CatalogEntry, SqlCatalogError> {
727 self.state
728 .resolve_entry(current_database, search_path, name, conn_id)
729 }
730
731 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
733 self.state.resolve_builtin_table(builtin)
734 }
735
736 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
738 self.state.resolve_builtin_log(builtin).0
739 }
740
741 pub fn resolve_builtin_storage_collection(
743 &self,
744 builtin: &'static BuiltinSource,
745 ) -> CatalogItemId {
746 self.state.resolve_builtin_source(builtin)
747 }
748
749 pub fn resolve_function(
751 &self,
752 current_database: Option<&DatabaseId>,
753 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
754 name: &PartialItemName,
755 conn_id: &ConnectionId,
756 ) -> Result<&CatalogEntry, SqlCatalogError> {
757 self.state
758 .resolve_function(current_database, search_path, name, conn_id)
759 }
760
761 pub fn resolve_type(
763 &self,
764 current_database: Option<&DatabaseId>,
765 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
766 name: &PartialItemName,
767 conn_id: &ConnectionId,
768 ) -> Result<&CatalogEntry, SqlCatalogError> {
769 self.state
770 .resolve_type(current_database, search_path, name, conn_id)
771 }
772
773 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
774 self.state.resolve_cluster(name)
775 }
776
777 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
783 self.state.resolve_builtin_cluster(cluster)
784 }
785
786 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
787 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
788 }
789
790 pub fn resolve_target_cluster(
792 &self,
793 target_cluster: TargetCluster,
794 session: &Session,
795 ) -> Result<&Cluster, AdapterError> {
796 match target_cluster {
797 TargetCluster::CatalogServer => {
798 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
799 }
800 TargetCluster::Active => self.active_cluster(session),
801 TargetCluster::Transaction(cluster_id) => self
802 .try_get_cluster(cluster_id)
803 .ok_or(AdapterError::ConcurrentClusterDrop),
804 }
805 }
806
807 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
808 if session.user().name != SYSTEM_USER.name
812 && session.user().name != SUPPORT_USER.name
813 && session.vars().cluster() == SYSTEM_USER.name
814 {
815 coord_bail!(
816 "system cluster '{}' cannot execute user queries",
817 SYSTEM_USER.name
818 );
819 }
820 let cluster = self.resolve_cluster(session.vars().cluster())?;
821 Ok(cluster)
822 }
823
824 pub fn state(&self) -> &CatalogState {
825 &self.state
826 }
827
828 pub fn resolve_full_name(
829 &self,
830 name: &QualifiedItemName,
831 conn_id: Option<&ConnectionId>,
832 ) -> FullItemName {
833 self.state.resolve_full_name(name, conn_id)
834 }
835
836 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
837 self.state.try_get_entry(id)
838 }
839
840 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
841 self.state.try_get_entry_by_global_id(id)
842 }
843
844 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
845 self.state.get_entry(id)
846 }
847
848 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
849 self.state.get_entry_by_global_id(id)
850 }
851
852 pub fn get_global_ids<'a>(
853 &'a self,
854 id: &CatalogItemId,
855 ) -> impl Iterator<Item = GlobalId> + use<'a> {
856 self.get_entry(id).global_ids()
857 }
858
859 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
860 self.get_entry_by_global_id(id).id()
861 }
862
863 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
864 let item = self.try_get_entry_by_global_id(id)?;
865 Some(item.id())
866 }
867
868 pub fn get_schema(
869 &self,
870 database_spec: &ResolvedDatabaseSpecifier,
871 schema_spec: &SchemaSpecifier,
872 conn_id: &ConnectionId,
873 ) -> &Schema {
874 self.state.get_schema(database_spec, schema_spec, conn_id)
875 }
876
877 pub fn try_get_schema(
878 &self,
879 database_spec: &ResolvedDatabaseSpecifier,
880 schema_spec: &SchemaSpecifier,
881 conn_id: &ConnectionId,
882 ) -> Option<&Schema> {
883 self.state
884 .try_get_schema(database_spec, schema_spec, conn_id)
885 }
886
887 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
888 self.state.get_mz_catalog_schema_id()
889 }
890
891 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
892 self.state.get_pg_catalog_schema_id()
893 }
894
895 pub fn get_information_schema_id(&self) -> SchemaId {
896 self.state.get_information_schema_id()
897 }
898
899 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
900 self.state.get_mz_internal_schema_id()
901 }
902
903 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
904 self.state.get_mz_introspection_schema_id()
905 }
906
907 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
908 self.state.get_mz_unsafe_schema_id()
909 }
910
911 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
912 self.state.system_schema_ids()
913 }
914
915 pub fn get_database(&self, id: &DatabaseId) -> &Database {
916 self.state.get_database(id)
917 }
918
919 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
920 self.state.try_get_role(id)
921 }
922
923 pub fn get_role(&self, id: &RoleId) -> &Role {
924 self.state.get_role(id)
925 }
926
927 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
928 self.state.try_get_role_by_name(role_name)
929 }
930
931 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
932 self.state.try_get_role_auth_by_id(id)
933 }
934
935 pub fn create_temporary_schema(
938 &mut self,
939 conn_id: &ConnectionId,
940 owner_id: RoleId,
941 ) -> Result<(), Error> {
942 self.state.create_temporary_schema(conn_id, owner_id)
943 }
944
945 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
946 self.state
948 .temporary_schemas
949 .get(conn_id)
950 .map(|schema| schema.items.contains_key(item_name))
951 .unwrap_or(false)
952 }
953
954 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
957 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
958 return Ok(());
959 };
960 if !schema.items.is_empty() {
961 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
962 }
963 Ok(())
964 }
965
966 pub(crate) fn object_dependents(
967 &self,
968 object_ids: &Vec<ObjectId>,
969 conn_id: &ConnectionId,
970 ) -> Vec<ObjectId> {
971 let mut seen = BTreeSet::new();
972 self.state.object_dependents(object_ids, conn_id, &mut seen)
973 }
974
975 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
976 FullNameV1 {
977 database: name.database.to_string(),
978 schema: name.schema.clone(),
979 item: name.item.clone(),
980 }
981 }
982
983 pub fn find_available_cluster_name(&self, name: &str) -> String {
984 let mut i = 0;
985 let mut candidate = name.to_string();
986 while self.state.clusters_by_name.contains_key(&candidate) {
987 i += 1;
988 candidate = format!("{}{}", name, i);
989 }
990 candidate
991 }
992
993 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
994 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
995 self.cluster_replica_sizes()
996 .enabled_allocations()
997 .map(|a| a.0.to_owned())
998 .collect::<Vec<_>>()
999 } else {
1000 self.system_config().allowed_cluster_replica_sizes()
1001 }
1002 }
1003
1004 pub fn concretize_replica_location(
1005 &self,
1006 location: mz_catalog::durable::ReplicaLocation,
1007 allowed_sizes: &Vec<String>,
1008 allowed_availability_zones: Option<&[String]>,
1009 ) -> Result<ReplicaLocation, Error> {
1010 self.state
1011 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1012 }
1013
1014 pub(crate) fn ensure_valid_replica_size(
1015 &self,
1016 allowed_sizes: &[String],
1017 size: &String,
1018 ) -> Result<(), Error> {
1019 self.state.ensure_valid_replica_size(allowed_sizes, size)
1020 }
1021
1022 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1023 &self.state.cluster_replica_sizes
1024 }
1025
1026 pub fn get_privileges(
1028 &self,
1029 id: &SystemObjectId,
1030 conn_id: &ConnectionId,
1031 ) -> Option<&PrivilegeMap> {
1032 match id {
1033 SystemObjectId::Object(id) => match id {
1034 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1035 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1036 ObjectId::Schema((database_spec, schema_spec)) => Some(
1037 self.get_schema(database_spec, schema_spec, conn_id)
1038 .privileges(),
1039 ),
1040 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1041 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1042 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1043 },
1044 SystemObjectId::System => Some(&self.state.system_privileges),
1045 }
1046 }
1047
1048 #[mz_ore::instrument(level = "debug")]
1049 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1050 Ok(self.storage().await.advance_upper(new_upper).await?)
1051 }
1052
1053 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1055 self.state.introspection_dependencies(id)
1056 }
1057
1058 pub fn dump(&self) -> Result<CatalogDump, Error> {
1064 Ok(CatalogDump::new(self.state.dump(None)?))
1065 }
1066
1067 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1071 self.state.check_consistency().map_err(|inconsistencies| {
1072 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1073 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1074 })
1075 })
1076 }
1077
1078 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1079 self.state.config()
1080 }
1081
1082 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1083 self.state.entry_by_id.values()
1084 }
1085
1086 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1087 self.entries()
1088 .filter(|entry| entry.is_connection() && entry.id().is_user())
1089 }
1090
1091 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1092 self.entries()
1093 .filter(|entry| entry.is_table() && entry.id().is_user())
1094 }
1095
1096 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1097 self.entries()
1098 .filter(|entry| entry.is_source() && entry.id().is_user())
1099 }
1100
1101 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1102 self.entries()
1103 .filter(|entry| entry.is_sink() && entry.id().is_user())
1104 }
1105
1106 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1107 self.entries()
1108 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1109 }
1110
1111 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1112 self.entries()
1113 .filter(|entry| entry.is_secret() && entry.id().is_user())
1114 }
1115
1116 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1117 self.state.get_network_policy(&network_policy_id)
1118 }
1119
1120 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1121 self.state.try_get_network_policy_by_name(name)
1122 }
1123
1124 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1125 self.state.clusters_by_id.values()
1126 }
1127
1128 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1129 self.state.get_cluster(cluster_id)
1130 }
1131
1132 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1133 self.state.try_get_cluster(cluster_id)
1134 }
1135
1136 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1137 self.clusters().filter(|cluster| cluster.id.is_user())
1138 }
1139
1140 pub fn get_cluster_replica(
1141 &self,
1142 cluster_id: ClusterId,
1143 replica_id: ReplicaId,
1144 ) -> &ClusterReplica {
1145 self.state.get_cluster_replica(cluster_id, replica_id)
1146 }
1147
1148 pub fn try_get_cluster_replica(
1149 &self,
1150 cluster_id: ClusterId,
1151 replica_id: ReplicaId,
1152 ) -> Option<&ClusterReplica> {
1153 self.state.try_get_cluster_replica(cluster_id, replica_id)
1154 }
1155
1156 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1157 self.user_clusters()
1158 .flat_map(|cluster| cluster.user_replicas())
1159 }
1160
1161 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1162 self.state.database_by_id.values()
1163 }
1164
1165 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1166 self.state
1167 .roles_by_id
1168 .values()
1169 .filter(|role| role.is_user())
1170 }
1171
1172 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1173 self.state
1174 .network_policies_by_id
1175 .iter()
1176 .filter(|(id, _)| id.is_user())
1177 .map(|(_, policy)| policy)
1178 }
1179
1180 pub fn system_privileges(&self) -> &PrivilegeMap {
1181 &self.state.system_privileges
1182 }
1183
1184 pub fn default_privileges(
1185 &self,
1186 ) -> impl Iterator<
1187 Item = (
1188 &DefaultPrivilegeObject,
1189 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1190 ),
1191 > {
1192 self.state.default_privileges.iter()
1193 }
1194
1195 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1196 self.state
1197 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1198 }
1199
1200 pub fn pack_storage_usage_update(
1201 &self,
1202 event: VersionedStorageUsage,
1203 diff: Diff,
1204 ) -> BuiltinTableUpdate {
1205 self.state
1206 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1207 }
1208
1209 pub fn system_config(&self) -> &SystemVars {
1210 self.state.system_config()
1211 }
1212
1213 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1214 self.state.system_config_mut()
1215 }
1216
1217 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1218 self.state.ensure_not_reserved_role(role_id)
1219 }
1220
1221 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1222 self.state.ensure_grantable_role(role_id)
1223 }
1224
1225 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1226 self.state.ensure_not_system_role(role_id)
1227 }
1228
1229 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1230 self.state.ensure_not_predefined_role(role_id)
1231 }
1232
1233 pub fn ensure_not_reserved_network_policy(
1234 &self,
1235 network_policy_id: &NetworkPolicyId,
1236 ) -> Result<(), Error> {
1237 self.state
1238 .ensure_not_reserved_network_policy(network_policy_id)
1239 }
1240
1241 pub fn ensure_not_reserved_object(
1242 &self,
1243 object_id: &ObjectId,
1244 conn_id: &ConnectionId,
1245 ) -> Result<(), Error> {
1246 match object_id {
1247 ObjectId::Cluster(cluster_id) => {
1248 if cluster_id.is_system() {
1249 let cluster = self.get_cluster(*cluster_id);
1250 Err(Error::new(ErrorKind::ReadOnlyCluster(
1251 cluster.name().to_string(),
1252 )))
1253 } else {
1254 Ok(())
1255 }
1256 }
1257 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1258 if replica_id.is_system() {
1259 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1260 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1261 replica.name().to_string(),
1262 )))
1263 } else {
1264 Ok(())
1265 }
1266 }
1267 ObjectId::Database(database_id) => {
1268 if database_id.is_system() {
1269 let database = self.get_database(database_id);
1270 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1271 database.name().to_string(),
1272 )))
1273 } else {
1274 Ok(())
1275 }
1276 }
1277 ObjectId::Schema((database_spec, schema_spec)) => {
1278 if schema_spec.is_system() {
1279 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1280 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1281 schema.name().schema.clone(),
1282 )))
1283 } else {
1284 Ok(())
1285 }
1286 }
1287 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1288 ObjectId::Item(item_id) => {
1289 if item_id.is_system() {
1290 let item = self.get_entry(item_id);
1291 let name = self.resolve_full_name(item.name(), Some(conn_id));
1292 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1293 } else {
1294 Ok(())
1295 }
1296 }
1297 ObjectId::NetworkPolicy(network_policy_id) => {
1298 self.ensure_not_reserved_network_policy(network_policy_id)
1299 }
1300 }
1301 }
1302
1303 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1305 &mut self,
1306 create_sql: &str,
1307 force_if_exists_skip: bool,
1308 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1309 self.state
1310 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1311 }
1312
1313 pub(crate) fn cache_expressions(
1324 &self,
1325 id: GlobalId,
1326 local_mir: Option<OptimizedMirRelationExpr>,
1327 mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1328 mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1329 dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1330 optimizer_features: OptimizerFeatures,
1331 ) -> BoxFuture<'static, ()> {
1332 global_mir.as_of = None;
1336 global_mir.until = Default::default();
1337 physical_plan.as_of = None;
1338 physical_plan.until = Default::default();
1339
1340 let mut local_exprs = Vec::new();
1341 if let Some(local_mir) = local_mir {
1342 local_exprs.push((
1343 id,
1344 LocalExpressions {
1345 local_mir,
1346 optimizer_features: optimizer_features.clone(),
1347 },
1348 ));
1349 }
1350 let global_exprs = vec![(
1351 id,
1352 GlobalExpressions {
1353 global_mir,
1354 physical_plan,
1355 dataflow_metainfos,
1356 optimizer_features,
1357 },
1358 )];
1359 self.update_expression_cache(local_exprs, global_exprs, Default::default())
1360 }
1361
1362 pub(crate) fn update_expression_cache<'a, 'b>(
1363 &'a self,
1364 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1365 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1366 invalidate_ids: BTreeSet<GlobalId>,
1367 ) -> BoxFuture<'b, ()> {
1368 if let Some(expr_cache) = &self.expr_cache_handle {
1369 expr_cache
1370 .update(
1371 new_local_expressions,
1372 new_global_expressions,
1373 invalidate_ids,
1374 )
1375 .boxed()
1376 } else {
1377 async {}.boxed()
1378 }
1379 }
1380
1381 #[cfg(test)]
1385 async fn sync_to_current_updates(
1386 &mut self,
1387 ) -> Result<
1388 (
1389 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1390 Vec<ParsedStateUpdate>,
1391 ),
1392 CatalogError,
1393 > {
1394 let updates = self.storage().await.sync_to_current_updates().await?;
1395 let (builtin_table_updates, catalog_updates) = self
1396 .state
1397 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1398 .await;
1399 Ok((builtin_table_updates, catalog_updates))
1400 }
1401}
1402
1403pub fn is_reserved_name(name: &str) -> bool {
1404 BUILTIN_PREFIXES
1405 .iter()
1406 .any(|prefix| name.starts_with(prefix))
1407}
1408
1409pub fn is_reserved_role_name(name: &str) -> bool {
1410 is_reserved_name(name) || is_public_role(name)
1411}
1412
1413pub fn is_public_role(name: &str) -> bool {
1414 name == &*PUBLIC_ROLE_NAME
1415}
1416
1417pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1418 object_type_to_audit_object_type(sql_type.into())
1419}
1420
1421pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1422 match id {
1423 CommentObjectId::Table(_) => ObjectType::Table,
1424 CommentObjectId::View(_) => ObjectType::View,
1425 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1426 CommentObjectId::Source(_) => ObjectType::Source,
1427 CommentObjectId::Sink(_) => ObjectType::Sink,
1428 CommentObjectId::Index(_) => ObjectType::Index,
1429 CommentObjectId::Func(_) => ObjectType::Func,
1430 CommentObjectId::Connection(_) => ObjectType::Connection,
1431 CommentObjectId::Type(_) => ObjectType::Type,
1432 CommentObjectId::Secret(_) => ObjectType::Secret,
1433 CommentObjectId::Role(_) => ObjectType::Role,
1434 CommentObjectId::Database(_) => ObjectType::Database,
1435 CommentObjectId::Schema(_) => ObjectType::Schema,
1436 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1437 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1438 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1439 }
1440}
1441
1442pub(crate) fn object_type_to_audit_object_type(
1443 object_type: mz_sql::catalog::ObjectType,
1444) -> ObjectType {
1445 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1446}
1447
1448pub(crate) fn system_object_type_to_audit_object_type(
1449 system_type: &SystemObjectType,
1450) -> ObjectType {
1451 match system_type {
1452 SystemObjectType::Object(object_type) => match object_type {
1453 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1454 mz_sql::catalog::ObjectType::View => ObjectType::View,
1455 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1456 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1457 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1458 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1459 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1460 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1461 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1462 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1463 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1464 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1465 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1466 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1467 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1468 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1469 },
1470 SystemObjectType::System => ObjectType::System,
1471 }
1472}
1473
1474#[derive(Debug, Copy, Clone)]
1475pub enum UpdatePrivilegeVariant {
1476 Grant,
1477 Revoke,
1478}
1479
1480impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1481 fn from(variant: UpdatePrivilegeVariant) -> Self {
1482 match variant {
1483 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1484 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1485 }
1486 }
1487}
1488
1489impl From<UpdatePrivilegeVariant> for EventType {
1490 fn from(variant: UpdatePrivilegeVariant) -> Self {
1491 match variant {
1492 UpdatePrivilegeVariant::Grant => EventType::Grant,
1493 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1494 }
1495 }
1496}
1497
1498impl ConnCatalog<'_> {
1499 fn resolve_item_name(
1500 &self,
1501 name: &PartialItemName,
1502 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1503 self.resolve_item(name).map(|entry| entry.name())
1504 }
1505
1506 fn resolve_function_name(
1507 &self,
1508 name: &PartialItemName,
1509 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1510 self.resolve_function(name).map(|entry| entry.name())
1511 }
1512
1513 fn resolve_type_name(
1514 &self,
1515 name: &PartialItemName,
1516 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1517 self.resolve_type(name).map(|entry| entry.name())
1518 }
1519}
1520
1521impl ExprHumanizer for ConnCatalog<'_> {
1522 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1523 let entry = self.state.try_get_entry_by_global_id(&id)?;
1524 Some(self.resolve_full_name(entry.name()).to_string())
1525 }
1526
1527 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1528 let entry = self.state.try_get_entry_by_global_id(&id)?;
1529 Some(entry.name().item.clone())
1530 }
1531
1532 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1533 let entry = self.state.try_get_entry_by_global_id(&id)?;
1534 Some(self.resolve_full_name(entry.name()).into_parts())
1535 }
1536
1537 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1538 use SqlScalarType::*;
1539
1540 match typ {
1541 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1542 List {
1543 custom_id: Some(item_id),
1544 ..
1545 }
1546 | Map {
1547 custom_id: Some(item_id),
1548 ..
1549 } => {
1550 let item = self.get_item(item_id);
1551 self.minimal_qualification(item.name()).to_string()
1552 }
1553 List { element_type, .. } => {
1554 format!(
1555 "{} list",
1556 self.humanize_sql_scalar_type(element_type, postgres_compat)
1557 )
1558 }
1559 Map { value_type, .. } => format!(
1560 "map[{}=>{}]",
1561 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1562 self.humanize_sql_scalar_type(value_type, postgres_compat)
1563 ),
1564 Record {
1565 custom_id: Some(item_id),
1566 ..
1567 } => {
1568 let item = self.get_item(item_id);
1569 self.minimal_qualification(item.name()).to_string()
1570 }
1571 Record { fields, .. } => format!(
1572 "record({})",
1573 fields
1574 .iter()
1575 .map(|f| format!(
1576 "{}: {}",
1577 f.0,
1578 self.humanize_sql_column_type(&f.1, postgres_compat)
1579 ))
1580 .join(",")
1581 ),
1582 PgLegacyChar => "\"char\"".into(),
1583 Char { length } if !postgres_compat => match length {
1584 None => "char".into(),
1585 Some(length) => format!("char({})", length.into_u32()),
1586 },
1587 VarChar { max_length } if !postgres_compat => match max_length {
1588 None => "varchar".into(),
1589 Some(length) => format!("varchar({})", length.into_u32()),
1590 },
1591 UInt16 => "uint2".into(),
1592 UInt32 => "uint4".into(),
1593 UInt64 => "uint8".into(),
1594 ty => {
1595 let pgrepr_type = mz_pgrepr::Type::from(ty);
1596 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1597
1598 let res = if self
1599 .effective_search_path(true)
1600 .iter()
1601 .any(|(_, schema)| schema == &pg_catalog_schema)
1602 {
1603 pgrepr_type.name().to_string()
1604 } else {
1605 let name = QualifiedItemName {
1608 qualifiers: ItemQualifiers {
1609 database_spec: ResolvedDatabaseSpecifier::Ambient,
1610 schema_spec: pg_catalog_schema,
1611 },
1612 item: pgrepr_type.name().to_string(),
1613 };
1614 self.resolve_full_name(&name).to_string()
1615 };
1616 res
1617 }
1618 }
1619 }
1620
1621 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1622 let entry = self.state.try_get_entry_by_global_id(&id)?;
1623
1624 match entry.index() {
1625 Some(index) => {
1626 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1627 let mut on_names = on_desc
1628 .iter_names()
1629 .map(|col_name| col_name.to_string())
1630 .collect::<Vec<_>>();
1631
1632 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1633
1634 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1638 let mut ix_names = vec![String::new(); ix_arity];
1639
1640 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1642 let on_name = on_names.get_mut(on_pos).expect("on_name");
1643 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1644 std::mem::swap(on_name, ix_name);
1645 }
1646
1647 Some(ix_names) }
1649 None => {
1650 let desc = self.state.try_get_desc_by_global_id(&id)?;
1651 let column_names = desc
1652 .iter_names()
1653 .map(|col_name| col_name.to_string())
1654 .collect();
1655
1656 Some(column_names)
1657 }
1658 }
1659 }
1660
1661 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1662 let desc = self.state.try_get_desc_by_global_id(&id)?;
1663 Some(desc.get_name(column).to_string())
1664 }
1665
1666 fn id_exists(&self, id: GlobalId) -> bool {
1667 self.state.entry_by_global_id.contains_key(&id)
1668 }
1669}
1670
1671impl SessionCatalog for ConnCatalog<'_> {
1672 fn active_role_id(&self) -> &RoleId {
1673 &self.role_id
1674 }
1675
1676 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1677 self.prepared_statements
1678 .as_ref()
1679 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1680 .flatten()
1681 }
1682
1683 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1684 self.portals
1685 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1686 }
1687
1688 fn active_database(&self) -> Option<&DatabaseId> {
1689 self.database.as_ref()
1690 }
1691
1692 fn active_cluster(&self) -> &str {
1693 &self.cluster
1694 }
1695
1696 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1697 &self.search_path
1698 }
1699
1700 fn resolve_database(
1701 &self,
1702 database_name: &str,
1703 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1704 Ok(self.state.resolve_database(database_name)?)
1705 }
1706
1707 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1708 self.state
1709 .database_by_id
1710 .get(id)
1711 .expect("database doesn't exist")
1712 }
1713
1714 #[allow(clippy::as_conversions)]
1716 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1717 self.state
1718 .database_by_id
1719 .values()
1720 .map(|database| database as &dyn CatalogDatabase)
1721 .collect()
1722 }
1723
1724 fn resolve_schema(
1725 &self,
1726 database_name: Option<&str>,
1727 schema_name: &str,
1728 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1729 Ok(self.state.resolve_schema(
1730 self.database.as_ref(),
1731 database_name,
1732 schema_name,
1733 &self.conn_id,
1734 )?)
1735 }
1736
1737 fn resolve_schema_in_database(
1738 &self,
1739 database_spec: &ResolvedDatabaseSpecifier,
1740 schema_name: &str,
1741 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1742 Ok(self
1743 .state
1744 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1745 }
1746
1747 fn get_schema(
1748 &self,
1749 database_spec: &ResolvedDatabaseSpecifier,
1750 schema_spec: &SchemaSpecifier,
1751 ) -> &dyn CatalogSchema {
1752 self.state
1753 .get_schema(database_spec, schema_spec, &self.conn_id)
1754 }
1755
1756 #[allow(clippy::as_conversions)]
1758 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1759 self.get_databases()
1760 .into_iter()
1761 .flat_map(|database| database.schemas().into_iter())
1762 .chain(
1763 self.state
1764 .ambient_schemas_by_id
1765 .values()
1766 .chain(self.state.temporary_schemas.values())
1767 .map(|schema| schema as &dyn CatalogSchema),
1768 )
1769 .collect()
1770 }
1771
1772 fn get_mz_internal_schema_id(&self) -> SchemaId {
1773 self.state().get_mz_internal_schema_id()
1774 }
1775
1776 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1777 self.state().get_mz_unsafe_schema_id()
1778 }
1779
1780 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1781 self.state.is_system_schema_specifier(schema)
1782 }
1783
1784 fn resolve_role(
1785 &self,
1786 role_name: &str,
1787 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1788 match self.state.try_get_role_by_name(role_name) {
1789 Some(role) => Ok(role),
1790 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1791 }
1792 }
1793
1794 fn resolve_network_policy(
1795 &self,
1796 policy_name: &str,
1797 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1798 match self.state.try_get_network_policy_by_name(policy_name) {
1799 Some(policy) => Ok(policy),
1800 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1801 }
1802 }
1803
1804 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1805 Some(self.state.roles_by_id.get(id)?)
1806 }
1807
1808 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1809 self.state.get_role(id)
1810 }
1811
1812 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1813 #[allow(clippy::as_conversions)]
1815 self.state
1816 .roles_by_id
1817 .values()
1818 .map(|role| role as &dyn CatalogRole)
1819 .collect()
1820 }
1821
1822 fn mz_system_role_id(&self) -> RoleId {
1823 MZ_SYSTEM_ROLE_ID
1824 }
1825
1826 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1827 self.state.collect_role_membership(id)
1828 }
1829
1830 fn get_network_policy(
1831 &self,
1832 id: &NetworkPolicyId,
1833 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1834 self.state.get_network_policy(id)
1835 }
1836
1837 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1838 #[allow(clippy::as_conversions)]
1840 self.state
1841 .network_policies_by_id
1842 .values()
1843 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1844 .collect()
1845 }
1846
1847 fn resolve_cluster(
1848 &self,
1849 cluster_name: Option<&str>,
1850 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1851 Ok(self
1852 .state
1853 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1854 }
1855
1856 fn resolve_cluster_replica(
1857 &self,
1858 cluster_replica_name: &QualifiedReplica,
1859 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1860 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1861 }
1862
1863 fn resolve_item(
1864 &self,
1865 name: &PartialItemName,
1866 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1867 let r = self.state.resolve_entry(
1868 self.database.as_ref(),
1869 &self.effective_search_path(true),
1870 name,
1871 &self.conn_id,
1872 )?;
1873 if self.unresolvable_ids.contains(&r.id()) {
1874 Err(SqlCatalogError::UnknownItem(name.to_string()))
1875 } else {
1876 Ok(r)
1877 }
1878 }
1879
1880 fn resolve_function(
1881 &self,
1882 name: &PartialItemName,
1883 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1884 let r = self.state.resolve_function(
1885 self.database.as_ref(),
1886 &self.effective_search_path(false),
1887 name,
1888 &self.conn_id,
1889 )?;
1890
1891 if self.unresolvable_ids.contains(&r.id()) {
1892 Err(SqlCatalogError::UnknownFunction {
1893 name: name.to_string(),
1894 alternative: None,
1895 })
1896 } else {
1897 Ok(r)
1898 }
1899 }
1900
1901 fn resolve_type(
1902 &self,
1903 name: &PartialItemName,
1904 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1905 let r = self.state.resolve_type(
1906 self.database.as_ref(),
1907 &self.effective_search_path(false),
1908 name,
1909 &self.conn_id,
1910 )?;
1911
1912 if self.unresolvable_ids.contains(&r.id()) {
1913 Err(SqlCatalogError::UnknownType {
1914 name: name.to_string(),
1915 })
1916 } else {
1917 Ok(r)
1918 }
1919 }
1920
1921 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1922 self.state.get_system_type(name)
1923 }
1924
1925 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1926 Some(self.state.try_get_entry(id)?)
1927 }
1928
1929 fn try_get_item_by_global_id(
1930 &self,
1931 id: &GlobalId,
1932 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1933 let entry = self.state.try_get_entry_by_global_id(id)?;
1934 let entry = match &entry.item {
1935 CatalogItem::Table(table) => {
1936 let (version, _gid) = table
1937 .collections
1938 .iter()
1939 .find(|(_version, gid)| *gid == id)
1940 .expect("catalog out of sync, mismatched GlobalId");
1941 entry.at_version(RelationVersionSelector::Specific(*version))
1942 }
1943 _ => entry.at_version(RelationVersionSelector::Latest),
1944 };
1945 Some(entry)
1946 }
1947
1948 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1949 self.state.get_entry(id)
1950 }
1951
1952 fn get_item_by_global_id(
1953 &self,
1954 id: &GlobalId,
1955 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1956 let entry = self.state.get_entry_by_global_id(id);
1957 let entry = match &entry.item {
1958 CatalogItem::Table(table) => {
1959 let (version, _gid) = table
1960 .collections
1961 .iter()
1962 .find(|(_version, gid)| *gid == id)
1963 .expect("catalog out of sync, mismatched GlobalId");
1964 entry.at_version(RelationVersionSelector::Specific(*version))
1965 }
1966 _ => entry.at_version(RelationVersionSelector::Latest),
1967 };
1968 entry
1969 }
1970
1971 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
1972 self.get_schemas()
1973 .into_iter()
1974 .flat_map(|schema| schema.item_ids())
1975 .map(|id| self.get_item(&id))
1976 .collect()
1977 }
1978
1979 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1980 self.state
1981 .get_item_by_name(name, &self.conn_id)
1982 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1983 }
1984
1985 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1986 self.state
1987 .get_type_by_name(name, &self.conn_id)
1988 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1989 }
1990
1991 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
1992 &self.state.clusters_by_id[&id]
1993 }
1994
1995 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
1996 self.state
1997 .clusters_by_id
1998 .values()
1999 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2000 .collect()
2001 }
2002
2003 fn get_cluster_replica(
2004 &self,
2005 cluster_id: ClusterId,
2006 replica_id: ReplicaId,
2007 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2008 let cluster = self.get_cluster(cluster_id);
2009 cluster.replica(replica_id)
2010 }
2011
2012 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2013 self.get_clusters()
2014 .into_iter()
2015 .flat_map(|cluster| cluster.replicas().into_iter())
2016 .collect()
2017 }
2018
2019 fn get_system_privileges(&self) -> &PrivilegeMap {
2020 &self.state.system_privileges
2021 }
2022
2023 fn get_default_privileges(
2024 &self,
2025 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2026 self.state
2027 .default_privileges
2028 .iter()
2029 .map(|(object, acl_items)| (object, acl_items.collect()))
2030 .collect()
2031 }
2032
2033 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2034 self.state.find_available_name(name, &self.conn_id)
2035 }
2036
2037 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2038 self.state.resolve_full_name(name, Some(&self.conn_id))
2039 }
2040
2041 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2042 self.state.resolve_full_schema_name(name)
2043 }
2044
2045 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2046 self.state.get_entry_by_global_id(global_id).id()
2047 }
2048
2049 fn resolve_global_id(
2050 &self,
2051 item_id: &CatalogItemId,
2052 version: RelationVersionSelector,
2053 ) -> GlobalId {
2054 self.state
2055 .get_entry(item_id)
2056 .at_version(version)
2057 .global_id()
2058 }
2059
2060 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2061 self.state.config()
2062 }
2063
2064 fn now(&self) -> EpochMillis {
2065 (self.state.config().now)()
2066 }
2067
2068 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2069 self.state.aws_privatelink_availability_zones.clone()
2070 }
2071
2072 fn system_vars(&self) -> &SystemVars {
2073 &self.state.system_configuration
2074 }
2075
2076 fn system_vars_mut(&mut self) -> &mut SystemVars {
2077 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2078 }
2079
2080 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2081 self.state().get_owner_id(id, self.conn_id())
2082 }
2083
2084 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2085 match id {
2086 SystemObjectId::System => Some(&self.state.system_privileges),
2087 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2088 Some(self.get_cluster(*id).privileges())
2089 }
2090 SystemObjectId::Object(ObjectId::Database(id)) => {
2091 Some(self.get_database(id).privileges())
2092 }
2093 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2094 self.state
2097 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2098 .map(|schema| schema.privileges())
2099 }
2100 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2101 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2102 Some(self.get_network_policy(id).privileges())
2103 }
2104 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2105 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2106 }
2107 }
2108
2109 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2110 let mut seen = BTreeSet::new();
2111 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2112 }
2113
2114 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2115 let mut seen = BTreeSet::new();
2116 self.state.item_dependents(id, &mut seen)
2117 }
2118
2119 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2120 rbac::all_object_privileges(object_type)
2121 }
2122
2123 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2124 self.state.get_object_type(object_id)
2125 }
2126
2127 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2128 self.state.get_system_object_type(id)
2129 }
2130
2131 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2138 if qualified_name.qualifiers.schema_spec.is_temporary() {
2139 return qualified_name.item.clone().into();
2147 }
2148
2149 let database_id = match &qualified_name.qualifiers.database_spec {
2150 ResolvedDatabaseSpecifier::Ambient => None,
2151 ResolvedDatabaseSpecifier::Id(id)
2152 if self.database.is_some() && self.database == Some(*id) =>
2153 {
2154 None
2155 }
2156 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2157 };
2158
2159 let schema_spec = if database_id.is_none()
2160 && self.resolve_item_name(&PartialItemName {
2161 database: None,
2162 schema: None,
2163 item: qualified_name.item.clone(),
2164 }) == Ok(qualified_name)
2165 || self.resolve_function_name(&PartialItemName {
2166 database: None,
2167 schema: None,
2168 item: qualified_name.item.clone(),
2169 }) == Ok(qualified_name)
2170 || self.resolve_type_name(&PartialItemName {
2171 database: None,
2172 schema: None,
2173 item: qualified_name.item.clone(),
2174 }) == Ok(qualified_name)
2175 {
2176 None
2177 } else {
2178 Some(qualified_name.qualifiers.schema_spec.clone())
2181 };
2182
2183 let res = PartialItemName {
2184 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2185 schema: schema_spec.map(|spec| {
2186 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2187 .name()
2188 .schema
2189 .clone()
2190 }),
2191 item: qualified_name.item.clone(),
2192 };
2193 assert!(
2194 self.resolve_item_name(&res) == Ok(qualified_name)
2195 || self.resolve_function_name(&res) == Ok(qualified_name)
2196 || self.resolve_type_name(&res) == Ok(qualified_name)
2197 );
2198 res
2199 }
2200
2201 fn add_notice(&self, notice: PlanNotice) {
2202 let _ = self.notices_tx.send(notice.into());
2203 }
2204
2205 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2206 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2207 self.state.comments.get_object_comments(comment_id)
2208 }
2209
2210 fn is_cluster_size_cc(&self, size: &str) -> bool {
2211 self.state
2212 .cluster_replica_sizes
2213 .0
2214 .get(size)
2215 .map_or(false, |a| a.is_cc)
2216 }
2217}
2218
2219#[cfg(test)]
2220mod tests {
2221 use std::collections::{BTreeMap, BTreeSet};
2222 use std::sync::Arc;
2223 use std::{env, iter};
2224
2225 use itertools::Itertools;
2226 use mz_catalog::memory::objects::CatalogItem;
2227 use tokio_postgres::NoTls;
2228 use tokio_postgres::types::Type;
2229 use uuid::Uuid;
2230
2231 use mz_catalog::SYSTEM_CONN_ID;
2232 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2233 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2234 use mz_controller_types::{ClusterId, ReplicaId};
2235 use mz_expr::MirScalarExpr;
2236 use mz_ore::now::to_datetime;
2237 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2238 use mz_persist_client::PersistClient;
2239 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2240 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2241 use mz_repr::role_id::RoleId;
2242 use mz_repr::{
2243 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2244 SqlScalarType, Timestamp,
2245 };
2246 use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2247 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2248 use mz_sql::names::{
2249 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2250 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2251 };
2252 use mz_sql::plan::{
2253 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2254 QueryLifetime, Scope, StatementContext,
2255 };
2256 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2257 use mz_sql::session::vars::{SystemVars, VarInput};
2258
2259 use crate::catalog::state::LocalExpressionCache;
2260 use crate::catalog::{Catalog, Op};
2261 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2262 use crate::session::Session;
2263
2264 #[mz_ore::test(tokio::test)]
2271 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2273 Catalog::with_debug(|catalog| async move {
2274 struct TestCase {
2275 input: QualifiedItemName,
2276 system_output: PartialItemName,
2277 normal_output: PartialItemName,
2278 }
2279
2280 let test_cases = vec![
2281 TestCase {
2282 input: QualifiedItemName {
2283 qualifiers: ItemQualifiers {
2284 database_spec: ResolvedDatabaseSpecifier::Ambient,
2285 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2286 },
2287 item: "numeric".to_string(),
2288 },
2289 system_output: PartialItemName {
2290 database: None,
2291 schema: None,
2292 item: "numeric".to_string(),
2293 },
2294 normal_output: PartialItemName {
2295 database: None,
2296 schema: None,
2297 item: "numeric".to_string(),
2298 },
2299 },
2300 TestCase {
2301 input: QualifiedItemName {
2302 qualifiers: ItemQualifiers {
2303 database_spec: ResolvedDatabaseSpecifier::Ambient,
2304 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2305 },
2306 item: "mz_array_types".to_string(),
2307 },
2308 system_output: PartialItemName {
2309 database: None,
2310 schema: None,
2311 item: "mz_array_types".to_string(),
2312 },
2313 normal_output: PartialItemName {
2314 database: None,
2315 schema: None,
2316 item: "mz_array_types".to_string(),
2317 },
2318 },
2319 ];
2320
2321 for tc in test_cases {
2322 assert_eq!(
2323 catalog
2324 .for_system_session()
2325 .minimal_qualification(&tc.input),
2326 tc.system_output
2327 );
2328 assert_eq!(
2329 catalog
2330 .for_session(&Session::dummy())
2331 .minimal_qualification(&tc.input),
2332 tc.normal_output
2333 );
2334 }
2335 catalog.expire().await;
2336 })
2337 .await
2338 }
2339
2340 #[mz_ore::test(tokio::test)]
2341 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2343 let persist_client = PersistClient::new_for_tests().await;
2344 let organization_id = Uuid::new_v4();
2345 let bootstrap_args = test_bootstrap_args();
2346 {
2347 let mut catalog = Catalog::open_debug_catalog(
2348 persist_client.clone(),
2349 organization_id.clone(),
2350 &bootstrap_args,
2351 )
2352 .await
2353 .expect("unable to open debug catalog");
2354 assert_eq!(catalog.transient_revision(), 1);
2355 let commit_ts = catalog.current_upper().await;
2356 catalog
2357 .transact(
2358 None,
2359 commit_ts,
2360 None,
2361 vec![Op::CreateDatabase {
2362 name: "test".to_string(),
2363 owner_id: MZ_SYSTEM_ROLE_ID,
2364 }],
2365 )
2366 .await
2367 .expect("failed to transact");
2368 assert_eq!(catalog.transient_revision(), 2);
2369 catalog.expire().await;
2370 }
2371 {
2372 let catalog =
2373 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2374 .await
2375 .expect("unable to open debug catalog");
2376 assert_eq!(catalog.transient_revision(), 1);
2378 catalog.expire().await;
2379 }
2380 }
2381
2382 #[mz_ore::test(tokio::test)]
2383 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2385 Catalog::with_debug(|catalog| async move {
2386 let mz_catalog_schema = (
2387 ResolvedDatabaseSpecifier::Ambient,
2388 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2389 );
2390 let pg_catalog_schema = (
2391 ResolvedDatabaseSpecifier::Ambient,
2392 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2393 );
2394 let mz_temp_schema = (
2395 ResolvedDatabaseSpecifier::Ambient,
2396 SchemaSpecifier::Temporary,
2397 );
2398
2399 let session = Session::dummy();
2401 let conn_catalog = catalog.for_session(&session);
2402 assert_ne!(
2403 conn_catalog.effective_search_path(false),
2404 conn_catalog.search_path
2405 );
2406 assert_ne!(
2407 conn_catalog.effective_search_path(true),
2408 conn_catalog.search_path
2409 );
2410 assert_eq!(
2411 conn_catalog.effective_search_path(false),
2412 vec![
2413 mz_catalog_schema.clone(),
2414 pg_catalog_schema.clone(),
2415 conn_catalog.search_path[0].clone()
2416 ]
2417 );
2418 assert_eq!(
2419 conn_catalog.effective_search_path(true),
2420 vec![
2421 mz_temp_schema.clone(),
2422 mz_catalog_schema.clone(),
2423 pg_catalog_schema.clone(),
2424 conn_catalog.search_path[0].clone()
2425 ]
2426 );
2427
2428 let mut session = Session::dummy();
2430 session
2431 .vars_mut()
2432 .set(
2433 &SystemVars::new(),
2434 "search_path",
2435 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2436 false,
2437 )
2438 .expect("failed to set search_path");
2439 let conn_catalog = catalog.for_session(&session);
2440 assert_ne!(
2441 conn_catalog.effective_search_path(false),
2442 conn_catalog.search_path
2443 );
2444 assert_ne!(
2445 conn_catalog.effective_search_path(true),
2446 conn_catalog.search_path
2447 );
2448 assert_eq!(
2449 conn_catalog.effective_search_path(false),
2450 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2451 );
2452 assert_eq!(
2453 conn_catalog.effective_search_path(true),
2454 vec![
2455 mz_temp_schema.clone(),
2456 mz_catalog_schema.clone(),
2457 pg_catalog_schema.clone()
2458 ]
2459 );
2460
2461 let mut session = Session::dummy();
2462 session
2463 .vars_mut()
2464 .set(
2465 &SystemVars::new(),
2466 "search_path",
2467 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2468 false,
2469 )
2470 .expect("failed to set search_path");
2471 let conn_catalog = catalog.for_session(&session);
2472 assert_ne!(
2473 conn_catalog.effective_search_path(false),
2474 conn_catalog.search_path
2475 );
2476 assert_ne!(
2477 conn_catalog.effective_search_path(true),
2478 conn_catalog.search_path
2479 );
2480 assert_eq!(
2481 conn_catalog.effective_search_path(false),
2482 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2483 );
2484 assert_eq!(
2485 conn_catalog.effective_search_path(true),
2486 vec![
2487 mz_temp_schema.clone(),
2488 pg_catalog_schema.clone(),
2489 mz_catalog_schema.clone()
2490 ]
2491 );
2492
2493 let mut session = Session::dummy();
2494 session
2495 .vars_mut()
2496 .set(
2497 &SystemVars::new(),
2498 "search_path",
2499 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2500 false,
2501 )
2502 .expect("failed to set search_path");
2503 let conn_catalog = catalog.for_session(&session);
2504 assert_ne!(
2505 conn_catalog.effective_search_path(false),
2506 conn_catalog.search_path
2507 );
2508 assert_ne!(
2509 conn_catalog.effective_search_path(true),
2510 conn_catalog.search_path
2511 );
2512 assert_eq!(
2513 conn_catalog.effective_search_path(false),
2514 vec![
2515 mz_catalog_schema.clone(),
2516 pg_catalog_schema.clone(),
2517 mz_temp_schema.clone()
2518 ]
2519 );
2520 assert_eq!(
2521 conn_catalog.effective_search_path(true),
2522 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2523 );
2524 catalog.expire().await;
2525 })
2526 .await
2527 }
2528
2529 #[mz_ore::test(tokio::test)]
2530 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2532 use mz_ore::collections::CollectionExt;
2533 Catalog::with_debug(|catalog| async move {
2534 let conn_catalog = catalog.for_system_session();
2535 let scx = &mut StatementContext::new(None, &conn_catalog);
2536
2537 let parsed = mz_sql_parser::parser::parse_statements(
2538 "create view public.foo as select 1 as bar",
2539 )
2540 .expect("")
2541 .into_element()
2542 .ast;
2543
2544 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2545
2546 assert_eq!(
2548 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2549 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2550 );
2551 catalog.expire().await;
2552 })
2553 .await;
2554 }
2555
2556 #[mz_ore::test(tokio::test)]
2558 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2560 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2561 let column = ", 1";
2562 let view_def_size = view_def.bytes().count();
2563 let column_size = column.bytes().count();
2564 let column_count =
2565 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2566 let columns = iter::repeat(column).take(column_count).join("");
2567 let create_sql = format!("{view_def}{columns})");
2568 let create_sql_check = create_sql.clone();
2569 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2570 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2571 &create_sql
2572 ));
2573
2574 let persist_client = PersistClient::new_for_tests().await;
2575 let organization_id = Uuid::new_v4();
2576 let id = CatalogItemId::User(1);
2577 let gid = GlobalId::User(1);
2578 let bootstrap_args = test_bootstrap_args();
2579 {
2580 let mut catalog = Catalog::open_debug_catalog(
2581 persist_client.clone(),
2582 organization_id.clone(),
2583 &bootstrap_args,
2584 )
2585 .await
2586 .expect("unable to open debug catalog");
2587 let item = catalog
2588 .state()
2589 .deserialize_item(
2590 gid,
2591 &create_sql,
2592 &BTreeMap::new(),
2593 &mut LocalExpressionCache::Closed,
2594 None,
2595 )
2596 .expect("unable to parse view");
2597 let commit_ts = catalog.current_upper().await;
2598 catalog
2599 .transact(
2600 None,
2601 commit_ts,
2602 None,
2603 vec![Op::CreateItem {
2604 item,
2605 name: QualifiedItemName {
2606 qualifiers: ItemQualifiers {
2607 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2608 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2609 },
2610 item: "v".to_string(),
2611 },
2612 id,
2613 owner_id: MZ_SYSTEM_ROLE_ID,
2614 }],
2615 )
2616 .await
2617 .expect("failed to transact");
2618 catalog.expire().await;
2619 }
2620 {
2621 let catalog =
2622 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2623 .await
2624 .expect("unable to open debug catalog");
2625 let view = catalog.get_entry(&id);
2626 assert_eq!("v", view.name.item);
2627 match &view.item {
2628 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2629 item => panic!("expected view, got {}", item.typ()),
2630 }
2631 catalog.expire().await;
2632 }
2633 }
2634
2635 #[mz_ore::test(tokio::test)]
2636 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2638 Catalog::with_debug(|catalog| async move {
2639 let conn_catalog = catalog.for_system_session();
2640
2641 assert_eq!(
2642 mz_sql::catalog::ObjectType::ClusterReplica,
2643 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2644 ClusterId::user(1).expect("1 is a valid ID"),
2645 ReplicaId::User(1)
2646 )))
2647 );
2648 assert_eq!(
2649 mz_sql::catalog::ObjectType::Role,
2650 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2651 );
2652 catalog.expire().await;
2653 })
2654 .await;
2655 }
2656
2657 #[mz_ore::test(tokio::test)]
2658 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2660 Catalog::with_debug(|catalog| async move {
2661 let conn_catalog = catalog.for_system_session();
2662
2663 assert_eq!(
2664 None,
2665 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2666 ClusterId::user(1).expect("1 is a valid ID"),
2667 ReplicaId::User(1),
2668 ))))
2669 );
2670 assert_eq!(
2671 None,
2672 conn_catalog
2673 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2674 );
2675 catalog.expire().await;
2676 })
2677 .await;
2678 }
2679
2680 #[mz_ore::test(tokio::test)]
2681 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2683 Catalog::with_debug(|catalog| async move {
2684 let conn_catalog = catalog.for_system_session();
2685
2686 for builtin in BUILTINS::iter() {
2687 let (schema, name, expected_desc) = match builtin {
2688 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2689 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2690 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2691 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2692 Builtin::Log(_)
2693 | Builtin::Type(_)
2694 | Builtin::Func(_)
2695 | Builtin::Index(_)
2696 | Builtin::Connection(_) => continue,
2697 };
2698 let item = conn_catalog
2699 .resolve_item(&PartialItemName {
2700 database: None,
2701 schema: Some(schema.to_string()),
2702 item: name.to_string(),
2703 })
2704 .expect("unable to resolve item")
2705 .at_version(RelationVersionSelector::Latest);
2706
2707 let actual_desc = item.relation_desc().expect("invalid item type");
2708 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2709 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2710 {
2711 assert_eq!(
2712 actual_name, expected_name,
2713 "item {schema}.{name} column {index} name did not match its expected name"
2714 );
2715 assert_eq!(
2716 actual_typ, expected_typ,
2717 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2718 );
2719 }
2720 assert_eq!(
2721 &*actual_desc, expected_desc,
2722 "item {schema}.{name} did not match its expected RelationDesc"
2723 );
2724 }
2725 catalog.expire().await;
2726 })
2727 .await
2728 }
2729
2730 #[mz_ore::test(tokio::test)]
2733 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2735 async fn inner(catalog: Catalog) {
2736 let (client, connection) = tokio_postgres::connect(
2740 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2741 NoTls,
2742 )
2743 .await
2744 .expect("failed to connect to Postgres");
2745
2746 task::spawn(|| "compare_builtin_postgres", async move {
2747 if let Err(e) = connection.await {
2748 panic!("connection error: {}", e);
2749 }
2750 });
2751
2752 struct PgProc {
2753 name: String,
2754 arg_oids: Vec<u32>,
2755 ret_oid: Option<u32>,
2756 ret_set: bool,
2757 }
2758
2759 struct PgType {
2760 name: String,
2761 ty: String,
2762 elem: u32,
2763 array: u32,
2764 input: u32,
2765 receive: u32,
2766 }
2767
2768 struct PgOper {
2769 oprresult: u32,
2770 name: String,
2771 }
2772
2773 let pg_proc: BTreeMap<_, _> = client
2774 .query(
2775 "SELECT
2776 p.oid,
2777 proname,
2778 proargtypes,
2779 prorettype,
2780 proretset
2781 FROM pg_proc p
2782 JOIN pg_namespace n ON p.pronamespace = n.oid",
2783 &[],
2784 )
2785 .await
2786 .expect("pg query failed")
2787 .into_iter()
2788 .map(|row| {
2789 let oid: u32 = row.get("oid");
2790 let pg_proc = PgProc {
2791 name: row.get("proname"),
2792 arg_oids: row.get("proargtypes"),
2793 ret_oid: row.get("prorettype"),
2794 ret_set: row.get("proretset"),
2795 };
2796 (oid, pg_proc)
2797 })
2798 .collect();
2799
2800 let pg_type: BTreeMap<_, _> = client
2801 .query(
2802 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2803 &[],
2804 )
2805 .await
2806 .expect("pg query failed")
2807 .into_iter()
2808 .map(|row| {
2809 let oid: u32 = row.get("oid");
2810 let pg_type = PgType {
2811 name: row.get("typname"),
2812 ty: row.get("typtype"),
2813 elem: row.get("typelem"),
2814 array: row.get("typarray"),
2815 input: row.get("typinput"),
2816 receive: row.get("typreceive"),
2817 };
2818 (oid, pg_type)
2819 })
2820 .collect();
2821
2822 let pg_oper: BTreeMap<_, _> = client
2823 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2824 .await
2825 .expect("pg query failed")
2826 .into_iter()
2827 .map(|row| {
2828 let oid: u32 = row.get("oid");
2829 let pg_oper = PgOper {
2830 name: row.get("oprname"),
2831 oprresult: row.get("oprresult"),
2832 };
2833 (oid, pg_oper)
2834 })
2835 .collect();
2836
2837 let conn_catalog = catalog.for_system_session();
2838 let resolve_type_oid = |item: &str| {
2839 conn_catalog
2840 .resolve_type(&PartialItemName {
2841 database: None,
2842 schema: Some(PG_CATALOG_SCHEMA.into()),
2845 item: item.to_string(),
2846 })
2847 .expect("unable to resolve type")
2848 .oid()
2849 };
2850
2851 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2852 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2853 .collect();
2854
2855 let mut all_oids = BTreeSet::new();
2856
2857 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2860 [
2861 (Type::NAME, Type::TEXT),
2863 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2864 (Type::TIME, Type::TIMETZ),
2866 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2867 ]
2868 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2869 );
2870 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2871 1619, ]);
2873 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2874 if ignore_return_types.contains(&fn_oid) {
2875 return true;
2876 }
2877 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2878 return true;
2879 }
2880 a == b
2881 };
2882
2883 for builtin in BUILTINS::iter() {
2884 match builtin {
2885 Builtin::Type(ty) => {
2886 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2887
2888 if ty.oid >= FIRST_MATERIALIZE_OID {
2889 continue;
2892 }
2893
2894 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2897 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2898 });
2899 assert_eq!(
2900 ty.name, pg_ty.name,
2901 "oid {} has name {} in postgres; expected {}",
2902 ty.oid, pg_ty.name, ty.name,
2903 );
2904
2905 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2906 None => (0, 0),
2907 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2908 };
2909 assert_eq!(
2910 typinput_oid, pg_ty.input,
2911 "type {} has typinput OID {:?} in mz but {:?} in pg",
2912 ty.name, typinput_oid, pg_ty.input,
2913 );
2914 assert_eq!(
2915 typreceive_oid, pg_ty.receive,
2916 "type {} has typreceive OID {:?} in mz but {:?} in pg",
2917 ty.name, typreceive_oid, pg_ty.receive,
2918 );
2919 if typinput_oid != 0 {
2920 assert!(
2921 func_oids.contains(&typinput_oid),
2922 "type {} has typinput OID {} that does not exist in pg_proc",
2923 ty.name,
2924 typinput_oid,
2925 );
2926 }
2927 if typreceive_oid != 0 {
2928 assert!(
2929 func_oids.contains(&typreceive_oid),
2930 "type {} has typreceive OID {} that does not exist in pg_proc",
2931 ty.name,
2932 typreceive_oid,
2933 );
2934 }
2935
2936 match &ty.details.typ {
2938 CatalogType::Array { element_reference } => {
2939 let elem_ty = BUILTINS::iter()
2940 .filter_map(|builtin| match builtin {
2941 Builtin::Type(ty @ BuiltinType { name, .. })
2942 if element_reference == name =>
2943 {
2944 Some(ty)
2945 }
2946 _ => None,
2947 })
2948 .next();
2949 let elem_ty = match elem_ty {
2950 Some(ty) => ty,
2951 None => {
2952 panic!("{} is unexpectedly not a type", element_reference)
2953 }
2954 };
2955 assert_eq!(
2956 pg_ty.elem, elem_ty.oid,
2957 "type {} has mismatched element OIDs",
2958 ty.name
2959 )
2960 }
2961 CatalogType::Pseudo => {
2962 assert_eq!(
2963 pg_ty.ty, "p",
2964 "type {} is not a pseudo type as expected",
2965 ty.name
2966 )
2967 }
2968 CatalogType::Range { .. } => {
2969 assert_eq!(
2970 pg_ty.ty, "r",
2971 "type {} is not a range type as expected",
2972 ty.name
2973 );
2974 }
2975 _ => {
2976 assert_eq!(
2977 pg_ty.ty, "b",
2978 "type {} is not a base type as expected",
2979 ty.name
2980 )
2981 }
2982 }
2983
2984 let schema = catalog
2986 .resolve_schema_in_database(
2987 &ResolvedDatabaseSpecifier::Ambient,
2988 ty.schema,
2989 &SYSTEM_CONN_ID,
2990 )
2991 .expect("unable to resolve schema");
2992 let allocated_type = catalog
2993 .resolve_type(
2994 None,
2995 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
2996 &PartialItemName {
2997 database: None,
2998 schema: Some(schema.name().schema.clone()),
2999 item: ty.name.to_string(),
3000 },
3001 &SYSTEM_CONN_ID,
3002 )
3003 .expect("unable to resolve type");
3004 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3005 ty
3006 } else {
3007 panic!("unexpectedly not a type")
3008 };
3009 match ty.details.array_id {
3010 Some(array_id) => {
3011 let array_ty = catalog.get_entry(&array_id);
3012 assert_eq!(
3013 pg_ty.array, array_ty.oid,
3014 "type {} has mismatched array OIDs",
3015 allocated_type.name.item,
3016 );
3017 }
3018 None => assert_eq!(
3019 pg_ty.array, 0,
3020 "type {} does not have an array type in mz but does in pg",
3021 allocated_type.name.item,
3022 ),
3023 }
3024 }
3025 Builtin::Func(func) => {
3026 for imp in func.inner.func_impls() {
3027 assert!(
3028 all_oids.insert(imp.oid),
3029 "{} reused oid {}",
3030 func.name,
3031 imp.oid
3032 );
3033
3034 assert!(
3035 imp.oid < FIRST_USER_OID,
3036 "built-in function {} erroneously has OID in user space ({})",
3037 func.name,
3038 imp.oid,
3039 );
3040
3041 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3044 continue;
3045 } else {
3046 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3047 panic!(
3048 "pg_proc missing function {}: oid {}",
3049 func.name, imp.oid
3050 )
3051 })
3052 };
3053 assert_eq!(
3054 func.name, pg_fn.name,
3055 "funcs with oid {} don't match names: {} in mz, {} in pg",
3056 imp.oid, func.name, pg_fn.name
3057 );
3058
3059 let imp_arg_oids = imp
3062 .arg_typs
3063 .iter()
3064 .map(|item| resolve_type_oid(item))
3065 .collect::<Vec<_>>();
3066
3067 if imp_arg_oids != pg_fn.arg_oids {
3068 println!(
3069 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3070 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3071 );
3072 }
3073
3074 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3075
3076 assert!(
3077 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3078 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3079 imp.oid,
3080 func.name,
3081 imp_return_oid,
3082 pg_fn.ret_oid
3083 );
3084
3085 assert_eq!(
3086 imp.return_is_set, pg_fn.ret_set,
3087 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3088 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3089 );
3090 }
3091 }
3092 _ => (),
3093 }
3094 }
3095
3096 for (op, func) in OP_IMPLS.iter() {
3097 for imp in func.func_impls() {
3098 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3099
3100 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3102 continue;
3103 } else {
3104 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3105 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3106 })
3107 };
3108
3109 assert_eq!(*op, pg_op.name);
3110
3111 let imp_return_oid =
3112 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3113 if imp_return_oid != pg_op.oprresult {
3114 panic!(
3115 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3116 imp.oid, op, imp_return_oid, pg_op.oprresult
3117 );
3118 }
3119 }
3120 }
3121 catalog.expire().await;
3122 }
3123
3124 Catalog::with_debug(inner).await
3125 }
3126
3127 #[mz_ore::test(tokio::test)]
3129 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3131 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3132 let catalog = Arc::new(catalog);
3133 let conn_catalog = catalog.for_system_session();
3134
3135 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3136 let mut handles = Vec::new();
3137
3138 let ignore_names = BTreeSet::from([
3140 "avg",
3141 "avg_internal_v1",
3142 "bool_and",
3143 "bool_or",
3144 "has_table_privilege", "has_type_privilege", "mod",
3147 "mz_panic",
3148 "mz_sleep",
3149 "pow",
3150 "stddev_pop",
3151 "stddev_samp",
3152 "stddev",
3153 "var_pop",
3154 "var_samp",
3155 "variance",
3156 ]);
3157
3158 let fns = BUILTINS::funcs()
3159 .map(|func| (&func.name, func.inner))
3160 .chain(OP_IMPLS.iter());
3161
3162 for (name, func) in fns {
3163 if ignore_names.contains(name) {
3164 continue;
3165 }
3166 let Func::Scalar(impls) = func else {
3167 continue;
3168 };
3169
3170 'outer: for imp in impls {
3171 let details = imp.details();
3172 let mut styps = Vec::new();
3173 for item in details.arg_typs.iter() {
3174 let oid = resolve_type_oid(item);
3175 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3176 continue 'outer;
3177 };
3178 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3179 }
3180 let datums = styps
3181 .iter()
3182 .map(|styp| {
3183 let mut datums = vec![Datum::Null];
3184 datums.extend(styp.interesting_datums());
3185 datums
3186 })
3187 .collect::<Vec<_>>();
3188 if datums.is_empty() {
3190 continue;
3191 }
3192
3193 let return_oid = details
3194 .return_typ
3195 .map(resolve_type_oid)
3196 .expect("must exist");
3197 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3198 .ok()
3199 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3200
3201 let mut idxs = vec![0; datums.len()];
3202 while idxs[0] < datums[0].len() {
3203 let mut args = Vec::with_capacity(idxs.len());
3204 for i in 0..(datums.len()) {
3205 args.push(datums[i][idxs[i]]);
3206 }
3207
3208 let op = &imp.op;
3209 let scalars = args
3210 .iter()
3211 .enumerate()
3212 .map(|(i, datum)| {
3213 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3214 datum.clone(),
3215 styps[i].clone(),
3216 ))
3217 })
3218 .collect();
3219
3220 let call_name = format!(
3221 "{name}({}) (oid: {})",
3222 args.iter()
3223 .map(|d| d.to_string())
3224 .collect::<Vec<_>>()
3225 .join(", "),
3226 imp.oid
3227 );
3228 let catalog = Arc::clone(&catalog);
3229 let call_name_fn = call_name.clone();
3230 let return_styp = return_styp.clone();
3231 let handle = task::spawn_blocking(
3232 || call_name,
3233 move || {
3234 smoketest_fn(
3235 name,
3236 call_name_fn,
3237 op,
3238 imp,
3239 args,
3240 catalog,
3241 scalars,
3242 return_styp,
3243 )
3244 },
3245 );
3246 handles.push(handle);
3247
3248 for i in (0..datums.len()).rev() {
3250 idxs[i] += 1;
3251 if idxs[i] >= datums[i].len() {
3252 if i == 0 {
3253 break;
3254 }
3255 idxs[i] = 0;
3256 continue;
3257 } else {
3258 break;
3259 }
3260 }
3261 }
3262 }
3263 }
3264 handles
3265 }
3266
3267 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3268 for handle in handles {
3269 handle.await;
3270 }
3271 }
3272
3273 fn smoketest_fn(
3274 name: &&str,
3275 call_name: String,
3276 op: &Operation<HirScalarExpr>,
3277 imp: &FuncImpl<HirScalarExpr>,
3278 args: Vec<Datum<'_>>,
3279 catalog: Arc<Catalog>,
3280 scalars: Vec<CoercibleScalarExpr>,
3281 return_styp: Option<SqlScalarType>,
3282 ) {
3283 let conn_catalog = catalog.for_system_session();
3284 let pcx = PlanContext::zero();
3285 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3286 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3287 let ecx = ExprContext {
3288 qcx: &qcx,
3289 name: "smoketest",
3290 scope: &Scope::empty(),
3291 relation_type: &SqlRelationType::empty(),
3292 allow_aggregates: false,
3293 allow_subqueries: false,
3294 allow_parameters: false,
3295 allow_windows: false,
3296 };
3297 let arena = RowArena::new();
3298 let mut session = Session::dummy();
3299 session
3300 .start_transaction(to_datetime(0), None, None)
3301 .expect("must succeed");
3302 let prep_style = ExprPrepOneShot {
3303 logical_time: EvalTime::Time(Timestamp::MIN),
3304 session: &session,
3305 catalog_state: &catalog.state,
3306 };
3307
3308 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3311 if let Ok(hir) = res {
3312 let uneliminated_result_row = {
3313 if let HirScalarExpr::CallUnary { func, .. } = &hir
3314 && func.is_eliminable_cast()
3315 {
3316 let mut uneliminated_mir = hir
3317 .clone()
3318 .lower_uncorrelated(HirToMirConfig {
3319 enable_cast_elimination: false,
3320 ..catalog.system_config().into()
3321 })
3322 .expect("lowering eliminable cast should always succeed");
3323 prep_style
3324 .prep_scalar_expr(&mut uneliminated_mir)
3325 .expect("must succeed");
3326
3327 uneliminated_mir
3329 .eval(&[], &arena)
3330 .ok()
3331 .map(|datum| Row::pack([datum]))
3332 } else {
3333 None
3334 }
3335 };
3336
3337 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3338 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3340
3341 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3342 if let Some(return_styp) = return_styp {
3343 let mir_typ = mir.typ(&[]);
3344 soft_assert_eq_or_log!(
3347 mir_typ.scalar_type,
3348 (&return_styp).into(),
3349 "MIR type did not match the catalog type (cast elimination/repr type error)"
3350 );
3351 if !eval_result_datum.is_instance_of(&mir_typ) {
3355 panic!(
3356 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3357 );
3358 }
3359 if let Some(row) = uneliminated_result_row {
3361 let uneliminated_result_datum = row.unpack_first();
3362 assert_eq!(
3363 uneliminated_result_datum, eval_result_datum,
3364 "datums should not change if cast is eliminable"
3365 );
3366 }
3367 if let Some((introduces_nulls, propagates_nulls)) =
3370 call_introduces_propagates_nulls(&mir)
3371 {
3372 if introduces_nulls {
3373 assert!(
3377 mir_typ.nullable,
3378 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3379 name, args, mir, mir_typ.nullable
3380 );
3381 } else {
3382 let any_input_null = args.iter().any(|arg| arg.is_null());
3383 if !any_input_null {
3384 assert!(
3385 !mir_typ.nullable,
3386 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3387 name, args, mir, mir_typ.nullable
3388 );
3389 } else if propagates_nulls {
3390 assert!(
3393 mir_typ.nullable,
3394 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3395 name, args, mir, mir_typ.nullable
3396 );
3397 }
3398 }
3403 }
3404 let mut reduced = mir.clone();
3407 reduced.reduce(&[]);
3408 match reduced {
3409 MirScalarExpr::Literal(reduce_result, ctyp) => {
3410 match reduce_result {
3411 Ok(reduce_result_row) => {
3412 let reduce_result_datum = reduce_result_row.unpack_first();
3413 assert_eq!(
3414 reduce_result_datum,
3415 eval_result_datum,
3416 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3417 name,
3418 args,
3419 mir,
3420 eval_result_datum,
3421 mir_typ.scalar_type,
3422 reduce_result_datum,
3423 ctyp.scalar_type
3424 );
3425 assert_eq!(
3431 ctyp.scalar_type,
3432 mir_typ.scalar_type,
3433 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3434 name,
3435 args,
3436 mir,
3437 eval_result_datum,
3438 mir_typ.scalar_type,
3439 reduce_result_datum,
3440 ctyp.scalar_type
3441 );
3442 }
3443 Err(..) => {} }
3445 }
3446 _ => unreachable!(
3447 "all args are literals, so should have reduced to a literal"
3448 ),
3449 }
3450 }
3451 }
3452 }
3453 }
3454 }
3455
3456 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3461 match mir_func_call {
3462 MirScalarExpr::CallUnary { func, expr } => {
3463 if expr.is_literal() {
3464 Some((func.introduces_nulls(), func.propagates_nulls()))
3465 } else {
3466 None
3467 }
3468 }
3469 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3470 if expr1.is_literal() && expr2.is_literal() {
3471 Some((func.introduces_nulls(), func.propagates_nulls()))
3472 } else {
3473 None
3474 }
3475 }
3476 MirScalarExpr::CallVariadic { func, exprs } => {
3477 if exprs.iter().all(|arg| arg.is_literal()) {
3478 Some((func.introduces_nulls(), func.propagates_nulls()))
3479 } else {
3480 None
3481 }
3482 }
3483 _ => None,
3484 }
3485 }
3486
3487 #[mz_ore::test(tokio::test)]
3489 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3491 Catalog::with_debug(|catalog| async move {
3492 let conn_catalog = catalog.for_system_session();
3493
3494 for view in BUILTINS::views().filter(|view| {
3495 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3496 }) {
3497 let item = conn_catalog
3498 .resolve_item(&PartialItemName {
3499 database: None,
3500 schema: Some(view.schema.to_string()),
3501 item: view.name.to_string(),
3502 })
3503 .expect("unable to resolve view")
3504 .at_version(RelationVersionSelector::Latest);
3506 let full_name = conn_catalog.resolve_full_name(item.name());
3507 let desc = item.relation_desc().expect("invalid item type");
3508 for col_type in desc.iter_types() {
3509 match &col_type.scalar_type {
3510 typ @ SqlScalarType::UInt16
3511 | typ @ SqlScalarType::UInt32
3512 | typ @ SqlScalarType::UInt64
3513 | typ @ SqlScalarType::MzTimestamp
3514 | typ @ SqlScalarType::List { .. }
3515 | typ @ SqlScalarType::Map { .. }
3516 | typ @ SqlScalarType::MzAclItem => {
3517 panic!("{typ:?} type found in {full_name}");
3518 }
3519 SqlScalarType::AclItem
3520 | SqlScalarType::Bool
3521 | SqlScalarType::Int16
3522 | SqlScalarType::Int32
3523 | SqlScalarType::Int64
3524 | SqlScalarType::Float32
3525 | SqlScalarType::Float64
3526 | SqlScalarType::Numeric { .. }
3527 | SqlScalarType::Date
3528 | SqlScalarType::Time
3529 | SqlScalarType::Timestamp { .. }
3530 | SqlScalarType::TimestampTz { .. }
3531 | SqlScalarType::Interval
3532 | SqlScalarType::PgLegacyChar
3533 | SqlScalarType::Bytes
3534 | SqlScalarType::String
3535 | SqlScalarType::Char { .. }
3536 | SqlScalarType::VarChar { .. }
3537 | SqlScalarType::Jsonb
3538 | SqlScalarType::Uuid
3539 | SqlScalarType::Array(_)
3540 | SqlScalarType::Record { .. }
3541 | SqlScalarType::Oid
3542 | SqlScalarType::RegProc
3543 | SqlScalarType::RegType
3544 | SqlScalarType::RegClass
3545 | SqlScalarType::Int2Vector
3546 | SqlScalarType::Range { .. }
3547 | SqlScalarType::PgLegacyName => {}
3548 }
3549 }
3550 }
3551 catalog.expire().await;
3552 })
3553 .await
3554 }
3555
3556 #[mz_ore::test(tokio::test)]
3559 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3561 Catalog::with_debug(|catalog| async move {
3562 let conn_catalog = catalog.for_system_session();
3563
3564 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3565 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3566
3567 for entry in catalog.entries() {
3568 let schema_spec = entry.name().qualifiers.schema_spec;
3569 let introspection_deps = catalog.introspection_dependencies(entry.id);
3570 if introspection_deps.is_empty() {
3571 assert!(
3572 schema_spec != introspection_schema_spec,
3573 "entry does not depend on introspection sources but is in \
3574 `mz_introspection`: {}",
3575 conn_catalog.resolve_full_name(entry.name()),
3576 );
3577 } else {
3578 assert!(
3579 schema_spec == introspection_schema_spec,
3580 "entry depends on introspection sources but is not in \
3581 `mz_introspection`: {}",
3582 conn_catalog.resolve_full_name(entry.name()),
3583 );
3584 }
3585 }
3586 })
3587 .await
3588 }
3589
3590 #[mz_ore::test(tokio::test)]
3591 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3593 let persist_client = PersistClient::new_for_tests().await;
3594 let bootstrap_args = test_bootstrap_args();
3595 let organization_id = Uuid::new_v4();
3596 let db_name = "DB";
3597
3598 let mut writer_catalog = Catalog::open_debug_catalog(
3599 persist_client.clone(),
3600 organization_id.clone(),
3601 &bootstrap_args,
3602 )
3603 .await
3604 .expect("open_debug_catalog");
3605 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3606 persist_client.clone(),
3607 organization_id.clone(),
3608 &bootstrap_args,
3609 )
3610 .await
3611 .expect("open_debug_read_only_catalog");
3612 assert_err!(writer_catalog.resolve_database(db_name));
3613 assert_err!(read_only_catalog.resolve_database(db_name));
3614
3615 let commit_ts = writer_catalog.current_upper().await;
3616 writer_catalog
3617 .transact(
3618 None,
3619 commit_ts,
3620 None,
3621 vec![Op::CreateDatabase {
3622 name: db_name.to_string(),
3623 owner_id: MZ_SYSTEM_ROLE_ID,
3624 }],
3625 )
3626 .await
3627 .expect("failed to transact");
3628
3629 let write_db = writer_catalog
3630 .resolve_database(db_name)
3631 .expect("resolve_database");
3632 read_only_catalog
3633 .sync_to_current_updates()
3634 .await
3635 .expect("sync_to_current_updates");
3636 let read_db = read_only_catalog
3637 .resolve_database(db_name)
3638 .expect("resolve_database");
3639
3640 assert_eq!(write_db, read_db);
3641
3642 let writer_catalog_fencer =
3643 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3644 .await
3645 .expect("open_debug_catalog for fencer");
3646 let fencer_db = writer_catalog_fencer
3647 .resolve_database(db_name)
3648 .expect("resolve_database for fencer");
3649 assert_eq!(fencer_db, read_db);
3650
3651 let write_fence_err = writer_catalog
3652 .sync_to_current_updates()
3653 .await
3654 .expect_err("sync_to_current_updates for fencer");
3655 assert!(matches!(
3656 write_fence_err,
3657 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3658 ));
3659 let read_fence_err = read_only_catalog
3660 .sync_to_current_updates()
3661 .await
3662 .expect_err("sync_to_current_updates after fencer");
3663 assert!(matches!(
3664 read_fence_err,
3665 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3666 ));
3667
3668 writer_catalog.expire().await;
3669 read_only_catalog.expire().await;
3670 writer_catalog_fencer.expire().await;
3671 }
3672}