1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_persist_client::PersistClient;
55use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
56use mz_repr::explain::ExprHumanizer;
57use mz_repr::namespaces::MZ_TEMP_SCHEMA;
58use mz_repr::network_policy_id::NetworkPolicyId;
59use mz_repr::optimize::OptimizerFeatures;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
66 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
67 SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_transform::dataflow::DataflowMetainfo;
83use mz_transform::notice::OptimizerNotice;
84use tokio::sync::MutexGuard;
85use tokio::sync::mpsc::UnboundedSender;
86use uuid::Uuid;
87
88pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
90pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
91pub use crate::catalog::state::CatalogState;
92pub use crate::catalog::transact::{
93 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
94};
95use crate::command::CatalogDump;
96use crate::coord::TargetCluster;
97#[cfg(test)]
98use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
99use crate::session::{Portal, PreparedStatement, Session};
100use crate::util::ResultExt;
101use crate::{AdapterError, AdapterNotice, ExecuteResponse};
102
103mod builtin_table_updates;
104pub(crate) mod consistency;
105mod migrate;
106
107mod apply;
108mod open;
109mod state;
110mod timeline;
111mod transact;
112
113#[derive(Debug)]
136pub struct Catalog {
137 state: CatalogState,
138 expr_cache_handle: Option<ExpressionCacheHandle>,
139 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
140 transient_revision: u64,
141}
142
143impl Clone for Catalog {
146 fn clone(&self) -> Self {
147 Self {
148 state: self.state.clone(),
149 expr_cache_handle: self.expr_cache_handle.clone(),
150 storage: Arc::clone(&self.storage),
151 transient_revision: self.transient_revision,
152 }
153 }
154}
155
156impl Catalog {
157 #[mz_ore::instrument(level = "trace")]
163 pub fn set_optimized_plan(
164 &mut self,
165 id: GlobalId,
166 plan: DataflowDescription<OptimizedMirRelationExpr>,
167 ) {
168 self.state.set_optimized_plan(id, plan);
169 }
170
171 #[mz_ore::instrument(level = "trace")]
177 pub fn set_physical_plan(
178 &mut self,
179 id: GlobalId,
180 plan: DataflowDescription<mz_compute_types::plan::Plan>,
181 ) {
182 self.state.set_physical_plan(id, plan);
183 }
184
185 #[mz_ore::instrument(level = "trace")]
187 pub fn try_get_optimized_plan(
188 &self,
189 id: &GlobalId,
190 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
191 let entry = self.state.try_get_entry_by_global_id(id)?;
192 entry.item().optimized_plan().map(AsRef::as_ref)
193 }
194
195 #[mz_ore::instrument(level = "trace")]
197 pub fn try_get_physical_plan(
198 &self,
199 id: &GlobalId,
200 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
201 let entry = self.state.try_get_entry_by_global_id(id)?;
202 entry.item().physical_plan().map(AsRef::as_ref)
203 }
204
205 #[mz_ore::instrument(level = "trace")]
211 pub fn set_dataflow_metainfo(
212 &mut self,
213 id: GlobalId,
214 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
215 ) {
216 self.state.set_dataflow_metainfo(id, metainfo);
217 }
218
219 #[mz_ore::instrument(level = "trace")]
221 pub fn try_get_dataflow_metainfo(
222 &self,
223 id: &GlobalId,
224 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
225 let entry = self.state.try_get_entry_by_global_id(id)?;
226 entry.item().dataflow_metainfo()
227 }
228}
229
230#[derive(Debug)]
231pub struct ConnCatalog<'a> {
232 state: Cow<'a, CatalogState>,
233 unresolvable_ids: BTreeSet<CatalogItemId>,
243 conn_id: ConnectionId,
244 cluster: String,
245 database: Option<DatabaseId>,
246 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
247 role_id: RoleId,
248 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
249 portals: Option<&'a BTreeMap<String, Portal>>,
250 notices_tx: UnboundedSender<AdapterNotice>,
251}
252
253impl ConnCatalog<'_> {
254 pub fn conn_id(&self) -> &ConnectionId {
255 &self.conn_id
256 }
257
258 pub fn state(&self) -> &CatalogState {
259 &*self.state
260 }
261
262 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
272 assert_eq!(
273 self.role_id, MZ_SYSTEM_ROLE_ID,
274 "only the system role can mark IDs unresolvable",
275 );
276 self.unresolvable_ids.insert(id);
277 }
278
279 pub fn effective_search_path(
285 &self,
286 include_temp_schema: bool,
287 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
288 self.state
289 .effective_search_path(&self.search_path, include_temp_schema)
290 }
291}
292
293impl ConnectionResolver for ConnCatalog<'_> {
294 fn resolve_connection(
295 &self,
296 id: CatalogItemId,
297 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
298 self.state().resolve_connection(id)
299 }
300}
301
302impl Catalog {
303 pub fn transient_revision(&self) -> u64 {
307 self.transient_revision
308 }
309
310 pub async fn with_debug<F, Fut, T>(f: F) -> T
322 where
323 F: FnOnce(Catalog) -> Fut,
324 Fut: Future<Output = T>,
325 {
326 let persist_client = PersistClient::new_for_tests().await;
327 let organization_id = Uuid::new_v4();
328 let bootstrap_args = test_bootstrap_args();
329 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
330 .await
331 .expect("can open debug catalog");
332 f(catalog).await
333 }
334
335 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
338 where
339 F: FnOnce(Catalog) -> Fut,
340 Fut: Future<Output = T>,
341 {
342 let persist_client = PersistClient::new_for_tests().await;
343 let organization_id = Uuid::new_v4();
344 let bootstrap_args = test_bootstrap_args();
345 let mut catalog =
346 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
347 .await
348 .expect("can open debug catalog");
349
350 let now = SYSTEM_TIME.clone();
352 let openable_storage = TestCatalogStateBuilder::new(persist_client)
353 .with_organization_id(organization_id)
354 .with_default_deploy_generation()
355 .build()
356 .await
357 .expect("can create durable catalog");
358 let mut storage = openable_storage
359 .open(now().into(), &bootstrap_args)
360 .await
361 .expect("can open durable catalog")
362 .0;
363 let _ = storage
365 .sync_to_current_updates()
366 .await
367 .expect("can sync to current updates");
368 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
369
370 f(catalog).await
371 }
372
373 pub async fn open_debug_catalog(
377 persist_client: PersistClient,
378 organization_id: Uuid,
379 bootstrap_args: &BootstrapArgs,
380 ) -> Result<Catalog, anyhow::Error> {
381 let now = SYSTEM_TIME.clone();
382 let environment_id = None;
383 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
384 .with_organization_id(organization_id)
385 .with_default_deploy_generation()
386 .build()
387 .await?;
388 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
389 let system_parameter_defaults = BTreeMap::default();
390 Self::open_debug_catalog_inner(
391 persist_client,
392 storage,
393 now,
394 environment_id,
395 &DUMMY_BUILD_INFO,
396 system_parameter_defaults,
397 bootstrap_args,
398 None,
399 )
400 .await
401 }
402
403 pub async fn open_debug_read_only_catalog(
408 persist_client: PersistClient,
409 organization_id: Uuid,
410 bootstrap_args: &BootstrapArgs,
411 ) -> Result<Catalog, anyhow::Error> {
412 let now = SYSTEM_TIME.clone();
413 let environment_id = None;
414 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
415 .with_organization_id(organization_id)
416 .build()
417 .await?;
418 let storage = openable_storage
419 .open_read_only(&test_bootstrap_args())
420 .await?;
421 let system_parameter_defaults = BTreeMap::default();
422 Self::open_debug_catalog_inner(
423 persist_client,
424 storage,
425 now,
426 environment_id,
427 &DUMMY_BUILD_INFO,
428 system_parameter_defaults,
429 bootstrap_args,
430 None,
431 )
432 .await
433 }
434
435 pub async fn open_debug_read_only_persist_catalog_config(
440 persist_client: PersistClient,
441 now: NowFn,
442 environment_id: EnvironmentId,
443 system_parameter_defaults: BTreeMap<String, String>,
444 build_info: &'static BuildInfo,
445 bootstrap_args: &BootstrapArgs,
446 enable_expression_cache_override: Option<bool>,
447 ) -> Result<Catalog, anyhow::Error> {
448 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
449 .with_organization_id(environment_id.organization_id())
450 .with_version(
451 build_info
452 .version
453 .parse()
454 .expect("build version is parseable"),
455 )
456 .build()
457 .await?;
458 let storage = openable_storage.open_read_only(bootstrap_args).await?;
459 Self::open_debug_catalog_inner(
460 persist_client,
461 storage,
462 now,
463 Some(environment_id),
464 build_info,
465 system_parameter_defaults,
466 bootstrap_args,
467 enable_expression_cache_override,
468 )
469 .await
470 }
471
472 async fn open_debug_catalog_inner(
473 persist_client: PersistClient,
474 storage: Box<dyn DurableCatalogState>,
475 now: NowFn,
476 environment_id: Option<EnvironmentId>,
477 build_info: &'static BuildInfo,
478 system_parameter_defaults: BTreeMap<String, String>,
479 bootstrap_args: &BootstrapArgs,
480 enable_expression_cache_override: Option<bool>,
481 ) -> Result<Catalog, anyhow::Error> {
482 let metrics_registry = &MetricsRegistry::new();
483 let secrets_reader = Arc::new(InMemorySecretsController::new());
484 let previous_ts = now().into();
487 let replica_size = &bootstrap_args.default_cluster_replica_size;
488 let read_only = false;
489
490 let OpenCatalogResult {
491 catalog,
492 migrated_storage_collections_0dt: _,
493 new_builtin_collections: _,
494 builtin_table_updates: _,
495 cached_global_exprs: _,
496 uncached_local_exprs: _,
497 } = Catalog::open(Config {
498 storage,
499 metrics_registry,
500 state: StateConfig {
501 unsafe_mode: true,
502 all_features: false,
503 build_info,
504 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
505 read_only,
506 now,
507 boot_ts: previous_ts,
508 skip_migrations: true,
509 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
510 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
511 size: replica_size.clone(),
512 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
513 },
514 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
515 size: replica_size.clone(),
516 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
517 },
518 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
519 size: replica_size.clone(),
520 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
521 },
522 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
523 size: replica_size.clone(),
524 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
525 },
526 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
527 size: replica_size.clone(),
528 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
529 },
530 system_parameter_defaults,
531 remote_system_parameters: None,
532 availability_zones: vec![],
533 egress_addresses: vec![],
534 aws_principal_context: None,
535 aws_privatelink_availability_zones: None,
536 http_host_name: None,
537 connection_context: ConnectionContext::for_tests(secrets_reader),
538 builtin_item_migration_config: BuiltinItemMigrationConfig {
539 persist_client: persist_client.clone(),
540 read_only,
541 force_migration: None,
542 },
543 persist_client,
544 enable_expression_cache_override,
545 helm_chart_version: None,
546 external_login_password_mz_system: None,
547 license_key: ValidatedLicenseKey::for_tests(),
548 },
549 })
550 .await?;
551 Ok(catalog)
552 }
553
554 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
555 self.state.for_session(session)
556 }
557
558 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
559 self.state.for_sessionless_user(role_id)
560 }
561
562 pub fn for_system_session(&self) -> ConnCatalog<'_> {
563 self.state.for_system_session()
564 }
565
566 async fn storage<'a>(
567 &'a self,
568 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
569 self.storage.lock().await
570 }
571
572 pub async fn current_upper(&self) -> mz_repr::Timestamp {
573 self.storage().await.current_upper().await
574 }
575
576 pub async fn allocate_user_id(
577 &self,
578 commit_ts: mz_repr::Timestamp,
579 ) -> Result<(CatalogItemId, GlobalId), Error> {
580 self.storage()
581 .await
582 .allocate_user_id(commit_ts)
583 .await
584 .maybe_terminate("allocating user ids")
585 .err_into()
586 }
587
588 pub async fn allocate_user_ids(
590 &self,
591 amount: u64,
592 commit_ts: mz_repr::Timestamp,
593 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
594 self.storage()
595 .await
596 .allocate_user_ids(amount, commit_ts)
597 .await
598 .maybe_terminate("allocating user ids")
599 .err_into()
600 }
601
602 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
603 let commit_ts = self.storage().await.current_upper().await;
604 self.allocate_user_id(commit_ts).await
605 }
606
607 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
609 self.storage()
610 .await
611 .get_next_user_item_id()
612 .await
613 .err_into()
614 }
615
616 #[cfg(test)]
617 pub async fn allocate_system_id(
618 &self,
619 commit_ts: mz_repr::Timestamp,
620 ) -> Result<(CatalogItemId, GlobalId), Error> {
621 use mz_ore::collections::CollectionExt;
622
623 let mut storage = self.storage().await;
624 let mut txn = storage.transaction().await?;
625 let id = txn
626 .allocate_system_item_ids(1)
627 .maybe_terminate("allocating system ids")?
628 .into_element();
629 let _ = txn.get_and_commit_op_updates();
631 txn.commit(commit_ts).await?;
632 Ok(id)
633 }
634
635 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
637 self.storage()
638 .await
639 .get_next_system_item_id()
640 .await
641 .err_into()
642 }
643
644 pub async fn allocate_user_cluster_id(
645 &self,
646 commit_ts: mz_repr::Timestamp,
647 ) -> Result<ClusterId, Error> {
648 self.storage()
649 .await
650 .allocate_user_cluster_id(commit_ts)
651 .await
652 .maybe_terminate("allocating user cluster ids")
653 .err_into()
654 }
655
656 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
658 self.storage()
659 .await
660 .get_next_system_replica_id()
661 .await
662 .err_into()
663 }
664
665 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
667 self.storage()
668 .await
669 .get_next_user_replica_id()
670 .await
671 .err_into()
672 }
673
674 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
675 self.state.resolve_database(database_name)
676 }
677
678 pub fn resolve_schema(
679 &self,
680 current_database: Option<&DatabaseId>,
681 database_name: Option<&str>,
682 schema_name: &str,
683 conn_id: &ConnectionId,
684 ) -> Result<&Schema, SqlCatalogError> {
685 self.state
686 .resolve_schema(current_database, database_name, schema_name, conn_id)
687 }
688
689 pub fn resolve_schema_in_database(
690 &self,
691 database_spec: &ResolvedDatabaseSpecifier,
692 schema_name: &str,
693 conn_id: &ConnectionId,
694 ) -> Result<&Schema, SqlCatalogError> {
695 self.state
696 .resolve_schema_in_database(database_spec, schema_name, conn_id)
697 }
698
699 pub fn resolve_replica_in_cluster(
700 &self,
701 cluster_id: &ClusterId,
702 replica_name: &str,
703 ) -> Result<&ClusterReplica, SqlCatalogError> {
704 self.state
705 .resolve_replica_in_cluster(cluster_id, replica_name)
706 }
707
708 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
709 self.state.resolve_system_schema(name)
710 }
711
712 pub fn resolve_search_path(
713 &self,
714 session: &Session,
715 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
716 self.state.resolve_search_path(session)
717 }
718
719 pub fn resolve_entry(
721 &self,
722 current_database: Option<&DatabaseId>,
723 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
724 name: &PartialItemName,
725 conn_id: &ConnectionId,
726 ) -> Result<&CatalogEntry, SqlCatalogError> {
727 self.state
728 .resolve_entry(current_database, search_path, name, conn_id)
729 }
730
731 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
733 self.state.resolve_builtin_table(builtin)
734 }
735
736 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
738 self.state.resolve_builtin_log(builtin).0
739 }
740
741 pub fn resolve_builtin_storage_collection(
743 &self,
744 builtin: &'static BuiltinSource,
745 ) -> CatalogItemId {
746 self.state.resolve_builtin_source(builtin)
747 }
748
749 pub fn resolve_function(
751 &self,
752 current_database: Option<&DatabaseId>,
753 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
754 name: &PartialItemName,
755 conn_id: &ConnectionId,
756 ) -> Result<&CatalogEntry, SqlCatalogError> {
757 self.state
758 .resolve_function(current_database, search_path, name, conn_id)
759 }
760
761 pub fn resolve_type(
763 &self,
764 current_database: Option<&DatabaseId>,
765 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
766 name: &PartialItemName,
767 conn_id: &ConnectionId,
768 ) -> Result<&CatalogEntry, SqlCatalogError> {
769 self.state
770 .resolve_type(current_database, search_path, name, conn_id)
771 }
772
773 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
774 self.state.resolve_cluster(name)
775 }
776
777 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
783 self.state.resolve_builtin_cluster(cluster)
784 }
785
786 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
787 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
788 }
789
790 pub fn resolve_target_cluster(
792 &self,
793 target_cluster: TargetCluster,
794 session: &Session,
795 ) -> Result<&Cluster, AdapterError> {
796 match target_cluster {
797 TargetCluster::CatalogServer => {
798 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
799 }
800 TargetCluster::Active => self.active_cluster(session),
801 TargetCluster::Transaction(cluster_id) => self
802 .try_get_cluster(cluster_id)
803 .ok_or(AdapterError::ConcurrentClusterDrop),
804 }
805 }
806
807 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
808 if session.user().name != SYSTEM_USER.name
812 && session.user().name != SUPPORT_USER.name
813 && session.vars().cluster() == SYSTEM_USER.name
814 {
815 coord_bail!(
816 "system cluster '{}' cannot execute user queries",
817 SYSTEM_USER.name
818 );
819 }
820 let cluster = self.resolve_cluster(session.vars().cluster())?;
821 Ok(cluster)
822 }
823
824 pub fn state(&self) -> &CatalogState {
825 &self.state
826 }
827
828 pub fn resolve_full_name(
829 &self,
830 name: &QualifiedItemName,
831 conn_id: Option<&ConnectionId>,
832 ) -> FullItemName {
833 self.state.resolve_full_name(name, conn_id)
834 }
835
836 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
837 self.state.try_get_entry(id)
838 }
839
840 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
841 self.state.try_get_entry_by_global_id(id)
842 }
843
844 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
845 self.state.get_entry(id)
846 }
847
848 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
849 self.state.get_entry_by_global_id(id)
850 }
851
852 pub fn get_global_ids<'a>(
853 &'a self,
854 id: &CatalogItemId,
855 ) -> impl Iterator<Item = GlobalId> + use<'a> {
856 self.get_entry(id).global_ids()
857 }
858
859 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
860 self.get_entry_by_global_id(id).id()
861 }
862
863 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
864 let item = self.try_get_entry_by_global_id(id)?;
865 Some(item.id())
866 }
867
868 pub fn get_schema(
869 &self,
870 database_spec: &ResolvedDatabaseSpecifier,
871 schema_spec: &SchemaSpecifier,
872 conn_id: &ConnectionId,
873 ) -> &Schema {
874 self.state.get_schema(database_spec, schema_spec, conn_id)
875 }
876
877 pub fn try_get_schema(
878 &self,
879 database_spec: &ResolvedDatabaseSpecifier,
880 schema_spec: &SchemaSpecifier,
881 conn_id: &ConnectionId,
882 ) -> Option<&Schema> {
883 self.state
884 .try_get_schema(database_spec, schema_spec, conn_id)
885 }
886
887 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
888 self.state.get_mz_catalog_schema_id()
889 }
890
891 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
892 self.state.get_pg_catalog_schema_id()
893 }
894
895 pub fn get_information_schema_id(&self) -> SchemaId {
896 self.state.get_information_schema_id()
897 }
898
899 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
900 self.state.get_mz_internal_schema_id()
901 }
902
903 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
904 self.state.get_mz_introspection_schema_id()
905 }
906
907 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
908 self.state.get_mz_unsafe_schema_id()
909 }
910
911 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
912 self.state.system_schema_ids()
913 }
914
915 pub fn get_database(&self, id: &DatabaseId) -> &Database {
916 self.state.get_database(id)
917 }
918
919 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
920 self.state.try_get_role(id)
921 }
922
923 pub fn get_role(&self, id: &RoleId) -> &Role {
924 self.state.get_role(id)
925 }
926
927 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
928 self.state.try_get_role_by_name(role_name)
929 }
930
931 pub fn try_get_role_by_name_case_insensitive(&self, role_name: &str) -> Option<&Role> {
932 self.state.try_get_role_by_name_case_insensitive(role_name)
933 }
934
935 pub fn roles_by_lowercase_name(&self) -> BTreeMap<String, &Role> {
936 self.state.roles_by_lowercase_name()
937 }
938
939 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
940 self.state.try_get_role_auth_by_id(id)
941 }
942
943 pub fn create_temporary_schema(
946 &mut self,
947 conn_id: &ConnectionId,
948 owner_id: RoleId,
949 ) -> Result<(), Error> {
950 self.state.create_temporary_schema(conn_id, owner_id)
951 }
952
953 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
954 self.state
956 .temporary_schemas
957 .get(conn_id)
958 .map(|schema| schema.items.contains_key(item_name))
959 .unwrap_or(false)
960 }
961
962 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
965 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
966 return Ok(());
967 };
968 if !schema.items.is_empty() {
969 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
970 }
971 Ok(())
972 }
973
974 pub(crate) fn object_dependents(
975 &self,
976 object_ids: &Vec<ObjectId>,
977 conn_id: &ConnectionId,
978 ) -> Vec<ObjectId> {
979 let mut seen = BTreeSet::new();
980 self.state.object_dependents(object_ids, conn_id, &mut seen)
981 }
982
983 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
984 FullNameV1 {
985 database: name.database.to_string(),
986 schema: name.schema.clone(),
987 item: name.item.clone(),
988 }
989 }
990
991 pub fn find_available_cluster_name(&self, name: &str) -> String {
992 let mut i = 0;
993 let mut candidate = name.to_string();
994 while self.state.clusters_by_name.contains_key(&candidate) {
995 i += 1;
996 candidate = format!("{}{}", name, i);
997 }
998 candidate
999 }
1000
1001 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1002 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1003 self.cluster_replica_sizes()
1004 .enabled_allocations()
1005 .map(|a| a.0.to_owned())
1006 .collect::<Vec<_>>()
1007 } else {
1008 self.system_config().allowed_cluster_replica_sizes()
1009 }
1010 }
1011
1012 pub fn concretize_replica_location(
1013 &self,
1014 location: mz_catalog::durable::ReplicaLocation,
1015 allowed_sizes: &Vec<String>,
1016 allowed_availability_zones: Option<&[String]>,
1017 ) -> Result<ReplicaLocation, Error> {
1018 self.state
1019 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1020 }
1021
1022 pub(crate) fn ensure_valid_replica_size(
1023 &self,
1024 allowed_sizes: &[String],
1025 size: &String,
1026 ) -> Result<(), Error> {
1027 self.state.ensure_valid_replica_size(allowed_sizes, size)
1028 }
1029
1030 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1031 &self.state.cluster_replica_sizes
1032 }
1033
1034 pub fn get_privileges(
1036 &self,
1037 id: &SystemObjectId,
1038 conn_id: &ConnectionId,
1039 ) -> Option<&PrivilegeMap> {
1040 match id {
1041 SystemObjectId::Object(id) => match id {
1042 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1043 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1044 ObjectId::Schema((database_spec, schema_spec)) => Some(
1045 self.get_schema(database_spec, schema_spec, conn_id)
1046 .privileges(),
1047 ),
1048 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1049 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1050 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1051 },
1052 SystemObjectId::System => Some(&self.state.system_privileges),
1053 }
1054 }
1055
1056 #[mz_ore::instrument(level = "debug")]
1057 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1058 Ok(self.storage().await.advance_upper(new_upper).await?)
1059 }
1060
1061 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1063 self.state.introspection_dependencies(id)
1064 }
1065
1066 pub fn dump(&self) -> Result<CatalogDump, Error> {
1072 Ok(CatalogDump::new(self.state.dump(None)?))
1073 }
1074
1075 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1079 self.state.check_consistency().map_err(|inconsistencies| {
1080 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1081 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1082 })
1083 })
1084 }
1085
1086 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1087 self.state.config()
1088 }
1089
1090 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1091 self.state.entry_by_id.values()
1092 }
1093
1094 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1095 self.entries()
1096 .filter(|entry| entry.is_connection() && entry.id().is_user())
1097 }
1098
1099 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1100 self.entries()
1101 .filter(|entry| entry.is_table() && entry.id().is_user())
1102 }
1103
1104 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1105 self.entries()
1106 .filter(|entry| entry.is_source() && entry.id().is_user())
1107 }
1108
1109 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1110 self.entries()
1111 .filter(|entry| entry.is_sink() && entry.id().is_user())
1112 }
1113
1114 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1115 self.entries()
1116 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1117 }
1118
1119 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1120 self.entries()
1121 .filter(|entry| entry.is_secret() && entry.id().is_user())
1122 }
1123
1124 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1125 self.state.get_network_policy(&network_policy_id)
1126 }
1127
1128 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1129 self.state.try_get_network_policy_by_name(name)
1130 }
1131
1132 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1133 self.state.clusters_by_id.values()
1134 }
1135
1136 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1137 self.state.get_cluster(cluster_id)
1138 }
1139
1140 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1141 self.state.try_get_cluster(cluster_id)
1142 }
1143
1144 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1145 self.clusters().filter(|cluster| cluster.id.is_user())
1146 }
1147
1148 pub fn get_cluster_replica(
1149 &self,
1150 cluster_id: ClusterId,
1151 replica_id: ReplicaId,
1152 ) -> &ClusterReplica {
1153 self.state.get_cluster_replica(cluster_id, replica_id)
1154 }
1155
1156 pub fn try_get_cluster_replica(
1157 &self,
1158 cluster_id: ClusterId,
1159 replica_id: ReplicaId,
1160 ) -> Option<&ClusterReplica> {
1161 self.state.try_get_cluster_replica(cluster_id, replica_id)
1162 }
1163
1164 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1165 self.user_clusters()
1166 .flat_map(|cluster| cluster.user_replicas())
1167 }
1168
1169 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1170 self.state.database_by_id.values()
1171 }
1172
1173 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1174 self.state
1175 .roles_by_id
1176 .values()
1177 .filter(|role| role.is_user())
1178 }
1179
1180 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1181 self.state
1182 .network_policies_by_id
1183 .iter()
1184 .filter(|(id, _)| id.is_user())
1185 .map(|(_, policy)| policy)
1186 }
1187
1188 pub fn system_privileges(&self) -> &PrivilegeMap {
1189 &self.state.system_privileges
1190 }
1191
1192 pub fn default_privileges(
1193 &self,
1194 ) -> impl Iterator<
1195 Item = (
1196 &DefaultPrivilegeObject,
1197 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1198 ),
1199 > {
1200 self.state.default_privileges.iter()
1201 }
1202
1203 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1204 self.state
1205 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1206 }
1207
1208 pub fn pack_storage_usage_update(
1209 &self,
1210 event: VersionedStorageUsage,
1211 diff: Diff,
1212 ) -> BuiltinTableUpdate {
1213 self.state
1214 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1215 }
1216
1217 pub fn system_config(&self) -> &SystemVars {
1218 self.state.system_config()
1219 }
1220
1221 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1222 self.state.system_config_mut()
1223 }
1224
1225 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1226 self.state.ensure_not_reserved_role(role_id)
1227 }
1228
1229 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1230 self.state.ensure_grantable_role(role_id)
1231 }
1232
1233 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1234 self.state.ensure_not_system_role(role_id)
1235 }
1236
1237 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1238 self.state.ensure_not_predefined_role(role_id)
1239 }
1240
1241 pub fn ensure_not_reserved_network_policy(
1242 &self,
1243 network_policy_id: &NetworkPolicyId,
1244 ) -> Result<(), Error> {
1245 self.state
1246 .ensure_not_reserved_network_policy(network_policy_id)
1247 }
1248
1249 pub fn ensure_not_reserved_object(
1250 &self,
1251 object_id: &ObjectId,
1252 conn_id: &ConnectionId,
1253 ) -> Result<(), Error> {
1254 match object_id {
1255 ObjectId::Cluster(cluster_id) => {
1256 if cluster_id.is_system() {
1257 let cluster = self.get_cluster(*cluster_id);
1258 Err(Error::new(ErrorKind::ReadOnlyCluster(
1259 cluster.name().to_string(),
1260 )))
1261 } else {
1262 Ok(())
1263 }
1264 }
1265 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1266 if replica_id.is_system() {
1267 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1268 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1269 replica.name().to_string(),
1270 )))
1271 } else {
1272 Ok(())
1273 }
1274 }
1275 ObjectId::Database(database_id) => {
1276 if database_id.is_system() {
1277 let database = self.get_database(database_id);
1278 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1279 database.name().to_string(),
1280 )))
1281 } else {
1282 Ok(())
1283 }
1284 }
1285 ObjectId::Schema((database_spec, schema_spec)) => {
1286 if schema_spec.is_system() {
1287 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1288 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1289 schema.name().schema.clone(),
1290 )))
1291 } else {
1292 Ok(())
1293 }
1294 }
1295 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1296 ObjectId::Item(item_id) => {
1297 if item_id.is_system() {
1298 let item = self.get_entry(item_id);
1299 let name = self.resolve_full_name(item.name(), Some(conn_id));
1300 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1301 } else {
1302 Ok(())
1303 }
1304 }
1305 ObjectId::NetworkPolicy(network_policy_id) => {
1306 self.ensure_not_reserved_network_policy(network_policy_id)
1307 }
1308 }
1309 }
1310
1311 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1313 &mut self,
1314 create_sql: &str,
1315 force_if_exists_skip: bool,
1316 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1317 self.state
1318 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1319 }
1320
1321 pub(crate) fn cache_expressions(
1332 &self,
1333 id: GlobalId,
1334 local_mir: Option<OptimizedMirRelationExpr>,
1335 mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1336 mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1337 dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1338 optimizer_features: OptimizerFeatures,
1339 ) -> BoxFuture<'static, ()> {
1340 global_mir.as_of = None;
1344 global_mir.until = Default::default();
1345 physical_plan.as_of = None;
1346 physical_plan.until = Default::default();
1347
1348 let mut local_exprs = Vec::new();
1349 if let Some(local_mir) = local_mir {
1350 local_exprs.push((
1351 id,
1352 LocalExpressions {
1353 local_mir,
1354 optimizer_features: optimizer_features.clone(),
1355 },
1356 ));
1357 }
1358 let global_exprs = vec![(
1359 id,
1360 GlobalExpressions {
1361 global_mir,
1362 physical_plan,
1363 dataflow_metainfos,
1364 optimizer_features,
1365 },
1366 )];
1367 self.update_expression_cache(local_exprs, global_exprs, Default::default())
1368 }
1369
1370 pub(crate) fn update_expression_cache<'a, 'b>(
1371 &'a self,
1372 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1373 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1374 invalidate_ids: BTreeSet<GlobalId>,
1375 ) -> BoxFuture<'b, ()> {
1376 if let Some(expr_cache) = &self.expr_cache_handle {
1377 expr_cache
1378 .update(
1379 new_local_expressions,
1380 new_global_expressions,
1381 invalidate_ids,
1382 )
1383 .boxed()
1384 } else {
1385 async {}.boxed()
1386 }
1387 }
1388
1389 #[cfg(test)]
1393 async fn sync_to_current_updates(
1394 &mut self,
1395 ) -> Result<
1396 (
1397 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1398 Vec<ParsedStateUpdate>,
1399 ),
1400 CatalogError,
1401 > {
1402 let updates = self.storage().await.sync_to_current_updates().await?;
1403 let (builtin_table_updates, catalog_updates) = self
1404 .state
1405 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1406 .await;
1407 Ok((builtin_table_updates, catalog_updates))
1408 }
1409}
1410
1411pub fn is_reserved_name(name: &str) -> bool {
1412 BUILTIN_PREFIXES
1413 .iter()
1414 .any(|prefix| name.starts_with(prefix))
1415}
1416
1417pub fn is_reserved_role_name(name: &str) -> bool {
1418 is_reserved_name(name) || is_public_role(name)
1419}
1420
1421pub fn is_public_role(name: &str) -> bool {
1422 name == &*PUBLIC_ROLE_NAME
1423}
1424
1425pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1426 object_type_to_audit_object_type(sql_type.into())
1427}
1428
1429pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1430 match id {
1431 CommentObjectId::Table(_) => ObjectType::Table,
1432 CommentObjectId::View(_) => ObjectType::View,
1433 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1434 CommentObjectId::Source(_) => ObjectType::Source,
1435 CommentObjectId::Sink(_) => ObjectType::Sink,
1436 CommentObjectId::Index(_) => ObjectType::Index,
1437 CommentObjectId::Func(_) => ObjectType::Func,
1438 CommentObjectId::Connection(_) => ObjectType::Connection,
1439 CommentObjectId::Type(_) => ObjectType::Type,
1440 CommentObjectId::Secret(_) => ObjectType::Secret,
1441 CommentObjectId::Role(_) => ObjectType::Role,
1442 CommentObjectId::Database(_) => ObjectType::Database,
1443 CommentObjectId::Schema(_) => ObjectType::Schema,
1444 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1445 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1446 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1447 }
1448}
1449
1450pub(crate) fn object_type_to_audit_object_type(
1451 object_type: mz_sql::catalog::ObjectType,
1452) -> ObjectType {
1453 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1454}
1455
1456pub(crate) fn system_object_type_to_audit_object_type(
1457 system_type: &SystemObjectType,
1458) -> ObjectType {
1459 match system_type {
1460 SystemObjectType::Object(object_type) => match object_type {
1461 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1462 mz_sql::catalog::ObjectType::View => ObjectType::View,
1463 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1464 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1465 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1466 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1467 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1468 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1469 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1470 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1471 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1472 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1473 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1474 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1475 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1476 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1477 },
1478 SystemObjectType::System => ObjectType::System,
1479 }
1480}
1481
1482#[derive(Debug, Copy, Clone)]
1483pub enum UpdatePrivilegeVariant {
1484 Grant,
1485 Revoke,
1486}
1487
1488impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1489 fn from(variant: UpdatePrivilegeVariant) -> Self {
1490 match variant {
1491 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1492 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1493 }
1494 }
1495}
1496
1497impl From<UpdatePrivilegeVariant> for EventType {
1498 fn from(variant: UpdatePrivilegeVariant) -> Self {
1499 match variant {
1500 UpdatePrivilegeVariant::Grant => EventType::Grant,
1501 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1502 }
1503 }
1504}
1505
1506impl ConnCatalog<'_> {
1507 fn resolve_item_name(
1508 &self,
1509 name: &PartialItemName,
1510 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1511 self.resolve_item(name).map(|entry| entry.name())
1512 }
1513
1514 fn resolve_function_name(
1515 &self,
1516 name: &PartialItemName,
1517 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1518 self.resolve_function(name).map(|entry| entry.name())
1519 }
1520
1521 fn resolve_type_name(
1522 &self,
1523 name: &PartialItemName,
1524 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1525 self.resolve_type(name).map(|entry| entry.name())
1526 }
1527}
1528
1529impl ExprHumanizer for ConnCatalog<'_> {
1530 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1531 let entry = self.state.try_get_entry_by_global_id(&id)?;
1532 Some(self.resolve_full_name(entry.name()).to_string())
1533 }
1534
1535 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1536 let entry = self.state.try_get_entry_by_global_id(&id)?;
1537 Some(entry.name().item.clone())
1538 }
1539
1540 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1541 let entry = self.state.try_get_entry_by_global_id(&id)?;
1542 Some(self.resolve_full_name(entry.name()).into_parts())
1543 }
1544
1545 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1546 use SqlScalarType::*;
1547
1548 match typ {
1549 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1550 List {
1551 custom_id: Some(item_id),
1552 ..
1553 }
1554 | Map {
1555 custom_id: Some(item_id),
1556 ..
1557 } => {
1558 let item = self.get_item(item_id);
1559 self.minimal_qualification(item.name()).to_string()
1560 }
1561 List { element_type, .. } => {
1562 format!(
1563 "{} list",
1564 self.humanize_sql_scalar_type(element_type, postgres_compat)
1565 )
1566 }
1567 Map { value_type, .. } => format!(
1568 "map[{}=>{}]",
1569 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1570 self.humanize_sql_scalar_type(value_type, postgres_compat)
1571 ),
1572 Record {
1573 custom_id: Some(item_id),
1574 ..
1575 } => {
1576 let item = self.get_item(item_id);
1577 self.minimal_qualification(item.name()).to_string()
1578 }
1579 Record { fields, .. } => format!(
1580 "record({})",
1581 fields
1582 .iter()
1583 .map(|f| format!(
1584 "{}: {}",
1585 f.0,
1586 self.humanize_sql_column_type(&f.1, postgres_compat)
1587 ))
1588 .join(",")
1589 ),
1590 PgLegacyChar => "\"char\"".into(),
1591 Char { length } if !postgres_compat => match length {
1592 None => "char".into(),
1593 Some(length) => format!("char({})", length.into_u32()),
1594 },
1595 VarChar { max_length } if !postgres_compat => match max_length {
1596 None => "varchar".into(),
1597 Some(length) => format!("varchar({})", length.into_u32()),
1598 },
1599 UInt16 => "uint2".into(),
1600 UInt32 => "uint4".into(),
1601 UInt64 => "uint8".into(),
1602 ty => {
1603 let pgrepr_type = mz_pgrepr::Type::from(ty);
1604 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1605
1606 let res = if self
1607 .effective_search_path(true)
1608 .iter()
1609 .any(|(_, schema)| schema == &pg_catalog_schema)
1610 {
1611 pgrepr_type.name().to_string()
1612 } else {
1613 let name = QualifiedItemName {
1616 qualifiers: ItemQualifiers {
1617 database_spec: ResolvedDatabaseSpecifier::Ambient,
1618 schema_spec: pg_catalog_schema,
1619 },
1620 item: pgrepr_type.name().to_string(),
1621 };
1622 self.resolve_full_name(&name).to_string()
1623 };
1624 res
1625 }
1626 }
1627 }
1628
1629 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1630 let entry = self.state.try_get_entry_by_global_id(&id)?;
1631
1632 match entry.index() {
1633 Some(index) => {
1634 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1635 let mut on_names = on_desc
1636 .iter_names()
1637 .map(|col_name| col_name.to_string())
1638 .collect::<Vec<_>>();
1639
1640 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1641
1642 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1646 let mut ix_names = vec![String::new(); ix_arity];
1647
1648 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1650 let on_name = on_names.get_mut(on_pos).expect("on_name");
1651 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1652 std::mem::swap(on_name, ix_name);
1653 }
1654
1655 Some(ix_names) }
1657 None => {
1658 let desc = self.state.try_get_desc_by_global_id(&id)?;
1659 let column_names = desc
1660 .iter_names()
1661 .map(|col_name| col_name.to_string())
1662 .collect();
1663
1664 Some(column_names)
1665 }
1666 }
1667 }
1668
1669 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1670 let desc = self.state.try_get_desc_by_global_id(&id)?;
1671 Some(desc.get_name(column).to_string())
1672 }
1673
1674 fn id_exists(&self, id: GlobalId) -> bool {
1675 self.state.entry_by_global_id.contains_key(&id)
1676 }
1677}
1678
1679impl SessionCatalog for ConnCatalog<'_> {
1680 fn active_role_id(&self) -> &RoleId {
1681 &self.role_id
1682 }
1683
1684 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1685 self.prepared_statements
1686 .as_ref()
1687 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1688 .flatten()
1689 }
1690
1691 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1692 self.portals
1693 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1694 }
1695
1696 fn active_database(&self) -> Option<&DatabaseId> {
1697 self.database.as_ref()
1698 }
1699
1700 fn active_cluster(&self) -> &str {
1701 &self.cluster
1702 }
1703
1704 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1705 &self.search_path
1706 }
1707
1708 fn resolve_database(
1709 &self,
1710 database_name: &str,
1711 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1712 Ok(self.state.resolve_database(database_name)?)
1713 }
1714
1715 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1716 self.state
1717 .database_by_id
1718 .get(id)
1719 .expect("database doesn't exist")
1720 }
1721
1722 #[allow(clippy::as_conversions)]
1724 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1725 self.state
1726 .database_by_id
1727 .values()
1728 .map(|database| database as &dyn CatalogDatabase)
1729 .collect()
1730 }
1731
1732 fn resolve_schema(
1733 &self,
1734 database_name: Option<&str>,
1735 schema_name: &str,
1736 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1737 Ok(self.state.resolve_schema(
1738 self.database.as_ref(),
1739 database_name,
1740 schema_name,
1741 &self.conn_id,
1742 )?)
1743 }
1744
1745 fn resolve_schema_in_database(
1746 &self,
1747 database_spec: &ResolvedDatabaseSpecifier,
1748 schema_name: &str,
1749 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1750 Ok(self
1751 .state
1752 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1753 }
1754
1755 fn get_schema(
1756 &self,
1757 database_spec: &ResolvedDatabaseSpecifier,
1758 schema_spec: &SchemaSpecifier,
1759 ) -> &dyn CatalogSchema {
1760 self.state
1761 .get_schema(database_spec, schema_spec, &self.conn_id)
1762 }
1763
1764 #[allow(clippy::as_conversions)]
1766 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1767 self.get_databases()
1768 .into_iter()
1769 .flat_map(|database| database.schemas().into_iter())
1770 .chain(
1771 self.state
1772 .ambient_schemas_by_id
1773 .values()
1774 .chain(self.state.temporary_schemas.values())
1775 .map(|schema| schema as &dyn CatalogSchema),
1776 )
1777 .collect()
1778 }
1779
1780 fn get_mz_internal_schema_id(&self) -> SchemaId {
1781 self.state().get_mz_internal_schema_id()
1782 }
1783
1784 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1785 self.state().get_mz_unsafe_schema_id()
1786 }
1787
1788 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1789 self.state.is_system_schema_specifier(schema)
1790 }
1791
1792 fn resolve_role(
1793 &self,
1794 role_name: &str,
1795 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1796 match self.state.try_get_role_by_name(role_name) {
1797 Some(role) => Ok(role),
1798 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1799 }
1800 }
1801
1802 fn resolve_network_policy(
1803 &self,
1804 policy_name: &str,
1805 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1806 match self.state.try_get_network_policy_by_name(policy_name) {
1807 Some(policy) => Ok(policy),
1808 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1809 }
1810 }
1811
1812 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1813 Some(self.state.roles_by_id.get(id)?)
1814 }
1815
1816 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1817 self.state.get_role(id)
1818 }
1819
1820 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1821 #[allow(clippy::as_conversions)]
1823 self.state
1824 .roles_by_id
1825 .values()
1826 .map(|role| role as &dyn CatalogRole)
1827 .collect()
1828 }
1829
1830 fn mz_system_role_id(&self) -> RoleId {
1831 MZ_SYSTEM_ROLE_ID
1832 }
1833
1834 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1835 self.state.collect_role_membership(id)
1836 }
1837
1838 fn get_network_policy(
1839 &self,
1840 id: &NetworkPolicyId,
1841 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1842 self.state.get_network_policy(id)
1843 }
1844
1845 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1846 #[allow(clippy::as_conversions)]
1848 self.state
1849 .network_policies_by_id
1850 .values()
1851 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1852 .collect()
1853 }
1854
1855 fn resolve_cluster(
1856 &self,
1857 cluster_name: Option<&str>,
1858 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1859 Ok(self
1860 .state
1861 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1862 }
1863
1864 fn resolve_cluster_replica(
1865 &self,
1866 cluster_replica_name: &QualifiedReplica,
1867 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1868 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1869 }
1870
1871 fn resolve_item(
1872 &self,
1873 name: &PartialItemName,
1874 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1875 let r = self.state.resolve_entry(
1876 self.database.as_ref(),
1877 &self.effective_search_path(true),
1878 name,
1879 &self.conn_id,
1880 )?;
1881 if self.unresolvable_ids.contains(&r.id()) {
1882 Err(SqlCatalogError::UnknownItem(name.to_string()))
1883 } else {
1884 Ok(r)
1885 }
1886 }
1887
1888 fn resolve_function(
1889 &self,
1890 name: &PartialItemName,
1891 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1892 let r = self.state.resolve_function(
1893 self.database.as_ref(),
1894 &self.effective_search_path(false),
1895 name,
1896 &self.conn_id,
1897 )?;
1898
1899 if self.unresolvable_ids.contains(&r.id()) {
1900 Err(SqlCatalogError::UnknownFunction {
1901 name: name.to_string(),
1902 alternative: None,
1903 })
1904 } else {
1905 Ok(r)
1906 }
1907 }
1908
1909 fn resolve_type(
1910 &self,
1911 name: &PartialItemName,
1912 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1913 let r = self.state.resolve_type(
1914 self.database.as_ref(),
1915 &self.effective_search_path(false),
1916 name,
1917 &self.conn_id,
1918 )?;
1919
1920 if self.unresolvable_ids.contains(&r.id()) {
1921 Err(SqlCatalogError::UnknownType {
1922 name: name.to_string(),
1923 })
1924 } else {
1925 Ok(r)
1926 }
1927 }
1928
1929 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1930 self.state.get_system_type(name)
1931 }
1932
1933 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1934 Some(self.state.try_get_entry(id)?)
1935 }
1936
1937 fn try_get_item_by_global_id(
1938 &self,
1939 id: &GlobalId,
1940 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1941 let entry = self.state.try_get_entry_by_global_id(id)?;
1942 let entry = match &entry.item {
1943 CatalogItem::Table(table) => {
1944 let (version, _gid) = table
1945 .collections
1946 .iter()
1947 .find(|(_version, gid)| *gid == id)
1948 .expect("catalog out of sync, mismatched GlobalId");
1949 entry.at_version(RelationVersionSelector::Specific(*version))
1950 }
1951 _ => entry.at_version(RelationVersionSelector::Latest),
1952 };
1953 Some(entry)
1954 }
1955
1956 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1957 self.state.get_entry(id)
1958 }
1959
1960 fn get_item_by_global_id(
1961 &self,
1962 id: &GlobalId,
1963 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1964 let entry = self.state.get_entry_by_global_id(id);
1965 let entry = match &entry.item {
1966 CatalogItem::Table(table) => {
1967 let (version, _gid) = table
1968 .collections
1969 .iter()
1970 .find(|(_version, gid)| *gid == id)
1971 .expect("catalog out of sync, mismatched GlobalId");
1972 entry.at_version(RelationVersionSelector::Specific(*version))
1973 }
1974 _ => entry.at_version(RelationVersionSelector::Latest),
1975 };
1976 entry
1977 }
1978
1979 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
1980 self.get_schemas()
1981 .into_iter()
1982 .flat_map(|schema| schema.item_ids())
1983 .map(|id| self.get_item(&id))
1984 .collect()
1985 }
1986
1987 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1988 self.state
1989 .get_item_by_name(name, &self.conn_id)
1990 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1991 }
1992
1993 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1994 self.state
1995 .get_type_by_name(name, &self.conn_id)
1996 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1997 }
1998
1999 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2000 &self.state.clusters_by_id[&id]
2001 }
2002
2003 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2004 self.state
2005 .clusters_by_id
2006 .values()
2007 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2008 .collect()
2009 }
2010
2011 fn get_cluster_replica(
2012 &self,
2013 cluster_id: ClusterId,
2014 replica_id: ReplicaId,
2015 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2016 let cluster = self.get_cluster(cluster_id);
2017 cluster.replica(replica_id)
2018 }
2019
2020 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2021 self.get_clusters()
2022 .into_iter()
2023 .flat_map(|cluster| cluster.replicas().into_iter())
2024 .collect()
2025 }
2026
2027 fn get_system_privileges(&self) -> &PrivilegeMap {
2028 &self.state.system_privileges
2029 }
2030
2031 fn get_default_privileges(
2032 &self,
2033 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2034 self.state
2035 .default_privileges
2036 .iter()
2037 .map(|(object, acl_items)| (object, acl_items.collect()))
2038 .collect()
2039 }
2040
2041 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2042 self.state.find_available_name(name, &self.conn_id)
2043 }
2044
2045 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2046 self.state.resolve_full_name(name, Some(&self.conn_id))
2047 }
2048
2049 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2050 self.state.resolve_full_schema_name(name)
2051 }
2052
2053 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2054 self.state.get_entry_by_global_id(global_id).id()
2055 }
2056
2057 fn resolve_global_id(
2058 &self,
2059 item_id: &CatalogItemId,
2060 version: RelationVersionSelector,
2061 ) -> GlobalId {
2062 self.state
2063 .get_entry(item_id)
2064 .at_version(version)
2065 .global_id()
2066 }
2067
2068 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2069 self.state.config()
2070 }
2071
2072 fn now(&self) -> EpochMillis {
2073 (self.state.config().now)()
2074 }
2075
2076 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2077 self.state.aws_privatelink_availability_zones.clone()
2078 }
2079
2080 fn system_vars(&self) -> &SystemVars {
2081 &self.state.system_configuration
2082 }
2083
2084 fn system_vars_mut(&mut self) -> &mut SystemVars {
2085 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2086 }
2087
2088 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2089 self.state().get_owner_id(id, self.conn_id())
2090 }
2091
2092 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2093 match id {
2094 SystemObjectId::System => Some(&self.state.system_privileges),
2095 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2096 Some(self.get_cluster(*id).privileges())
2097 }
2098 SystemObjectId::Object(ObjectId::Database(id)) => {
2099 Some(self.get_database(id).privileges())
2100 }
2101 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2102 self.state
2105 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2106 .map(|schema| schema.privileges())
2107 }
2108 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2109 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2110 Some(self.get_network_policy(id).privileges())
2111 }
2112 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2113 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2114 }
2115 }
2116
2117 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2118 let mut seen = BTreeSet::new();
2119 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2120 }
2121
2122 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2123 let mut seen = BTreeSet::new();
2124 self.state.item_dependents(id, &mut seen)
2125 }
2126
2127 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2128 rbac::all_object_privileges(object_type)
2129 }
2130
2131 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2132 self.state.get_object_type(object_id)
2133 }
2134
2135 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2136 self.state.get_system_object_type(id)
2137 }
2138
2139 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2146 if qualified_name.qualifiers.schema_spec.is_temporary() {
2147 return qualified_name.item.clone().into();
2155 }
2156
2157 let database_id = match &qualified_name.qualifiers.database_spec {
2158 ResolvedDatabaseSpecifier::Ambient => None,
2159 ResolvedDatabaseSpecifier::Id(id)
2160 if self.database.is_some() && self.database == Some(*id) =>
2161 {
2162 None
2163 }
2164 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2165 };
2166
2167 let schema_spec = if database_id.is_none()
2168 && self.resolve_item_name(&PartialItemName {
2169 database: None,
2170 schema: None,
2171 item: qualified_name.item.clone(),
2172 }) == Ok(qualified_name)
2173 || self.resolve_function_name(&PartialItemName {
2174 database: None,
2175 schema: None,
2176 item: qualified_name.item.clone(),
2177 }) == Ok(qualified_name)
2178 || self.resolve_type_name(&PartialItemName {
2179 database: None,
2180 schema: None,
2181 item: qualified_name.item.clone(),
2182 }) == Ok(qualified_name)
2183 {
2184 None
2185 } else {
2186 Some(qualified_name.qualifiers.schema_spec.clone())
2189 };
2190
2191 let res = PartialItemName {
2192 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2193 schema: schema_spec.map(|spec| {
2194 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2195 .name()
2196 .schema
2197 .clone()
2198 }),
2199 item: qualified_name.item.clone(),
2200 };
2201 assert!(
2202 self.resolve_item_name(&res) == Ok(qualified_name)
2203 || self.resolve_function_name(&res) == Ok(qualified_name)
2204 || self.resolve_type_name(&res) == Ok(qualified_name)
2205 );
2206 res
2207 }
2208
2209 fn add_notice(&self, notice: PlanNotice) {
2210 let _ = self.notices_tx.send(notice.into());
2211 }
2212
2213 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2214 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2215 self.state.comments.get_object_comments(comment_id)
2216 }
2217
2218 fn is_cluster_size_cc(&self, size: &str) -> bool {
2219 self.state
2220 .cluster_replica_sizes
2221 .0
2222 .get(size)
2223 .map_or(false, |a| a.is_cc)
2224 }
2225}
2226
2227#[cfg(test)]
2228mod tests {
2229 use std::collections::{BTreeMap, BTreeSet};
2230 use std::sync::Arc;
2231 use std::{env, iter};
2232
2233 use itertools::Itertools;
2234 use mz_catalog::memory::objects::CatalogItem;
2235 use tokio_postgres::NoTls;
2236 use tokio_postgres::types::Type;
2237 use uuid::Uuid;
2238
2239 use mz_catalog::SYSTEM_CONN_ID;
2240 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2241 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2242 use mz_controller_types::{ClusterId, ReplicaId};
2243 use mz_expr::MirScalarExpr;
2244 use mz_ore::now::to_datetime;
2245 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2246 use mz_persist_client::PersistClient;
2247 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2248 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2249 use mz_repr::role_id::RoleId;
2250 use mz_repr::{
2251 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2252 SqlScalarType, Timestamp,
2253 };
2254 use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2255 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2256 use mz_sql::names::{
2257 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2258 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2259 };
2260 use mz_sql::plan::{
2261 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2262 QueryLifetime, Scope, StatementContext,
2263 };
2264 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2265 use mz_sql::session::vars::{SystemVars, VarInput};
2266
2267 use crate::catalog::state::LocalExpressionCache;
2268 use crate::catalog::{Catalog, Op};
2269 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2270 use crate::session::Session;
2271
2272 #[mz_ore::test(tokio::test)]
2279 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2281 Catalog::with_debug(|catalog| async move {
2282 struct TestCase {
2283 input: QualifiedItemName,
2284 system_output: PartialItemName,
2285 normal_output: PartialItemName,
2286 }
2287
2288 let test_cases = vec![
2289 TestCase {
2290 input: QualifiedItemName {
2291 qualifiers: ItemQualifiers {
2292 database_spec: ResolvedDatabaseSpecifier::Ambient,
2293 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2294 },
2295 item: "numeric".to_string(),
2296 },
2297 system_output: PartialItemName {
2298 database: None,
2299 schema: None,
2300 item: "numeric".to_string(),
2301 },
2302 normal_output: PartialItemName {
2303 database: None,
2304 schema: None,
2305 item: "numeric".to_string(),
2306 },
2307 },
2308 TestCase {
2309 input: QualifiedItemName {
2310 qualifiers: ItemQualifiers {
2311 database_spec: ResolvedDatabaseSpecifier::Ambient,
2312 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2313 },
2314 item: "mz_array_types".to_string(),
2315 },
2316 system_output: PartialItemName {
2317 database: None,
2318 schema: None,
2319 item: "mz_array_types".to_string(),
2320 },
2321 normal_output: PartialItemName {
2322 database: None,
2323 schema: None,
2324 item: "mz_array_types".to_string(),
2325 },
2326 },
2327 ];
2328
2329 for tc in test_cases {
2330 assert_eq!(
2331 catalog
2332 .for_system_session()
2333 .minimal_qualification(&tc.input),
2334 tc.system_output
2335 );
2336 assert_eq!(
2337 catalog
2338 .for_session(&Session::dummy())
2339 .minimal_qualification(&tc.input),
2340 tc.normal_output
2341 );
2342 }
2343 catalog.expire().await;
2344 })
2345 .await
2346 }
2347
2348 #[mz_ore::test(tokio::test)]
2349 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2351 let persist_client = PersistClient::new_for_tests().await;
2352 let organization_id = Uuid::new_v4();
2353 let bootstrap_args = test_bootstrap_args();
2354 {
2355 let mut catalog = Catalog::open_debug_catalog(
2356 persist_client.clone(),
2357 organization_id.clone(),
2358 &bootstrap_args,
2359 )
2360 .await
2361 .expect("unable to open debug catalog");
2362 assert_eq!(catalog.transient_revision(), 1);
2363 let commit_ts = catalog.current_upper().await;
2364 catalog
2365 .transact(
2366 None,
2367 commit_ts,
2368 None,
2369 vec![Op::CreateDatabase {
2370 name: "test".to_string(),
2371 owner_id: MZ_SYSTEM_ROLE_ID,
2372 }],
2373 )
2374 .await
2375 .expect("failed to transact");
2376 assert_eq!(catalog.transient_revision(), 2);
2377 catalog.expire().await;
2378 }
2379 {
2380 let catalog =
2381 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2382 .await
2383 .expect("unable to open debug catalog");
2384 assert_eq!(catalog.transient_revision(), 1);
2386 catalog.expire().await;
2387 }
2388 }
2389
2390 #[mz_ore::test(tokio::test)]
2391 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2393 Catalog::with_debug(|catalog| async move {
2394 let mz_catalog_schema = (
2395 ResolvedDatabaseSpecifier::Ambient,
2396 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2397 );
2398 let pg_catalog_schema = (
2399 ResolvedDatabaseSpecifier::Ambient,
2400 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2401 );
2402 let mz_temp_schema = (
2403 ResolvedDatabaseSpecifier::Ambient,
2404 SchemaSpecifier::Temporary,
2405 );
2406
2407 let session = Session::dummy();
2409 let conn_catalog = catalog.for_session(&session);
2410 assert_ne!(
2411 conn_catalog.effective_search_path(false),
2412 conn_catalog.search_path
2413 );
2414 assert_ne!(
2415 conn_catalog.effective_search_path(true),
2416 conn_catalog.search_path
2417 );
2418 assert_eq!(
2419 conn_catalog.effective_search_path(false),
2420 vec![
2421 mz_catalog_schema.clone(),
2422 pg_catalog_schema.clone(),
2423 conn_catalog.search_path[0].clone()
2424 ]
2425 );
2426 assert_eq!(
2427 conn_catalog.effective_search_path(true),
2428 vec![
2429 mz_temp_schema.clone(),
2430 mz_catalog_schema.clone(),
2431 pg_catalog_schema.clone(),
2432 conn_catalog.search_path[0].clone()
2433 ]
2434 );
2435
2436 let mut session = Session::dummy();
2438 session
2439 .vars_mut()
2440 .set(
2441 &SystemVars::new(),
2442 "search_path",
2443 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2444 false,
2445 )
2446 .expect("failed to set search_path");
2447 let conn_catalog = catalog.for_session(&session);
2448 assert_ne!(
2449 conn_catalog.effective_search_path(false),
2450 conn_catalog.search_path
2451 );
2452 assert_ne!(
2453 conn_catalog.effective_search_path(true),
2454 conn_catalog.search_path
2455 );
2456 assert_eq!(
2457 conn_catalog.effective_search_path(false),
2458 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2459 );
2460 assert_eq!(
2461 conn_catalog.effective_search_path(true),
2462 vec![
2463 mz_temp_schema.clone(),
2464 mz_catalog_schema.clone(),
2465 pg_catalog_schema.clone()
2466 ]
2467 );
2468
2469 let mut session = Session::dummy();
2470 session
2471 .vars_mut()
2472 .set(
2473 &SystemVars::new(),
2474 "search_path",
2475 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2476 false,
2477 )
2478 .expect("failed to set search_path");
2479 let conn_catalog = catalog.for_session(&session);
2480 assert_ne!(
2481 conn_catalog.effective_search_path(false),
2482 conn_catalog.search_path
2483 );
2484 assert_ne!(
2485 conn_catalog.effective_search_path(true),
2486 conn_catalog.search_path
2487 );
2488 assert_eq!(
2489 conn_catalog.effective_search_path(false),
2490 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2491 );
2492 assert_eq!(
2493 conn_catalog.effective_search_path(true),
2494 vec![
2495 mz_temp_schema.clone(),
2496 pg_catalog_schema.clone(),
2497 mz_catalog_schema.clone()
2498 ]
2499 );
2500
2501 let mut session = Session::dummy();
2502 session
2503 .vars_mut()
2504 .set(
2505 &SystemVars::new(),
2506 "search_path",
2507 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2508 false,
2509 )
2510 .expect("failed to set search_path");
2511 let conn_catalog = catalog.for_session(&session);
2512 assert_ne!(
2513 conn_catalog.effective_search_path(false),
2514 conn_catalog.search_path
2515 );
2516 assert_ne!(
2517 conn_catalog.effective_search_path(true),
2518 conn_catalog.search_path
2519 );
2520 assert_eq!(
2521 conn_catalog.effective_search_path(false),
2522 vec![
2523 mz_catalog_schema.clone(),
2524 pg_catalog_schema.clone(),
2525 mz_temp_schema.clone()
2526 ]
2527 );
2528 assert_eq!(
2529 conn_catalog.effective_search_path(true),
2530 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2531 );
2532 catalog.expire().await;
2533 })
2534 .await
2535 }
2536
2537 #[mz_ore::test(tokio::test)]
2538 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2540 use mz_ore::collections::CollectionExt;
2541 Catalog::with_debug(|catalog| async move {
2542 let conn_catalog = catalog.for_system_session();
2543 let scx = &mut StatementContext::new(None, &conn_catalog);
2544
2545 let parsed = mz_sql_parser::parser::parse_statements(
2546 "create view public.foo as select 1 as bar",
2547 )
2548 .expect("")
2549 .into_element()
2550 .ast;
2551
2552 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2553
2554 assert_eq!(
2556 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2557 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2558 );
2559 catalog.expire().await;
2560 })
2561 .await;
2562 }
2563
2564 #[mz_ore::test(tokio::test)]
2566 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2568 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2569 let column = ", 1";
2570 let view_def_size = view_def.bytes().count();
2571 let column_size = column.bytes().count();
2572 let column_count =
2573 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2574 let columns = iter::repeat(column).take(column_count).join("");
2575 let create_sql = format!("{view_def}{columns})");
2576 let create_sql_check = create_sql.clone();
2577 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2578 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2579 &create_sql
2580 ));
2581
2582 let persist_client = PersistClient::new_for_tests().await;
2583 let organization_id = Uuid::new_v4();
2584 let id = CatalogItemId::User(1);
2585 let gid = GlobalId::User(1);
2586 let bootstrap_args = test_bootstrap_args();
2587 {
2588 let mut catalog = Catalog::open_debug_catalog(
2589 persist_client.clone(),
2590 organization_id.clone(),
2591 &bootstrap_args,
2592 )
2593 .await
2594 .expect("unable to open debug catalog");
2595 let item = catalog
2596 .state()
2597 .deserialize_item(
2598 gid,
2599 &create_sql,
2600 &BTreeMap::new(),
2601 &mut LocalExpressionCache::Closed,
2602 None,
2603 )
2604 .expect("unable to parse view");
2605 let commit_ts = catalog.current_upper().await;
2606 catalog
2607 .transact(
2608 None,
2609 commit_ts,
2610 None,
2611 vec![Op::CreateItem {
2612 item,
2613 name: QualifiedItemName {
2614 qualifiers: ItemQualifiers {
2615 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2616 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2617 },
2618 item: "v".to_string(),
2619 },
2620 id,
2621 owner_id: MZ_SYSTEM_ROLE_ID,
2622 }],
2623 )
2624 .await
2625 .expect("failed to transact");
2626 catalog.expire().await;
2627 }
2628 {
2629 let catalog =
2630 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2631 .await
2632 .expect("unable to open debug catalog");
2633 let view = catalog.get_entry(&id);
2634 assert_eq!("v", view.name.item);
2635 match &view.item {
2636 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2637 item => panic!("expected view, got {}", item.typ()),
2638 }
2639 catalog.expire().await;
2640 }
2641 }
2642
2643 #[mz_ore::test(tokio::test)]
2644 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2646 Catalog::with_debug(|catalog| async move {
2647 let conn_catalog = catalog.for_system_session();
2648
2649 assert_eq!(
2650 mz_sql::catalog::ObjectType::ClusterReplica,
2651 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2652 ClusterId::user(1).expect("1 is a valid ID"),
2653 ReplicaId::User(1)
2654 )))
2655 );
2656 assert_eq!(
2657 mz_sql::catalog::ObjectType::Role,
2658 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2659 );
2660 catalog.expire().await;
2661 })
2662 .await;
2663 }
2664
2665 #[mz_ore::test(tokio::test)]
2666 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2668 Catalog::with_debug(|catalog| async move {
2669 let conn_catalog = catalog.for_system_session();
2670
2671 assert_eq!(
2672 None,
2673 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2674 ClusterId::user(1).expect("1 is a valid ID"),
2675 ReplicaId::User(1),
2676 ))))
2677 );
2678 assert_eq!(
2679 None,
2680 conn_catalog
2681 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2682 );
2683 catalog.expire().await;
2684 })
2685 .await;
2686 }
2687
2688 #[mz_ore::test(tokio::test)]
2689 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2691 Catalog::with_debug(|catalog| async move {
2692 let conn_catalog = catalog.for_system_session();
2693
2694 for builtin in BUILTINS::iter() {
2695 let (schema, name, expected_desc) = match builtin {
2696 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2697 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2698 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2699 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2700 Builtin::Log(_)
2701 | Builtin::Type(_)
2702 | Builtin::Func(_)
2703 | Builtin::Index(_)
2704 | Builtin::Connection(_) => continue,
2705 };
2706 let item = conn_catalog
2707 .resolve_item(&PartialItemName {
2708 database: None,
2709 schema: Some(schema.to_string()),
2710 item: name.to_string(),
2711 })
2712 .expect("unable to resolve item")
2713 .at_version(RelationVersionSelector::Latest);
2714
2715 let actual_desc = item.relation_desc().expect("invalid item type");
2716 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2717 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2718 {
2719 assert_eq!(
2720 actual_name, expected_name,
2721 "item {schema}.{name} column {index} name did not match its expected name"
2722 );
2723 assert_eq!(
2724 actual_typ, expected_typ,
2725 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2726 );
2727 }
2728 assert_eq!(
2729 &*actual_desc, expected_desc,
2730 "item {schema}.{name} did not match its expected RelationDesc"
2731 );
2732 }
2733 catalog.expire().await;
2734 })
2735 .await
2736 }
2737
2738 #[mz_ore::test(tokio::test)]
2741 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2743 async fn inner(catalog: Catalog) {
2744 let (client, connection) = tokio_postgres::connect(
2748 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2749 NoTls,
2750 )
2751 .await
2752 .expect("failed to connect to Postgres");
2753
2754 task::spawn(|| "compare_builtin_postgres", async move {
2755 if let Err(e) = connection.await {
2756 panic!("connection error: {}", e);
2757 }
2758 });
2759
2760 struct PgProc {
2761 name: String,
2762 arg_oids: Vec<u32>,
2763 ret_oid: Option<u32>,
2764 ret_set: bool,
2765 }
2766
2767 struct PgType {
2768 name: String,
2769 ty: String,
2770 elem: u32,
2771 array: u32,
2772 input: u32,
2773 receive: u32,
2774 }
2775
2776 struct PgOper {
2777 oprresult: u32,
2778 name: String,
2779 }
2780
2781 let pg_proc: BTreeMap<_, _> = client
2782 .query(
2783 "SELECT
2784 p.oid,
2785 proname,
2786 proargtypes,
2787 prorettype,
2788 proretset
2789 FROM pg_proc p
2790 JOIN pg_namespace n ON p.pronamespace = n.oid",
2791 &[],
2792 )
2793 .await
2794 .expect("pg query failed")
2795 .into_iter()
2796 .map(|row| {
2797 let oid: u32 = row.get("oid");
2798 let pg_proc = PgProc {
2799 name: row.get("proname"),
2800 arg_oids: row.get("proargtypes"),
2801 ret_oid: row.get("prorettype"),
2802 ret_set: row.get("proretset"),
2803 };
2804 (oid, pg_proc)
2805 })
2806 .collect();
2807
2808 let pg_type: BTreeMap<_, _> = client
2809 .query(
2810 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2811 &[],
2812 )
2813 .await
2814 .expect("pg query failed")
2815 .into_iter()
2816 .map(|row| {
2817 let oid: u32 = row.get("oid");
2818 let pg_type = PgType {
2819 name: row.get("typname"),
2820 ty: row.get("typtype"),
2821 elem: row.get("typelem"),
2822 array: row.get("typarray"),
2823 input: row.get("typinput"),
2824 receive: row.get("typreceive"),
2825 };
2826 (oid, pg_type)
2827 })
2828 .collect();
2829
2830 let pg_oper: BTreeMap<_, _> = client
2831 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2832 .await
2833 .expect("pg query failed")
2834 .into_iter()
2835 .map(|row| {
2836 let oid: u32 = row.get("oid");
2837 let pg_oper = PgOper {
2838 name: row.get("oprname"),
2839 oprresult: row.get("oprresult"),
2840 };
2841 (oid, pg_oper)
2842 })
2843 .collect();
2844
2845 let conn_catalog = catalog.for_system_session();
2846 let resolve_type_oid = |item: &str| {
2847 conn_catalog
2848 .resolve_type(&PartialItemName {
2849 database: None,
2850 schema: Some(PG_CATALOG_SCHEMA.into()),
2853 item: item.to_string(),
2854 })
2855 .expect("unable to resolve type")
2856 .oid()
2857 };
2858
2859 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2860 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2861 .collect();
2862
2863 let mut all_oids = BTreeSet::new();
2864
2865 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2868 [
2869 (Type::NAME, Type::TEXT),
2871 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2872 (Type::TIME, Type::TIMETZ),
2874 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2875 ]
2876 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2877 );
2878 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2879 1619, ]);
2881 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2882 if ignore_return_types.contains(&fn_oid) {
2883 return true;
2884 }
2885 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2886 return true;
2887 }
2888 a == b
2889 };
2890
2891 for builtin in BUILTINS::iter() {
2892 match builtin {
2893 Builtin::Type(ty) => {
2894 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2895
2896 if ty.oid >= FIRST_MATERIALIZE_OID {
2897 continue;
2900 }
2901
2902 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2905 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2906 });
2907 assert_eq!(
2908 ty.name, pg_ty.name,
2909 "oid {} has name {} in postgres; expected {}",
2910 ty.oid, pg_ty.name, ty.name,
2911 );
2912
2913 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2914 None => (0, 0),
2915 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2916 };
2917 assert_eq!(
2918 typinput_oid, pg_ty.input,
2919 "type {} has typinput OID {:?} in mz but {:?} in pg",
2920 ty.name, typinput_oid, pg_ty.input,
2921 );
2922 assert_eq!(
2923 typreceive_oid, pg_ty.receive,
2924 "type {} has typreceive OID {:?} in mz but {:?} in pg",
2925 ty.name, typreceive_oid, pg_ty.receive,
2926 );
2927 if typinput_oid != 0 {
2928 assert!(
2929 func_oids.contains(&typinput_oid),
2930 "type {} has typinput OID {} that does not exist in pg_proc",
2931 ty.name,
2932 typinput_oid,
2933 );
2934 }
2935 if typreceive_oid != 0 {
2936 assert!(
2937 func_oids.contains(&typreceive_oid),
2938 "type {} has typreceive OID {} that does not exist in pg_proc",
2939 ty.name,
2940 typreceive_oid,
2941 );
2942 }
2943
2944 match &ty.details.typ {
2946 CatalogType::Array { element_reference } => {
2947 let elem_ty = BUILTINS::iter()
2948 .filter_map(|builtin| match builtin {
2949 Builtin::Type(ty @ BuiltinType { name, .. })
2950 if element_reference == name =>
2951 {
2952 Some(ty)
2953 }
2954 _ => None,
2955 })
2956 .next();
2957 let elem_ty = match elem_ty {
2958 Some(ty) => ty,
2959 None => {
2960 panic!("{} is unexpectedly not a type", element_reference)
2961 }
2962 };
2963 assert_eq!(
2964 pg_ty.elem, elem_ty.oid,
2965 "type {} has mismatched element OIDs",
2966 ty.name
2967 )
2968 }
2969 CatalogType::Pseudo => {
2970 assert_eq!(
2971 pg_ty.ty, "p",
2972 "type {} is not a pseudo type as expected",
2973 ty.name
2974 )
2975 }
2976 CatalogType::Range { .. } => {
2977 assert_eq!(
2978 pg_ty.ty, "r",
2979 "type {} is not a range type as expected",
2980 ty.name
2981 );
2982 }
2983 _ => {
2984 assert_eq!(
2985 pg_ty.ty, "b",
2986 "type {} is not a base type as expected",
2987 ty.name
2988 )
2989 }
2990 }
2991
2992 let schema = catalog
2994 .resolve_schema_in_database(
2995 &ResolvedDatabaseSpecifier::Ambient,
2996 ty.schema,
2997 &SYSTEM_CONN_ID,
2998 )
2999 .expect("unable to resolve schema");
3000 let allocated_type = catalog
3001 .resolve_type(
3002 None,
3003 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3004 &PartialItemName {
3005 database: None,
3006 schema: Some(schema.name().schema.clone()),
3007 item: ty.name.to_string(),
3008 },
3009 &SYSTEM_CONN_ID,
3010 )
3011 .expect("unable to resolve type");
3012 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3013 ty
3014 } else {
3015 panic!("unexpectedly not a type")
3016 };
3017 match ty.details.array_id {
3018 Some(array_id) => {
3019 let array_ty = catalog.get_entry(&array_id);
3020 assert_eq!(
3021 pg_ty.array, array_ty.oid,
3022 "type {} has mismatched array OIDs",
3023 allocated_type.name.item,
3024 );
3025 }
3026 None => assert_eq!(
3027 pg_ty.array, 0,
3028 "type {} does not have an array type in mz but does in pg",
3029 allocated_type.name.item,
3030 ),
3031 }
3032 }
3033 Builtin::Func(func) => {
3034 for imp in func.inner.func_impls() {
3035 assert!(
3036 all_oids.insert(imp.oid),
3037 "{} reused oid {}",
3038 func.name,
3039 imp.oid
3040 );
3041
3042 assert!(
3043 imp.oid < FIRST_USER_OID,
3044 "built-in function {} erroneously has OID in user space ({})",
3045 func.name,
3046 imp.oid,
3047 );
3048
3049 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3052 continue;
3053 } else {
3054 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3055 panic!(
3056 "pg_proc missing function {}: oid {}",
3057 func.name, imp.oid
3058 )
3059 })
3060 };
3061 assert_eq!(
3062 func.name, pg_fn.name,
3063 "funcs with oid {} don't match names: {} in mz, {} in pg",
3064 imp.oid, func.name, pg_fn.name
3065 );
3066
3067 let imp_arg_oids = imp
3070 .arg_typs
3071 .iter()
3072 .map(|item| resolve_type_oid(item))
3073 .collect::<Vec<_>>();
3074
3075 if imp_arg_oids != pg_fn.arg_oids {
3076 println!(
3077 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3078 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3079 );
3080 }
3081
3082 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3083
3084 assert!(
3085 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3086 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3087 imp.oid,
3088 func.name,
3089 imp_return_oid,
3090 pg_fn.ret_oid
3091 );
3092
3093 assert_eq!(
3094 imp.return_is_set, pg_fn.ret_set,
3095 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3096 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3097 );
3098 }
3099 }
3100 _ => (),
3101 }
3102 }
3103
3104 for (op, func) in OP_IMPLS.iter() {
3105 for imp in func.func_impls() {
3106 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3107
3108 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3110 continue;
3111 } else {
3112 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3113 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3114 })
3115 };
3116
3117 assert_eq!(*op, pg_op.name);
3118
3119 let imp_return_oid =
3120 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3121 if imp_return_oid != pg_op.oprresult {
3122 panic!(
3123 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3124 imp.oid, op, imp_return_oid, pg_op.oprresult
3125 );
3126 }
3127 }
3128 }
3129 catalog.expire().await;
3130 }
3131
3132 Catalog::with_debug(inner).await
3133 }
3134
3135 #[mz_ore::test(tokio::test)]
3137 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3139 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3140 let catalog = Arc::new(catalog);
3141 let conn_catalog = catalog.for_system_session();
3142
3143 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3144 let mut handles = Vec::new();
3145
3146 let ignore_names = BTreeSet::from([
3148 "avg",
3149 "avg_internal_v1",
3150 "bool_and",
3151 "bool_or",
3152 "has_table_privilege", "has_type_privilege", "mod",
3155 "mz_panic",
3156 "mz_sleep",
3157 "pow",
3158 "stddev_pop",
3159 "stddev_samp",
3160 "stddev",
3161 "var_pop",
3162 "var_samp",
3163 "variance",
3164 ]);
3165
3166 let fns = BUILTINS::funcs()
3167 .map(|func| (&func.name, func.inner))
3168 .chain(OP_IMPLS.iter());
3169
3170 for (name, func) in fns {
3171 if ignore_names.contains(name) {
3172 continue;
3173 }
3174 let Func::Scalar(impls) = func else {
3175 continue;
3176 };
3177
3178 'outer: for imp in impls {
3179 let details = imp.details();
3180 let mut styps = Vec::new();
3181 for item in details.arg_typs.iter() {
3182 let oid = resolve_type_oid(item);
3183 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3184 continue 'outer;
3185 };
3186 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3187 }
3188 let datums = styps
3189 .iter()
3190 .map(|styp| {
3191 let mut datums = vec![Datum::Null];
3192 datums.extend(styp.interesting_datums());
3193 datums
3194 })
3195 .collect::<Vec<_>>();
3196 if datums.is_empty() {
3198 continue;
3199 }
3200
3201 let return_oid = details
3202 .return_typ
3203 .map(resolve_type_oid)
3204 .expect("must exist");
3205 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3206 .ok()
3207 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3208
3209 let mut idxs = vec![0; datums.len()];
3210 while idxs[0] < datums[0].len() {
3211 let mut args = Vec::with_capacity(idxs.len());
3212 for i in 0..(datums.len()) {
3213 args.push(datums[i][idxs[i]]);
3214 }
3215
3216 let op = &imp.op;
3217 let scalars = args
3218 .iter()
3219 .enumerate()
3220 .map(|(i, datum)| {
3221 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3222 datum.clone(),
3223 styps[i].clone(),
3224 ))
3225 })
3226 .collect();
3227
3228 let call_name = format!(
3229 "{name}({}) (oid: {})",
3230 args.iter()
3231 .map(|d| d.to_string())
3232 .collect::<Vec<_>>()
3233 .join(", "),
3234 imp.oid
3235 );
3236 let catalog = Arc::clone(&catalog);
3237 let call_name_fn = call_name.clone();
3238 let return_styp = return_styp.clone();
3239 let handle = task::spawn_blocking(
3240 || call_name,
3241 move || {
3242 smoketest_fn(
3243 name,
3244 call_name_fn,
3245 op,
3246 imp,
3247 args,
3248 catalog,
3249 scalars,
3250 return_styp,
3251 )
3252 },
3253 );
3254 handles.push(handle);
3255
3256 for i in (0..datums.len()).rev() {
3258 idxs[i] += 1;
3259 if idxs[i] >= datums[i].len() {
3260 if i == 0 {
3261 break;
3262 }
3263 idxs[i] = 0;
3264 continue;
3265 } else {
3266 break;
3267 }
3268 }
3269 }
3270 }
3271 }
3272 handles
3273 }
3274
3275 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3276 for handle in handles {
3277 handle.await;
3278 }
3279 }
3280
3281 fn smoketest_fn(
3282 name: &&str,
3283 call_name: String,
3284 op: &Operation<HirScalarExpr>,
3285 imp: &FuncImpl<HirScalarExpr>,
3286 args: Vec<Datum<'_>>,
3287 catalog: Arc<Catalog>,
3288 scalars: Vec<CoercibleScalarExpr>,
3289 return_styp: Option<SqlScalarType>,
3290 ) {
3291 let conn_catalog = catalog.for_system_session();
3292 let pcx = PlanContext::zero();
3293 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3294 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3295 let ecx = ExprContext {
3296 qcx: &qcx,
3297 name: "smoketest",
3298 scope: &Scope::empty(),
3299 relation_type: &SqlRelationType::empty(),
3300 allow_aggregates: false,
3301 allow_subqueries: false,
3302 allow_parameters: false,
3303 allow_windows: false,
3304 };
3305 let arena = RowArena::new();
3306 let mut session = Session::dummy();
3307 session
3308 .start_transaction(to_datetime(0), None, None)
3309 .expect("must succeed");
3310 let prep_style = ExprPrepOneShot {
3311 logical_time: EvalTime::Time(Timestamp::MIN),
3312 session: &session,
3313 catalog_state: &catalog.state,
3314 };
3315
3316 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3319 if let Ok(hir) = res {
3320 let uneliminated_result_row = {
3321 if let HirScalarExpr::CallUnary { func, .. } = &hir
3322 && func.is_eliminable_cast()
3323 {
3324 let mut uneliminated_mir = hir
3325 .clone()
3326 .lower_uncorrelated(HirToMirConfig {
3327 enable_cast_elimination: false,
3328 ..catalog.system_config().into()
3329 })
3330 .expect("lowering eliminable cast should always succeed");
3331 prep_style
3332 .prep_scalar_expr(&mut uneliminated_mir)
3333 .expect("must succeed");
3334
3335 uneliminated_mir
3337 .eval(&[], &arena)
3338 .ok()
3339 .map(|datum| Row::pack([datum]))
3340 } else {
3341 None
3342 }
3343 };
3344
3345 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3346 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3348
3349 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3350 if let Some(return_styp) = return_styp {
3351 let mir_typ = mir.typ(&[]);
3352 soft_assert_eq_or_log!(
3355 mir_typ.scalar_type,
3356 (&return_styp).into(),
3357 "MIR type did not match the catalog type (cast elimination/repr type error)"
3358 );
3359 if !eval_result_datum.is_instance_of(&mir_typ) {
3363 panic!(
3364 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3365 );
3366 }
3367 if let Some(row) = uneliminated_result_row {
3369 let uneliminated_result_datum = row.unpack_first();
3370 assert_eq!(
3371 uneliminated_result_datum, eval_result_datum,
3372 "datums should not change if cast is eliminable"
3373 );
3374 }
3375 if let Some((introduces_nulls, propagates_nulls)) =
3378 call_introduces_propagates_nulls(&mir)
3379 {
3380 if introduces_nulls {
3381 assert!(
3385 mir_typ.nullable,
3386 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3387 name, args, mir, mir_typ.nullable
3388 );
3389 } else {
3390 let any_input_null = args.iter().any(|arg| arg.is_null());
3391 if !any_input_null {
3392 assert!(
3393 !mir_typ.nullable,
3394 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3395 name, args, mir, mir_typ.nullable
3396 );
3397 } else if propagates_nulls {
3398 assert!(
3401 mir_typ.nullable,
3402 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3403 name, args, mir, mir_typ.nullable
3404 );
3405 }
3406 }
3411 }
3412 let mut reduced = mir.clone();
3415 reduced.reduce(&[]);
3416 match reduced {
3417 MirScalarExpr::Literal(reduce_result, ctyp) => {
3418 match reduce_result {
3419 Ok(reduce_result_row) => {
3420 let reduce_result_datum = reduce_result_row.unpack_first();
3421 assert_eq!(
3422 reduce_result_datum,
3423 eval_result_datum,
3424 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3425 name,
3426 args,
3427 mir,
3428 eval_result_datum,
3429 mir_typ.scalar_type,
3430 reduce_result_datum,
3431 ctyp.scalar_type
3432 );
3433 assert_eq!(
3439 ctyp.scalar_type,
3440 mir_typ.scalar_type,
3441 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3442 name,
3443 args,
3444 mir,
3445 eval_result_datum,
3446 mir_typ.scalar_type,
3447 reduce_result_datum,
3448 ctyp.scalar_type
3449 );
3450 }
3451 Err(..) => {} }
3453 }
3454 _ => unreachable!(
3455 "all args are literals, so should have reduced to a literal"
3456 ),
3457 }
3458 }
3459 }
3460 }
3461 }
3462 }
3463
3464 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3469 match mir_func_call {
3470 MirScalarExpr::CallUnary { func, expr } => {
3471 if expr.is_literal() {
3472 Some((func.introduces_nulls(), func.propagates_nulls()))
3473 } else {
3474 None
3475 }
3476 }
3477 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3478 if expr1.is_literal() && expr2.is_literal() {
3479 Some((func.introduces_nulls(), func.propagates_nulls()))
3480 } else {
3481 None
3482 }
3483 }
3484 MirScalarExpr::CallVariadic { func, exprs } => {
3485 if exprs.iter().all(|arg| arg.is_literal()) {
3486 Some((func.introduces_nulls(), func.propagates_nulls()))
3487 } else {
3488 None
3489 }
3490 }
3491 _ => None,
3492 }
3493 }
3494
3495 #[mz_ore::test(tokio::test)]
3497 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3499 Catalog::with_debug(|catalog| async move {
3500 let conn_catalog = catalog.for_system_session();
3501
3502 for view in BUILTINS::views().filter(|view| {
3503 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3504 }) {
3505 let item = conn_catalog
3506 .resolve_item(&PartialItemName {
3507 database: None,
3508 schema: Some(view.schema.to_string()),
3509 item: view.name.to_string(),
3510 })
3511 .expect("unable to resolve view")
3512 .at_version(RelationVersionSelector::Latest);
3514 let full_name = conn_catalog.resolve_full_name(item.name());
3515 let desc = item.relation_desc().expect("invalid item type");
3516 for col_type in desc.iter_types() {
3517 match &col_type.scalar_type {
3518 typ @ SqlScalarType::UInt16
3519 | typ @ SqlScalarType::UInt32
3520 | typ @ SqlScalarType::UInt64
3521 | typ @ SqlScalarType::MzTimestamp
3522 | typ @ SqlScalarType::List { .. }
3523 | typ @ SqlScalarType::Map { .. }
3524 | typ @ SqlScalarType::MzAclItem => {
3525 panic!("{typ:?} type found in {full_name}");
3526 }
3527 SqlScalarType::AclItem
3528 | SqlScalarType::Bool
3529 | SqlScalarType::Int16
3530 | SqlScalarType::Int32
3531 | SqlScalarType::Int64
3532 | SqlScalarType::Float32
3533 | SqlScalarType::Float64
3534 | SqlScalarType::Numeric { .. }
3535 | SqlScalarType::Date
3536 | SqlScalarType::Time
3537 | SqlScalarType::Timestamp { .. }
3538 | SqlScalarType::TimestampTz { .. }
3539 | SqlScalarType::Interval
3540 | SqlScalarType::PgLegacyChar
3541 | SqlScalarType::Bytes
3542 | SqlScalarType::String
3543 | SqlScalarType::Char { .. }
3544 | SqlScalarType::VarChar { .. }
3545 | SqlScalarType::Jsonb
3546 | SqlScalarType::Uuid
3547 | SqlScalarType::Array(_)
3548 | SqlScalarType::Record { .. }
3549 | SqlScalarType::Oid
3550 | SqlScalarType::RegProc
3551 | SqlScalarType::RegType
3552 | SqlScalarType::RegClass
3553 | SqlScalarType::Int2Vector
3554 | SqlScalarType::Range { .. }
3555 | SqlScalarType::PgLegacyName => {}
3556 }
3557 }
3558 }
3559 catalog.expire().await;
3560 })
3561 .await
3562 }
3563
3564 #[mz_ore::test(tokio::test)]
3567 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3569 Catalog::with_debug(|catalog| async move {
3570 let conn_catalog = catalog.for_system_session();
3571
3572 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3573 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3574
3575 for entry in catalog.entries() {
3576 let schema_spec = entry.name().qualifiers.schema_spec;
3577 let introspection_deps = catalog.introspection_dependencies(entry.id);
3578 if introspection_deps.is_empty() {
3579 assert!(
3580 schema_spec != introspection_schema_spec,
3581 "entry does not depend on introspection sources but is in \
3582 `mz_introspection`: {}",
3583 conn_catalog.resolve_full_name(entry.name()),
3584 );
3585 } else {
3586 assert!(
3587 schema_spec == introspection_schema_spec,
3588 "entry depends on introspection sources but is not in \
3589 `mz_introspection`: {}",
3590 conn_catalog.resolve_full_name(entry.name()),
3591 );
3592 }
3593 }
3594 })
3595 .await
3596 }
3597
3598 #[mz_ore::test(tokio::test)]
3599 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3601 let persist_client = PersistClient::new_for_tests().await;
3602 let bootstrap_args = test_bootstrap_args();
3603 let organization_id = Uuid::new_v4();
3604 let db_name = "DB";
3605
3606 let mut writer_catalog = Catalog::open_debug_catalog(
3607 persist_client.clone(),
3608 organization_id.clone(),
3609 &bootstrap_args,
3610 )
3611 .await
3612 .expect("open_debug_catalog");
3613 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3614 persist_client.clone(),
3615 organization_id.clone(),
3616 &bootstrap_args,
3617 )
3618 .await
3619 .expect("open_debug_read_only_catalog");
3620 assert_err!(writer_catalog.resolve_database(db_name));
3621 assert_err!(read_only_catalog.resolve_database(db_name));
3622
3623 let commit_ts = writer_catalog.current_upper().await;
3624 writer_catalog
3625 .transact(
3626 None,
3627 commit_ts,
3628 None,
3629 vec![Op::CreateDatabase {
3630 name: db_name.to_string(),
3631 owner_id: MZ_SYSTEM_ROLE_ID,
3632 }],
3633 )
3634 .await
3635 .expect("failed to transact");
3636
3637 let write_db = writer_catalog
3638 .resolve_database(db_name)
3639 .expect("resolve_database");
3640 read_only_catalog
3641 .sync_to_current_updates()
3642 .await
3643 .expect("sync_to_current_updates");
3644 let read_db = read_only_catalog
3645 .resolve_database(db_name)
3646 .expect("resolve_database");
3647
3648 assert_eq!(write_db, read_db);
3649
3650 let writer_catalog_fencer =
3651 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3652 .await
3653 .expect("open_debug_catalog for fencer");
3654 let fencer_db = writer_catalog_fencer
3655 .resolve_database(db_name)
3656 .expect("resolve_database for fencer");
3657 assert_eq!(fencer_db, read_db);
3658
3659 let write_fence_err = writer_catalog
3660 .sync_to_current_updates()
3661 .await
3662 .expect_err("sync_to_current_updates for fencer");
3663 assert!(matches!(
3664 write_fence_err,
3665 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3666 ));
3667 let read_fence_err = read_only_catalog
3668 .sync_to_current_updates()
3669 .await
3670 .expect_err("sync_to_current_updates after fencer");
3671 assert!(matches!(
3672 read_fence_err,
3673 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3674 ));
3675
3676 writer_catalog.expire().await;
3677 read_only_catalog.expire().await;
3678 writer_catalog_fencer.expire().await;
3679 }
3680}