1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, STORAGE_USAGE_ID_ALLOC_KEY, TestCatalogStateBuilder,
39 test_bootstrap_args,
40};
41use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
42use mz_catalog::memory::error::{Error, ErrorKind};
43use mz_catalog::memory::objects::{
44 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
45 NetworkPolicy, Role, RoleAuth, Schema,
46};
47use mz_compute_types::dataflows::DataflowDescription;
48use mz_controller::clusters::ReplicaLocation;
49use mz_controller_types::{ClusterId, ReplicaId};
50use mz_expr::OptimizedMirRelationExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68 SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 expr_cache_handle: Option<ExpressionCacheHandle>,
140 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141 transient_revision: u64,
142}
143
144impl Clone for Catalog {
147 fn clone(&self) -> Self {
148 Self {
149 state: self.state.clone(),
150 expr_cache_handle: self.expr_cache_handle.clone(),
151 storage: Arc::clone(&self.storage),
152 transient_revision: self.transient_revision,
153 }
154 }
155}
156
157impl Catalog {
158 #[mz_ore::instrument(level = "trace")]
164 pub fn set_optimized_plan(
165 &mut self,
166 id: GlobalId,
167 plan: DataflowDescription<OptimizedMirRelationExpr>,
168 ) {
169 self.state.set_optimized_plan(id, plan);
170 }
171
172 #[mz_ore::instrument(level = "trace")]
178 pub fn set_physical_plan(
179 &mut self,
180 id: GlobalId,
181 plan: DataflowDescription<mz_compute_types::plan::Plan>,
182 ) {
183 self.state.set_physical_plan(id, plan);
184 }
185
186 #[mz_ore::instrument(level = "trace")]
188 pub fn try_get_optimized_plan(
189 &self,
190 id: &GlobalId,
191 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192 let entry = self.state.try_get_entry_by_global_id(id)?;
193 entry.item().optimized_plan().map(AsRef::as_ref)
194 }
195
196 #[mz_ore::instrument(level = "trace")]
198 pub fn try_get_physical_plan(
199 &self,
200 id: &GlobalId,
201 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202 let entry = self.state.try_get_entry_by_global_id(id)?;
203 entry.item().physical_plan().map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
212 pub fn set_dataflow_metainfo(
213 &mut self,
214 id: GlobalId,
215 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216 ) {
217 self.state.set_dataflow_metainfo(id, metainfo);
218 }
219
220 #[mz_ore::instrument(level = "trace")]
222 pub fn try_get_dataflow_metainfo(
223 &self,
224 id: &GlobalId,
225 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226 let entry = self.state.try_get_entry_by_global_id(id)?;
227 entry.item().dataflow_metainfo()
228 }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233 state: Cow<'a, CatalogState>,
234 unresolvable_ids: BTreeSet<CatalogItemId>,
244 conn_id: ConnectionId,
245 cluster: String,
246 database: Option<DatabaseId>,
247 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248 role_id: RoleId,
249 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250 portals: Option<&'a BTreeMap<String, Portal>>,
251 notices_tx: UnboundedSender<AdapterNotice>,
252 restrict_to_user_objects: bool,
253}
254
255impl ConnCatalog<'_> {
256 pub fn conn_id(&self) -> &ConnectionId {
257 &self.conn_id
258 }
259
260 pub fn state(&self) -> &CatalogState {
261 &*self.state
262 }
263
264 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
274 assert_eq!(
275 self.role_id, MZ_SYSTEM_ROLE_ID,
276 "only the system role can mark IDs unresolvable",
277 );
278 self.unresolvable_ids.insert(id);
279 }
280
281 pub fn effective_search_path(
287 &self,
288 include_temp_schema: bool,
289 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
290 self.state
291 .effective_search_path(&self.search_path, include_temp_schema)
292 }
293}
294
295impl ConnectionResolver for ConnCatalog<'_> {
296 fn resolve_connection(
297 &self,
298 id: CatalogItemId,
299 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
300 self.state().resolve_connection(id)
301 }
302}
303
304impl Catalog {
305 pub fn transient_revision(&self) -> u64 {
309 self.transient_revision
310 }
311
312 pub async fn with_debug<F, Fut, T>(f: F) -> T
324 where
325 F: FnOnce(Catalog) -> Fut,
326 Fut: Future<Output = T>,
327 {
328 let persist_client = PersistClient::new_for_tests().await;
329 let organization_id = Uuid::new_v4();
330 let bootstrap_args = test_bootstrap_args();
331 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
332 .await
333 .expect("can open debug catalog");
334 f(catalog).await
335 }
336
337 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
340 where
341 F: FnOnce(Catalog) -> Fut,
342 Fut: Future<Output = T>,
343 {
344 let persist_client = PersistClient::new_for_tests().await;
345 let organization_id = Uuid::new_v4();
346 let bootstrap_args = test_bootstrap_args();
347 let mut catalog =
348 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
349 .await
350 .expect("can open debug catalog");
351
352 let now = SYSTEM_TIME.clone();
354 let openable_storage = TestCatalogStateBuilder::new(persist_client)
355 .with_organization_id(organization_id)
356 .with_default_deploy_generation()
357 .build()
358 .await
359 .expect("can create durable catalog");
360 let mut storage = openable_storage
361 .open(now().into(), &bootstrap_args)
362 .await
363 .expect("can open durable catalog")
364 .0;
365 let _ = storage
367 .sync_to_current_updates()
368 .await
369 .expect("can sync to current updates");
370 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
371
372 f(catalog).await
373 }
374
375 pub async fn open_debug_catalog(
379 persist_client: PersistClient,
380 organization_id: Uuid,
381 bootstrap_args: &BootstrapArgs,
382 ) -> Result<Catalog, anyhow::Error> {
383 let now = SYSTEM_TIME.clone();
384 let environment_id = None;
385 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
386 .with_organization_id(organization_id)
387 .with_default_deploy_generation()
388 .build()
389 .await?;
390 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
391 let system_parameter_defaults = BTreeMap::default();
392 Self::open_debug_catalog_inner(
393 persist_client,
394 storage,
395 now,
396 environment_id,
397 &DUMMY_BUILD_INFO,
398 system_parameter_defaults,
399 bootstrap_args,
400 None,
401 )
402 .await
403 }
404
405 pub async fn open_debug_read_only_catalog(
410 persist_client: PersistClient,
411 organization_id: Uuid,
412 bootstrap_args: &BootstrapArgs,
413 ) -> Result<Catalog, anyhow::Error> {
414 let now = SYSTEM_TIME.clone();
415 let environment_id = None;
416 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
417 .with_organization_id(organization_id)
418 .build()
419 .await?;
420 let storage = openable_storage
421 .open_read_only(&test_bootstrap_args())
422 .await?;
423 let system_parameter_defaults = BTreeMap::default();
424 Self::open_debug_catalog_inner(
425 persist_client,
426 storage,
427 now,
428 environment_id,
429 &DUMMY_BUILD_INFO,
430 system_parameter_defaults,
431 bootstrap_args,
432 None,
433 )
434 .await
435 }
436
437 pub async fn open_debug_read_only_persist_catalog_config(
442 persist_client: PersistClient,
443 now: NowFn,
444 environment_id: EnvironmentId,
445 system_parameter_defaults: BTreeMap<String, String>,
446 build_info: &'static BuildInfo,
447 bootstrap_args: &BootstrapArgs,
448 enable_expression_cache_override: Option<bool>,
449 ) -> Result<Catalog, anyhow::Error> {
450 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
451 .with_organization_id(environment_id.organization_id())
452 .with_version(
453 build_info
454 .version
455 .parse()
456 .expect("build version is parseable"),
457 )
458 .build()
459 .await?;
460 let storage = openable_storage.open_read_only(bootstrap_args).await?;
461 Self::open_debug_catalog_inner(
462 persist_client,
463 storage,
464 now,
465 Some(environment_id),
466 build_info,
467 system_parameter_defaults,
468 bootstrap_args,
469 enable_expression_cache_override,
470 )
471 .await
472 }
473
474 async fn open_debug_catalog_inner(
475 persist_client: PersistClient,
476 storage: Box<dyn DurableCatalogState>,
477 now: NowFn,
478 environment_id: Option<EnvironmentId>,
479 build_info: &'static BuildInfo,
480 system_parameter_defaults: BTreeMap<String, String>,
481 bootstrap_args: &BootstrapArgs,
482 enable_expression_cache_override: Option<bool>,
483 ) -> Result<Catalog, anyhow::Error> {
484 let metrics_registry = &MetricsRegistry::new();
485 let secrets_reader = Arc::new(InMemorySecretsController::new());
486 let previous_ts = now().into();
489 let replica_size = &bootstrap_args.default_cluster_replica_size;
490 let read_only = false;
491
492 let OpenCatalogResult {
493 catalog,
494 migrated_storage_collections_0dt: _,
495 new_builtin_collections: _,
496 builtin_table_updates: _,
497 cached_global_exprs: _,
498 uncached_local_exprs: _,
499 } = Catalog::open(Config {
500 storage,
501 metrics_registry,
502 state: StateConfig {
503 unsafe_mode: true,
504 all_features: false,
505 build_info,
506 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
507 read_only,
508 now,
509 boot_ts: previous_ts,
510 skip_migrations: true,
511 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
512 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
513 size: replica_size.clone(),
514 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
515 },
516 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
517 size: replica_size.clone(),
518 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
519 },
520 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
521 size: replica_size.clone(),
522 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
523 },
524 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
525 size: replica_size.clone(),
526 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
527 },
528 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
529 size: replica_size.clone(),
530 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
531 },
532 system_parameter_defaults,
533 remote_system_parameters: None,
534 availability_zones: vec![],
535 egress_addresses: vec![],
536 aws_principal_context: None,
537 aws_privatelink_availability_zones: None,
538 http_host_name: None,
539 connection_context: ConnectionContext::for_tests(secrets_reader),
540 builtin_item_migration_config: BuiltinItemMigrationConfig {
541 persist_client: persist_client.clone(),
542 read_only,
543 force_migration: None,
544 },
545 persist_client,
546 enable_expression_cache_override,
547 helm_chart_version: None,
548 external_login_password_mz_system: None,
549 license_key: ValidatedLicenseKey::for_tests(),
550 },
551 })
552 .await?;
553 Ok(catalog)
554 }
555
556 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
557 self.state.for_session(session)
558 }
559
560 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
561 self.state.for_sessionless_user(role_id)
562 }
563
564 pub fn for_system_session(&self) -> ConnCatalog<'_> {
565 self.state.for_system_session()
566 }
567
568 async fn storage<'a>(
569 &'a self,
570 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
571 self.storage.lock().await
572 }
573
574 pub async fn current_upper(&self) -> mz_repr::Timestamp {
575 self.storage().await.current_upper().await
576 }
577
578 pub async fn allocate_user_id(
579 &self,
580 commit_ts: mz_repr::Timestamp,
581 ) -> Result<(CatalogItemId, GlobalId), Error> {
582 self.storage()
583 .await
584 .allocate_user_id(commit_ts)
585 .await
586 .maybe_terminate("allocating user ids")
587 .err_into()
588 }
589
590 pub async fn allocate_user_ids(
592 &self,
593 amount: u64,
594 commit_ts: mz_repr::Timestamp,
595 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
596 self.storage()
597 .await
598 .allocate_user_ids(amount, commit_ts)
599 .await
600 .maybe_terminate("allocating user ids")
601 .err_into()
602 }
603
604 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
605 let commit_ts = self.storage().await.current_upper().await;
606 self.allocate_user_id(commit_ts).await
607 }
608
609 pub async fn allocate_storage_usage_id(
617 &self,
618 commit_ts: mz_repr::Timestamp,
619 ) -> Result<u64, Error> {
620 use mz_ore::collections::CollectionExt;
621
622 self.storage()
623 .await
624 .allocate_id(STORAGE_USAGE_ID_ALLOC_KEY, 1, commit_ts)
625 .await
626 .maybe_terminate("allocating storage usage id")
627 .map(|ids| ids.into_element())
628 .err_into()
629 }
630
631 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
633 self.storage()
634 .await
635 .get_next_user_item_id()
636 .await
637 .err_into()
638 }
639
640 #[cfg(test)]
641 pub async fn allocate_system_id(
642 &self,
643 commit_ts: mz_repr::Timestamp,
644 ) -> Result<(CatalogItemId, GlobalId), Error> {
645 use mz_ore::collections::CollectionExt;
646
647 let mut storage = self.storage().await;
648 let mut txn = storage.transaction().await?;
649 let id = txn
650 .allocate_system_item_ids(1)
651 .maybe_terminate("allocating system ids")?
652 .into_element();
653 let _ = txn.get_and_commit_op_updates();
655 txn.commit(commit_ts).await?;
656 Ok(id)
657 }
658
659 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
661 self.storage()
662 .await
663 .get_next_system_item_id()
664 .await
665 .err_into()
666 }
667
668 pub async fn allocate_user_cluster_id(
669 &self,
670 commit_ts: mz_repr::Timestamp,
671 ) -> Result<ClusterId, Error> {
672 self.storage()
673 .await
674 .allocate_user_cluster_id(commit_ts)
675 .await
676 .maybe_terminate("allocating user cluster ids")
677 .err_into()
678 }
679
680 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
682 self.storage()
683 .await
684 .get_next_system_replica_id()
685 .await
686 .err_into()
687 }
688
689 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
691 self.storage()
692 .await
693 .get_next_user_replica_id()
694 .await
695 .err_into()
696 }
697
698 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
699 self.state.resolve_database(database_name)
700 }
701
702 pub fn resolve_schema(
703 &self,
704 current_database: Option<&DatabaseId>,
705 database_name: Option<&str>,
706 schema_name: &str,
707 conn_id: &ConnectionId,
708 ) -> Result<&Schema, SqlCatalogError> {
709 self.state
710 .resolve_schema(current_database, database_name, schema_name, conn_id)
711 }
712
713 pub fn resolve_schema_in_database(
714 &self,
715 database_spec: &ResolvedDatabaseSpecifier,
716 schema_name: &str,
717 conn_id: &ConnectionId,
718 ) -> Result<&Schema, SqlCatalogError> {
719 self.state
720 .resolve_schema_in_database(database_spec, schema_name, conn_id)
721 }
722
723 pub fn resolve_replica_in_cluster(
724 &self,
725 cluster_id: &ClusterId,
726 replica_name: &str,
727 ) -> Result<&ClusterReplica, SqlCatalogError> {
728 self.state
729 .resolve_replica_in_cluster(cluster_id, replica_name)
730 }
731
732 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
733 self.state.resolve_system_schema(name)
734 }
735
736 pub fn resolve_search_path(
737 &self,
738 session: &Session,
739 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
740 self.state.resolve_search_path(session)
741 }
742
743 pub fn resolve_entry(
745 &self,
746 current_database: Option<&DatabaseId>,
747 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
748 name: &PartialItemName,
749 conn_id: &ConnectionId,
750 ) -> Result<&CatalogEntry, SqlCatalogError> {
751 self.state
752 .resolve_entry(current_database, search_path, name, conn_id)
753 }
754
755 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
757 self.state.resolve_builtin_table(builtin)
758 }
759
760 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
762 self.state.resolve_builtin_log(builtin).0
763 }
764
765 pub fn resolve_builtin_storage_collection(
767 &self,
768 builtin: &'static BuiltinSource,
769 ) -> CatalogItemId {
770 self.state.resolve_builtin_source(builtin)
771 }
772
773 pub fn resolve_function(
775 &self,
776 current_database: Option<&DatabaseId>,
777 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
778 name: &PartialItemName,
779 conn_id: &ConnectionId,
780 ) -> Result<&CatalogEntry, SqlCatalogError> {
781 self.state
782 .resolve_function(current_database, search_path, name, conn_id)
783 }
784
785 pub fn resolve_type(
787 &self,
788 current_database: Option<&DatabaseId>,
789 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
790 name: &PartialItemName,
791 conn_id: &ConnectionId,
792 ) -> Result<&CatalogEntry, SqlCatalogError> {
793 self.state
794 .resolve_type(current_database, search_path, name, conn_id)
795 }
796
797 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
798 self.state.resolve_cluster(name)
799 }
800
801 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
807 self.state.resolve_builtin_cluster(cluster)
808 }
809
810 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
811 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
812 }
813
814 pub fn resolve_target_cluster(
816 &self,
817 target_cluster: TargetCluster,
818 session: &Session,
819 ) -> Result<&Cluster, AdapterError> {
820 match target_cluster {
821 TargetCluster::CatalogServer => {
822 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
823 }
824 TargetCluster::Active => self.active_cluster(session),
825 TargetCluster::Transaction(cluster_id) => self
826 .try_get_cluster(cluster_id)
827 .ok_or(AdapterError::ConcurrentClusterDrop),
828 }
829 }
830
831 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
832 if session.user().name != SYSTEM_USER.name
836 && session.user().name != SUPPORT_USER.name
837 && session.vars().cluster() == SYSTEM_USER.name
838 {
839 coord_bail!(
840 "system cluster '{}' cannot execute user queries",
841 SYSTEM_USER.name
842 );
843 }
844 let cluster = self.resolve_cluster(session.vars().cluster())?;
845 Ok(cluster)
846 }
847
848 pub fn state(&self) -> &CatalogState {
849 &self.state
850 }
851
852 pub fn resolve_full_name(
853 &self,
854 name: &QualifiedItemName,
855 conn_id: Option<&ConnectionId>,
856 ) -> FullItemName {
857 self.state.resolve_full_name(name, conn_id)
858 }
859
860 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
861 self.state.try_get_entry(id)
862 }
863
864 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
865 self.state.try_get_entry_by_global_id(id)
866 }
867
868 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
869 self.state.get_entry(id)
870 }
871
872 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
873 self.state.get_entry_by_global_id(id)
874 }
875
876 pub fn get_global_ids<'a>(
877 &'a self,
878 id: &CatalogItemId,
879 ) -> impl Iterator<Item = GlobalId> + use<'a> {
880 self.get_entry(id).global_ids()
881 }
882
883 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
884 self.get_entry_by_global_id(id).id()
885 }
886
887 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
888 let item = self.try_get_entry_by_global_id(id)?;
889 Some(item.id())
890 }
891
892 pub fn get_schema(
893 &self,
894 database_spec: &ResolvedDatabaseSpecifier,
895 schema_spec: &SchemaSpecifier,
896 conn_id: &ConnectionId,
897 ) -> &Schema {
898 self.state.get_schema(database_spec, schema_spec, conn_id)
899 }
900
901 pub fn try_get_schema(
902 &self,
903 database_spec: &ResolvedDatabaseSpecifier,
904 schema_spec: &SchemaSpecifier,
905 conn_id: &ConnectionId,
906 ) -> Option<&Schema> {
907 self.state
908 .try_get_schema(database_spec, schema_spec, conn_id)
909 }
910
911 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
912 self.state.get_mz_catalog_schema_id()
913 }
914
915 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
916 self.state.get_pg_catalog_schema_id()
917 }
918
919 pub fn get_information_schema_id(&self) -> SchemaId {
920 self.state.get_information_schema_id()
921 }
922
923 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
924 self.state.get_mz_internal_schema_id()
925 }
926
927 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
928 self.state.get_mz_introspection_schema_id()
929 }
930
931 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
932 self.state.get_mz_unsafe_schema_id()
933 }
934
935 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
936 self.state.system_schema_ids()
937 }
938
939 pub fn get_database(&self, id: &DatabaseId) -> &Database {
940 self.state.get_database(id)
941 }
942
943 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
944 self.state.try_get_role(id)
945 }
946
947 pub fn get_role(&self, id: &RoleId) -> &Role {
948 self.state.get_role(id)
949 }
950
951 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
952 self.state.try_get_role_by_name(role_name)
953 }
954
955 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
956 self.state.try_get_role_auth_by_id(id)
957 }
958
959 pub fn create_temporary_schema(
962 &mut self,
963 conn_id: &ConnectionId,
964 owner_id: RoleId,
965 ) -> Result<(), Error> {
966 self.state.create_temporary_schema(conn_id, owner_id)
967 }
968
969 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
970 self.state
972 .temporary_schemas
973 .get(conn_id)
974 .map(|schema| schema.items.contains_key(item_name))
975 .unwrap_or(false)
976 }
977
978 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
981 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
982 return Ok(());
983 };
984 if !schema.items.is_empty() {
985 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
986 }
987 Ok(())
988 }
989
990 pub(crate) fn object_dependents(
991 &self,
992 object_ids: &Vec<ObjectId>,
993 conn_id: &ConnectionId,
994 ) -> Vec<ObjectId> {
995 let mut seen = BTreeSet::new();
996 self.state.object_dependents(object_ids, conn_id, &mut seen)
997 }
998
999 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1000 FullNameV1 {
1001 database: name.database.to_string(),
1002 schema: name.schema.clone(),
1003 item: name.item.clone(),
1004 }
1005 }
1006
1007 pub fn find_available_cluster_name(&self, name: &str) -> String {
1008 let mut i = 0;
1009 let mut candidate = name.to_string();
1010 while self.state.clusters_by_name.contains_key(&candidate) {
1011 i += 1;
1012 candidate = format!("{}{}", name, i);
1013 }
1014 candidate
1015 }
1016
1017 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1018 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1019 self.cluster_replica_sizes()
1020 .enabled_allocations()
1021 .map(|a| a.0.to_owned())
1022 .collect::<Vec<_>>()
1023 } else {
1024 self.system_config().allowed_cluster_replica_sizes()
1025 }
1026 }
1027
1028 pub fn concretize_replica_location(
1029 &self,
1030 location: mz_catalog::durable::ReplicaLocation,
1031 allowed_sizes: &Vec<String>,
1032 allowed_availability_zones: Option<&[String]>,
1033 ) -> Result<ReplicaLocation, Error> {
1034 self.state
1035 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1036 }
1037
1038 pub(crate) fn ensure_valid_replica_size(
1039 &self,
1040 allowed_sizes: &[String],
1041 size: &String,
1042 ) -> Result<(), Error> {
1043 self.state.ensure_valid_replica_size(allowed_sizes, size)
1044 }
1045
1046 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1047 &self.state.cluster_replica_sizes
1048 }
1049
1050 pub fn get_privileges(
1052 &self,
1053 id: &SystemObjectId,
1054 conn_id: &ConnectionId,
1055 ) -> Option<&PrivilegeMap> {
1056 match id {
1057 SystemObjectId::Object(id) => match id {
1058 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1059 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1060 ObjectId::Schema((database_spec, schema_spec)) => Some(
1061 self.get_schema(database_spec, schema_spec, conn_id)
1062 .privileges(),
1063 ),
1064 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1065 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1066 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1067 },
1068 SystemObjectId::System => Some(&self.state.system_privileges),
1069 }
1070 }
1071
1072 #[mz_ore::instrument(level = "debug")]
1073 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1074 Ok(self.storage().await.advance_upper(new_upper).await?)
1075 }
1076
1077 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1079 self.state.introspection_dependencies(id)
1080 }
1081
1082 pub fn dump(&self) -> Result<CatalogDump, Error> {
1088 Ok(CatalogDump::new(self.state.dump(None)?))
1089 }
1090
1091 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1095 self.state.check_consistency().map_err(|inconsistencies| {
1096 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1097 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1098 })
1099 })
1100 }
1101
1102 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1103 self.state.config()
1104 }
1105
1106 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1107 self.state.entry_by_id.values()
1108 }
1109
1110 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1111 self.entries()
1112 .filter(|entry| entry.is_connection() && entry.id().is_user())
1113 }
1114
1115 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1116 self.entries()
1117 .filter(|entry| entry.is_table() && entry.id().is_user())
1118 }
1119
1120 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1121 self.entries()
1122 .filter(|entry| entry.is_source() && entry.id().is_user())
1123 }
1124
1125 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1126 self.entries()
1127 .filter(|entry| entry.is_sink() && entry.id().is_user())
1128 }
1129
1130 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1131 self.entries()
1132 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1133 }
1134
1135 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1136 self.entries()
1137 .filter(|entry| entry.is_secret() && entry.id().is_user())
1138 }
1139
1140 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1141 self.state.get_network_policy(&network_policy_id)
1142 }
1143
1144 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1145 self.state.try_get_network_policy_by_name(name)
1146 }
1147
1148 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1149 self.state.clusters_by_id.values()
1150 }
1151
1152 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1153 self.state.get_cluster(cluster_id)
1154 }
1155
1156 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1157 self.state.try_get_cluster(cluster_id)
1158 }
1159
1160 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1161 self.clusters().filter(|cluster| cluster.id.is_user())
1162 }
1163
1164 pub fn get_cluster_replica(
1165 &self,
1166 cluster_id: ClusterId,
1167 replica_id: ReplicaId,
1168 ) -> &ClusterReplica {
1169 self.state.get_cluster_replica(cluster_id, replica_id)
1170 }
1171
1172 pub fn try_get_cluster_replica(
1173 &self,
1174 cluster_id: ClusterId,
1175 replica_id: ReplicaId,
1176 ) -> Option<&ClusterReplica> {
1177 self.state.try_get_cluster_replica(cluster_id, replica_id)
1178 }
1179
1180 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1181 self.user_clusters()
1182 .flat_map(|cluster| cluster.user_replicas())
1183 }
1184
1185 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1186 self.state.database_by_id.values()
1187 }
1188
1189 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1190 self.state
1191 .roles_by_id
1192 .values()
1193 .filter(|role| role.is_user())
1194 }
1195
1196 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1197 self.state
1198 .network_policies_by_id
1199 .iter()
1200 .filter(|(id, _)| id.is_user())
1201 .map(|(_, policy)| policy)
1202 }
1203
1204 pub fn system_privileges(&self) -> &PrivilegeMap {
1205 &self.state.system_privileges
1206 }
1207
1208 pub fn default_privileges(
1209 &self,
1210 ) -> impl Iterator<
1211 Item = (
1212 &DefaultPrivilegeObject,
1213 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1214 ),
1215 > {
1216 self.state.default_privileges.iter()
1217 }
1218
1219 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1220 self.state
1221 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1222 }
1223
1224 pub fn pack_storage_usage_update(
1225 &self,
1226 event: VersionedStorageUsage,
1227 diff: Diff,
1228 ) -> BuiltinTableUpdate {
1229 self.state
1230 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1231 }
1232
1233 pub fn system_config(&self) -> &SystemVars {
1234 self.state.system_config()
1235 }
1236
1237 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1238 self.state.system_config_mut()
1239 }
1240
1241 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1242 self.state.ensure_not_reserved_role(role_id)
1243 }
1244
1245 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1246 self.state.ensure_grantable_role(role_id)
1247 }
1248
1249 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1250 self.state.ensure_not_system_role(role_id)
1251 }
1252
1253 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1254 self.state.ensure_not_predefined_role(role_id)
1255 }
1256
1257 pub fn ensure_not_reserved_network_policy(
1258 &self,
1259 network_policy_id: &NetworkPolicyId,
1260 ) -> Result<(), Error> {
1261 self.state
1262 .ensure_not_reserved_network_policy(network_policy_id)
1263 }
1264
1265 pub fn ensure_not_reserved_object(
1266 &self,
1267 object_id: &ObjectId,
1268 conn_id: &ConnectionId,
1269 ) -> Result<(), Error> {
1270 match object_id {
1271 ObjectId::Cluster(cluster_id) => {
1272 if cluster_id.is_system() {
1273 let cluster = self.get_cluster(*cluster_id);
1274 Err(Error::new(ErrorKind::ReadOnlyCluster(
1275 cluster.name().to_string(),
1276 )))
1277 } else {
1278 Ok(())
1279 }
1280 }
1281 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1282 if replica_id.is_system() {
1283 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1284 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1285 replica.name().to_string(),
1286 )))
1287 } else {
1288 Ok(())
1289 }
1290 }
1291 ObjectId::Database(database_id) => {
1292 if database_id.is_system() {
1293 let database = self.get_database(database_id);
1294 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1295 database.name().to_string(),
1296 )))
1297 } else {
1298 Ok(())
1299 }
1300 }
1301 ObjectId::Schema((database_spec, schema_spec)) => {
1302 if schema_spec.is_system() {
1303 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1304 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1305 schema.name().schema.clone(),
1306 )))
1307 } else {
1308 Ok(())
1309 }
1310 }
1311 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1312 ObjectId::Item(item_id) => {
1313 if item_id.is_system() {
1314 let item = self.get_entry(item_id);
1315 let name = self.resolve_full_name(item.name(), Some(conn_id));
1316 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1317 } else {
1318 Ok(())
1319 }
1320 }
1321 ObjectId::NetworkPolicy(network_policy_id) => {
1322 self.ensure_not_reserved_network_policy(network_policy_id)
1323 }
1324 }
1325 }
1326
1327 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1329 &mut self,
1330 create_sql: &str,
1331 force_if_exists_skip: bool,
1332 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1333 self.state
1334 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1335 }
1336
1337 pub(crate) fn cache_expressions(
1348 &self,
1349 id: GlobalId,
1350 local_mir: Option<OptimizedMirRelationExpr>,
1351 mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1352 mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1353 dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1354 optimizer_features: OptimizerFeatures,
1355 ) -> BoxFuture<'static, ()> {
1356 global_mir.as_of = None;
1360 global_mir.until = Default::default();
1361 physical_plan.as_of = None;
1362 physical_plan.until = Default::default();
1363
1364 let mut local_exprs = Vec::new();
1365 if let Some(local_mir) = local_mir {
1366 local_exprs.push((
1367 id,
1368 LocalExpressions {
1369 local_mir,
1370 optimizer_features: optimizer_features.clone(),
1371 },
1372 ));
1373 }
1374 let global_exprs = vec![(
1375 id,
1376 GlobalExpressions {
1377 global_mir,
1378 physical_plan,
1379 dataflow_metainfos,
1380 optimizer_features,
1381 },
1382 )];
1383 self.update_expression_cache(local_exprs, global_exprs, Default::default())
1384 }
1385
1386 pub(crate) fn update_expression_cache<'a, 'b>(
1387 &'a self,
1388 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1389 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1390 invalidate_ids: BTreeSet<GlobalId>,
1391 ) -> BoxFuture<'b, ()> {
1392 if let Some(expr_cache) = &self.expr_cache_handle {
1393 expr_cache
1394 .update(
1395 new_local_expressions,
1396 new_global_expressions,
1397 invalidate_ids,
1398 )
1399 .boxed()
1400 } else {
1401 async {}.boxed()
1402 }
1403 }
1404
1405 #[cfg(test)]
1409 async fn sync_to_current_updates(
1410 &mut self,
1411 ) -> Result<
1412 (
1413 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1414 Vec<ParsedStateUpdate>,
1415 ),
1416 CatalogError,
1417 > {
1418 let updates = self.storage().await.sync_to_current_updates().await?;
1419 let (builtin_table_updates, catalog_updates) = self
1420 .state
1421 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1422 .await;
1423 Ok((builtin_table_updates, catalog_updates))
1424 }
1425}
1426
1427pub fn is_reserved_name(name: &str) -> bool {
1428 BUILTIN_PREFIXES
1429 .iter()
1430 .any(|prefix| name.starts_with(prefix))
1431}
1432
1433pub fn is_reserved_role_name(name: &str) -> bool {
1434 is_reserved_name(name) || is_public_role(name)
1435}
1436
1437pub fn is_public_role(name: &str) -> bool {
1438 name == &*PUBLIC_ROLE_NAME
1439}
1440
1441pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1442 object_type_to_audit_object_type(sql_type.into())
1443}
1444
1445pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1446 match id {
1447 CommentObjectId::Table(_) => ObjectType::Table,
1448 CommentObjectId::View(_) => ObjectType::View,
1449 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1450 CommentObjectId::Source(_) => ObjectType::Source,
1451 CommentObjectId::Sink(_) => ObjectType::Sink,
1452 CommentObjectId::Index(_) => ObjectType::Index,
1453 CommentObjectId::Func(_) => ObjectType::Func,
1454 CommentObjectId::Connection(_) => ObjectType::Connection,
1455 CommentObjectId::Type(_) => ObjectType::Type,
1456 CommentObjectId::Secret(_) => ObjectType::Secret,
1457 CommentObjectId::Role(_) => ObjectType::Role,
1458 CommentObjectId::Database(_) => ObjectType::Database,
1459 CommentObjectId::Schema(_) => ObjectType::Schema,
1460 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1461 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1462 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1463 }
1464}
1465
1466pub(crate) fn object_type_to_audit_object_type(
1467 object_type: mz_sql::catalog::ObjectType,
1468) -> ObjectType {
1469 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1470}
1471
1472pub(crate) fn system_object_type_to_audit_object_type(
1473 system_type: &SystemObjectType,
1474) -> ObjectType {
1475 match system_type {
1476 SystemObjectType::Object(object_type) => match object_type {
1477 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1478 mz_sql::catalog::ObjectType::View => ObjectType::View,
1479 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1480 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1481 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1482 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1483 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1484 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1485 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1486 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1487 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1488 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1489 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1490 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1491 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1492 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1493 },
1494 SystemObjectType::System => ObjectType::System,
1495 }
1496}
1497
1498#[derive(Debug, Copy, Clone)]
1499pub enum UpdatePrivilegeVariant {
1500 Grant,
1501 Revoke,
1502}
1503
1504impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1505 fn from(variant: UpdatePrivilegeVariant) -> Self {
1506 match variant {
1507 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1508 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1509 }
1510 }
1511}
1512
1513impl From<UpdatePrivilegeVariant> for EventType {
1514 fn from(variant: UpdatePrivilegeVariant) -> Self {
1515 match variant {
1516 UpdatePrivilegeVariant::Grant => EventType::Grant,
1517 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1518 }
1519 }
1520}
1521
1522impl ConnCatalog<'_> {
1523 fn resolve_item_name(
1524 &self,
1525 name: &PartialItemName,
1526 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1527 self.resolve_item(name).map(|entry| entry.name())
1528 }
1529
1530 fn resolve_function_name(
1531 &self,
1532 name: &PartialItemName,
1533 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1534 self.resolve_function(name).map(|entry| entry.name())
1535 }
1536
1537 fn resolve_type_name(
1538 &self,
1539 name: &PartialItemName,
1540 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1541 self.resolve_type(name).map(|entry| entry.name())
1542 }
1543}
1544
1545impl ExprHumanizer for ConnCatalog<'_> {
1546 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1547 let entry = self.state.try_get_entry_by_global_id(&id)?;
1548 Some(self.resolve_full_name(entry.name()).to_string())
1549 }
1550
1551 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1552 let entry = self.state.try_get_entry_by_global_id(&id)?;
1553 Some(entry.name().item.clone())
1554 }
1555
1556 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1557 let entry = self.state.try_get_entry_by_global_id(&id)?;
1558 Some(self.resolve_full_name(entry.name()).into_parts())
1559 }
1560
1561 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1562 use SqlScalarType::*;
1563
1564 match typ {
1565 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1566 List {
1567 custom_id: Some(item_id),
1568 ..
1569 }
1570 | Map {
1571 custom_id: Some(item_id),
1572 ..
1573 } => {
1574 let item = self.get_item(item_id);
1575 self.minimal_qualification(item.name()).to_string()
1576 }
1577 List { element_type, .. } => {
1578 format!(
1579 "{} list",
1580 self.humanize_sql_scalar_type(element_type, postgres_compat)
1581 )
1582 }
1583 Map { value_type, .. } => format!(
1584 "map[{}=>{}]",
1585 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1586 self.humanize_sql_scalar_type(value_type, postgres_compat)
1587 ),
1588 Record {
1589 custom_id: Some(item_id),
1590 ..
1591 } => {
1592 let item = self.get_item(item_id);
1593 self.minimal_qualification(item.name()).to_string()
1594 }
1595 Record { fields, .. } => format!(
1596 "record({})",
1597 fields
1598 .iter()
1599 .map(|f| format!(
1600 "{}: {}",
1601 f.0,
1602 self.humanize_sql_column_type(&f.1, postgres_compat)
1603 ))
1604 .join(",")
1605 ),
1606 PgLegacyChar => "\"char\"".into(),
1607 Char { length } if !postgres_compat => match length {
1608 None => "char".into(),
1609 Some(length) => format!("char({})", length.into_u32()),
1610 },
1611 VarChar { max_length } if !postgres_compat => match max_length {
1612 None => "varchar".into(),
1613 Some(length) => format!("varchar({})", length.into_u32()),
1614 },
1615 UInt16 => "uint2".into(),
1616 UInt32 => "uint4".into(),
1617 UInt64 => "uint8".into(),
1618 ty => {
1619 let pgrepr_type = mz_pgrepr::Type::from(ty);
1620 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1621
1622 let res = if self
1623 .effective_search_path(true)
1624 .iter()
1625 .any(|(_, schema)| schema == &pg_catalog_schema)
1626 {
1627 pgrepr_type.name().to_string()
1628 } else {
1629 let name = QualifiedItemName {
1632 qualifiers: ItemQualifiers {
1633 database_spec: ResolvedDatabaseSpecifier::Ambient,
1634 schema_spec: pg_catalog_schema,
1635 },
1636 item: pgrepr_type.name().to_string(),
1637 };
1638 self.resolve_full_name(&name).to_string()
1639 };
1640 res
1641 }
1642 }
1643 }
1644
1645 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1646 let entry = self.state.try_get_entry_by_global_id(&id)?;
1647
1648 match entry.index() {
1649 Some(index) => {
1650 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1651 let mut on_names = on_desc
1652 .iter_names()
1653 .map(|col_name| col_name.to_string())
1654 .collect::<Vec<_>>();
1655
1656 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1657
1658 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1662 let mut ix_names = vec![String::new(); ix_arity];
1663
1664 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1666 let on_name = on_names.get_mut(on_pos).expect("on_name");
1667 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1668 std::mem::swap(on_name, ix_name);
1669 }
1670
1671 Some(ix_names) }
1673 None => {
1674 let desc = self.state.try_get_desc_by_global_id(&id)?;
1675 let column_names = desc
1676 .iter_names()
1677 .map(|col_name| col_name.to_string())
1678 .collect();
1679
1680 Some(column_names)
1681 }
1682 }
1683 }
1684
1685 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1686 let desc = self.state.try_get_desc_by_global_id(&id)?;
1687 Some(desc.get_name(column).to_string())
1688 }
1689
1690 fn id_exists(&self, id: GlobalId) -> bool {
1691 self.state.entry_by_global_id.contains_key(&id)
1692 }
1693}
1694
1695impl SessionCatalog for ConnCatalog<'_> {
1696 fn active_role_id(&self) -> &RoleId {
1697 &self.role_id
1698 }
1699
1700 fn restrict_to_user_objects(&self) -> bool {
1701 self.restrict_to_user_objects
1702 }
1703
1704 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1705 self.prepared_statements
1706 .as_ref()
1707 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1708 .flatten()
1709 }
1710
1711 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1712 self.portals
1713 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1714 }
1715
1716 fn active_database(&self) -> Option<&DatabaseId> {
1717 self.database.as_ref()
1718 }
1719
1720 fn active_cluster(&self) -> &str {
1721 &self.cluster
1722 }
1723
1724 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1725 &self.search_path
1726 }
1727
1728 fn resolve_database(
1729 &self,
1730 database_name: &str,
1731 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1732 Ok(self.state.resolve_database(database_name)?)
1733 }
1734
1735 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1736 self.state
1737 .database_by_id
1738 .get(id)
1739 .expect("database doesn't exist")
1740 }
1741
1742 #[allow(clippy::as_conversions)]
1744 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1745 self.state
1746 .database_by_id
1747 .values()
1748 .map(|database| database as &dyn CatalogDatabase)
1749 .collect()
1750 }
1751
1752 fn resolve_schema(
1753 &self,
1754 database_name: Option<&str>,
1755 schema_name: &str,
1756 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1757 Ok(self.state.resolve_schema(
1758 self.database.as_ref(),
1759 database_name,
1760 schema_name,
1761 &self.conn_id,
1762 )?)
1763 }
1764
1765 fn resolve_schema_in_database(
1766 &self,
1767 database_spec: &ResolvedDatabaseSpecifier,
1768 schema_name: &str,
1769 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1770 Ok(self
1771 .state
1772 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1773 }
1774
1775 fn get_schema(
1776 &self,
1777 database_spec: &ResolvedDatabaseSpecifier,
1778 schema_spec: &SchemaSpecifier,
1779 ) -> &dyn CatalogSchema {
1780 self.state
1781 .get_schema(database_spec, schema_spec, &self.conn_id)
1782 }
1783
1784 #[allow(clippy::as_conversions)]
1786 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1787 self.get_databases()
1788 .into_iter()
1789 .flat_map(|database| database.schemas().into_iter())
1790 .chain(
1791 self.state
1792 .ambient_schemas_by_id
1793 .values()
1794 .chain(self.state.temporary_schemas.values())
1795 .map(|schema| schema as &dyn CatalogSchema),
1796 )
1797 .collect()
1798 }
1799
1800 fn get_mz_internal_schema_id(&self) -> SchemaId {
1801 self.state().get_mz_internal_schema_id()
1802 }
1803
1804 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1805 self.state().get_mz_unsafe_schema_id()
1806 }
1807
1808 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1809 self.state.is_system_schema_specifier(schema)
1810 }
1811
1812 fn resolve_role(
1813 &self,
1814 role_name: &str,
1815 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1816 match self.state.try_get_role_by_name(role_name) {
1817 Some(role) => Ok(role),
1818 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1819 }
1820 }
1821
1822 fn resolve_network_policy(
1823 &self,
1824 policy_name: &str,
1825 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1826 match self.state.try_get_network_policy_by_name(policy_name) {
1827 Some(policy) => Ok(policy),
1828 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1829 }
1830 }
1831
1832 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1833 Some(self.state.roles_by_id.get(id)?)
1834 }
1835
1836 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1837 self.state.get_role(id)
1838 }
1839
1840 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1841 #[allow(clippy::as_conversions)]
1843 self.state
1844 .roles_by_id
1845 .values()
1846 .map(|role| role as &dyn CatalogRole)
1847 .collect()
1848 }
1849
1850 fn mz_system_role_id(&self) -> RoleId {
1851 MZ_SYSTEM_ROLE_ID
1852 }
1853
1854 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1855 self.state.collect_role_membership(id)
1856 }
1857
1858 fn get_network_policy(
1859 &self,
1860 id: &NetworkPolicyId,
1861 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1862 self.state.get_network_policy(id)
1863 }
1864
1865 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1866 #[allow(clippy::as_conversions)]
1868 self.state
1869 .network_policies_by_id
1870 .values()
1871 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1872 .collect()
1873 }
1874
1875 fn resolve_cluster(
1876 &self,
1877 cluster_name: Option<&str>,
1878 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1879 Ok(self
1880 .state
1881 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1882 }
1883
1884 fn resolve_cluster_replica(
1885 &self,
1886 cluster_replica_name: &QualifiedReplica,
1887 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1888 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1889 }
1890
1891 fn resolve_item(
1892 &self,
1893 name: &PartialItemName,
1894 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1895 let r = self.state.resolve_entry(
1896 self.database.as_ref(),
1897 &self.effective_search_path(true),
1898 name,
1899 &self.conn_id,
1900 )?;
1901 if self.unresolvable_ids.contains(&r.id()) {
1902 Err(SqlCatalogError::UnknownItem(name.to_string()))
1903 } else {
1904 Ok(r)
1905 }
1906 }
1907
1908 fn resolve_function(
1909 &self,
1910 name: &PartialItemName,
1911 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1912 let r = self.state.resolve_function(
1913 self.database.as_ref(),
1914 &self.effective_search_path(false),
1915 name,
1916 &self.conn_id,
1917 )?;
1918
1919 if self.unresolvable_ids.contains(&r.id()) {
1920 Err(SqlCatalogError::UnknownFunction {
1921 name: name.to_string(),
1922 alternative: None,
1923 })
1924 } else {
1925 Ok(r)
1926 }
1927 }
1928
1929 fn resolve_type(
1930 &self,
1931 name: &PartialItemName,
1932 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1933 let r = self.state.resolve_type(
1934 self.database.as_ref(),
1935 &self.effective_search_path(false),
1936 name,
1937 &self.conn_id,
1938 )?;
1939
1940 if self.unresolvable_ids.contains(&r.id()) {
1941 Err(SqlCatalogError::UnknownType {
1942 name: name.to_string(),
1943 })
1944 } else {
1945 Ok(r)
1946 }
1947 }
1948
1949 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1950 self.state.get_system_type(name)
1951 }
1952
1953 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1954 Some(self.state.try_get_entry(id)?)
1955 }
1956
1957 fn try_get_item_by_global_id(
1958 &self,
1959 id: &GlobalId,
1960 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1961 let entry = self.state.try_get_entry_by_global_id(id)?;
1962 let entry = match &entry.item {
1963 CatalogItem::Table(table) => {
1964 let (version, _gid) = table
1965 .collections
1966 .iter()
1967 .find(|(_version, gid)| *gid == id)
1968 .expect("catalog out of sync, mismatched GlobalId");
1969 entry.at_version(RelationVersionSelector::Specific(*version))
1970 }
1971 _ => entry.at_version(RelationVersionSelector::Latest),
1972 };
1973 Some(entry)
1974 }
1975
1976 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1977 self.state.get_entry(id)
1978 }
1979
1980 fn get_item_by_global_id(
1981 &self,
1982 id: &GlobalId,
1983 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1984 let entry = self.state.get_entry_by_global_id(id);
1985 let entry = match &entry.item {
1986 CatalogItem::Table(table) => {
1987 let (version, _gid) = table
1988 .collections
1989 .iter()
1990 .find(|(_version, gid)| *gid == id)
1991 .expect("catalog out of sync, mismatched GlobalId");
1992 entry.at_version(RelationVersionSelector::Specific(*version))
1993 }
1994 _ => entry.at_version(RelationVersionSelector::Latest),
1995 };
1996 entry
1997 }
1998
1999 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2000 self.get_schemas()
2001 .into_iter()
2002 .flat_map(|schema| schema.item_ids())
2003 .map(|id| self.get_item(&id))
2004 .collect()
2005 }
2006
2007 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2008 self.state
2009 .get_item_by_name(name, &self.conn_id)
2010 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2011 }
2012
2013 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2014 self.state
2015 .get_type_by_name(name, &self.conn_id)
2016 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2017 }
2018
2019 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2020 &self.state.clusters_by_id[&id]
2021 }
2022
2023 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2024 self.state
2025 .clusters_by_id
2026 .values()
2027 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2028 .collect()
2029 }
2030
2031 fn get_cluster_replica(
2032 &self,
2033 cluster_id: ClusterId,
2034 replica_id: ReplicaId,
2035 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2036 let cluster = self.get_cluster(cluster_id);
2037 cluster.replica(replica_id)
2038 }
2039
2040 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2041 self.get_clusters()
2042 .into_iter()
2043 .flat_map(|cluster| cluster.replicas().into_iter())
2044 .collect()
2045 }
2046
2047 fn get_system_privileges(&self) -> &PrivilegeMap {
2048 &self.state.system_privileges
2049 }
2050
2051 fn get_default_privileges(
2052 &self,
2053 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2054 self.state
2055 .default_privileges
2056 .iter()
2057 .map(|(object, acl_items)| (object, acl_items.collect()))
2058 .collect()
2059 }
2060
2061 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2062 self.state.find_available_name(name, &self.conn_id)
2063 }
2064
2065 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2066 self.state.resolve_full_name(name, Some(&self.conn_id))
2067 }
2068
2069 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2070 self.state.resolve_full_schema_name(name)
2071 }
2072
2073 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2074 self.state.get_entry_by_global_id(global_id).id()
2075 }
2076
2077 fn resolve_global_id(
2078 &self,
2079 item_id: &CatalogItemId,
2080 version: RelationVersionSelector,
2081 ) -> GlobalId {
2082 self.state
2083 .get_entry(item_id)
2084 .at_version(version)
2085 .global_id()
2086 }
2087
2088 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2089 self.state.config()
2090 }
2091
2092 fn now(&self) -> EpochMillis {
2093 (self.state.config().now)()
2094 }
2095
2096 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2097 self.state.aws_privatelink_availability_zones.clone()
2098 }
2099
2100 fn system_vars(&self) -> &SystemVars {
2101 &self.state.system_configuration
2102 }
2103
2104 fn system_vars_mut(&mut self) -> &mut SystemVars {
2105 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2106 }
2107
2108 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2109 self.state().get_owner_id(id, self.conn_id())
2110 }
2111
2112 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2113 match id {
2114 SystemObjectId::System => Some(&self.state.system_privileges),
2115 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2116 Some(self.get_cluster(*id).privileges())
2117 }
2118 SystemObjectId::Object(ObjectId::Database(id)) => {
2119 Some(self.get_database(id).privileges())
2120 }
2121 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2122 self.state
2125 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2126 .map(|schema| schema.privileges())
2127 }
2128 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2129 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2130 Some(self.get_network_policy(id).privileges())
2131 }
2132 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2133 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2134 }
2135 }
2136
2137 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2138 let mut seen = BTreeSet::new();
2139 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2140 }
2141
2142 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2143 let mut seen = BTreeSet::new();
2144 self.state.item_dependents(id, &mut seen)
2145 }
2146
2147 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2148 rbac::all_object_privileges(object_type)
2149 }
2150
2151 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2152 self.state.get_object_type(object_id)
2153 }
2154
2155 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2156 self.state.get_system_object_type(id)
2157 }
2158
2159 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2166 if qualified_name.qualifiers.schema_spec.is_temporary() {
2167 return qualified_name.item.clone().into();
2175 }
2176
2177 let database_id = match &qualified_name.qualifiers.database_spec {
2178 ResolvedDatabaseSpecifier::Ambient => None,
2179 ResolvedDatabaseSpecifier::Id(id)
2180 if self.database.is_some() && self.database == Some(*id) =>
2181 {
2182 None
2183 }
2184 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2185 };
2186
2187 let schema_spec = if database_id.is_none()
2188 && self.resolve_item_name(&PartialItemName {
2189 database: None,
2190 schema: None,
2191 item: qualified_name.item.clone(),
2192 }) == Ok(qualified_name)
2193 || self.resolve_function_name(&PartialItemName {
2194 database: None,
2195 schema: None,
2196 item: qualified_name.item.clone(),
2197 }) == Ok(qualified_name)
2198 || self.resolve_type_name(&PartialItemName {
2199 database: None,
2200 schema: None,
2201 item: qualified_name.item.clone(),
2202 }) == Ok(qualified_name)
2203 {
2204 None
2205 } else {
2206 Some(qualified_name.qualifiers.schema_spec.clone())
2209 };
2210
2211 let res = PartialItemName {
2212 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2213 schema: schema_spec.map(|spec| {
2214 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2215 .name()
2216 .schema
2217 .clone()
2218 }),
2219 item: qualified_name.item.clone(),
2220 };
2221 assert!(
2222 self.resolve_item_name(&res) == Ok(qualified_name)
2223 || self.resolve_function_name(&res) == Ok(qualified_name)
2224 || self.resolve_type_name(&res) == Ok(qualified_name)
2225 );
2226 res
2227 }
2228
2229 fn add_notice(&self, notice: PlanNotice) {
2230 let _ = self.notices_tx.send(notice.into());
2231 }
2232
2233 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2234 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2235 self.state.comments.get_object_comments(comment_id)
2236 }
2237
2238 fn is_cluster_size_cc(&self, size: &str) -> bool {
2239 self.state
2240 .cluster_replica_sizes
2241 .0
2242 .get(size)
2243 .map_or(false, |a| a.is_cc)
2244 }
2245}
2246
2247#[cfg(test)]
2248mod tests {
2249 use std::collections::{BTreeMap, BTreeSet};
2250 use std::sync::Arc;
2251 use std::{env, iter};
2252
2253 use itertools::Itertools;
2254 use mz_catalog::memory::objects::CatalogItem;
2255 use tokio_postgres::NoTls;
2256 use tokio_postgres::types::Type;
2257 use uuid::Uuid;
2258
2259 use mz_catalog::SYSTEM_CONN_ID;
2260 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2261 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2262 use mz_controller_types::{ClusterId, ReplicaId};
2263 use mz_expr::{Eval, MirScalarExpr};
2264 use mz_ore::now::to_datetime;
2265 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2266 use mz_persist_client::PersistClient;
2267 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2268 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2269 use mz_repr::role_id::RoleId;
2270 use mz_repr::{
2271 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2272 SqlScalarType, Timestamp,
2273 };
2274 use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2275 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2276 use mz_sql::names::{
2277 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2278 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2279 };
2280 use mz_sql::plan::{
2281 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2282 QueryLifetime, Scope, StatementContext,
2283 };
2284 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2285 use mz_sql::session::vars::{SystemVars, VarInput};
2286
2287 use crate::catalog::state::LocalExpressionCache;
2288 use crate::catalog::{Catalog, Op};
2289 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2290 use crate::session::Session;
2291
2292 #[mz_ore::test(tokio::test)]
2299 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2301 Catalog::with_debug(|catalog| async move {
2302 struct TestCase {
2303 input: QualifiedItemName,
2304 system_output: PartialItemName,
2305 normal_output: PartialItemName,
2306 }
2307
2308 let test_cases = vec![
2309 TestCase {
2310 input: QualifiedItemName {
2311 qualifiers: ItemQualifiers {
2312 database_spec: ResolvedDatabaseSpecifier::Ambient,
2313 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2314 },
2315 item: "numeric".to_string(),
2316 },
2317 system_output: PartialItemName {
2318 database: None,
2319 schema: None,
2320 item: "numeric".to_string(),
2321 },
2322 normal_output: PartialItemName {
2323 database: None,
2324 schema: None,
2325 item: "numeric".to_string(),
2326 },
2327 },
2328 TestCase {
2329 input: QualifiedItemName {
2330 qualifiers: ItemQualifiers {
2331 database_spec: ResolvedDatabaseSpecifier::Ambient,
2332 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2333 },
2334 item: "mz_array_types".to_string(),
2335 },
2336 system_output: PartialItemName {
2337 database: None,
2338 schema: None,
2339 item: "mz_array_types".to_string(),
2340 },
2341 normal_output: PartialItemName {
2342 database: None,
2343 schema: None,
2344 item: "mz_array_types".to_string(),
2345 },
2346 },
2347 ];
2348
2349 for tc in test_cases {
2350 assert_eq!(
2351 catalog
2352 .for_system_session()
2353 .minimal_qualification(&tc.input),
2354 tc.system_output
2355 );
2356 assert_eq!(
2357 catalog
2358 .for_session(&Session::dummy())
2359 .minimal_qualification(&tc.input),
2360 tc.normal_output
2361 );
2362 }
2363 catalog.expire().await;
2364 })
2365 .await
2366 }
2367
2368 #[mz_ore::test(tokio::test)]
2369 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2371 let persist_client = PersistClient::new_for_tests().await;
2372 let organization_id = Uuid::new_v4();
2373 let bootstrap_args = test_bootstrap_args();
2374 {
2375 let mut catalog = Catalog::open_debug_catalog(
2376 persist_client.clone(),
2377 organization_id.clone(),
2378 &bootstrap_args,
2379 )
2380 .await
2381 .expect("unable to open debug catalog");
2382 assert_eq!(catalog.transient_revision(), 1);
2383 let commit_ts = catalog.current_upper().await;
2384 catalog
2385 .transact(
2386 None,
2387 commit_ts,
2388 None,
2389 vec![Op::CreateDatabase {
2390 name: "test".to_string(),
2391 owner_id: MZ_SYSTEM_ROLE_ID,
2392 }],
2393 )
2394 .await
2395 .expect("failed to transact");
2396 assert_eq!(catalog.transient_revision(), 2);
2397 catalog.expire().await;
2398 }
2399 {
2400 let catalog =
2401 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2402 .await
2403 .expect("unable to open debug catalog");
2404 assert_eq!(catalog.transient_revision(), 1);
2406 catalog.expire().await;
2407 }
2408 }
2409
2410 #[mz_ore::test(tokio::test)]
2411 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2413 Catalog::with_debug(|catalog| async move {
2414 let mz_catalog_schema = (
2415 ResolvedDatabaseSpecifier::Ambient,
2416 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2417 );
2418 let pg_catalog_schema = (
2419 ResolvedDatabaseSpecifier::Ambient,
2420 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2421 );
2422 let mz_temp_schema = (
2423 ResolvedDatabaseSpecifier::Ambient,
2424 SchemaSpecifier::Temporary,
2425 );
2426
2427 let session = Session::dummy();
2429 let conn_catalog = catalog.for_session(&session);
2430 assert_ne!(
2431 conn_catalog.effective_search_path(false),
2432 conn_catalog.search_path
2433 );
2434 assert_ne!(
2435 conn_catalog.effective_search_path(true),
2436 conn_catalog.search_path
2437 );
2438 assert_eq!(
2439 conn_catalog.effective_search_path(false),
2440 vec![
2441 mz_catalog_schema.clone(),
2442 pg_catalog_schema.clone(),
2443 conn_catalog.search_path[0].clone()
2444 ]
2445 );
2446 assert_eq!(
2447 conn_catalog.effective_search_path(true),
2448 vec![
2449 mz_temp_schema.clone(),
2450 mz_catalog_schema.clone(),
2451 pg_catalog_schema.clone(),
2452 conn_catalog.search_path[0].clone()
2453 ]
2454 );
2455
2456 let mut session = Session::dummy();
2458 session
2459 .vars_mut()
2460 .set(
2461 &SystemVars::new(),
2462 "search_path",
2463 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2464 false,
2465 )
2466 .expect("failed to set search_path");
2467 let conn_catalog = catalog.for_session(&session);
2468 assert_ne!(
2469 conn_catalog.effective_search_path(false),
2470 conn_catalog.search_path
2471 );
2472 assert_ne!(
2473 conn_catalog.effective_search_path(true),
2474 conn_catalog.search_path
2475 );
2476 assert_eq!(
2477 conn_catalog.effective_search_path(false),
2478 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2479 );
2480 assert_eq!(
2481 conn_catalog.effective_search_path(true),
2482 vec![
2483 mz_temp_schema.clone(),
2484 mz_catalog_schema.clone(),
2485 pg_catalog_schema.clone()
2486 ]
2487 );
2488
2489 let mut session = Session::dummy();
2490 session
2491 .vars_mut()
2492 .set(
2493 &SystemVars::new(),
2494 "search_path",
2495 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2496 false,
2497 )
2498 .expect("failed to set search_path");
2499 let conn_catalog = catalog.for_session(&session);
2500 assert_ne!(
2501 conn_catalog.effective_search_path(false),
2502 conn_catalog.search_path
2503 );
2504 assert_ne!(
2505 conn_catalog.effective_search_path(true),
2506 conn_catalog.search_path
2507 );
2508 assert_eq!(
2509 conn_catalog.effective_search_path(false),
2510 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2511 );
2512 assert_eq!(
2513 conn_catalog.effective_search_path(true),
2514 vec![
2515 mz_temp_schema.clone(),
2516 pg_catalog_schema.clone(),
2517 mz_catalog_schema.clone()
2518 ]
2519 );
2520
2521 let mut session = Session::dummy();
2522 session
2523 .vars_mut()
2524 .set(
2525 &SystemVars::new(),
2526 "search_path",
2527 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2528 false,
2529 )
2530 .expect("failed to set search_path");
2531 let conn_catalog = catalog.for_session(&session);
2532 assert_ne!(
2533 conn_catalog.effective_search_path(false),
2534 conn_catalog.search_path
2535 );
2536 assert_ne!(
2537 conn_catalog.effective_search_path(true),
2538 conn_catalog.search_path
2539 );
2540 assert_eq!(
2541 conn_catalog.effective_search_path(false),
2542 vec![
2543 mz_catalog_schema.clone(),
2544 pg_catalog_schema.clone(),
2545 mz_temp_schema.clone()
2546 ]
2547 );
2548 assert_eq!(
2549 conn_catalog.effective_search_path(true),
2550 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2551 );
2552 catalog.expire().await;
2553 })
2554 .await
2555 }
2556
2557 #[mz_ore::test(tokio::test)]
2558 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2560 use mz_ore::collections::CollectionExt;
2561 Catalog::with_debug(|catalog| async move {
2562 let conn_catalog = catalog.for_system_session();
2563 let scx = &mut StatementContext::new(None, &conn_catalog);
2564
2565 let parsed = mz_sql_parser::parser::parse_statements(
2566 "create view public.foo as select 1 as bar",
2567 )
2568 .expect("")
2569 .into_element()
2570 .ast;
2571
2572 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2573
2574 assert_eq!(
2576 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2577 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2578 );
2579 catalog.expire().await;
2580 })
2581 .await;
2582 }
2583
2584 #[mz_ore::test(tokio::test)]
2586 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2588 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2589 let column = ", 1";
2590 let view_def_size = view_def.bytes().count();
2591 let column_size = column.bytes().count();
2592 let column_count =
2593 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2594 let columns = iter::repeat(column).take(column_count).join("");
2595 let create_sql = format!("{view_def}{columns})");
2596 let create_sql_check = create_sql.clone();
2597 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2598 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2599 &create_sql
2600 ));
2601
2602 let persist_client = PersistClient::new_for_tests().await;
2603 let organization_id = Uuid::new_v4();
2604 let id = CatalogItemId::User(1);
2605 let gid = GlobalId::User(1);
2606 let bootstrap_args = test_bootstrap_args();
2607 {
2608 let mut catalog = Catalog::open_debug_catalog(
2609 persist_client.clone(),
2610 organization_id.clone(),
2611 &bootstrap_args,
2612 )
2613 .await
2614 .expect("unable to open debug catalog");
2615 let item = catalog
2616 .state()
2617 .deserialize_item(
2618 gid,
2619 &create_sql,
2620 &BTreeMap::new(),
2621 &mut LocalExpressionCache::Closed,
2622 None,
2623 )
2624 .expect("unable to parse view");
2625 let commit_ts = catalog.current_upper().await;
2626 catalog
2627 .transact(
2628 None,
2629 commit_ts,
2630 None,
2631 vec![Op::CreateItem {
2632 item,
2633 name: QualifiedItemName {
2634 qualifiers: ItemQualifiers {
2635 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2636 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2637 },
2638 item: "v".to_string(),
2639 },
2640 id,
2641 owner_id: MZ_SYSTEM_ROLE_ID,
2642 }],
2643 )
2644 .await
2645 .expect("failed to transact");
2646 catalog.expire().await;
2647 }
2648 {
2649 let catalog =
2650 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2651 .await
2652 .expect("unable to open debug catalog");
2653 let view = catalog.get_entry(&id);
2654 assert_eq!("v", view.name.item);
2655 match &view.item {
2656 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2657 item => panic!("expected view, got {}", item.typ()),
2658 }
2659 catalog.expire().await;
2660 }
2661 }
2662
2663 #[mz_ore::test(tokio::test)]
2664 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2666 Catalog::with_debug(|catalog| async move {
2667 let conn_catalog = catalog.for_system_session();
2668
2669 assert_eq!(
2670 mz_sql::catalog::ObjectType::ClusterReplica,
2671 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2672 ClusterId::user(1).expect("1 is a valid ID"),
2673 ReplicaId::User(1)
2674 )))
2675 );
2676 assert_eq!(
2677 mz_sql::catalog::ObjectType::Role,
2678 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2679 );
2680 catalog.expire().await;
2681 })
2682 .await;
2683 }
2684
2685 #[mz_ore::test(tokio::test)]
2686 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2688 Catalog::with_debug(|catalog| async move {
2689 let conn_catalog = catalog.for_system_session();
2690
2691 assert_eq!(
2692 None,
2693 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2694 ClusterId::user(1).expect("1 is a valid ID"),
2695 ReplicaId::User(1),
2696 ))))
2697 );
2698 assert_eq!(
2699 None,
2700 conn_catalog
2701 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2702 );
2703 catalog.expire().await;
2704 })
2705 .await;
2706 }
2707
2708 #[mz_ore::test(tokio::test)]
2709 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2711 Catalog::with_debug(|catalog| async move {
2712 let conn_catalog = catalog.for_system_session();
2713
2714 for builtin in BUILTINS::iter() {
2715 let (schema, name, expected_desc) = match builtin {
2716 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2717 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2718 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2719 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2720 Builtin::Log(_)
2721 | Builtin::Type(_)
2722 | Builtin::Func(_)
2723 | Builtin::Index(_)
2724 | Builtin::Connection(_) => continue,
2725 };
2726 let item = conn_catalog
2727 .resolve_item(&PartialItemName {
2728 database: None,
2729 schema: Some(schema.to_string()),
2730 item: name.to_string(),
2731 })
2732 .expect("unable to resolve item")
2733 .at_version(RelationVersionSelector::Latest);
2734
2735 let actual_desc = item.relation_desc().expect("invalid item type");
2736 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2737 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2738 {
2739 assert_eq!(
2740 actual_name, expected_name,
2741 "item {schema}.{name} column {index} name did not match its expected name"
2742 );
2743 assert_eq!(
2744 actual_typ, expected_typ,
2745 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2746 );
2747 }
2748 assert_eq!(
2749 &*actual_desc, expected_desc,
2750 "item {schema}.{name} did not match its expected RelationDesc"
2751 );
2752 }
2753 catalog.expire().await;
2754 })
2755 .await
2756 }
2757
2758 #[mz_ore::test(tokio::test)]
2761 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2763 async fn inner(catalog: Catalog) {
2764 let (client, connection) = tokio_postgres::connect(
2768 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2769 NoTls,
2770 )
2771 .await
2772 .expect("failed to connect to Postgres");
2773
2774 task::spawn(|| "compare_builtin_postgres", async move {
2775 if let Err(e) = connection.await {
2776 panic!("connection error: {}", e);
2777 }
2778 });
2779
2780 struct PgProc {
2781 name: String,
2782 arg_oids: Vec<u32>,
2783 ret_oid: Option<u32>,
2784 ret_set: bool,
2785 }
2786
2787 struct PgType {
2788 name: String,
2789 ty: String,
2790 elem: u32,
2791 array: u32,
2792 input: u32,
2793 receive: u32,
2794 }
2795
2796 struct PgOper {
2797 oprresult: u32,
2798 name: String,
2799 }
2800
2801 let pg_proc: BTreeMap<_, _> = client
2802 .query(
2803 "SELECT
2804 p.oid,
2805 proname,
2806 proargtypes,
2807 prorettype,
2808 proretset
2809 FROM pg_proc p
2810 JOIN pg_namespace n ON p.pronamespace = n.oid",
2811 &[],
2812 )
2813 .await
2814 .expect("pg query failed")
2815 .into_iter()
2816 .map(|row| {
2817 let oid: u32 = row.get("oid");
2818 let pg_proc = PgProc {
2819 name: row.get("proname"),
2820 arg_oids: row.get("proargtypes"),
2821 ret_oid: row.get("prorettype"),
2822 ret_set: row.get("proretset"),
2823 };
2824 (oid, pg_proc)
2825 })
2826 .collect();
2827
2828 let pg_type: BTreeMap<_, _> = client
2829 .query(
2830 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2831 &[],
2832 )
2833 .await
2834 .expect("pg query failed")
2835 .into_iter()
2836 .map(|row| {
2837 let oid: u32 = row.get("oid");
2838 let pg_type = PgType {
2839 name: row.get("typname"),
2840 ty: row.get("typtype"),
2841 elem: row.get("typelem"),
2842 array: row.get("typarray"),
2843 input: row.get("typinput"),
2844 receive: row.get("typreceive"),
2845 };
2846 (oid, pg_type)
2847 })
2848 .collect();
2849
2850 let pg_oper: BTreeMap<_, _> = client
2851 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2852 .await
2853 .expect("pg query failed")
2854 .into_iter()
2855 .map(|row| {
2856 let oid: u32 = row.get("oid");
2857 let pg_oper = PgOper {
2858 name: row.get("oprname"),
2859 oprresult: row.get("oprresult"),
2860 };
2861 (oid, pg_oper)
2862 })
2863 .collect();
2864
2865 let conn_catalog = catalog.for_system_session();
2866 let resolve_type_oid = |item: &str| {
2867 conn_catalog
2868 .resolve_type(&PartialItemName {
2869 database: None,
2870 schema: Some(PG_CATALOG_SCHEMA.into()),
2873 item: item.to_string(),
2874 })
2875 .expect("unable to resolve type")
2876 .oid()
2877 };
2878
2879 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2880 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2881 .collect();
2882
2883 let mut all_oids = BTreeSet::new();
2884
2885 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2888 [
2889 (Type::NAME, Type::TEXT),
2891 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2892 (Type::TIME, Type::TIMETZ),
2894 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2895 ]
2896 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2897 );
2898 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2899 1619, ]);
2901 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2902 if ignore_return_types.contains(&fn_oid) {
2903 return true;
2904 }
2905 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2906 return true;
2907 }
2908 a == b
2909 };
2910
2911 for builtin in BUILTINS::iter() {
2912 match builtin {
2913 Builtin::Type(ty) => {
2914 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2915
2916 if ty.oid >= FIRST_MATERIALIZE_OID {
2917 continue;
2920 }
2921
2922 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2925 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2926 });
2927 assert_eq!(
2928 ty.name, pg_ty.name,
2929 "oid {} has name {} in postgres; expected {}",
2930 ty.oid, pg_ty.name, ty.name,
2931 );
2932
2933 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2934 None => (0, 0),
2935 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2936 };
2937 assert_eq!(
2938 typinput_oid, pg_ty.input,
2939 "type {} has typinput OID {:?} in mz but {:?} in pg",
2940 ty.name, typinput_oid, pg_ty.input,
2941 );
2942 assert_eq!(
2943 typreceive_oid, pg_ty.receive,
2944 "type {} has typreceive OID {:?} in mz but {:?} in pg",
2945 ty.name, typreceive_oid, pg_ty.receive,
2946 );
2947 if typinput_oid != 0 {
2948 assert!(
2949 func_oids.contains(&typinput_oid),
2950 "type {} has typinput OID {} that does not exist in pg_proc",
2951 ty.name,
2952 typinput_oid,
2953 );
2954 }
2955 if typreceive_oid != 0 {
2956 assert!(
2957 func_oids.contains(&typreceive_oid),
2958 "type {} has typreceive OID {} that does not exist in pg_proc",
2959 ty.name,
2960 typreceive_oid,
2961 );
2962 }
2963
2964 match &ty.details.typ {
2966 CatalogType::Array { element_reference } => {
2967 let elem_ty = BUILTINS::iter()
2968 .filter_map(|builtin| match builtin {
2969 Builtin::Type(ty @ BuiltinType { name, .. })
2970 if element_reference == name =>
2971 {
2972 Some(ty)
2973 }
2974 _ => None,
2975 })
2976 .next();
2977 let elem_ty = match elem_ty {
2978 Some(ty) => ty,
2979 None => {
2980 panic!("{} is unexpectedly not a type", element_reference)
2981 }
2982 };
2983 assert_eq!(
2984 pg_ty.elem, elem_ty.oid,
2985 "type {} has mismatched element OIDs",
2986 ty.name
2987 )
2988 }
2989 CatalogType::Pseudo => {
2990 assert_eq!(
2991 pg_ty.ty, "p",
2992 "type {} is not a pseudo type as expected",
2993 ty.name
2994 )
2995 }
2996 CatalogType::Range { .. } => {
2997 assert_eq!(
2998 pg_ty.ty, "r",
2999 "type {} is not a range type as expected",
3000 ty.name
3001 );
3002 }
3003 _ => {
3004 assert_eq!(
3005 pg_ty.ty, "b",
3006 "type {} is not a base type as expected",
3007 ty.name
3008 )
3009 }
3010 }
3011
3012 let schema = catalog
3014 .resolve_schema_in_database(
3015 &ResolvedDatabaseSpecifier::Ambient,
3016 ty.schema,
3017 &SYSTEM_CONN_ID,
3018 )
3019 .expect("unable to resolve schema");
3020 let allocated_type = catalog
3021 .resolve_type(
3022 None,
3023 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3024 &PartialItemName {
3025 database: None,
3026 schema: Some(schema.name().schema.clone()),
3027 item: ty.name.to_string(),
3028 },
3029 &SYSTEM_CONN_ID,
3030 )
3031 .expect("unable to resolve type");
3032 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3033 ty
3034 } else {
3035 panic!("unexpectedly not a type")
3036 };
3037 match ty.details.array_id {
3038 Some(array_id) => {
3039 let array_ty = catalog.get_entry(&array_id);
3040 assert_eq!(
3041 pg_ty.array, array_ty.oid,
3042 "type {} has mismatched array OIDs",
3043 allocated_type.name.item,
3044 );
3045 }
3046 None => assert_eq!(
3047 pg_ty.array, 0,
3048 "type {} does not have an array type in mz but does in pg",
3049 allocated_type.name.item,
3050 ),
3051 }
3052 }
3053 Builtin::Func(func) => {
3054 for imp in func.inner.func_impls() {
3055 assert!(
3056 all_oids.insert(imp.oid),
3057 "{} reused oid {}",
3058 func.name,
3059 imp.oid
3060 );
3061
3062 assert!(
3063 imp.oid < FIRST_USER_OID,
3064 "built-in function {} erroneously has OID in user space ({})",
3065 func.name,
3066 imp.oid,
3067 );
3068
3069 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3072 continue;
3073 } else {
3074 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3075 panic!(
3076 "pg_proc missing function {}: oid {}",
3077 func.name, imp.oid
3078 )
3079 })
3080 };
3081 assert_eq!(
3082 func.name, pg_fn.name,
3083 "funcs with oid {} don't match names: {} in mz, {} in pg",
3084 imp.oid, func.name, pg_fn.name
3085 );
3086
3087 let imp_arg_oids = imp
3090 .arg_typs
3091 .iter()
3092 .map(|item| resolve_type_oid(item))
3093 .collect::<Vec<_>>();
3094
3095 if imp_arg_oids != pg_fn.arg_oids {
3096 println!(
3097 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3098 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3099 );
3100 }
3101
3102 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3103
3104 assert!(
3105 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3106 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3107 imp.oid,
3108 func.name,
3109 imp_return_oid,
3110 pg_fn.ret_oid
3111 );
3112
3113 assert_eq!(
3114 imp.return_is_set, pg_fn.ret_set,
3115 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3116 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3117 );
3118 }
3119 }
3120 _ => (),
3121 }
3122 }
3123
3124 for (op, func) in OP_IMPLS.iter() {
3125 for imp in func.func_impls() {
3126 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3127
3128 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3130 continue;
3131 } else {
3132 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3133 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3134 })
3135 };
3136
3137 assert_eq!(*op, pg_op.name);
3138
3139 let imp_return_oid =
3140 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3141 if imp_return_oid != pg_op.oprresult {
3142 panic!(
3143 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3144 imp.oid, op, imp_return_oid, pg_op.oprresult
3145 );
3146 }
3147 }
3148 }
3149 catalog.expire().await;
3150 }
3151
3152 Catalog::with_debug(inner).await
3153 }
3154
3155 #[mz_ore::test(tokio::test)]
3157 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3159 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3160 let catalog = Arc::new(catalog);
3161 let conn_catalog = catalog.for_system_session();
3162
3163 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3164 let mut handles = Vec::new();
3165
3166 let ignore_names = BTreeSet::from([
3168 "avg",
3169 "avg_internal_v1",
3170 "bool_and",
3171 "bool_or",
3172 "has_table_privilege", "has_type_privilege", "mod",
3175 "mz_panic",
3176 "mz_sleep",
3177 "pow",
3178 "stddev_pop",
3179 "stddev_samp",
3180 "stddev",
3181 "var_pop",
3182 "var_samp",
3183 "variance",
3184 ]);
3185
3186 let fns = BUILTINS::funcs()
3187 .map(|func| (&func.name, func.inner))
3188 .chain(OP_IMPLS.iter());
3189
3190 for (name, func) in fns {
3191 if ignore_names.contains(name) {
3192 continue;
3193 }
3194 let Func::Scalar(impls) = func else {
3195 continue;
3196 };
3197
3198 'outer: for imp in impls {
3199 let details = imp.details();
3200 let mut styps = Vec::new();
3201 for item in details.arg_typs.iter() {
3202 let oid = resolve_type_oid(item);
3203 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3204 continue 'outer;
3205 };
3206 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3207 }
3208 let datums = styps
3209 .iter()
3210 .map(|styp| {
3211 let mut datums = vec![Datum::Null];
3212 datums.extend(styp.interesting_datums());
3213 datums
3214 })
3215 .collect::<Vec<_>>();
3216 if datums.is_empty() {
3218 continue;
3219 }
3220
3221 let return_oid = details
3222 .return_typ
3223 .map(resolve_type_oid)
3224 .expect("must exist");
3225 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3226 .ok()
3227 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3228
3229 let mut idxs = vec![0; datums.len()];
3230 while idxs[0] < datums[0].len() {
3231 let mut args = Vec::with_capacity(idxs.len());
3232 for i in 0..(datums.len()) {
3233 args.push(datums[i][idxs[i]]);
3234 }
3235
3236 let op = &imp.op;
3237 let scalars = args
3238 .iter()
3239 .enumerate()
3240 .map(|(i, datum)| {
3241 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3242 datum.clone(),
3243 styps[i].clone(),
3244 ))
3245 })
3246 .collect();
3247
3248 let call_name = format!(
3249 "{name}({}) (oid: {})",
3250 args.iter()
3251 .map(|d| d.to_string())
3252 .collect::<Vec<_>>()
3253 .join(", "),
3254 imp.oid
3255 );
3256 let catalog = Arc::clone(&catalog);
3257 let call_name_fn = call_name.clone();
3258 let return_styp = return_styp.clone();
3259 let handle = task::spawn_blocking(
3260 || call_name,
3261 move || {
3262 smoketest_fn(
3263 name,
3264 call_name_fn,
3265 op,
3266 imp,
3267 args,
3268 catalog,
3269 scalars,
3270 return_styp,
3271 )
3272 },
3273 );
3274 handles.push(handle);
3275
3276 for i in (0..datums.len()).rev() {
3278 idxs[i] += 1;
3279 if idxs[i] >= datums[i].len() {
3280 if i == 0 {
3281 break;
3282 }
3283 idxs[i] = 0;
3284 continue;
3285 } else {
3286 break;
3287 }
3288 }
3289 }
3290 }
3291 }
3292 handles
3293 }
3294
3295 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3296 for handle in handles {
3297 handle.await;
3298 }
3299 }
3300
3301 fn smoketest_fn(
3302 name: &&str,
3303 call_name: String,
3304 op: &Operation<HirScalarExpr>,
3305 imp: &FuncImpl<HirScalarExpr>,
3306 args: Vec<Datum<'_>>,
3307 catalog: Arc<Catalog>,
3308 scalars: Vec<CoercibleScalarExpr>,
3309 return_styp: Option<SqlScalarType>,
3310 ) {
3311 let conn_catalog = catalog.for_system_session();
3312 let pcx = PlanContext::zero();
3313 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3314 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3315 let ecx = ExprContext {
3316 qcx: &qcx,
3317 name: "smoketest",
3318 scope: &Scope::empty(),
3319 relation_type: &SqlRelationType::empty(),
3320 allow_aggregates: false,
3321 allow_subqueries: false,
3322 allow_parameters: false,
3323 allow_windows: false,
3324 };
3325 let arena = RowArena::new();
3326 let mut session = Session::dummy();
3327 session
3328 .start_transaction(to_datetime(0), None, None)
3329 .expect("must succeed");
3330 let prep_style = ExprPrepOneShot {
3331 logical_time: EvalTime::Time(Timestamp::MIN),
3332 session: &session,
3333 catalog_state: &catalog.state,
3334 };
3335
3336 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3339 if let Ok(hir) = res {
3340 let uneliminated_result_row = {
3341 if let HirScalarExpr::CallUnary { func, .. } = &hir
3342 && func.is_eliminable_cast()
3343 {
3344 let mut uneliminated_mir = hir
3345 .clone()
3346 .lower_uncorrelated(HirToMirConfig {
3347 enable_cast_elimination: false,
3348 ..catalog.system_config().into()
3349 })
3350 .expect("lowering eliminable cast should always succeed");
3351 prep_style
3352 .prep_scalar_expr(&mut uneliminated_mir)
3353 .expect("must succeed");
3354
3355 uneliminated_mir
3357 .eval(&[], &arena)
3358 .ok()
3359 .map(|datum| Row::pack([datum]))
3360 } else {
3361 None
3362 }
3363 };
3364
3365 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3366 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3368
3369 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3370 if let Some(return_styp) = return_styp {
3371 let mir_typ = mir.typ(&[]);
3372 soft_assert_eq_or_log!(
3375 mir_typ.scalar_type,
3376 (&return_styp).into(),
3377 "MIR type did not match the catalog type (cast elimination/repr type error)"
3378 );
3379 if !eval_result_datum.is_instance_of(&mir_typ) {
3383 panic!(
3384 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3385 );
3386 }
3387 if let Some(row) = uneliminated_result_row {
3389 let uneliminated_result_datum = row.unpack_first();
3390 assert_eq!(
3391 uneliminated_result_datum, eval_result_datum,
3392 "datums should not change if cast is eliminable"
3393 );
3394 }
3395 if let Some((introduces_nulls, propagates_nulls)) =
3398 call_introduces_propagates_nulls(&mir)
3399 {
3400 if introduces_nulls {
3401 assert!(
3405 mir_typ.nullable,
3406 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3407 name, args, mir, mir_typ.nullable
3408 );
3409 } else {
3410 let any_input_null = args.iter().any(|arg| arg.is_null());
3411 if !any_input_null {
3412 assert!(
3413 !mir_typ.nullable,
3414 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3415 name, args, mir, mir_typ.nullable
3416 );
3417 } else if propagates_nulls {
3418 assert!(
3421 mir_typ.nullable,
3422 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3423 name, args, mir, mir_typ.nullable
3424 );
3425 }
3426 }
3431 }
3432 let mut reduced = mir.clone();
3435 reduced.reduce(&[]);
3436 match reduced {
3437 MirScalarExpr::Literal(reduce_result, ctyp) => {
3438 match reduce_result {
3439 Ok(reduce_result_row) => {
3440 let reduce_result_datum = reduce_result_row.unpack_first();
3441 assert_eq!(
3442 reduce_result_datum,
3443 eval_result_datum,
3444 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3445 name,
3446 args,
3447 mir,
3448 eval_result_datum,
3449 mir_typ.scalar_type,
3450 reduce_result_datum,
3451 ctyp.scalar_type
3452 );
3453 assert_eq!(
3459 ctyp.scalar_type,
3460 mir_typ.scalar_type,
3461 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3462 name,
3463 args,
3464 mir,
3465 eval_result_datum,
3466 mir_typ.scalar_type,
3467 reduce_result_datum,
3468 ctyp.scalar_type
3469 );
3470 }
3471 Err(..) => {} }
3473 }
3474 _ => unreachable!(
3475 "all args are literals, so should have reduced to a literal"
3476 ),
3477 }
3478 }
3479 }
3480 }
3481 }
3482 }
3483
3484 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3489 match mir_func_call {
3490 MirScalarExpr::CallUnary { func, expr } => {
3491 if expr.is_literal() {
3492 Some((func.introduces_nulls(), func.propagates_nulls()))
3493 } else {
3494 None
3495 }
3496 }
3497 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3498 if expr1.is_literal() && expr2.is_literal() {
3499 Some((func.introduces_nulls(), func.propagates_nulls()))
3500 } else {
3501 None
3502 }
3503 }
3504 MirScalarExpr::CallVariadic { func, exprs } => {
3505 if exprs.iter().all(|arg| arg.is_literal()) {
3506 Some((func.introduces_nulls(), func.propagates_nulls()))
3507 } else {
3508 None
3509 }
3510 }
3511 _ => None,
3512 }
3513 }
3514
3515 #[mz_ore::test(tokio::test)]
3517 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3519 Catalog::with_debug(|catalog| async move {
3520 let conn_catalog = catalog.for_system_session();
3521
3522 for view in BUILTINS::views().filter(|view| {
3523 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3524 }) {
3525 let item = conn_catalog
3526 .resolve_item(&PartialItemName {
3527 database: None,
3528 schema: Some(view.schema.to_string()),
3529 item: view.name.to_string(),
3530 })
3531 .expect("unable to resolve view")
3532 .at_version(RelationVersionSelector::Latest);
3534 let full_name = conn_catalog.resolve_full_name(item.name());
3535 let desc = item.relation_desc().expect("invalid item type");
3536 for col_type in desc.iter_types() {
3537 match &col_type.scalar_type {
3538 typ @ SqlScalarType::UInt16
3539 | typ @ SqlScalarType::UInt32
3540 | typ @ SqlScalarType::UInt64
3541 | typ @ SqlScalarType::MzTimestamp
3542 | typ @ SqlScalarType::List { .. }
3543 | typ @ SqlScalarType::Map { .. }
3544 | typ @ SqlScalarType::MzAclItem => {
3545 panic!("{typ:?} type found in {full_name}");
3546 }
3547 SqlScalarType::AclItem
3548 | SqlScalarType::Bool
3549 | SqlScalarType::Int16
3550 | SqlScalarType::Int32
3551 | SqlScalarType::Int64
3552 | SqlScalarType::Float32
3553 | SqlScalarType::Float64
3554 | SqlScalarType::Numeric { .. }
3555 | SqlScalarType::Date
3556 | SqlScalarType::Time
3557 | SqlScalarType::Timestamp { .. }
3558 | SqlScalarType::TimestampTz { .. }
3559 | SqlScalarType::Interval
3560 | SqlScalarType::PgLegacyChar
3561 | SqlScalarType::Bytes
3562 | SqlScalarType::String
3563 | SqlScalarType::Char { .. }
3564 | SqlScalarType::VarChar { .. }
3565 | SqlScalarType::Jsonb
3566 | SqlScalarType::Uuid
3567 | SqlScalarType::Array(_)
3568 | SqlScalarType::Record { .. }
3569 | SqlScalarType::Oid
3570 | SqlScalarType::RegProc
3571 | SqlScalarType::RegType
3572 | SqlScalarType::RegClass
3573 | SqlScalarType::Int2Vector
3574 | SqlScalarType::Range { .. }
3575 | SqlScalarType::PgLegacyName => {}
3576 }
3577 }
3578 }
3579 catalog.expire().await;
3580 })
3581 .await
3582 }
3583
3584 #[mz_ore::test(tokio::test)]
3587 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3589 Catalog::with_debug(|catalog| async move {
3590 let conn_catalog = catalog.for_system_session();
3591
3592 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3593 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3594
3595 for entry in catalog.entries() {
3596 let schema_spec = entry.name().qualifiers.schema_spec;
3597 let introspection_deps = catalog.introspection_dependencies(entry.id);
3598 if introspection_deps.is_empty() {
3599 assert!(
3600 schema_spec != introspection_schema_spec,
3601 "entry does not depend on introspection sources but is in \
3602 `mz_introspection`: {}",
3603 conn_catalog.resolve_full_name(entry.name()),
3604 );
3605 } else {
3606 assert!(
3607 schema_spec == introspection_schema_spec,
3608 "entry depends on introspection sources but is not in \
3609 `mz_introspection`: {}",
3610 conn_catalog.resolve_full_name(entry.name()),
3611 );
3612 }
3613 }
3614 })
3615 .await
3616 }
3617
3618 #[mz_ore::test(tokio::test)]
3619 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3621 let persist_client = PersistClient::new_for_tests().await;
3622 let bootstrap_args = test_bootstrap_args();
3623 let organization_id = Uuid::new_v4();
3624 let db_name = "DB";
3625
3626 let mut writer_catalog = Catalog::open_debug_catalog(
3627 persist_client.clone(),
3628 organization_id.clone(),
3629 &bootstrap_args,
3630 )
3631 .await
3632 .expect("open_debug_catalog");
3633 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3634 persist_client.clone(),
3635 organization_id.clone(),
3636 &bootstrap_args,
3637 )
3638 .await
3639 .expect("open_debug_read_only_catalog");
3640 assert_err!(writer_catalog.resolve_database(db_name));
3641 assert_err!(read_only_catalog.resolve_database(db_name));
3642
3643 let commit_ts = writer_catalog.current_upper().await;
3644 writer_catalog
3645 .transact(
3646 None,
3647 commit_ts,
3648 None,
3649 vec![Op::CreateDatabase {
3650 name: db_name.to_string(),
3651 owner_id: MZ_SYSTEM_ROLE_ID,
3652 }],
3653 )
3654 .await
3655 .expect("failed to transact");
3656
3657 let write_db = writer_catalog
3658 .resolve_database(db_name)
3659 .expect("resolve_database");
3660 read_only_catalog
3661 .sync_to_current_updates()
3662 .await
3663 .expect("sync_to_current_updates");
3664 let read_db = read_only_catalog
3665 .resolve_database(db_name)
3666 .expect("resolve_database");
3667
3668 assert_eq!(write_db, read_db);
3669
3670 let writer_catalog_fencer =
3671 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3672 .await
3673 .expect("open_debug_catalog for fencer");
3674 let fencer_db = writer_catalog_fencer
3675 .resolve_database(db_name)
3676 .expect("resolve_database for fencer");
3677 assert_eq!(fencer_db, read_db);
3678
3679 let write_fence_err = writer_catalog
3680 .sync_to_current_updates()
3681 .await
3682 .expect_err("sync_to_current_updates for fencer");
3683 assert!(matches!(
3684 write_fence_err,
3685 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3686 ));
3687 let read_fence_err = read_only_catalog
3688 .sync_to_current_updates()
3689 .await
3690 .expect_err("sync_to_current_updates after fencer");
3691 assert!(matches!(
3692 read_fence_err,
3693 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3694 ));
3695
3696 writer_catalog.expire().await;
3697 read_only_catalog.expire().await;
3698 writer_catalog_fencer.expire().await;
3699 }
3700}