1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, STORAGE_USAGE_ID_ALLOC_KEY, TestCatalogStateBuilder,
39 test_bootstrap_args,
40};
41use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
42use mz_catalog::memory::error::{Error, ErrorKind};
43use mz_catalog::memory::objects::{
44 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
45 NetworkPolicy, Role, RoleAuth, Schema,
46};
47use mz_compute_types::dataflows::DataflowDescription;
48use mz_controller::clusters::ReplicaLocation;
49use mz_controller_types::{ClusterId, ReplicaId};
50use mz_expr::OptimizedMirRelationExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68 SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 expr_cache_handle: Option<ExpressionCacheHandle>,
140 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141 transient_revision: u64,
142}
143
144impl Clone for Catalog {
147 fn clone(&self) -> Self {
148 Self {
149 state: self.state.clone(),
150 expr_cache_handle: self.expr_cache_handle.clone(),
151 storage: Arc::clone(&self.storage),
152 transient_revision: self.transient_revision,
153 }
154 }
155}
156
157impl Catalog {
158 #[mz_ore::instrument(level = "trace")]
164 pub fn set_optimized_plan(
165 &mut self,
166 id: GlobalId,
167 plan: DataflowDescription<OptimizedMirRelationExpr>,
168 ) {
169 self.state.set_optimized_plan(id, plan);
170 }
171
172 #[mz_ore::instrument(level = "trace")]
178 pub fn set_physical_plan(
179 &mut self,
180 id: GlobalId,
181 plan: DataflowDescription<mz_compute_types::plan::Plan>,
182 ) {
183 self.state.set_physical_plan(id, plan);
184 }
185
186 #[mz_ore::instrument(level = "trace")]
188 pub fn try_get_optimized_plan(
189 &self,
190 id: &GlobalId,
191 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192 let entry = self.state.try_get_entry_by_global_id(id)?;
193 entry.item().optimized_plan().map(AsRef::as_ref)
194 }
195
196 #[mz_ore::instrument(level = "trace")]
198 pub fn try_get_physical_plan(
199 &self,
200 id: &GlobalId,
201 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202 let entry = self.state.try_get_entry_by_global_id(id)?;
203 entry.item().physical_plan().map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
212 pub fn set_dataflow_metainfo(
213 &mut self,
214 id: GlobalId,
215 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216 ) {
217 self.state.set_dataflow_metainfo(id, metainfo);
218 }
219
220 #[mz_ore::instrument(level = "trace")]
222 pub fn try_get_dataflow_metainfo(
223 &self,
224 id: &GlobalId,
225 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226 let entry = self.state.try_get_entry_by_global_id(id)?;
227 entry.item().dataflow_metainfo()
228 }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233 state: Cow<'a, CatalogState>,
234 unresolvable_ids: BTreeSet<CatalogItemId>,
244 conn_id: ConnectionId,
245 cluster: String,
246 database: Option<DatabaseId>,
247 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248 role_id: RoleId,
249 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250 portals: Option<&'a BTreeMap<String, Portal>>,
251 notices_tx: UnboundedSender<AdapterNotice>,
252 restrict_to_user_objects: bool,
253}
254
255impl ConnCatalog<'_> {
256 pub fn conn_id(&self) -> &ConnectionId {
257 &self.conn_id
258 }
259
260 pub fn state(&self) -> &CatalogState {
261 &*self.state
262 }
263
264 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
274 assert_eq!(
275 self.role_id, MZ_SYSTEM_ROLE_ID,
276 "only the system role can mark IDs unresolvable",
277 );
278 self.unresolvable_ids.insert(id);
279 }
280
281 pub fn effective_search_path(
287 &self,
288 include_temp_schema: bool,
289 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
290 self.state
291 .effective_search_path(&self.search_path, include_temp_schema)
292 }
293}
294
295impl ConnectionResolver for ConnCatalog<'_> {
296 fn resolve_connection(
297 &self,
298 id: CatalogItemId,
299 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
300 self.state().resolve_connection(id)
301 }
302}
303
304impl Catalog {
305 pub fn transient_revision(&self) -> u64 {
309 self.transient_revision
310 }
311
312 pub async fn with_debug<F, Fut, T>(f: F) -> T
324 where
325 F: FnOnce(Catalog) -> Fut,
326 Fut: Future<Output = T>,
327 {
328 let persist_client = PersistClient::new_for_tests().await;
329 let organization_id = Uuid::new_v4();
330 let bootstrap_args = test_bootstrap_args();
331 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
332 .await
333 .expect("can open debug catalog");
334 f(catalog).await
335 }
336
337 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
340 where
341 F: FnOnce(Catalog) -> Fut,
342 Fut: Future<Output = T>,
343 {
344 let persist_client = PersistClient::new_for_tests().await;
345 let organization_id = Uuid::new_v4();
346 let bootstrap_args = test_bootstrap_args();
347 let mut catalog =
348 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
349 .await
350 .expect("can open debug catalog");
351
352 let now = SYSTEM_TIME.clone();
354 let openable_storage = TestCatalogStateBuilder::new(persist_client)
355 .with_organization_id(organization_id)
356 .with_default_deploy_generation()
357 .build()
358 .await
359 .expect("can create durable catalog");
360 let mut storage = openable_storage
361 .open(now().into(), &bootstrap_args)
362 .await
363 .expect("can open durable catalog")
364 .0;
365 let _ = storage
367 .sync_to_current_updates()
368 .await
369 .expect("can sync to current updates");
370 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
371
372 f(catalog).await
373 }
374
375 pub async fn open_debug_catalog(
379 persist_client: PersistClient,
380 organization_id: Uuid,
381 bootstrap_args: &BootstrapArgs,
382 ) -> Result<Catalog, anyhow::Error> {
383 let now = SYSTEM_TIME.clone();
384 let environment_id = None;
385 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
386 .with_organization_id(organization_id)
387 .with_default_deploy_generation()
388 .build()
389 .await?;
390 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
391 let system_parameter_defaults = BTreeMap::default();
392 Self::open_debug_catalog_inner(
393 persist_client,
394 storage,
395 now,
396 environment_id,
397 &DUMMY_BUILD_INFO,
398 system_parameter_defaults,
399 bootstrap_args,
400 None,
401 )
402 .await
403 }
404
405 pub async fn open_debug_read_only_catalog(
410 persist_client: PersistClient,
411 organization_id: Uuid,
412 bootstrap_args: &BootstrapArgs,
413 ) -> Result<Catalog, anyhow::Error> {
414 let now = SYSTEM_TIME.clone();
415 let environment_id = None;
416 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
417 .with_organization_id(organization_id)
418 .build()
419 .await?;
420 let storage = openable_storage
421 .open_read_only(&test_bootstrap_args())
422 .await?;
423 let system_parameter_defaults = BTreeMap::default();
424 Self::open_debug_catalog_inner(
425 persist_client,
426 storage,
427 now,
428 environment_id,
429 &DUMMY_BUILD_INFO,
430 system_parameter_defaults,
431 bootstrap_args,
432 None,
433 )
434 .await
435 }
436
437 pub async fn open_debug_read_only_persist_catalog_config(
442 persist_client: PersistClient,
443 now: NowFn,
444 environment_id: EnvironmentId,
445 system_parameter_defaults: BTreeMap<String, String>,
446 build_info: &'static BuildInfo,
447 bootstrap_args: &BootstrapArgs,
448 enable_expression_cache_override: Option<bool>,
449 ) -> Result<Catalog, anyhow::Error> {
450 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
451 .with_organization_id(environment_id.organization_id())
452 .with_version(
453 build_info
454 .version
455 .parse()
456 .expect("build version is parseable"),
457 )
458 .build()
459 .await?;
460 let storage = openable_storage.open_read_only(bootstrap_args).await?;
461 Self::open_debug_catalog_inner(
462 persist_client,
463 storage,
464 now,
465 Some(environment_id),
466 build_info,
467 system_parameter_defaults,
468 bootstrap_args,
469 enable_expression_cache_override,
470 )
471 .await
472 }
473
474 async fn open_debug_catalog_inner(
475 persist_client: PersistClient,
476 storage: Box<dyn DurableCatalogState>,
477 now: NowFn,
478 environment_id: Option<EnvironmentId>,
479 build_info: &'static BuildInfo,
480 system_parameter_defaults: BTreeMap<String, String>,
481 bootstrap_args: &BootstrapArgs,
482 enable_expression_cache_override: Option<bool>,
483 ) -> Result<Catalog, anyhow::Error> {
484 let metrics_registry = &MetricsRegistry::new();
485 let secrets_reader = Arc::new(InMemorySecretsController::new());
486 let previous_ts = now().into();
489 let replica_size = &bootstrap_args.default_cluster_replica_size;
490 let read_only = false;
491
492 let OpenCatalogResult {
493 catalog,
494 migrated_storage_collections_0dt: _,
495 new_builtin_collections: _,
496 builtin_table_updates: _,
497 cached_global_exprs: _,
498 uncached_local_exprs: _,
499 } = Catalog::open(Config {
500 storage,
501 metrics_registry,
502 state: StateConfig {
503 unsafe_mode: true,
504 all_features: false,
505 build_info,
506 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
507 read_only,
508 now,
509 boot_ts: previous_ts,
510 skip_migrations: true,
511 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
512 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
513 size: replica_size.clone(),
514 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
515 },
516 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
517 size: replica_size.clone(),
518 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
519 },
520 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
521 size: replica_size.clone(),
522 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
523 },
524 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
525 size: replica_size.clone(),
526 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
527 },
528 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
529 size: replica_size.clone(),
530 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
531 },
532 system_parameter_defaults,
533 remote_system_parameters: None,
534 availability_zones: vec![],
535 egress_addresses: vec![],
536 aws_principal_context: None,
537 aws_privatelink_availability_zones: None,
538 http_host_name: None,
539 connection_context: ConnectionContext::for_tests(secrets_reader),
540 builtin_item_migration_config: BuiltinItemMigrationConfig {
541 persist_client: persist_client.clone(),
542 read_only,
543 force_migration: None,
544 },
545 persist_client,
546 enable_expression_cache_override,
547 helm_chart_version: None,
548 external_login_password_mz_system: None,
549 license_key: ValidatedLicenseKey::for_tests(),
550 },
551 })
552 .await?;
553 Ok(catalog)
554 }
555
556 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
557 self.state.for_session(session)
558 }
559
560 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
561 self.state.for_sessionless_user(role_id)
562 }
563
564 pub fn for_system_session(&self) -> ConnCatalog<'_> {
565 self.state.for_system_session()
566 }
567
568 async fn storage<'a>(
569 &'a self,
570 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
571 self.storage.lock().await
572 }
573
574 pub async fn current_upper(&self) -> mz_repr::Timestamp {
575 self.storage().await.current_upper().await
576 }
577
578 pub async fn allocate_user_id(
579 &self,
580 commit_ts: mz_repr::Timestamp,
581 ) -> Result<(CatalogItemId, GlobalId), Error> {
582 self.storage()
583 .await
584 .allocate_user_id(commit_ts)
585 .await
586 .maybe_terminate("allocating user ids")
587 .err_into()
588 }
589
590 pub async fn allocate_user_ids(
592 &self,
593 amount: u64,
594 commit_ts: mz_repr::Timestamp,
595 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
596 self.storage()
597 .await
598 .allocate_user_ids(amount, commit_ts)
599 .await
600 .maybe_terminate("allocating user ids")
601 .err_into()
602 }
603
604 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
605 let commit_ts = self.storage().await.current_upper().await;
606 self.allocate_user_id(commit_ts).await
607 }
608
609 pub async fn allocate_storage_usage_id(
617 &self,
618 commit_ts: mz_repr::Timestamp,
619 ) -> Result<u64, Error> {
620 use mz_ore::collections::CollectionExt;
621
622 self.storage()
623 .await
624 .allocate_id(STORAGE_USAGE_ID_ALLOC_KEY, 1, commit_ts)
625 .await
626 .maybe_terminate("allocating storage usage id")
627 .map(|ids| ids.into_element())
628 .err_into()
629 }
630
631 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
633 self.storage()
634 .await
635 .get_next_user_item_id()
636 .await
637 .err_into()
638 }
639
640 #[cfg(test)]
641 pub async fn allocate_system_id(
642 &self,
643 commit_ts: mz_repr::Timestamp,
644 ) -> Result<(CatalogItemId, GlobalId), Error> {
645 use mz_ore::collections::CollectionExt;
646
647 let mut storage = self.storage().await;
648 let mut txn = storage.transaction().await?;
649 let id = txn
650 .allocate_system_item_ids(1)
651 .maybe_terminate("allocating system ids")?
652 .into_element();
653 let _ = txn.get_and_commit_op_updates();
655 txn.commit(commit_ts).await?;
656 Ok(id)
657 }
658
659 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
661 self.storage()
662 .await
663 .get_next_system_item_id()
664 .await
665 .err_into()
666 }
667
668 pub async fn allocate_user_cluster_id(
669 &self,
670 commit_ts: mz_repr::Timestamp,
671 ) -> Result<ClusterId, Error> {
672 self.storage()
673 .await
674 .allocate_user_cluster_id(commit_ts)
675 .await
676 .maybe_terminate("allocating user cluster ids")
677 .err_into()
678 }
679
680 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
682 self.storage()
683 .await
684 .get_next_system_replica_id()
685 .await
686 .err_into()
687 }
688
689 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
691 self.storage()
692 .await
693 .get_next_user_replica_id()
694 .await
695 .err_into()
696 }
697
698 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
699 self.state.resolve_database(database_name)
700 }
701
702 pub fn resolve_schema(
703 &self,
704 current_database: Option<&DatabaseId>,
705 database_name: Option<&str>,
706 schema_name: &str,
707 conn_id: &ConnectionId,
708 ) -> Result<&Schema, SqlCatalogError> {
709 self.state
710 .resolve_schema(current_database, database_name, schema_name, conn_id)
711 }
712
713 pub fn resolve_schema_in_database(
714 &self,
715 database_spec: &ResolvedDatabaseSpecifier,
716 schema_name: &str,
717 conn_id: &ConnectionId,
718 ) -> Result<&Schema, SqlCatalogError> {
719 self.state
720 .resolve_schema_in_database(database_spec, schema_name, conn_id)
721 }
722
723 pub fn resolve_replica_in_cluster(
724 &self,
725 cluster_id: &ClusterId,
726 replica_name: &str,
727 ) -> Result<&ClusterReplica, SqlCatalogError> {
728 self.state
729 .resolve_replica_in_cluster(cluster_id, replica_name)
730 }
731
732 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
733 self.state.resolve_system_schema(name)
734 }
735
736 pub fn resolve_search_path(
737 &self,
738 session: &Session,
739 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
740 self.state.resolve_search_path(session)
741 }
742
743 pub fn resolve_entry(
745 &self,
746 current_database: Option<&DatabaseId>,
747 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
748 name: &PartialItemName,
749 conn_id: &ConnectionId,
750 ) -> Result<&CatalogEntry, SqlCatalogError> {
751 self.state
752 .resolve_entry(current_database, search_path, name, conn_id)
753 }
754
755 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
757 self.state.resolve_builtin_table(builtin)
758 }
759
760 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
762 self.state.resolve_builtin_log(builtin).0
763 }
764
765 pub fn resolve_builtin_storage_collection(
767 &self,
768 builtin: &'static BuiltinSource,
769 ) -> CatalogItemId {
770 self.state.resolve_builtin_source(builtin)
771 }
772
773 pub fn resolve_function(
775 &self,
776 current_database: Option<&DatabaseId>,
777 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
778 name: &PartialItemName,
779 conn_id: &ConnectionId,
780 ) -> Result<&CatalogEntry, SqlCatalogError> {
781 self.state
782 .resolve_function(current_database, search_path, name, conn_id)
783 }
784
785 pub fn resolve_type(
787 &self,
788 current_database: Option<&DatabaseId>,
789 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
790 name: &PartialItemName,
791 conn_id: &ConnectionId,
792 ) -> Result<&CatalogEntry, SqlCatalogError> {
793 self.state
794 .resolve_type(current_database, search_path, name, conn_id)
795 }
796
797 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
798 self.state.resolve_cluster(name)
799 }
800
801 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
807 self.state.resolve_builtin_cluster(cluster)
808 }
809
810 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
811 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
812 }
813
814 pub fn resolve_target_cluster(
816 &self,
817 target_cluster: TargetCluster,
818 session: &Session,
819 ) -> Result<&Cluster, AdapterError> {
820 match target_cluster {
821 TargetCluster::CatalogServer => {
822 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
823 }
824 TargetCluster::Active => self.active_cluster(session),
825 TargetCluster::Transaction(cluster_id) => self
826 .try_get_cluster(cluster_id)
827 .ok_or(AdapterError::ConcurrentClusterDrop),
828 }
829 }
830
831 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
832 if session.user().name != SYSTEM_USER.name
836 && session.user().name != SUPPORT_USER.name
837 && session.vars().cluster() == SYSTEM_USER.name
838 {
839 coord_bail!(
840 "system cluster '{}' cannot execute user queries",
841 SYSTEM_USER.name
842 );
843 }
844 let cluster = self.resolve_cluster(session.vars().cluster())?;
845 Ok(cluster)
846 }
847
848 pub fn state(&self) -> &CatalogState {
849 &self.state
850 }
851
852 pub fn resolve_full_name(
853 &self,
854 name: &QualifiedItemName,
855 conn_id: Option<&ConnectionId>,
856 ) -> FullItemName {
857 self.state.resolve_full_name(name, conn_id)
858 }
859
860 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
861 self.state.try_get_entry(id)
862 }
863
864 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
865 self.state.try_get_entry_by_global_id(id)
866 }
867
868 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
869 self.state.get_entry(id)
870 }
871
872 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
873 self.state.get_entry_by_global_id(id)
874 }
875
876 pub fn get_global_ids<'a>(
877 &'a self,
878 id: &CatalogItemId,
879 ) -> impl Iterator<Item = GlobalId> + use<'a> {
880 self.get_entry(id).global_ids()
881 }
882
883 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
884 self.get_entry_by_global_id(id).id()
885 }
886
887 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
888 let item = self.try_get_entry_by_global_id(id)?;
889 Some(item.id())
890 }
891
892 pub fn get_schema(
893 &self,
894 database_spec: &ResolvedDatabaseSpecifier,
895 schema_spec: &SchemaSpecifier,
896 conn_id: &ConnectionId,
897 ) -> &Schema {
898 self.state.get_schema(database_spec, schema_spec, conn_id)
899 }
900
901 pub fn try_get_schema(
902 &self,
903 database_spec: &ResolvedDatabaseSpecifier,
904 schema_spec: &SchemaSpecifier,
905 conn_id: &ConnectionId,
906 ) -> Option<&Schema> {
907 self.state
908 .try_get_schema(database_spec, schema_spec, conn_id)
909 }
910
911 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
912 self.state.get_mz_catalog_schema_id()
913 }
914
915 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
916 self.state.get_pg_catalog_schema_id()
917 }
918
919 pub fn get_information_schema_id(&self) -> SchemaId {
920 self.state.get_information_schema_id()
921 }
922
923 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
924 self.state.get_mz_internal_schema_id()
925 }
926
927 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
928 self.state.get_mz_introspection_schema_id()
929 }
930
931 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
932 self.state.get_mz_unsafe_schema_id()
933 }
934
935 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
936 self.state.system_schema_ids()
937 }
938
939 pub fn get_database(&self, id: &DatabaseId) -> &Database {
940 self.state.get_database(id)
941 }
942
943 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
944 self.state.try_get_role(id)
945 }
946
947 pub fn get_role(&self, id: &RoleId) -> &Role {
948 self.state.get_role(id)
949 }
950
951 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
952 self.state.try_get_role_by_name(role_name)
953 }
954
955 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
956 self.state.try_get_role_auth_by_id(id)
957 }
958
959 pub fn create_temporary_schema(
962 &mut self,
963 conn_id: &ConnectionId,
964 owner_id: RoleId,
965 ) -> Result<(), Error> {
966 self.state.create_temporary_schema(conn_id, owner_id)
967 }
968
969 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
970 self.state
972 .temporary_schemas
973 .get(conn_id)
974 .map(|schema| schema.items.contains_key(item_name))
975 .unwrap_or(false)
976 }
977
978 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
981 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
982 return Ok(());
983 };
984 if !schema.items.is_empty() {
985 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
986 }
987 Ok(())
988 }
989
990 pub(crate) fn object_dependents(
991 &self,
992 object_ids: &Vec<ObjectId>,
993 conn_id: &ConnectionId,
994 ) -> Vec<ObjectId> {
995 let mut seen = BTreeSet::new();
996 self.state.object_dependents(object_ids, conn_id, &mut seen)
997 }
998
999 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1000 FullNameV1 {
1001 database: name.database.to_string(),
1002 schema: name.schema.clone(),
1003 item: name.item.clone(),
1004 }
1005 }
1006
1007 pub fn find_available_cluster_name(&self, name: &str) -> String {
1008 let mut i = 0;
1009 let mut candidate = name.to_string();
1010 while self.state.clusters_by_name.contains_key(&candidate) {
1011 i += 1;
1012 candidate = format!("{}{}", name, i);
1013 }
1014 candidate
1015 }
1016
1017 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1018 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1019 self.cluster_replica_sizes()
1020 .enabled_allocations()
1021 .map(|a| a.0.to_owned())
1022 .collect::<Vec<_>>()
1023 } else {
1024 self.system_config().allowed_cluster_replica_sizes()
1025 }
1026 }
1027
1028 pub fn concretize_replica_location(
1029 &self,
1030 location: mz_catalog::durable::ReplicaLocation,
1031 allowed_sizes: &Vec<String>,
1032 allowed_availability_zones: Option<&[String]>,
1033 allow_disabled: bool,
1034 ) -> Result<ReplicaLocation, Error> {
1035 self.state.concretize_replica_location(
1036 location,
1037 allowed_sizes,
1038 allowed_availability_zones,
1039 allow_disabled,
1040 )
1041 }
1042
1043 pub(crate) fn ensure_valid_replica_size(
1044 &self,
1045 allowed_sizes: &[String],
1046 size: &String,
1047 allow_disabled: bool,
1048 ) -> Result<(), Error> {
1049 self.state
1050 .ensure_valid_replica_size(allowed_sizes, size, allow_disabled)
1051 }
1052
1053 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1054 &self.state.cluster_replica_sizes
1055 }
1056
1057 pub fn get_privileges(
1059 &self,
1060 id: &SystemObjectId,
1061 conn_id: &ConnectionId,
1062 ) -> Option<&PrivilegeMap> {
1063 match id {
1064 SystemObjectId::Object(id) => match id {
1065 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1066 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1067 ObjectId::Schema((database_spec, schema_spec)) => Some(
1068 self.get_schema(database_spec, schema_spec, conn_id)
1069 .privileges(),
1070 ),
1071 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1072 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1073 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1074 },
1075 SystemObjectId::System => Some(&self.state.system_privileges),
1076 }
1077 }
1078
1079 #[mz_ore::instrument(level = "debug")]
1080 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1081 Ok(self.storage().await.advance_upper(new_upper).await?)
1082 }
1083
1084 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1086 self.state.introspection_dependencies(id)
1087 }
1088
1089 pub fn dump(&self) -> Result<CatalogDump, Error> {
1095 Ok(CatalogDump::new(self.state.dump(None)?))
1096 }
1097
1098 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1102 self.state.check_consistency().map_err(|inconsistencies| {
1103 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1104 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1105 })
1106 })
1107 }
1108
1109 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1110 self.state.config()
1111 }
1112
1113 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1114 self.state.entry_by_id.values()
1115 }
1116
1117 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1118 self.entries()
1119 .filter(|entry| entry.is_connection() && entry.id().is_user())
1120 }
1121
1122 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1123 self.entries()
1124 .filter(|entry| entry.is_table() && entry.id().is_user())
1125 }
1126
1127 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1128 self.entries()
1129 .filter(|entry| entry.is_source() && entry.id().is_user())
1130 }
1131
1132 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1133 self.entries()
1134 .filter(|entry| entry.is_sink() && entry.id().is_user())
1135 }
1136
1137 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1138 self.entries()
1139 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1140 }
1141
1142 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1143 self.entries()
1144 .filter(|entry| entry.is_secret() && entry.id().is_user())
1145 }
1146
1147 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1148 self.state.get_network_policy(&network_policy_id)
1149 }
1150
1151 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1152 self.state.try_get_network_policy_by_name(name)
1153 }
1154
1155 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1156 self.state.clusters_by_id.values()
1157 }
1158
1159 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1160 self.state.get_cluster(cluster_id)
1161 }
1162
1163 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1164 self.state.try_get_cluster(cluster_id)
1165 }
1166
1167 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1168 self.clusters().filter(|cluster| cluster.id.is_user())
1169 }
1170
1171 pub fn get_cluster_replica(
1172 &self,
1173 cluster_id: ClusterId,
1174 replica_id: ReplicaId,
1175 ) -> &ClusterReplica {
1176 self.state.get_cluster_replica(cluster_id, replica_id)
1177 }
1178
1179 pub fn try_get_cluster_replica(
1180 &self,
1181 cluster_id: ClusterId,
1182 replica_id: ReplicaId,
1183 ) -> Option<&ClusterReplica> {
1184 self.state.try_get_cluster_replica(cluster_id, replica_id)
1185 }
1186
1187 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1188 self.user_clusters()
1189 .flat_map(|cluster| cluster.user_replicas())
1190 }
1191
1192 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1193 self.state.database_by_id.values()
1194 }
1195
1196 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1197 self.state
1198 .roles_by_id
1199 .values()
1200 .filter(|role| role.is_user())
1201 }
1202
1203 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1204 self.state
1205 .network_policies_by_id
1206 .iter()
1207 .filter(|(id, _)| id.is_user())
1208 .map(|(_, policy)| policy)
1209 }
1210
1211 pub fn system_privileges(&self) -> &PrivilegeMap {
1212 &self.state.system_privileges
1213 }
1214
1215 pub fn default_privileges(
1216 &self,
1217 ) -> impl Iterator<
1218 Item = (
1219 &DefaultPrivilegeObject,
1220 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1221 ),
1222 > {
1223 self.state.default_privileges.iter()
1224 }
1225
1226 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1227 self.state
1228 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1229 }
1230
1231 pub fn pack_storage_usage_update(
1232 &self,
1233 event: VersionedStorageUsage,
1234 diff: Diff,
1235 ) -> BuiltinTableUpdate {
1236 self.state
1237 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1238 }
1239
1240 pub fn system_config(&self) -> &SystemVars {
1241 self.state.system_config()
1242 }
1243
1244 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1245 self.state.system_config_mut()
1246 }
1247
1248 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1249 self.state.ensure_not_reserved_role(role_id)
1250 }
1251
1252 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1253 self.state.ensure_grantable_role(role_id)
1254 }
1255
1256 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1257 self.state.ensure_not_system_role(role_id)
1258 }
1259
1260 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1261 self.state.ensure_not_predefined_role(role_id)
1262 }
1263
1264 pub fn ensure_not_reserved_network_policy(
1265 &self,
1266 network_policy_id: &NetworkPolicyId,
1267 ) -> Result<(), Error> {
1268 self.state
1269 .ensure_not_reserved_network_policy(network_policy_id)
1270 }
1271
1272 pub fn ensure_not_reserved_object(
1273 &self,
1274 object_id: &ObjectId,
1275 conn_id: &ConnectionId,
1276 ) -> Result<(), Error> {
1277 match object_id {
1278 ObjectId::Cluster(cluster_id) => {
1279 if cluster_id.is_system() {
1280 let cluster = self.get_cluster(*cluster_id);
1281 Err(Error::new(ErrorKind::ReadOnlyCluster(
1282 cluster.name().to_string(),
1283 )))
1284 } else {
1285 Ok(())
1286 }
1287 }
1288 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1289 if replica_id.is_system() {
1290 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1291 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1292 replica.name().to_string(),
1293 )))
1294 } else {
1295 Ok(())
1296 }
1297 }
1298 ObjectId::Database(database_id) => {
1299 if database_id.is_system() {
1300 let database = self.get_database(database_id);
1301 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1302 database.name().to_string(),
1303 )))
1304 } else {
1305 Ok(())
1306 }
1307 }
1308 ObjectId::Schema((database_spec, schema_spec)) => {
1309 if schema_spec.is_system() {
1310 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1311 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1312 schema.name().schema.clone(),
1313 )))
1314 } else {
1315 Ok(())
1316 }
1317 }
1318 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1319 ObjectId::Item(item_id) => {
1320 if item_id.is_system() {
1321 let item = self.get_entry(item_id);
1322 let name = self.resolve_full_name(item.name(), Some(conn_id));
1323 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1324 } else {
1325 Ok(())
1326 }
1327 }
1328 ObjectId::NetworkPolicy(network_policy_id) => {
1329 self.ensure_not_reserved_network_policy(network_policy_id)
1330 }
1331 }
1332 }
1333
1334 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1336 &mut self,
1337 create_sql: &str,
1338 force_if_exists_skip: bool,
1339 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1340 self.state
1341 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1342 }
1343
1344 pub(crate) fn cache_expressions(
1355 &self,
1356 id: GlobalId,
1357 local_mir: Option<OptimizedMirRelationExpr>,
1358 mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1359 mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1360 dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1361 optimizer_features: OptimizerFeatures,
1362 ) -> BoxFuture<'static, ()> {
1363 global_mir.as_of = None;
1367 global_mir.until = Default::default();
1368 physical_plan.as_of = None;
1369 physical_plan.until = Default::default();
1370
1371 let mut local_exprs = Vec::new();
1372 if let Some(local_mir) = local_mir {
1373 local_exprs.push((
1374 id,
1375 LocalExpressions {
1376 local_mir,
1377 optimizer_features: optimizer_features.clone(),
1378 },
1379 ));
1380 }
1381 let global_exprs = vec![(
1382 id,
1383 GlobalExpressions {
1384 global_mir,
1385 physical_plan,
1386 dataflow_metainfos,
1387 optimizer_features,
1388 },
1389 )];
1390 self.update_expression_cache(local_exprs, global_exprs, Default::default())
1391 }
1392
1393 pub(crate) fn update_expression_cache<'a, 'b>(
1394 &'a self,
1395 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1396 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1397 invalidate_ids: BTreeSet<GlobalId>,
1398 ) -> BoxFuture<'b, ()> {
1399 if let Some(expr_cache) = &self.expr_cache_handle {
1400 expr_cache
1401 .update(
1402 new_local_expressions,
1403 new_global_expressions,
1404 invalidate_ids,
1405 )
1406 .boxed()
1407 } else {
1408 async {}.boxed()
1409 }
1410 }
1411
1412 #[cfg(test)]
1416 async fn sync_to_current_updates(
1417 &mut self,
1418 ) -> Result<
1419 (
1420 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1421 Vec<ParsedStateUpdate>,
1422 ),
1423 CatalogError,
1424 > {
1425 let updates = self.storage().await.sync_to_current_updates().await?;
1426 let (builtin_table_updates, catalog_updates) = self
1427 .state
1428 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1429 .await;
1430 Ok((builtin_table_updates, catalog_updates))
1431 }
1432}
1433
1434pub fn is_reserved_name(name: &str) -> bool {
1435 BUILTIN_PREFIXES
1436 .iter()
1437 .any(|prefix| name.starts_with(prefix))
1438}
1439
1440pub fn is_reserved_role_name(name: &str) -> bool {
1441 is_reserved_name(name) || is_public_role(name)
1442}
1443
1444pub fn is_public_role(name: &str) -> bool {
1445 name == &*PUBLIC_ROLE_NAME
1446}
1447
1448pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1449 object_type_to_audit_object_type(sql_type.into())
1450}
1451
1452pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1453 match id {
1454 CommentObjectId::Table(_) => ObjectType::Table,
1455 CommentObjectId::View(_) => ObjectType::View,
1456 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1457 CommentObjectId::Source(_) => ObjectType::Source,
1458 CommentObjectId::Sink(_) => ObjectType::Sink,
1459 CommentObjectId::Index(_) => ObjectType::Index,
1460 CommentObjectId::Func(_) => ObjectType::Func,
1461 CommentObjectId::Connection(_) => ObjectType::Connection,
1462 CommentObjectId::Type(_) => ObjectType::Type,
1463 CommentObjectId::Secret(_) => ObjectType::Secret,
1464 CommentObjectId::Role(_) => ObjectType::Role,
1465 CommentObjectId::Database(_) => ObjectType::Database,
1466 CommentObjectId::Schema(_) => ObjectType::Schema,
1467 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1468 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1469 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1470 }
1471}
1472
1473pub(crate) fn object_type_to_audit_object_type(
1474 object_type: mz_sql::catalog::ObjectType,
1475) -> ObjectType {
1476 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1477}
1478
1479pub(crate) fn system_object_type_to_audit_object_type(
1480 system_type: &SystemObjectType,
1481) -> ObjectType {
1482 match system_type {
1483 SystemObjectType::Object(object_type) => match object_type {
1484 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1485 mz_sql::catalog::ObjectType::View => ObjectType::View,
1486 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1487 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1488 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1489 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1490 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1491 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1492 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1493 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1494 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1495 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1496 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1497 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1498 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1499 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1500 },
1501 SystemObjectType::System => ObjectType::System,
1502 }
1503}
1504
1505#[derive(Debug, Copy, Clone)]
1506pub enum UpdatePrivilegeVariant {
1507 Grant,
1508 Revoke,
1509}
1510
1511impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1512 fn from(variant: UpdatePrivilegeVariant) -> Self {
1513 match variant {
1514 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1515 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1516 }
1517 }
1518}
1519
1520impl From<UpdatePrivilegeVariant> for EventType {
1521 fn from(variant: UpdatePrivilegeVariant) -> Self {
1522 match variant {
1523 UpdatePrivilegeVariant::Grant => EventType::Grant,
1524 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1525 }
1526 }
1527}
1528
1529impl ConnCatalog<'_> {
1530 fn resolve_item_name(
1531 &self,
1532 name: &PartialItemName,
1533 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1534 self.resolve_item(name).map(|entry| entry.name())
1535 }
1536
1537 fn resolve_function_name(
1538 &self,
1539 name: &PartialItemName,
1540 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1541 self.resolve_function(name).map(|entry| entry.name())
1542 }
1543
1544 fn resolve_type_name(
1545 &self,
1546 name: &PartialItemName,
1547 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1548 self.resolve_type(name).map(|entry| entry.name())
1549 }
1550}
1551
1552impl ExprHumanizer for ConnCatalog<'_> {
1553 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1554 let entry = self.state.try_get_entry_by_global_id(&id)?;
1555 Some(self.resolve_full_name(entry.name()).to_string())
1556 }
1557
1558 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1559 let entry = self.state.try_get_entry_by_global_id(&id)?;
1560 Some(entry.name().item.clone())
1561 }
1562
1563 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1564 let entry = self.state.try_get_entry_by_global_id(&id)?;
1565 Some(self.resolve_full_name(entry.name()).into_parts())
1566 }
1567
1568 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1569 use SqlScalarType::*;
1570
1571 match typ {
1572 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1573 List {
1574 custom_id: Some(item_id),
1575 ..
1576 }
1577 | Map {
1578 custom_id: Some(item_id),
1579 ..
1580 } => {
1581 let item = self.get_item(item_id);
1582 self.minimal_qualification(item.name()).to_string()
1583 }
1584 List { element_type, .. } => {
1585 format!(
1586 "{} list",
1587 self.humanize_sql_scalar_type(element_type, postgres_compat)
1588 )
1589 }
1590 Map { value_type, .. } => format!(
1591 "map[{}=>{}]",
1592 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1593 self.humanize_sql_scalar_type(value_type, postgres_compat)
1594 ),
1595 Record {
1596 custom_id: Some(item_id),
1597 ..
1598 } => {
1599 let item = self.get_item(item_id);
1600 self.minimal_qualification(item.name()).to_string()
1601 }
1602 Record { fields, .. } => format!(
1603 "record({})",
1604 fields
1605 .iter()
1606 .map(|f| format!(
1607 "{}: {}",
1608 f.0,
1609 self.humanize_sql_column_type(&f.1, postgres_compat)
1610 ))
1611 .join(",")
1612 ),
1613 PgLegacyChar => "\"char\"".into(),
1614 Char { length } if !postgres_compat => match length {
1615 None => "char".into(),
1616 Some(length) => format!("char({})", length.into_u32()),
1617 },
1618 VarChar { max_length } if !postgres_compat => match max_length {
1619 None => "varchar".into(),
1620 Some(length) => format!("varchar({})", length.into_u32()),
1621 },
1622 UInt16 => "uint2".into(),
1623 UInt32 => "uint4".into(),
1624 UInt64 => "uint8".into(),
1625 ty => {
1626 let pgrepr_type = mz_pgrepr::Type::from(ty);
1627 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1628
1629 let res = if self
1630 .effective_search_path(true)
1631 .iter()
1632 .any(|(_, schema)| schema == &pg_catalog_schema)
1633 {
1634 pgrepr_type.name().to_string()
1635 } else {
1636 let name = QualifiedItemName {
1639 qualifiers: ItemQualifiers {
1640 database_spec: ResolvedDatabaseSpecifier::Ambient,
1641 schema_spec: pg_catalog_schema,
1642 },
1643 item: pgrepr_type.name().to_string(),
1644 };
1645 self.resolve_full_name(&name).to_string()
1646 };
1647 res
1648 }
1649 }
1650 }
1651
1652 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1653 let entry = self.state.try_get_entry_by_global_id(&id)?;
1654
1655 match entry.index() {
1656 Some(index) => {
1657 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1658 let mut on_names = on_desc
1659 .iter_names()
1660 .map(|col_name| col_name.to_string())
1661 .collect::<Vec<_>>();
1662
1663 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1664
1665 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1669 let mut ix_names = vec![String::new(); ix_arity];
1670
1671 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1673 let on_name = on_names.get_mut(on_pos).expect("on_name");
1674 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1675 std::mem::swap(on_name, ix_name);
1676 }
1677
1678 Some(ix_names) }
1680 None => {
1681 let desc = self.state.try_get_desc_by_global_id(&id)?;
1682 let column_names = desc
1683 .iter_names()
1684 .map(|col_name| col_name.to_string())
1685 .collect();
1686
1687 Some(column_names)
1688 }
1689 }
1690 }
1691
1692 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1693 let desc = self.state.try_get_desc_by_global_id(&id)?;
1694 Some(desc.get_name(column).to_string())
1695 }
1696
1697 fn id_exists(&self, id: GlobalId) -> bool {
1698 self.state.entry_by_global_id.contains_key(&id)
1699 }
1700}
1701
1702impl SessionCatalog for ConnCatalog<'_> {
1703 fn active_role_id(&self) -> &RoleId {
1704 &self.role_id
1705 }
1706
1707 fn restrict_to_user_objects(&self) -> bool {
1708 self.restrict_to_user_objects
1709 }
1710
1711 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1712 self.prepared_statements
1713 .as_ref()
1714 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1715 .flatten()
1716 }
1717
1718 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1719 self.portals
1720 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1721 }
1722
1723 fn active_database(&self) -> Option<&DatabaseId> {
1724 self.database.as_ref()
1725 }
1726
1727 fn active_cluster(&self) -> &str {
1728 &self.cluster
1729 }
1730
1731 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1732 &self.search_path
1733 }
1734
1735 fn resolve_database(
1736 &self,
1737 database_name: &str,
1738 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1739 Ok(self.state.resolve_database(database_name)?)
1740 }
1741
1742 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1743 self.state
1744 .database_by_id
1745 .get(id)
1746 .expect("database doesn't exist")
1747 }
1748
1749 #[allow(clippy::as_conversions)]
1751 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1752 self.state
1753 .database_by_id
1754 .values()
1755 .map(|database| database as &dyn CatalogDatabase)
1756 .collect()
1757 }
1758
1759 fn resolve_schema(
1760 &self,
1761 database_name: Option<&str>,
1762 schema_name: &str,
1763 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1764 Ok(self.state.resolve_schema(
1765 self.database.as_ref(),
1766 database_name,
1767 schema_name,
1768 &self.conn_id,
1769 )?)
1770 }
1771
1772 fn resolve_schema_in_database(
1773 &self,
1774 database_spec: &ResolvedDatabaseSpecifier,
1775 schema_name: &str,
1776 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1777 Ok(self
1778 .state
1779 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1780 }
1781
1782 fn get_schema(
1783 &self,
1784 database_spec: &ResolvedDatabaseSpecifier,
1785 schema_spec: &SchemaSpecifier,
1786 ) -> &dyn CatalogSchema {
1787 self.state
1788 .get_schema(database_spec, schema_spec, &self.conn_id)
1789 }
1790
1791 #[allow(clippy::as_conversions)]
1793 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1794 self.get_databases()
1795 .into_iter()
1796 .flat_map(|database| database.schemas().into_iter())
1797 .chain(
1798 self.state
1799 .ambient_schemas_by_id
1800 .values()
1801 .chain(self.state.temporary_schemas.values())
1802 .map(|schema| schema as &dyn CatalogSchema),
1803 )
1804 .collect()
1805 }
1806
1807 fn get_mz_internal_schema_id(&self) -> SchemaId {
1808 self.state().get_mz_internal_schema_id()
1809 }
1810
1811 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1812 self.state().get_mz_unsafe_schema_id()
1813 }
1814
1815 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1816 self.state.is_system_schema_specifier(schema)
1817 }
1818
1819 fn resolve_role(
1820 &self,
1821 role_name: &str,
1822 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1823 match self.state.try_get_role_by_name(role_name) {
1824 Some(role) => Ok(role),
1825 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1826 }
1827 }
1828
1829 fn resolve_network_policy(
1830 &self,
1831 policy_name: &str,
1832 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1833 match self.state.try_get_network_policy_by_name(policy_name) {
1834 Some(policy) => Ok(policy),
1835 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1836 }
1837 }
1838
1839 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1840 Some(self.state.roles_by_id.get(id)?)
1841 }
1842
1843 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1844 self.state.get_role(id)
1845 }
1846
1847 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1848 #[allow(clippy::as_conversions)]
1850 self.state
1851 .roles_by_id
1852 .values()
1853 .map(|role| role as &dyn CatalogRole)
1854 .collect()
1855 }
1856
1857 fn mz_system_role_id(&self) -> RoleId {
1858 MZ_SYSTEM_ROLE_ID
1859 }
1860
1861 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1862 self.state.collect_role_membership(id)
1863 }
1864
1865 fn get_network_policy(
1866 &self,
1867 id: &NetworkPolicyId,
1868 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1869 self.state.get_network_policy(id)
1870 }
1871
1872 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1873 #[allow(clippy::as_conversions)]
1875 self.state
1876 .network_policies_by_id
1877 .values()
1878 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1879 .collect()
1880 }
1881
1882 fn resolve_cluster(
1883 &self,
1884 cluster_name: Option<&str>,
1885 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1886 Ok(self
1887 .state
1888 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1889 }
1890
1891 fn resolve_cluster_replica(
1892 &self,
1893 cluster_replica_name: &QualifiedReplica,
1894 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1895 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1896 }
1897
1898 fn resolve_item(
1899 &self,
1900 name: &PartialItemName,
1901 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1902 let r = self.state.resolve_entry(
1903 self.database.as_ref(),
1904 &self.effective_search_path(true),
1905 name,
1906 &self.conn_id,
1907 )?;
1908 if self.unresolvable_ids.contains(&r.id()) {
1909 Err(SqlCatalogError::UnknownItem(name.to_string()))
1910 } else {
1911 Ok(r)
1912 }
1913 }
1914
1915 fn resolve_function(
1916 &self,
1917 name: &PartialItemName,
1918 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1919 let r = self.state.resolve_function(
1920 self.database.as_ref(),
1921 &self.effective_search_path(false),
1922 name,
1923 &self.conn_id,
1924 )?;
1925
1926 if self.unresolvable_ids.contains(&r.id()) {
1927 Err(SqlCatalogError::UnknownFunction {
1928 name: name.to_string(),
1929 alternative: None,
1930 })
1931 } else {
1932 Ok(r)
1933 }
1934 }
1935
1936 fn resolve_type(
1937 &self,
1938 name: &PartialItemName,
1939 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1940 let r = self.state.resolve_type(
1941 self.database.as_ref(),
1942 &self.effective_search_path(false),
1943 name,
1944 &self.conn_id,
1945 )?;
1946
1947 if self.unresolvable_ids.contains(&r.id()) {
1948 Err(SqlCatalogError::UnknownType {
1949 name: name.to_string(),
1950 })
1951 } else {
1952 Ok(r)
1953 }
1954 }
1955
1956 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1957 self.state.get_system_type(name)
1958 }
1959
1960 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1961 Some(self.state.try_get_entry(id)?)
1962 }
1963
1964 fn try_get_item_by_global_id(
1965 &self,
1966 id: &GlobalId,
1967 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1968 let entry = self.state.try_get_entry_by_global_id(id)?;
1969 let entry = match &entry.item {
1970 CatalogItem::Table(table) => {
1971 let (version, _gid) = table
1972 .collections
1973 .iter()
1974 .find(|(_version, gid)| *gid == id)
1975 .expect("catalog out of sync, mismatched GlobalId");
1976 entry.at_version(RelationVersionSelector::Specific(*version))
1977 }
1978 _ => entry.at_version(RelationVersionSelector::Latest),
1979 };
1980 Some(entry)
1981 }
1982
1983 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1984 self.state.get_entry(id)
1985 }
1986
1987 fn get_item_by_global_id(
1988 &self,
1989 id: &GlobalId,
1990 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1991 let entry = self.state.get_entry_by_global_id(id);
1992 let entry = match &entry.item {
1993 CatalogItem::Table(table) => {
1994 let (version, _gid) = table
1995 .collections
1996 .iter()
1997 .find(|(_version, gid)| *gid == id)
1998 .expect("catalog out of sync, mismatched GlobalId");
1999 entry.at_version(RelationVersionSelector::Specific(*version))
2000 }
2001 _ => entry.at_version(RelationVersionSelector::Latest),
2002 };
2003 entry
2004 }
2005
2006 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2007 self.get_schemas()
2008 .into_iter()
2009 .flat_map(|schema| schema.item_ids())
2010 .map(|id| self.get_item(&id))
2011 .collect()
2012 }
2013
2014 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2015 self.state
2016 .get_item_by_name(name, &self.conn_id)
2017 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2018 }
2019
2020 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2021 self.state
2022 .get_type_by_name(name, &self.conn_id)
2023 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2024 }
2025
2026 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2027 &self.state.clusters_by_id[&id]
2028 }
2029
2030 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2031 self.state
2032 .clusters_by_id
2033 .values()
2034 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2035 .collect()
2036 }
2037
2038 fn get_cluster_replica(
2039 &self,
2040 cluster_id: ClusterId,
2041 replica_id: ReplicaId,
2042 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2043 let cluster = self.get_cluster(cluster_id);
2044 cluster.replica(replica_id)
2045 }
2046
2047 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2048 self.get_clusters()
2049 .into_iter()
2050 .flat_map(|cluster| cluster.replicas().into_iter())
2051 .collect()
2052 }
2053
2054 fn get_system_privileges(&self) -> &PrivilegeMap {
2055 &self.state.system_privileges
2056 }
2057
2058 fn get_default_privileges(
2059 &self,
2060 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2061 self.state
2062 .default_privileges
2063 .iter()
2064 .map(|(object, acl_items)| (object, acl_items.collect()))
2065 .collect()
2066 }
2067
2068 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2069 self.state.find_available_name(name, &self.conn_id)
2070 }
2071
2072 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2073 self.state.resolve_full_name(name, Some(&self.conn_id))
2074 }
2075
2076 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2077 self.state.resolve_full_schema_name(name)
2078 }
2079
2080 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2081 self.state.get_entry_by_global_id(global_id).id()
2082 }
2083
2084 fn resolve_global_id(
2085 &self,
2086 item_id: &CatalogItemId,
2087 version: RelationVersionSelector,
2088 ) -> GlobalId {
2089 self.state
2090 .get_entry(item_id)
2091 .at_version(version)
2092 .global_id()
2093 }
2094
2095 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2096 self.state.config()
2097 }
2098
2099 fn now(&self) -> EpochMillis {
2100 (self.state.config().now)()
2101 }
2102
2103 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2104 self.state.aws_privatelink_availability_zones.clone()
2105 }
2106
2107 fn system_vars(&self) -> &SystemVars {
2108 &self.state.system_configuration
2109 }
2110
2111 fn system_vars_mut(&mut self) -> &mut SystemVars {
2112 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2113 }
2114
2115 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2116 self.state().get_owner_id(id, self.conn_id())
2117 }
2118
2119 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2120 match id {
2121 SystemObjectId::System => Some(&self.state.system_privileges),
2122 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2123 Some(self.get_cluster(*id).privileges())
2124 }
2125 SystemObjectId::Object(ObjectId::Database(id)) => {
2126 Some(self.get_database(id).privileges())
2127 }
2128 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2129 self.state
2132 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2133 .map(|schema| schema.privileges())
2134 }
2135 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2136 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2137 Some(self.get_network_policy(id).privileges())
2138 }
2139 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2140 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2141 }
2142 }
2143
2144 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2145 let mut seen = BTreeSet::new();
2146 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2147 }
2148
2149 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2150 let mut seen = BTreeSet::new();
2151 self.state.item_dependents(id, &mut seen)
2152 }
2153
2154 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2155 rbac::all_object_privileges(object_type)
2156 }
2157
2158 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2159 self.state.get_object_type(object_id)
2160 }
2161
2162 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2163 self.state.get_system_object_type(id)
2164 }
2165
2166 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2173 if qualified_name.qualifiers.schema_spec.is_temporary() {
2174 return qualified_name.item.clone().into();
2182 }
2183
2184 let database_id = match &qualified_name.qualifiers.database_spec {
2185 ResolvedDatabaseSpecifier::Ambient => None,
2186 ResolvedDatabaseSpecifier::Id(id)
2187 if self.database.is_some() && self.database == Some(*id) =>
2188 {
2189 None
2190 }
2191 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2192 };
2193
2194 let schema_spec = if database_id.is_none()
2195 && self.resolve_item_name(&PartialItemName {
2196 database: None,
2197 schema: None,
2198 item: qualified_name.item.clone(),
2199 }) == Ok(qualified_name)
2200 || self.resolve_function_name(&PartialItemName {
2201 database: None,
2202 schema: None,
2203 item: qualified_name.item.clone(),
2204 }) == Ok(qualified_name)
2205 || self.resolve_type_name(&PartialItemName {
2206 database: None,
2207 schema: None,
2208 item: qualified_name.item.clone(),
2209 }) == Ok(qualified_name)
2210 {
2211 None
2212 } else {
2213 Some(qualified_name.qualifiers.schema_spec.clone())
2216 };
2217
2218 let res = PartialItemName {
2219 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2220 schema: schema_spec.map(|spec| {
2221 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2222 .name()
2223 .schema
2224 .clone()
2225 }),
2226 item: qualified_name.item.clone(),
2227 };
2228 assert!(
2229 self.resolve_item_name(&res) == Ok(qualified_name)
2230 || self.resolve_function_name(&res) == Ok(qualified_name)
2231 || self.resolve_type_name(&res) == Ok(qualified_name)
2232 );
2233 res
2234 }
2235
2236 fn add_notice(&self, notice: PlanNotice) {
2237 let _ = self.notices_tx.send(notice.into());
2238 }
2239
2240 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2241 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2242 self.state.comments.get_object_comments(comment_id)
2243 }
2244
2245 fn is_cluster_size_cc(&self, size: &str) -> bool {
2246 self.state
2247 .cluster_replica_sizes
2248 .0
2249 .get(size)
2250 .map_or(false, |a| a.is_cc)
2251 }
2252}
2253
2254#[cfg(test)]
2255mod tests {
2256 use std::collections::{BTreeMap, BTreeSet};
2257 use std::sync::Arc;
2258 use std::{env, iter};
2259
2260 use itertools::Itertools;
2261 use mz_catalog::memory::objects::CatalogItem;
2262 use mz_postgres_util::{query, sql};
2263 use tokio_postgres::NoTls;
2264 use tokio_postgres::types::Type;
2265 use uuid::Uuid;
2266
2267 use mz_catalog::SYSTEM_CONN_ID;
2268 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2269 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2270 use mz_controller_types::{ClusterId, ReplicaId};
2271 use mz_expr::{Eval, MirScalarExpr};
2272 use mz_ore::now::to_datetime;
2273 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2274 use mz_persist_client::PersistClient;
2275 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2276 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2277 use mz_repr::role_id::RoleId;
2278 use mz_repr::{
2279 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2280 SqlScalarType, Timestamp,
2281 };
2282 use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2283 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2284 use mz_sql::names::{
2285 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2286 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2287 };
2288 use mz_sql::plan::{
2289 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2290 QueryLifetime, Scope, StatementContext,
2291 };
2292 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2293 use mz_sql::session::vars::{SystemVars, VarInput};
2294
2295 use crate::catalog::state::LocalExpressionCache;
2296 use crate::catalog::{Catalog, Op};
2297 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2298 use crate::session::Session;
2299
2300 #[mz_ore::test(tokio::test)]
2307 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2309 Catalog::with_debug(|catalog| async move {
2310 struct TestCase {
2311 input: QualifiedItemName,
2312 system_output: PartialItemName,
2313 normal_output: PartialItemName,
2314 }
2315
2316 let test_cases = vec![
2317 TestCase {
2318 input: QualifiedItemName {
2319 qualifiers: ItemQualifiers {
2320 database_spec: ResolvedDatabaseSpecifier::Ambient,
2321 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2322 },
2323 item: "numeric".to_string(),
2324 },
2325 system_output: PartialItemName {
2326 database: None,
2327 schema: None,
2328 item: "numeric".to_string(),
2329 },
2330 normal_output: PartialItemName {
2331 database: None,
2332 schema: None,
2333 item: "numeric".to_string(),
2334 },
2335 },
2336 TestCase {
2337 input: QualifiedItemName {
2338 qualifiers: ItemQualifiers {
2339 database_spec: ResolvedDatabaseSpecifier::Ambient,
2340 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2341 },
2342 item: "mz_array_types".to_string(),
2343 },
2344 system_output: PartialItemName {
2345 database: None,
2346 schema: None,
2347 item: "mz_array_types".to_string(),
2348 },
2349 normal_output: PartialItemName {
2350 database: None,
2351 schema: None,
2352 item: "mz_array_types".to_string(),
2353 },
2354 },
2355 ];
2356
2357 for tc in test_cases {
2358 assert_eq!(
2359 catalog
2360 .for_system_session()
2361 .minimal_qualification(&tc.input),
2362 tc.system_output
2363 );
2364 assert_eq!(
2365 catalog
2366 .for_session(&Session::dummy())
2367 .minimal_qualification(&tc.input),
2368 tc.normal_output
2369 );
2370 }
2371 catalog.expire().await;
2372 })
2373 .await
2374 }
2375
2376 #[mz_ore::test(tokio::test)]
2377 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2379 let persist_client = PersistClient::new_for_tests().await;
2380 let organization_id = Uuid::new_v4();
2381 let bootstrap_args = test_bootstrap_args();
2382 {
2383 let mut catalog = Catalog::open_debug_catalog(
2384 persist_client.clone(),
2385 organization_id.clone(),
2386 &bootstrap_args,
2387 )
2388 .await
2389 .expect("unable to open debug catalog");
2390 assert_eq!(catalog.transient_revision(), 1);
2391 let commit_ts = catalog.current_upper().await;
2392 catalog
2393 .transact(
2394 None,
2395 commit_ts,
2396 None,
2397 vec![Op::CreateDatabase {
2398 name: "test".to_string(),
2399 owner_id: MZ_SYSTEM_ROLE_ID,
2400 }],
2401 )
2402 .await
2403 .expect("failed to transact");
2404 assert_eq!(catalog.transient_revision(), 2);
2405 catalog.expire().await;
2406 }
2407 {
2408 let catalog =
2409 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2410 .await
2411 .expect("unable to open debug catalog");
2412 assert_eq!(catalog.transient_revision(), 1);
2414 catalog.expire().await;
2415 }
2416 }
2417
2418 #[mz_ore::test(tokio::test)]
2419 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2421 Catalog::with_debug(|catalog| async move {
2422 let mz_catalog_schema = (
2423 ResolvedDatabaseSpecifier::Ambient,
2424 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2425 );
2426 let pg_catalog_schema = (
2427 ResolvedDatabaseSpecifier::Ambient,
2428 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2429 );
2430 let mz_temp_schema = (
2431 ResolvedDatabaseSpecifier::Ambient,
2432 SchemaSpecifier::Temporary,
2433 );
2434
2435 let session = Session::dummy();
2437 let conn_catalog = catalog.for_session(&session);
2438 assert_ne!(
2439 conn_catalog.effective_search_path(false),
2440 conn_catalog.search_path
2441 );
2442 assert_ne!(
2443 conn_catalog.effective_search_path(true),
2444 conn_catalog.search_path
2445 );
2446 assert_eq!(
2447 conn_catalog.effective_search_path(false),
2448 vec![
2449 mz_catalog_schema.clone(),
2450 pg_catalog_schema.clone(),
2451 conn_catalog.search_path[0].clone()
2452 ]
2453 );
2454 assert_eq!(
2455 conn_catalog.effective_search_path(true),
2456 vec![
2457 mz_temp_schema.clone(),
2458 mz_catalog_schema.clone(),
2459 pg_catalog_schema.clone(),
2460 conn_catalog.search_path[0].clone()
2461 ]
2462 );
2463
2464 let mut session = Session::dummy();
2466 session
2467 .vars_mut()
2468 .set(
2469 &SystemVars::new(),
2470 "search_path",
2471 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2472 false,
2473 )
2474 .expect("failed to set search_path");
2475 let conn_catalog = catalog.for_session(&session);
2476 assert_ne!(
2477 conn_catalog.effective_search_path(false),
2478 conn_catalog.search_path
2479 );
2480 assert_ne!(
2481 conn_catalog.effective_search_path(true),
2482 conn_catalog.search_path
2483 );
2484 assert_eq!(
2485 conn_catalog.effective_search_path(false),
2486 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2487 );
2488 assert_eq!(
2489 conn_catalog.effective_search_path(true),
2490 vec![
2491 mz_temp_schema.clone(),
2492 mz_catalog_schema.clone(),
2493 pg_catalog_schema.clone()
2494 ]
2495 );
2496
2497 let mut session = Session::dummy();
2498 session
2499 .vars_mut()
2500 .set(
2501 &SystemVars::new(),
2502 "search_path",
2503 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2504 false,
2505 )
2506 .expect("failed to set search_path");
2507 let conn_catalog = catalog.for_session(&session);
2508 assert_ne!(
2509 conn_catalog.effective_search_path(false),
2510 conn_catalog.search_path
2511 );
2512 assert_ne!(
2513 conn_catalog.effective_search_path(true),
2514 conn_catalog.search_path
2515 );
2516 assert_eq!(
2517 conn_catalog.effective_search_path(false),
2518 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2519 );
2520 assert_eq!(
2521 conn_catalog.effective_search_path(true),
2522 vec![
2523 mz_temp_schema.clone(),
2524 pg_catalog_schema.clone(),
2525 mz_catalog_schema.clone()
2526 ]
2527 );
2528
2529 let mut session = Session::dummy();
2530 session
2531 .vars_mut()
2532 .set(
2533 &SystemVars::new(),
2534 "search_path",
2535 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2536 false,
2537 )
2538 .expect("failed to set search_path");
2539 let conn_catalog = catalog.for_session(&session);
2540 assert_ne!(
2541 conn_catalog.effective_search_path(false),
2542 conn_catalog.search_path
2543 );
2544 assert_ne!(
2545 conn_catalog.effective_search_path(true),
2546 conn_catalog.search_path
2547 );
2548 assert_eq!(
2549 conn_catalog.effective_search_path(false),
2550 vec![
2551 mz_catalog_schema.clone(),
2552 pg_catalog_schema.clone(),
2553 mz_temp_schema.clone()
2554 ]
2555 );
2556 assert_eq!(
2557 conn_catalog.effective_search_path(true),
2558 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2559 );
2560 catalog.expire().await;
2561 })
2562 .await
2563 }
2564
2565 #[mz_ore::test(tokio::test)]
2566 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2568 use mz_ore::collections::CollectionExt;
2569 Catalog::with_debug(|catalog| async move {
2570 let conn_catalog = catalog.for_system_session();
2571 let scx = &mut StatementContext::new(None, &conn_catalog);
2572
2573 let parsed = mz_sql_parser::parser::parse_statements(
2574 "create view public.foo as select 1 as bar",
2575 )
2576 .expect("")
2577 .into_element()
2578 .ast;
2579
2580 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2581
2582 assert_eq!(
2584 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2585 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2586 );
2587 catalog.expire().await;
2588 })
2589 .await;
2590 }
2591
2592 #[mz_ore::test(tokio::test)]
2594 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2596 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2597 let column = ", 1";
2598 let view_def_size = view_def.bytes().count();
2599 let column_size = column.bytes().count();
2600 let column_count =
2601 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2602 let columns = iter::repeat(column).take(column_count).join("");
2603 let create_sql = format!("{view_def}{columns})");
2604 let create_sql_check = create_sql.clone();
2605 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2606 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2607 &create_sql
2608 ));
2609
2610 let persist_client = PersistClient::new_for_tests().await;
2611 let organization_id = Uuid::new_v4();
2612 let id = CatalogItemId::User(1);
2613 let gid = GlobalId::User(1);
2614 let bootstrap_args = test_bootstrap_args();
2615 {
2616 let mut catalog = Catalog::open_debug_catalog(
2617 persist_client.clone(),
2618 organization_id.clone(),
2619 &bootstrap_args,
2620 )
2621 .await
2622 .expect("unable to open debug catalog");
2623 let item = catalog
2624 .state()
2625 .deserialize_item(
2626 gid,
2627 &create_sql,
2628 &BTreeMap::new(),
2629 &mut LocalExpressionCache::Closed,
2630 None,
2631 )
2632 .expect("unable to parse view");
2633 let commit_ts = catalog.current_upper().await;
2634 catalog
2635 .transact(
2636 None,
2637 commit_ts,
2638 None,
2639 vec![Op::CreateItem {
2640 item,
2641 name: QualifiedItemName {
2642 qualifiers: ItemQualifiers {
2643 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2644 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2645 },
2646 item: "v".to_string(),
2647 },
2648 id,
2649 owner_id: MZ_SYSTEM_ROLE_ID,
2650 }],
2651 )
2652 .await
2653 .expect("failed to transact");
2654 catalog.expire().await;
2655 }
2656 {
2657 let catalog =
2658 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2659 .await
2660 .expect("unable to open debug catalog");
2661 let view = catalog.get_entry(&id);
2662 assert_eq!("v", view.name.item);
2663 match &view.item {
2664 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2665 item => panic!("expected view, got {}", item.typ()),
2666 }
2667 catalog.expire().await;
2668 }
2669 }
2670
2671 #[mz_ore::test(tokio::test)]
2672 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2674 Catalog::with_debug(|catalog| async move {
2675 let conn_catalog = catalog.for_system_session();
2676
2677 assert_eq!(
2678 mz_sql::catalog::ObjectType::ClusterReplica,
2679 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2680 ClusterId::user(1).expect("1 is a valid ID"),
2681 ReplicaId::User(1)
2682 )))
2683 );
2684 assert_eq!(
2685 mz_sql::catalog::ObjectType::Role,
2686 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2687 );
2688 catalog.expire().await;
2689 })
2690 .await;
2691 }
2692
2693 #[mz_ore::test(tokio::test)]
2694 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2696 Catalog::with_debug(|catalog| async move {
2697 let conn_catalog = catalog.for_system_session();
2698
2699 assert_eq!(
2700 None,
2701 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2702 ClusterId::user(1).expect("1 is a valid ID"),
2703 ReplicaId::User(1),
2704 ))))
2705 );
2706 assert_eq!(
2707 None,
2708 conn_catalog
2709 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2710 );
2711 catalog.expire().await;
2712 })
2713 .await;
2714 }
2715
2716 #[mz_ore::test(tokio::test)]
2717 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2719 Catalog::with_debug(|catalog| async move {
2720 let conn_catalog = catalog.for_system_session();
2721
2722 for builtin in BUILTINS::iter() {
2723 let (schema, name, expected_desc) = match builtin {
2724 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2725 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2726 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2727 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2728 Builtin::Log(_)
2729 | Builtin::Type(_)
2730 | Builtin::Func(_)
2731 | Builtin::Index(_)
2732 | Builtin::Connection(_) => continue,
2733 };
2734 let item = conn_catalog
2735 .resolve_item(&PartialItemName {
2736 database: None,
2737 schema: Some(schema.to_string()),
2738 item: name.to_string(),
2739 })
2740 .expect("unable to resolve item")
2741 .at_version(RelationVersionSelector::Latest);
2742
2743 let actual_desc = item.relation_desc().expect("invalid item type");
2744 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2745 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2746 {
2747 assert_eq!(
2748 actual_name, expected_name,
2749 "item {schema}.{name} column {index} name did not match its expected name"
2750 );
2751 assert_eq!(
2752 actual_typ, expected_typ,
2753 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2754 );
2755 }
2756 assert_eq!(
2757 &*actual_desc, expected_desc,
2758 "item {schema}.{name} did not match its expected RelationDesc"
2759 );
2760 }
2761 catalog.expire().await;
2762 })
2763 .await
2764 }
2765
2766 #[mz_ore::test(tokio::test)]
2769 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2771 async fn inner(catalog: Catalog) {
2772 let (client, connection) = tokio_postgres::connect(
2776 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2777 NoTls,
2778 )
2779 .await
2780 .expect("failed to connect to Postgres");
2781
2782 task::spawn(|| "compare_builtin_postgres", async move {
2783 if let Err(e) = connection.await {
2784 panic!("connection error: {}", e);
2785 }
2786 });
2787
2788 struct PgProc {
2789 name: String,
2790 arg_oids: Vec<u32>,
2791 ret_oid: Option<u32>,
2792 ret_set: bool,
2793 }
2794
2795 struct PgType {
2796 name: String,
2797 ty: String,
2798 elem: u32,
2799 array: u32,
2800 input: u32,
2801 receive: u32,
2802 }
2803
2804 struct PgOper {
2805 oprresult: u32,
2806 name: String,
2807 }
2808
2809 let pg_proc: BTreeMap<_, _> = query(
2810 &client,
2811 sql!(
2812 "SELECT
2813 p.oid,
2814 proname,
2815 proargtypes,
2816 prorettype,
2817 proretset
2818 FROM pg_proc p
2819 JOIN pg_namespace n ON p.pronamespace = n.oid"
2820 ),
2821 &[],
2822 )
2823 .await
2824 .expect("pg query failed")
2825 .into_iter()
2826 .map(|row| {
2827 let oid: u32 = row.get("oid");
2828 let pg_proc = PgProc {
2829 name: row.get("proname"),
2830 arg_oids: row.get("proargtypes"),
2831 ret_oid: row.get("prorettype"),
2832 ret_set: row.get("proretset"),
2833 };
2834 (oid, pg_proc)
2835 })
2836 .collect();
2837
2838 let pg_type: BTreeMap<_, _> = query(
2839 &client,
2840 sql!(
2841 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type"
2842 ),
2843 &[],
2844 )
2845 .await
2846 .expect("pg query failed")
2847 .into_iter()
2848 .map(|row| {
2849 let oid: u32 = row.get("oid");
2850 let pg_type = PgType {
2851 name: row.get("typname"),
2852 ty: row.get("typtype"),
2853 elem: row.get("typelem"),
2854 array: row.get("typarray"),
2855 input: row.get("typinput"),
2856 receive: row.get("typreceive"),
2857 };
2858 (oid, pg_type)
2859 })
2860 .collect();
2861
2862 let pg_oper: BTreeMap<_, _> = query(
2863 &client,
2864 sql!("SELECT oid, oprname, oprresult FROM pg_operator"),
2865 &[],
2866 )
2867 .await
2868 .expect("pg query failed")
2869 .into_iter()
2870 .map(|row| {
2871 let oid: u32 = row.get("oid");
2872 let pg_oper = PgOper {
2873 name: row.get("oprname"),
2874 oprresult: row.get("oprresult"),
2875 };
2876 (oid, pg_oper)
2877 })
2878 .collect();
2879
2880 let conn_catalog = catalog.for_system_session();
2881 let resolve_type_oid = |item: &str| {
2882 conn_catalog
2883 .resolve_type(&PartialItemName {
2884 database: None,
2885 schema: Some(PG_CATALOG_SCHEMA.into()),
2888 item: item.to_string(),
2889 })
2890 .expect("unable to resolve type")
2891 .oid()
2892 };
2893
2894 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2895 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2896 .collect();
2897
2898 let mut all_oids = BTreeSet::new();
2899
2900 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2903 [
2904 (Type::NAME, Type::TEXT),
2906 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2907 (Type::TIME, Type::TIMETZ),
2909 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2910 ]
2911 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2912 );
2913 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2914 1619, ]);
2916 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2917 if ignore_return_types.contains(&fn_oid) {
2918 return true;
2919 }
2920 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2921 return true;
2922 }
2923 a == b
2924 };
2925
2926 for builtin in BUILTINS::iter() {
2927 match builtin {
2928 Builtin::Type(ty) => {
2929 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2930
2931 if ty.oid >= FIRST_MATERIALIZE_OID {
2932 continue;
2935 }
2936
2937 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2940 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2941 });
2942 assert_eq!(
2943 ty.name, pg_ty.name,
2944 "oid {} has name {} in postgres; expected {}",
2945 ty.oid, pg_ty.name, ty.name,
2946 );
2947
2948 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2949 None => (0, 0),
2950 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2951 };
2952 assert_eq!(
2953 typinput_oid, pg_ty.input,
2954 "type {} has typinput OID {:?} in mz but {:?} in pg",
2955 ty.name, typinput_oid, pg_ty.input,
2956 );
2957 assert_eq!(
2958 typreceive_oid, pg_ty.receive,
2959 "type {} has typreceive OID {:?} in mz but {:?} in pg",
2960 ty.name, typreceive_oid, pg_ty.receive,
2961 );
2962 if typinput_oid != 0 {
2963 assert!(
2964 func_oids.contains(&typinput_oid),
2965 "type {} has typinput OID {} that does not exist in pg_proc",
2966 ty.name,
2967 typinput_oid,
2968 );
2969 }
2970 if typreceive_oid != 0 {
2971 assert!(
2972 func_oids.contains(&typreceive_oid),
2973 "type {} has typreceive OID {} that does not exist in pg_proc",
2974 ty.name,
2975 typreceive_oid,
2976 );
2977 }
2978
2979 match &ty.details.typ {
2981 CatalogType::Array { element_reference } => {
2982 let elem_ty = BUILTINS::iter()
2983 .filter_map(|builtin| match builtin {
2984 Builtin::Type(ty @ BuiltinType { name, .. })
2985 if element_reference == name =>
2986 {
2987 Some(ty)
2988 }
2989 _ => None,
2990 })
2991 .next();
2992 let elem_ty = match elem_ty {
2993 Some(ty) => ty,
2994 None => {
2995 panic!("{} is unexpectedly not a type", element_reference)
2996 }
2997 };
2998 assert_eq!(
2999 pg_ty.elem, elem_ty.oid,
3000 "type {} has mismatched element OIDs",
3001 ty.name
3002 )
3003 }
3004 CatalogType::Pseudo => {
3005 assert_eq!(
3006 pg_ty.ty, "p",
3007 "type {} is not a pseudo type as expected",
3008 ty.name
3009 )
3010 }
3011 CatalogType::Range { .. } => {
3012 assert_eq!(
3013 pg_ty.ty, "r",
3014 "type {} is not a range type as expected",
3015 ty.name
3016 );
3017 }
3018 _ => {
3019 assert_eq!(
3020 pg_ty.ty, "b",
3021 "type {} is not a base type as expected",
3022 ty.name
3023 )
3024 }
3025 }
3026
3027 let schema = catalog
3029 .resolve_schema_in_database(
3030 &ResolvedDatabaseSpecifier::Ambient,
3031 ty.schema,
3032 &SYSTEM_CONN_ID,
3033 )
3034 .expect("unable to resolve schema");
3035 let allocated_type = catalog
3036 .resolve_type(
3037 None,
3038 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3039 &PartialItemName {
3040 database: None,
3041 schema: Some(schema.name().schema.clone()),
3042 item: ty.name.to_string(),
3043 },
3044 &SYSTEM_CONN_ID,
3045 )
3046 .expect("unable to resolve type");
3047 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3048 ty
3049 } else {
3050 panic!("unexpectedly not a type")
3051 };
3052 match ty.details.array_id {
3053 Some(array_id) => {
3054 let array_ty = catalog.get_entry(&array_id);
3055 assert_eq!(
3056 pg_ty.array, array_ty.oid,
3057 "type {} has mismatched array OIDs",
3058 allocated_type.name.item,
3059 );
3060 }
3061 None => assert_eq!(
3062 pg_ty.array, 0,
3063 "type {} does not have an array type in mz but does in pg",
3064 allocated_type.name.item,
3065 ),
3066 }
3067 }
3068 Builtin::Func(func) => {
3069 for imp in func.inner.func_impls() {
3070 assert!(
3071 all_oids.insert(imp.oid),
3072 "{} reused oid {}",
3073 func.name,
3074 imp.oid
3075 );
3076
3077 assert!(
3078 imp.oid < FIRST_USER_OID,
3079 "built-in function {} erroneously has OID in user space ({})",
3080 func.name,
3081 imp.oid,
3082 );
3083
3084 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3087 continue;
3088 } else {
3089 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3090 panic!(
3091 "pg_proc missing function {}: oid {}",
3092 func.name, imp.oid
3093 )
3094 })
3095 };
3096 assert_eq!(
3097 func.name, pg_fn.name,
3098 "funcs with oid {} don't match names: {} in mz, {} in pg",
3099 imp.oid, func.name, pg_fn.name
3100 );
3101
3102 let imp_arg_oids = imp
3105 .arg_typs
3106 .iter()
3107 .map(|item| resolve_type_oid(item))
3108 .collect::<Vec<_>>();
3109
3110 if imp_arg_oids != pg_fn.arg_oids {
3111 println!(
3112 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3113 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3114 );
3115 }
3116
3117 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3118
3119 assert!(
3120 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3121 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3122 imp.oid,
3123 func.name,
3124 imp_return_oid,
3125 pg_fn.ret_oid
3126 );
3127
3128 assert_eq!(
3129 imp.return_is_set, pg_fn.ret_set,
3130 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3131 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3132 );
3133 }
3134 }
3135 _ => (),
3136 }
3137 }
3138
3139 for (op, func) in OP_IMPLS.iter() {
3140 for imp in func.func_impls() {
3141 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3142
3143 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3145 continue;
3146 } else {
3147 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3148 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3149 })
3150 };
3151
3152 assert_eq!(*op, pg_op.name);
3153
3154 let imp_return_oid =
3155 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3156 if imp_return_oid != pg_op.oprresult {
3157 panic!(
3158 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3159 imp.oid, op, imp_return_oid, pg_op.oprresult
3160 );
3161 }
3162 }
3163 }
3164 catalog.expire().await;
3165 }
3166
3167 Catalog::with_debug(inner).await
3168 }
3169
3170 #[mz_ore::test(tokio::test)]
3172 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3174 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3175 let catalog = Arc::new(catalog);
3176 let conn_catalog = catalog.for_system_session();
3177
3178 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3179 let mut handles = Vec::new();
3180
3181 let ignore_names = BTreeSet::from([
3183 "avg",
3184 "avg_internal_v1",
3185 "bool_and",
3186 "bool_or",
3187 "has_table_privilege", "has_type_privilege", "mod",
3190 "mz_panic",
3191 "mz_sleep",
3192 "pow",
3193 "stddev_pop",
3194 "stddev_samp",
3195 "stddev",
3196 "var_pop",
3197 "var_samp",
3198 "variance",
3199 ]);
3200
3201 let fns = BUILTINS::funcs()
3202 .map(|func| (&func.name, func.inner))
3203 .chain(OP_IMPLS.iter());
3204
3205 for (name, func) in fns {
3206 if ignore_names.contains(name) {
3207 continue;
3208 }
3209 let Func::Scalar(impls) = func else {
3210 continue;
3211 };
3212
3213 'outer: for imp in impls {
3214 let details = imp.details();
3215 let mut styps = Vec::new();
3216 for item in details.arg_typs.iter() {
3217 let oid = resolve_type_oid(item);
3218 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3219 continue 'outer;
3220 };
3221 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3222 }
3223 let datums = styps
3224 .iter()
3225 .map(|styp| {
3226 let mut datums = vec![Datum::Null];
3227 datums.extend(styp.interesting_datums());
3228 datums
3229 })
3230 .collect::<Vec<_>>();
3231 if datums.is_empty() {
3233 continue;
3234 }
3235
3236 let return_oid = details
3237 .return_typ
3238 .map(resolve_type_oid)
3239 .expect("must exist");
3240 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3241 .ok()
3242 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3243
3244 let mut idxs = vec![0; datums.len()];
3245 while idxs[0] < datums[0].len() {
3246 let mut args = Vec::with_capacity(idxs.len());
3247 for i in 0..(datums.len()) {
3248 args.push(datums[i][idxs[i]]);
3249 }
3250
3251 let op = &imp.op;
3252 let scalars = args
3253 .iter()
3254 .enumerate()
3255 .map(|(i, datum)| {
3256 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3257 datum.clone(),
3258 styps[i].clone(),
3259 ))
3260 })
3261 .collect();
3262
3263 let call_name = format!(
3264 "{name}({}) (oid: {})",
3265 args.iter()
3266 .map(|d| d.to_string())
3267 .collect::<Vec<_>>()
3268 .join(", "),
3269 imp.oid
3270 );
3271 let catalog = Arc::clone(&catalog);
3272 let call_name_fn = call_name.clone();
3273 let return_styp = return_styp.clone();
3274 let handle = task::spawn_blocking(
3275 || call_name,
3276 move || {
3277 smoketest_fn(
3278 name,
3279 call_name_fn,
3280 op,
3281 imp,
3282 args,
3283 catalog,
3284 scalars,
3285 return_styp,
3286 )
3287 },
3288 );
3289 handles.push(handle);
3290
3291 for i in (0..datums.len()).rev() {
3293 idxs[i] += 1;
3294 if idxs[i] >= datums[i].len() {
3295 if i == 0 {
3296 break;
3297 }
3298 idxs[i] = 0;
3299 continue;
3300 } else {
3301 break;
3302 }
3303 }
3304 }
3305 }
3306 }
3307 handles
3308 }
3309
3310 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3311 for handle in handles {
3312 handle.await;
3313 }
3314 }
3315
3316 fn smoketest_fn(
3317 name: &&str,
3318 call_name: String,
3319 op: &Operation<HirScalarExpr>,
3320 imp: &FuncImpl<HirScalarExpr>,
3321 args: Vec<Datum<'_>>,
3322 catalog: Arc<Catalog>,
3323 scalars: Vec<CoercibleScalarExpr>,
3324 return_styp: Option<SqlScalarType>,
3325 ) {
3326 let conn_catalog = catalog.for_system_session();
3327 let pcx = PlanContext::zero();
3328 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3329 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3330 let ecx = ExprContext {
3331 qcx: &qcx,
3332 name: "smoketest",
3333 scope: &Scope::empty(),
3334 relation_type: &SqlRelationType::empty(),
3335 allow_aggregates: false,
3336 allow_subqueries: false,
3337 allow_parameters: false,
3338 allow_windows: false,
3339 };
3340 let arena = RowArena::new();
3341 let mut session = Session::dummy();
3342 session
3343 .start_transaction(to_datetime(0), None, None)
3344 .expect("must succeed");
3345 let prep_style = ExprPrepOneShot {
3346 logical_time: EvalTime::Time(Timestamp::MIN),
3347 session: &session,
3348 catalog_state: &catalog.state,
3349 };
3350
3351 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3354 if let Ok(hir) = res {
3355 let uneliminated_result_row = {
3356 if let HirScalarExpr::CallUnary { func, .. } = &hir
3357 && func.is_eliminable_cast()
3358 {
3359 let mut uneliminated_mir = hir
3360 .clone()
3361 .lower_uncorrelated(HirToMirConfig {
3362 enable_cast_elimination: false,
3363 ..catalog.system_config().into()
3364 })
3365 .expect("lowering eliminable cast should always succeed");
3366 prep_style
3367 .prep_scalar_expr(&mut uneliminated_mir)
3368 .expect("must succeed");
3369
3370 uneliminated_mir
3372 .eval(&[], &arena)
3373 .ok()
3374 .map(|datum| Row::pack([datum]))
3375 } else {
3376 None
3377 }
3378 };
3379
3380 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3381 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3383
3384 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3385 if let Some(return_styp) = return_styp {
3386 let mir_typ = mir.typ(&[]);
3387 soft_assert_eq_or_log!(
3390 mir_typ.scalar_type,
3391 (&return_styp).into(),
3392 "MIR type did not match the catalog type (cast elimination/repr type error)"
3393 );
3394 if !eval_result_datum.is_instance_of(&mir_typ) {
3398 panic!(
3399 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3400 );
3401 }
3402 if let Some(row) = uneliminated_result_row {
3404 let uneliminated_result_datum = row.unpack_first();
3405 assert_eq!(
3406 uneliminated_result_datum, eval_result_datum,
3407 "datums should not change if cast is eliminable"
3408 );
3409 }
3410 if let Some((introduces_nulls, propagates_nulls)) =
3413 call_introduces_propagates_nulls(&mir)
3414 {
3415 if introduces_nulls {
3416 assert!(
3420 mir_typ.nullable,
3421 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3422 name, args, mir, mir_typ.nullable
3423 );
3424 } else {
3425 let any_input_null = args.iter().any(|arg| arg.is_null());
3426 if !any_input_null {
3427 assert!(
3428 !mir_typ.nullable,
3429 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3430 name, args, mir, mir_typ.nullable
3431 );
3432 } else if propagates_nulls {
3433 assert!(
3436 mir_typ.nullable,
3437 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3438 name, args, mir, mir_typ.nullable
3439 );
3440 }
3441 }
3446 }
3447 let mut reduced = mir.clone();
3450 reduced.reduce(&[]);
3451 match reduced {
3452 MirScalarExpr::Literal(reduce_result, ctyp) => {
3453 match reduce_result {
3454 Ok(reduce_result_row) => {
3455 let reduce_result_datum = reduce_result_row.unpack_first();
3456 assert_eq!(
3457 reduce_result_datum,
3458 eval_result_datum,
3459 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3460 name,
3461 args,
3462 mir,
3463 eval_result_datum,
3464 mir_typ.scalar_type,
3465 reduce_result_datum,
3466 ctyp.scalar_type
3467 );
3468 assert_eq!(
3474 ctyp.scalar_type,
3475 mir_typ.scalar_type,
3476 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3477 name,
3478 args,
3479 mir,
3480 eval_result_datum,
3481 mir_typ.scalar_type,
3482 reduce_result_datum,
3483 ctyp.scalar_type
3484 );
3485 }
3486 Err(..) => {} }
3488 }
3489 _ => unreachable!(
3490 "all args are literals, so should have reduced to a literal"
3491 ),
3492 }
3493 }
3494 }
3495 }
3496 }
3497 }
3498
3499 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3504 match mir_func_call {
3505 MirScalarExpr::CallUnary { func, expr } => {
3506 if expr.is_literal() {
3507 Some((func.introduces_nulls(), func.propagates_nulls()))
3508 } else {
3509 None
3510 }
3511 }
3512 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3513 if expr1.is_literal() && expr2.is_literal() {
3514 Some((func.introduces_nulls(), func.propagates_nulls()))
3515 } else {
3516 None
3517 }
3518 }
3519 MirScalarExpr::CallVariadic { func, exprs } => {
3520 if exprs.iter().all(|arg| arg.is_literal()) {
3521 Some((func.introduces_nulls(), func.propagates_nulls()))
3522 } else {
3523 None
3524 }
3525 }
3526 _ => None,
3527 }
3528 }
3529
3530 #[mz_ore::test(tokio::test)]
3532 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3534 Catalog::with_debug(|catalog| async move {
3535 let conn_catalog = catalog.for_system_session();
3536
3537 for view in BUILTINS::views().filter(|view| {
3538 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3539 }) {
3540 let item = conn_catalog
3541 .resolve_item(&PartialItemName {
3542 database: None,
3543 schema: Some(view.schema.to_string()),
3544 item: view.name.to_string(),
3545 })
3546 .expect("unable to resolve view")
3547 .at_version(RelationVersionSelector::Latest);
3549 let full_name = conn_catalog.resolve_full_name(item.name());
3550 let desc = item.relation_desc().expect("invalid item type");
3551 for col_type in desc.iter_types() {
3552 match &col_type.scalar_type {
3553 typ @ SqlScalarType::UInt16
3554 | typ @ SqlScalarType::UInt32
3555 | typ @ SqlScalarType::UInt64
3556 | typ @ SqlScalarType::MzTimestamp
3557 | typ @ SqlScalarType::List { .. }
3558 | typ @ SqlScalarType::Map { .. }
3559 | typ @ SqlScalarType::MzAclItem => {
3560 panic!("{typ:?} type found in {full_name}");
3561 }
3562 SqlScalarType::AclItem
3563 | SqlScalarType::Bool
3564 | SqlScalarType::Int16
3565 | SqlScalarType::Int32
3566 | SqlScalarType::Int64
3567 | SqlScalarType::Float32
3568 | SqlScalarType::Float64
3569 | SqlScalarType::Numeric { .. }
3570 | SqlScalarType::Date
3571 | SqlScalarType::Time
3572 | SqlScalarType::Timestamp { .. }
3573 | SqlScalarType::TimestampTz { .. }
3574 | SqlScalarType::Interval
3575 | SqlScalarType::PgLegacyChar
3576 | SqlScalarType::Bytes
3577 | SqlScalarType::String
3578 | SqlScalarType::Char { .. }
3579 | SqlScalarType::VarChar { .. }
3580 | SqlScalarType::Jsonb
3581 | SqlScalarType::Uuid
3582 | SqlScalarType::Array(_)
3583 | SqlScalarType::Record { .. }
3584 | SqlScalarType::Oid
3585 | SqlScalarType::RegProc
3586 | SqlScalarType::RegType
3587 | SqlScalarType::RegClass
3588 | SqlScalarType::Int2Vector
3589 | SqlScalarType::Range { .. }
3590 | SqlScalarType::PgLegacyName => {}
3591 }
3592 }
3593 }
3594 catalog.expire().await;
3595 })
3596 .await
3597 }
3598
3599 #[mz_ore::test(tokio::test)]
3602 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3604 Catalog::with_debug(|catalog| async move {
3605 let conn_catalog = catalog.for_system_session();
3606
3607 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3608 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3609
3610 for entry in catalog.entries() {
3611 let schema_spec = entry.name().qualifiers.schema_spec;
3612 let introspection_deps = catalog.introspection_dependencies(entry.id);
3613 if introspection_deps.is_empty() {
3614 assert!(
3615 schema_spec != introspection_schema_spec,
3616 "entry does not depend on introspection sources but is in \
3617 `mz_introspection`: {}",
3618 conn_catalog.resolve_full_name(entry.name()),
3619 );
3620 } else {
3621 assert!(
3622 schema_spec == introspection_schema_spec,
3623 "entry depends on introspection sources but is not in \
3624 `mz_introspection`: {}",
3625 conn_catalog.resolve_full_name(entry.name()),
3626 );
3627 }
3628 }
3629 })
3630 .await
3631 }
3632
3633 #[mz_ore::test(tokio::test)]
3634 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3636 let persist_client = PersistClient::new_for_tests().await;
3637 let bootstrap_args = test_bootstrap_args();
3638 let organization_id = Uuid::new_v4();
3639 let db_name = "DB";
3640
3641 let mut writer_catalog = Catalog::open_debug_catalog(
3642 persist_client.clone(),
3643 organization_id.clone(),
3644 &bootstrap_args,
3645 )
3646 .await
3647 .expect("open_debug_catalog");
3648 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3649 persist_client.clone(),
3650 organization_id.clone(),
3651 &bootstrap_args,
3652 )
3653 .await
3654 .expect("open_debug_read_only_catalog");
3655 assert_err!(writer_catalog.resolve_database(db_name));
3656 assert_err!(read_only_catalog.resolve_database(db_name));
3657
3658 let commit_ts = writer_catalog.current_upper().await;
3659 writer_catalog
3660 .transact(
3661 None,
3662 commit_ts,
3663 None,
3664 vec![Op::CreateDatabase {
3665 name: db_name.to_string(),
3666 owner_id: MZ_SYSTEM_ROLE_ID,
3667 }],
3668 )
3669 .await
3670 .expect("failed to transact");
3671
3672 let write_db = writer_catalog
3673 .resolve_database(db_name)
3674 .expect("resolve_database");
3675 read_only_catalog
3676 .sync_to_current_updates()
3677 .await
3678 .expect("sync_to_current_updates");
3679 let read_db = read_only_catalog
3680 .resolve_database(db_name)
3681 .expect("resolve_database");
3682
3683 assert_eq!(write_db, read_db);
3684
3685 let writer_catalog_fencer =
3686 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3687 .await
3688 .expect("open_debug_catalog for fencer");
3689 let fencer_db = writer_catalog_fencer
3690 .resolve_database(db_name)
3691 .expect("resolve_database for fencer");
3692 assert_eq!(fencer_db, read_db);
3693
3694 let write_fence_err = writer_catalog
3695 .sync_to_current_updates()
3696 .await
3697 .expect_err("sync_to_current_updates for fencer");
3698 assert!(matches!(
3699 write_fence_err,
3700 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3701 ));
3702 let read_fence_err = read_only_catalog
3703 .sync_to_current_updates()
3704 .await
3705 .expect_err("sync_to_current_updates after fencer");
3706 assert!(matches!(
3707 read_fence_err,
3708 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3709 ));
3710
3711 writer_catalog.expire().await;
3712 read_only_catalog.expire().await;
3713 writer_catalog_fencer.expire().await;
3714 }
3715}