1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::soft_panic_or_log;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68 SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 expr_cache_handle: Option<ExpressionCacheHandle>,
140 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141 transient_revision: u64,
142}
143
144impl Clone for Catalog {
147 fn clone(&self) -> Self {
148 Self {
149 state: self.state.clone(),
150 expr_cache_handle: self.expr_cache_handle.clone(),
151 storage: Arc::clone(&self.storage),
152 transient_revision: self.transient_revision,
153 }
154 }
155}
156
157impl Catalog {
158 #[mz_ore::instrument(level = "trace")]
164 pub fn set_optimized_plan(
165 &mut self,
166 id: GlobalId,
167 plan: DataflowDescription<OptimizedMirRelationExpr>,
168 ) {
169 self.state.set_optimized_plan(id, plan);
170 }
171
172 #[mz_ore::instrument(level = "trace")]
178 pub fn set_physical_plan(
179 &mut self,
180 id: GlobalId,
181 plan: DataflowDescription<mz_compute_types::plan::Plan>,
182 ) {
183 self.state.set_physical_plan(id, plan);
184 }
185
186 #[mz_ore::instrument(level = "trace")]
188 pub fn try_get_optimized_plan(
189 &self,
190 id: &GlobalId,
191 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192 let entry = self.state.try_get_entry_by_global_id(id)?;
193 entry.item().optimized_plan().map(AsRef::as_ref)
194 }
195
196 #[mz_ore::instrument(level = "trace")]
198 pub fn try_get_physical_plan(
199 &self,
200 id: &GlobalId,
201 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202 let entry = self.state.try_get_entry_by_global_id(id)?;
203 entry.item().physical_plan().map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
212 pub fn set_dataflow_metainfo(
213 &mut self,
214 id: GlobalId,
215 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216 ) {
217 self.state.set_dataflow_metainfo(id, metainfo);
218 }
219
220 #[mz_ore::instrument(level = "trace")]
222 pub fn try_get_dataflow_metainfo(
223 &self,
224 id: &GlobalId,
225 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226 let entry = self.state.try_get_entry_by_global_id(id)?;
227 entry.item().dataflow_metainfo()
228 }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233 state: Cow<'a, CatalogState>,
234 unresolvable_ids: BTreeSet<CatalogItemId>,
244 conn_id: ConnectionId,
245 cluster: String,
246 database: Option<DatabaseId>,
247 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248 role_id: RoleId,
249 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250 portals: Option<&'a BTreeMap<String, Portal>>,
251 notices_tx: UnboundedSender<AdapterNotice>,
252}
253
254impl ConnCatalog<'_> {
255 pub fn conn_id(&self) -> &ConnectionId {
256 &self.conn_id
257 }
258
259 pub fn state(&self) -> &CatalogState {
260 &*self.state
261 }
262
263 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
273 assert_eq!(
274 self.role_id, MZ_SYSTEM_ROLE_ID,
275 "only the system role can mark IDs unresolvable",
276 );
277 self.unresolvable_ids.insert(id);
278 }
279
280 pub fn effective_search_path(
286 &self,
287 include_temp_schema: bool,
288 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
289 self.state
290 .effective_search_path(&self.search_path, include_temp_schema)
291 }
292}
293
294impl ConnectionResolver for ConnCatalog<'_> {
295 fn resolve_connection(
296 &self,
297 id: CatalogItemId,
298 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
299 self.state().resolve_connection(id)
300 }
301}
302
303impl Catalog {
304 pub fn transient_revision(&self) -> u64 {
308 self.transient_revision
309 }
310
311 pub async fn with_debug<F, Fut, T>(f: F) -> T
323 where
324 F: FnOnce(Catalog) -> Fut,
325 Fut: Future<Output = T>,
326 {
327 let persist_client = PersistClient::new_for_tests().await;
328 let organization_id = Uuid::new_v4();
329 let bootstrap_args = test_bootstrap_args();
330 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
331 .await
332 .expect("can open debug catalog");
333 f(catalog).await
334 }
335
336 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
339 where
340 F: FnOnce(Catalog) -> Fut,
341 Fut: Future<Output = T>,
342 {
343 let persist_client = PersistClient::new_for_tests().await;
344 let organization_id = Uuid::new_v4();
345 let bootstrap_args = test_bootstrap_args();
346 let mut catalog =
347 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
348 .await
349 .expect("can open debug catalog");
350
351 let now = SYSTEM_TIME.clone();
353 let openable_storage = TestCatalogStateBuilder::new(persist_client)
354 .with_organization_id(organization_id)
355 .with_default_deploy_generation()
356 .build()
357 .await
358 .expect("can create durable catalog");
359 let mut storage = openable_storage
360 .open(now().into(), &bootstrap_args)
361 .await
362 .expect("can open durable catalog")
363 .0;
364 let _ = storage
366 .sync_to_current_updates()
367 .await
368 .expect("can sync to current updates");
369 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
370
371 f(catalog).await
372 }
373
374 pub async fn open_debug_catalog(
378 persist_client: PersistClient,
379 organization_id: Uuid,
380 bootstrap_args: &BootstrapArgs,
381 ) -> Result<Catalog, anyhow::Error> {
382 let now = SYSTEM_TIME.clone();
383 let environment_id = None;
384 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
385 .with_organization_id(organization_id)
386 .with_default_deploy_generation()
387 .build()
388 .await?;
389 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
390 let system_parameter_defaults = BTreeMap::default();
391 Self::open_debug_catalog_inner(
392 persist_client,
393 storage,
394 now,
395 environment_id,
396 &DUMMY_BUILD_INFO,
397 system_parameter_defaults,
398 bootstrap_args,
399 None,
400 )
401 .await
402 }
403
404 pub async fn open_debug_read_only_catalog(
409 persist_client: PersistClient,
410 organization_id: Uuid,
411 bootstrap_args: &BootstrapArgs,
412 ) -> Result<Catalog, anyhow::Error> {
413 let now = SYSTEM_TIME.clone();
414 let environment_id = None;
415 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
416 .with_organization_id(organization_id)
417 .build()
418 .await?;
419 let storage = openable_storage
420 .open_read_only(&test_bootstrap_args())
421 .await?;
422 let system_parameter_defaults = BTreeMap::default();
423 Self::open_debug_catalog_inner(
424 persist_client,
425 storage,
426 now,
427 environment_id,
428 &DUMMY_BUILD_INFO,
429 system_parameter_defaults,
430 bootstrap_args,
431 None,
432 )
433 .await
434 }
435
436 pub async fn open_debug_read_only_persist_catalog_config(
441 persist_client: PersistClient,
442 now: NowFn,
443 environment_id: EnvironmentId,
444 system_parameter_defaults: BTreeMap<String, String>,
445 build_info: &'static BuildInfo,
446 bootstrap_args: &BootstrapArgs,
447 enable_expression_cache_override: Option<bool>,
448 ) -> Result<Catalog, anyhow::Error> {
449 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
450 .with_organization_id(environment_id.organization_id())
451 .with_version(
452 build_info
453 .version
454 .parse()
455 .expect("build version is parseable"),
456 )
457 .build()
458 .await?;
459 let storage = openable_storage.open_read_only(bootstrap_args).await?;
460 Self::open_debug_catalog_inner(
461 persist_client,
462 storage,
463 now,
464 Some(environment_id),
465 build_info,
466 system_parameter_defaults,
467 bootstrap_args,
468 enable_expression_cache_override,
469 )
470 .await
471 }
472
473 async fn open_debug_catalog_inner(
474 persist_client: PersistClient,
475 storage: Box<dyn DurableCatalogState>,
476 now: NowFn,
477 environment_id: Option<EnvironmentId>,
478 build_info: &'static BuildInfo,
479 system_parameter_defaults: BTreeMap<String, String>,
480 bootstrap_args: &BootstrapArgs,
481 enable_expression_cache_override: Option<bool>,
482 ) -> Result<Catalog, anyhow::Error> {
483 let metrics_registry = &MetricsRegistry::new();
484 let secrets_reader = Arc::new(InMemorySecretsController::new());
485 let previous_ts = now().into();
488 let replica_size = &bootstrap_args.default_cluster_replica_size;
489 let read_only = false;
490
491 let OpenCatalogResult {
492 catalog,
493 migrated_storage_collections_0dt: _,
494 new_builtin_collections: _,
495 builtin_table_updates: _,
496 cached_global_exprs: _,
497 uncached_local_exprs: _,
498 } = Catalog::open(Config {
499 storage,
500 metrics_registry,
501 state: StateConfig {
502 unsafe_mode: true,
503 all_features: false,
504 build_info,
505 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
506 read_only,
507 now,
508 boot_ts: previous_ts,
509 skip_migrations: true,
510 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
511 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
512 size: replica_size.clone(),
513 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
514 },
515 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
516 size: replica_size.clone(),
517 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
518 },
519 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
520 size: replica_size.clone(),
521 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
522 },
523 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
524 size: replica_size.clone(),
525 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
526 },
527 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
528 size: replica_size.clone(),
529 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
530 },
531 system_parameter_defaults,
532 remote_system_parameters: None,
533 availability_zones: vec![],
534 egress_addresses: vec![],
535 aws_principal_context: None,
536 aws_privatelink_availability_zones: None,
537 http_host_name: None,
538 connection_context: ConnectionContext::for_tests(secrets_reader),
539 builtin_item_migration_config: BuiltinItemMigrationConfig {
540 persist_client: persist_client.clone(),
541 read_only,
542 force_migration: None,
543 },
544 persist_client,
545 enable_expression_cache_override,
546 helm_chart_version: None,
547 external_login_password_mz_system: None,
548 license_key: ValidatedLicenseKey::for_tests(),
549 },
550 })
551 .await?;
552 Ok(catalog)
553 }
554
555 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
556 self.state.for_session(session)
557 }
558
559 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
560 self.state.for_sessionless_user(role_id)
561 }
562
563 pub fn for_system_session(&self) -> ConnCatalog<'_> {
564 self.state.for_system_session()
565 }
566
567 async fn storage<'a>(
568 &'a self,
569 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
570 self.storage.lock().await
571 }
572
573 pub async fn current_upper(&self) -> mz_repr::Timestamp {
574 self.storage().await.current_upper().await
575 }
576
577 pub async fn allocate_user_id(
578 &self,
579 commit_ts: mz_repr::Timestamp,
580 ) -> Result<(CatalogItemId, GlobalId), Error> {
581 self.storage()
582 .await
583 .allocate_user_id(commit_ts)
584 .await
585 .maybe_terminate("allocating user ids")
586 .err_into()
587 }
588
589 pub async fn allocate_user_ids(
591 &self,
592 amount: u64,
593 commit_ts: mz_repr::Timestamp,
594 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
595 self.storage()
596 .await
597 .allocate_user_ids(amount, commit_ts)
598 .await
599 .maybe_terminate("allocating user ids")
600 .err_into()
601 }
602
603 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
604 let commit_ts = self.storage().await.current_upper().await;
605 self.allocate_user_id(commit_ts).await
606 }
607
608 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
610 self.storage()
611 .await
612 .get_next_user_item_id()
613 .await
614 .err_into()
615 }
616
617 #[cfg(test)]
618 pub async fn allocate_system_id(
619 &self,
620 commit_ts: mz_repr::Timestamp,
621 ) -> Result<(CatalogItemId, GlobalId), Error> {
622 use mz_ore::collections::CollectionExt;
623
624 let mut storage = self.storage().await;
625 let mut txn = storage.transaction().await?;
626 let id = txn
627 .allocate_system_item_ids(1)
628 .maybe_terminate("allocating system ids")?
629 .into_element();
630 let _ = txn.get_and_commit_op_updates();
632 txn.commit(commit_ts).await?;
633 Ok(id)
634 }
635
636 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
638 self.storage()
639 .await
640 .get_next_system_item_id()
641 .await
642 .err_into()
643 }
644
645 pub async fn allocate_user_cluster_id(
646 &self,
647 commit_ts: mz_repr::Timestamp,
648 ) -> Result<ClusterId, Error> {
649 self.storage()
650 .await
651 .allocate_user_cluster_id(commit_ts)
652 .await
653 .maybe_terminate("allocating user cluster ids")
654 .err_into()
655 }
656
657 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
659 self.storage()
660 .await
661 .get_next_system_replica_id()
662 .await
663 .err_into()
664 }
665
666 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
668 self.storage()
669 .await
670 .get_next_user_replica_id()
671 .await
672 .err_into()
673 }
674
675 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
676 self.state.resolve_database(database_name)
677 }
678
679 pub fn resolve_schema(
680 &self,
681 current_database: Option<&DatabaseId>,
682 database_name: Option<&str>,
683 schema_name: &str,
684 conn_id: &ConnectionId,
685 ) -> Result<&Schema, SqlCatalogError> {
686 self.state
687 .resolve_schema(current_database, database_name, schema_name, conn_id)
688 }
689
690 pub fn resolve_schema_in_database(
691 &self,
692 database_spec: &ResolvedDatabaseSpecifier,
693 schema_name: &str,
694 conn_id: &ConnectionId,
695 ) -> Result<&Schema, SqlCatalogError> {
696 self.state
697 .resolve_schema_in_database(database_spec, schema_name, conn_id)
698 }
699
700 pub fn resolve_replica_in_cluster(
701 &self,
702 cluster_id: &ClusterId,
703 replica_name: &str,
704 ) -> Result<&ClusterReplica, SqlCatalogError> {
705 self.state
706 .resolve_replica_in_cluster(cluster_id, replica_name)
707 }
708
709 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
710 self.state.resolve_system_schema(name)
711 }
712
713 pub fn resolve_search_path(
714 &self,
715 session: &Session,
716 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
717 self.state.resolve_search_path(session)
718 }
719
720 pub fn resolve_entry(
722 &self,
723 current_database: Option<&DatabaseId>,
724 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
725 name: &PartialItemName,
726 conn_id: &ConnectionId,
727 ) -> Result<&CatalogEntry, SqlCatalogError> {
728 self.state
729 .resolve_entry(current_database, search_path, name, conn_id)
730 }
731
732 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
734 self.state.resolve_builtin_table(builtin)
735 }
736
737 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
739 self.state.resolve_builtin_log(builtin).0
740 }
741
742 pub fn resolve_builtin_storage_collection(
744 &self,
745 builtin: &'static BuiltinSource,
746 ) -> CatalogItemId {
747 self.state.resolve_builtin_source(builtin)
748 }
749
750 pub fn resolve_function(
752 &self,
753 current_database: Option<&DatabaseId>,
754 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
755 name: &PartialItemName,
756 conn_id: &ConnectionId,
757 ) -> Result<&CatalogEntry, SqlCatalogError> {
758 self.state
759 .resolve_function(current_database, search_path, name, conn_id)
760 }
761
762 pub fn resolve_type(
764 &self,
765 current_database: Option<&DatabaseId>,
766 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
767 name: &PartialItemName,
768 conn_id: &ConnectionId,
769 ) -> Result<&CatalogEntry, SqlCatalogError> {
770 self.state
771 .resolve_type(current_database, search_path, name, conn_id)
772 }
773
774 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
775 self.state.resolve_cluster(name)
776 }
777
778 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
784 self.state.resolve_builtin_cluster(cluster)
785 }
786
787 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
788 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
789 }
790
791 pub fn resolve_target_cluster(
793 &self,
794 target_cluster: TargetCluster,
795 session: &Session,
796 ) -> Result<&Cluster, AdapterError> {
797 match target_cluster {
798 TargetCluster::CatalogServer => {
799 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
800 }
801 TargetCluster::Active => self.active_cluster(session),
802 TargetCluster::Transaction(cluster_id) => self
803 .try_get_cluster(cluster_id)
804 .ok_or(AdapterError::ConcurrentClusterDrop),
805 }
806 }
807
808 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
809 if session.user().name != SYSTEM_USER.name
813 && session.user().name != SUPPORT_USER.name
814 && session.vars().cluster() == SYSTEM_USER.name
815 {
816 coord_bail!(
817 "system cluster '{}' cannot execute user queries",
818 SYSTEM_USER.name
819 );
820 }
821 let cluster = self.resolve_cluster(session.vars().cluster())?;
822 Ok(cluster)
823 }
824
825 pub fn state(&self) -> &CatalogState {
826 &self.state
827 }
828
829 pub fn resolve_full_name(
830 &self,
831 name: &QualifiedItemName,
832 conn_id: Option<&ConnectionId>,
833 ) -> FullItemName {
834 self.state.resolve_full_name(name, conn_id)
835 }
836
837 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
838 self.state.try_get_entry(id)
839 }
840
841 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
842 self.state.try_get_entry_by_global_id(id)
843 }
844
845 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
846 self.state.get_entry(id)
847 }
848
849 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
850 self.state.get_entry_by_global_id(id)
851 }
852
853 pub fn get_global_ids<'a>(
854 &'a self,
855 id: &CatalogItemId,
856 ) -> impl Iterator<Item = GlobalId> + use<'a> {
857 self.get_entry(id).global_ids()
858 }
859
860 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
861 self.get_entry_by_global_id(id).id()
862 }
863
864 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
865 let item = self.try_get_entry_by_global_id(id)?;
866 Some(item.id())
867 }
868
869 pub fn get_schema(
870 &self,
871 database_spec: &ResolvedDatabaseSpecifier,
872 schema_spec: &SchemaSpecifier,
873 conn_id: &ConnectionId,
874 ) -> &Schema {
875 self.state.get_schema(database_spec, schema_spec, conn_id)
876 }
877
878 pub fn try_get_schema(
879 &self,
880 database_spec: &ResolvedDatabaseSpecifier,
881 schema_spec: &SchemaSpecifier,
882 conn_id: &ConnectionId,
883 ) -> Option<&Schema> {
884 self.state
885 .try_get_schema(database_spec, schema_spec, conn_id)
886 }
887
888 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
889 self.state.get_mz_catalog_schema_id()
890 }
891
892 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
893 self.state.get_pg_catalog_schema_id()
894 }
895
896 pub fn get_information_schema_id(&self) -> SchemaId {
897 self.state.get_information_schema_id()
898 }
899
900 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
901 self.state.get_mz_internal_schema_id()
902 }
903
904 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
905 self.state.get_mz_introspection_schema_id()
906 }
907
908 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
909 self.state.get_mz_unsafe_schema_id()
910 }
911
912 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
913 self.state.system_schema_ids()
914 }
915
916 pub fn get_database(&self, id: &DatabaseId) -> &Database {
917 self.state.get_database(id)
918 }
919
920 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
921 self.state.try_get_role(id)
922 }
923
924 pub fn get_role(&self, id: &RoleId) -> &Role {
925 self.state.get_role(id)
926 }
927
928 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
929 self.state.try_get_role_by_name(role_name)
930 }
931
932 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
933 self.state.try_get_role_auth_by_id(id)
934 }
935
936 pub fn create_temporary_schema(
939 &mut self,
940 conn_id: &ConnectionId,
941 owner_id: RoleId,
942 ) -> Result<(), Error> {
943 self.state.create_temporary_schema(conn_id, owner_id)
944 }
945
946 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
947 self.state
949 .temporary_schemas
950 .get(conn_id)
951 .map(|schema| schema.items.contains_key(item_name))
952 .unwrap_or(false)
953 }
954
955 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
958 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
959 return Ok(());
960 };
961 if !schema.items.is_empty() {
962 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
963 }
964 Ok(())
965 }
966
967 pub(crate) fn object_dependents(
968 &self,
969 object_ids: &Vec<ObjectId>,
970 conn_id: &ConnectionId,
971 ) -> Vec<ObjectId> {
972 let mut seen = BTreeSet::new();
973 self.state.object_dependents(object_ids, conn_id, &mut seen)
974 }
975
976 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
977 FullNameV1 {
978 database: name.database.to_string(),
979 schema: name.schema.clone(),
980 item: name.item.clone(),
981 }
982 }
983
984 pub fn find_available_cluster_name(&self, name: &str) -> String {
985 let mut i = 0;
986 let mut candidate = name.to_string();
987 while self.state.clusters_by_name.contains_key(&candidate) {
988 i += 1;
989 candidate = format!("{}{}", name, i);
990 }
991 candidate
992 }
993
994 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
995 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
996 self.cluster_replica_sizes()
997 .enabled_allocations()
998 .map(|a| a.0.to_owned())
999 .collect::<Vec<_>>()
1000 } else {
1001 self.system_config().allowed_cluster_replica_sizes()
1002 }
1003 }
1004
1005 pub fn concretize_replica_location(
1006 &self,
1007 location: mz_catalog::durable::ReplicaLocation,
1008 allowed_sizes: &Vec<String>,
1009 allowed_availability_zones: Option<&[String]>,
1010 ) -> Result<ReplicaLocation, Error> {
1011 self.state
1012 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1013 }
1014
1015 pub(crate) fn ensure_valid_replica_size(
1016 &self,
1017 allowed_sizes: &[String],
1018 size: &String,
1019 ) -> Result<(), Error> {
1020 self.state.ensure_valid_replica_size(allowed_sizes, size)
1021 }
1022
1023 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1024 &self.state.cluster_replica_sizes
1025 }
1026
1027 pub fn get_privileges(
1029 &self,
1030 id: &SystemObjectId,
1031 conn_id: &ConnectionId,
1032 ) -> Option<&PrivilegeMap> {
1033 match id {
1034 SystemObjectId::Object(id) => match id {
1035 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1036 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1037 ObjectId::Schema((database_spec, schema_spec)) => Some(
1038 self.get_schema(database_spec, schema_spec, conn_id)
1039 .privileges(),
1040 ),
1041 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1042 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1043 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1044 },
1045 SystemObjectId::System => Some(&self.state.system_privileges),
1046 }
1047 }
1048
1049 #[mz_ore::instrument(level = "debug")]
1050 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1051 Ok(self.storage().await.advance_upper(new_upper).await?)
1052 }
1053
1054 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1056 self.state.introspection_dependencies(id)
1057 }
1058
1059 pub fn dump(&self) -> Result<CatalogDump, Error> {
1065 Ok(CatalogDump::new(self.state.dump(None)?))
1066 }
1067
1068 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1072 self.state.check_consistency().map_err(|inconsistencies| {
1073 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1074 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1075 })
1076 })
1077 }
1078
1079 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1080 self.state.config()
1081 }
1082
1083 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1084 self.state.entry_by_id.values()
1085 }
1086
1087 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1088 self.entries()
1089 .filter(|entry| entry.is_connection() && entry.id().is_user())
1090 }
1091
1092 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1093 self.entries()
1094 .filter(|entry| entry.is_table() && entry.id().is_user())
1095 }
1096
1097 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1098 self.entries()
1099 .filter(|entry| entry.is_source() && entry.id().is_user())
1100 }
1101
1102 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1103 self.entries()
1104 .filter(|entry| entry.is_sink() && entry.id().is_user())
1105 }
1106
1107 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1108 self.entries()
1109 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1110 }
1111
1112 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1113 self.entries()
1114 .filter(|entry| entry.is_secret() && entry.id().is_user())
1115 }
1116
1117 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1118 self.state.get_network_policy(&network_policy_id)
1119 }
1120
1121 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1122 self.state.try_get_network_policy_by_name(name)
1123 }
1124
1125 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1126 self.state.clusters_by_id.values()
1127 }
1128
1129 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1130 self.state.get_cluster(cluster_id)
1131 }
1132
1133 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1134 self.state.try_get_cluster(cluster_id)
1135 }
1136
1137 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1138 self.clusters().filter(|cluster| cluster.id.is_user())
1139 }
1140
1141 pub fn get_cluster_replica(
1142 &self,
1143 cluster_id: ClusterId,
1144 replica_id: ReplicaId,
1145 ) -> &ClusterReplica {
1146 self.state.get_cluster_replica(cluster_id, replica_id)
1147 }
1148
1149 pub fn try_get_cluster_replica(
1150 &self,
1151 cluster_id: ClusterId,
1152 replica_id: ReplicaId,
1153 ) -> Option<&ClusterReplica> {
1154 self.state.try_get_cluster_replica(cluster_id, replica_id)
1155 }
1156
1157 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1158 self.user_clusters()
1159 .flat_map(|cluster| cluster.user_replicas())
1160 }
1161
1162 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1163 self.state.database_by_id.values()
1164 }
1165
1166 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1167 self.state
1168 .roles_by_id
1169 .values()
1170 .filter(|role| role.is_user())
1171 }
1172
1173 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1174 self.entries()
1175 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1176 }
1177
1178 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1179 self.state
1180 .network_policies_by_id
1181 .iter()
1182 .filter(|(id, _)| id.is_user())
1183 .map(|(_, policy)| policy)
1184 }
1185
1186 pub fn system_privileges(&self) -> &PrivilegeMap {
1187 &self.state.system_privileges
1188 }
1189
1190 pub fn default_privileges(
1191 &self,
1192 ) -> impl Iterator<
1193 Item = (
1194 &DefaultPrivilegeObject,
1195 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1196 ),
1197 > {
1198 self.state.default_privileges.iter()
1199 }
1200
1201 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1202 self.state
1203 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1204 }
1205
1206 pub fn pack_storage_usage_update(
1207 &self,
1208 event: VersionedStorageUsage,
1209 diff: Diff,
1210 ) -> BuiltinTableUpdate {
1211 self.state
1212 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1213 }
1214
1215 pub fn system_config(&self) -> &SystemVars {
1216 self.state.system_config()
1217 }
1218
1219 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1220 self.state.system_config_mut()
1221 }
1222
1223 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1224 self.state.ensure_not_reserved_role(role_id)
1225 }
1226
1227 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1228 self.state.ensure_grantable_role(role_id)
1229 }
1230
1231 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1232 self.state.ensure_not_system_role(role_id)
1233 }
1234
1235 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1236 self.state.ensure_not_predefined_role(role_id)
1237 }
1238
1239 pub fn ensure_not_reserved_network_policy(
1240 &self,
1241 network_policy_id: &NetworkPolicyId,
1242 ) -> Result<(), Error> {
1243 self.state
1244 .ensure_not_reserved_network_policy(network_policy_id)
1245 }
1246
1247 pub fn ensure_not_reserved_object(
1248 &self,
1249 object_id: &ObjectId,
1250 conn_id: &ConnectionId,
1251 ) -> Result<(), Error> {
1252 match object_id {
1253 ObjectId::Cluster(cluster_id) => {
1254 if cluster_id.is_system() {
1255 let cluster = self.get_cluster(*cluster_id);
1256 Err(Error::new(ErrorKind::ReadOnlyCluster(
1257 cluster.name().to_string(),
1258 )))
1259 } else {
1260 Ok(())
1261 }
1262 }
1263 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1264 if replica_id.is_system() {
1265 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1266 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1267 replica.name().to_string(),
1268 )))
1269 } else {
1270 Ok(())
1271 }
1272 }
1273 ObjectId::Database(database_id) => {
1274 if database_id.is_system() {
1275 let database = self.get_database(database_id);
1276 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1277 database.name().to_string(),
1278 )))
1279 } else {
1280 Ok(())
1281 }
1282 }
1283 ObjectId::Schema((database_spec, schema_spec)) => {
1284 if schema_spec.is_system() {
1285 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1286 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1287 schema.name().schema.clone(),
1288 )))
1289 } else {
1290 Ok(())
1291 }
1292 }
1293 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1294 ObjectId::Item(item_id) => {
1295 if item_id.is_system() {
1296 let item = self.get_entry(item_id);
1297 let name = self.resolve_full_name(item.name(), Some(conn_id));
1298 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1299 } else {
1300 Ok(())
1301 }
1302 }
1303 ObjectId::NetworkPolicy(network_policy_id) => {
1304 self.ensure_not_reserved_network_policy(network_policy_id)
1305 }
1306 }
1307 }
1308
1309 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1311 &mut self,
1312 create_sql: &str,
1313 force_if_exists_skip: bool,
1314 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1315 self.state
1316 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1317 }
1318
1319 pub(crate) fn cache_expressions(
1325 &self,
1326 id: GlobalId,
1327 local_mir: Option<OptimizedMirRelationExpr>,
1328 optimizer_features: OptimizerFeatures,
1329 ) {
1330 let Some(mut global_mir) = self.try_get_optimized_plan(&id).cloned() else {
1331 soft_panic_or_log!("optimized plan missing for ID {id}");
1332 return;
1333 };
1334 let Some(mut physical_plan) = self.try_get_physical_plan(&id).cloned() else {
1335 soft_panic_or_log!("physical plan missing for ID {id}");
1336 return;
1337 };
1338 let Some(dataflow_metainfos) = self.try_get_dataflow_metainfo(&id).cloned() else {
1339 soft_panic_or_log!("dataflow metainfo missing for ID {id}");
1340 return;
1341 };
1342
1343 global_mir.as_of = None;
1346 global_mir.until = Default::default();
1347 physical_plan.as_of = None;
1348 physical_plan.until = Default::default();
1349
1350 let mut local_exprs = Vec::new();
1351 if let Some(local_mir) = local_mir {
1352 local_exprs.push((
1353 id,
1354 LocalExpressions {
1355 local_mir,
1356 optimizer_features: optimizer_features.clone(),
1357 },
1358 ));
1359 }
1360 let global_exprs = vec![(
1361 id,
1362 GlobalExpressions {
1363 global_mir,
1364 physical_plan,
1365 dataflow_metainfos,
1366 optimizer_features,
1367 },
1368 )];
1369 let _fut = self.update_expression_cache(local_exprs, global_exprs, Default::default());
1370 }
1371
1372 pub(crate) fn update_expression_cache<'a, 'b>(
1373 &'a self,
1374 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1375 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1376 invalidate_ids: BTreeSet<GlobalId>,
1377 ) -> BoxFuture<'b, ()> {
1378 if let Some(expr_cache) = &self.expr_cache_handle {
1379 expr_cache
1380 .update(
1381 new_local_expressions,
1382 new_global_expressions,
1383 invalidate_ids,
1384 )
1385 .boxed()
1386 } else {
1387 async {}.boxed()
1388 }
1389 }
1390
1391 #[cfg(test)]
1395 async fn sync_to_current_updates(
1396 &mut self,
1397 ) -> Result<
1398 (
1399 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1400 Vec<ParsedStateUpdate>,
1401 ),
1402 CatalogError,
1403 > {
1404 let updates = self.storage().await.sync_to_current_updates().await?;
1405 let (builtin_table_updates, catalog_updates) = self
1406 .state
1407 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1408 .await;
1409 Ok((builtin_table_updates, catalog_updates))
1410 }
1411}
1412
1413pub fn is_reserved_name(name: &str) -> bool {
1414 BUILTIN_PREFIXES
1415 .iter()
1416 .any(|prefix| name.starts_with(prefix))
1417}
1418
1419pub fn is_reserved_role_name(name: &str) -> bool {
1420 is_reserved_name(name) || is_public_role(name)
1421}
1422
1423pub fn is_public_role(name: &str) -> bool {
1424 name == &*PUBLIC_ROLE_NAME
1425}
1426
1427pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1428 object_type_to_audit_object_type(sql_type.into())
1429}
1430
1431pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1432 match id {
1433 CommentObjectId::Table(_) => ObjectType::Table,
1434 CommentObjectId::View(_) => ObjectType::View,
1435 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1436 CommentObjectId::Source(_) => ObjectType::Source,
1437 CommentObjectId::Sink(_) => ObjectType::Sink,
1438 CommentObjectId::Index(_) => ObjectType::Index,
1439 CommentObjectId::Func(_) => ObjectType::Func,
1440 CommentObjectId::Connection(_) => ObjectType::Connection,
1441 CommentObjectId::Type(_) => ObjectType::Type,
1442 CommentObjectId::Secret(_) => ObjectType::Secret,
1443 CommentObjectId::Role(_) => ObjectType::Role,
1444 CommentObjectId::Database(_) => ObjectType::Database,
1445 CommentObjectId::Schema(_) => ObjectType::Schema,
1446 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1447 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1448 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1449 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1450 }
1451}
1452
1453pub(crate) fn object_type_to_audit_object_type(
1454 object_type: mz_sql::catalog::ObjectType,
1455) -> ObjectType {
1456 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1457}
1458
1459pub(crate) fn system_object_type_to_audit_object_type(
1460 system_type: &SystemObjectType,
1461) -> ObjectType {
1462 match system_type {
1463 SystemObjectType::Object(object_type) => match object_type {
1464 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1465 mz_sql::catalog::ObjectType::View => ObjectType::View,
1466 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1467 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1468 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1469 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1470 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1471 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1472 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1473 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1474 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1475 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1476 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1477 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1478 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1479 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1480 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1481 },
1482 SystemObjectType::System => ObjectType::System,
1483 }
1484}
1485
1486#[derive(Debug, Copy, Clone)]
1487pub enum UpdatePrivilegeVariant {
1488 Grant,
1489 Revoke,
1490}
1491
1492impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1493 fn from(variant: UpdatePrivilegeVariant) -> Self {
1494 match variant {
1495 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1496 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1497 }
1498 }
1499}
1500
1501impl From<UpdatePrivilegeVariant> for EventType {
1502 fn from(variant: UpdatePrivilegeVariant) -> Self {
1503 match variant {
1504 UpdatePrivilegeVariant::Grant => EventType::Grant,
1505 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1506 }
1507 }
1508}
1509
1510impl ConnCatalog<'_> {
1511 fn resolve_item_name(
1512 &self,
1513 name: &PartialItemName,
1514 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1515 self.resolve_item(name).map(|entry| entry.name())
1516 }
1517
1518 fn resolve_function_name(
1519 &self,
1520 name: &PartialItemName,
1521 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1522 self.resolve_function(name).map(|entry| entry.name())
1523 }
1524
1525 fn resolve_type_name(
1526 &self,
1527 name: &PartialItemName,
1528 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1529 self.resolve_type(name).map(|entry| entry.name())
1530 }
1531}
1532
1533impl ExprHumanizer for ConnCatalog<'_> {
1534 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1535 let entry = self.state.try_get_entry_by_global_id(&id)?;
1536 Some(self.resolve_full_name(entry.name()).to_string())
1537 }
1538
1539 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1540 let entry = self.state.try_get_entry_by_global_id(&id)?;
1541 Some(entry.name().item.clone())
1542 }
1543
1544 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1545 let entry = self.state.try_get_entry_by_global_id(&id)?;
1546 Some(self.resolve_full_name(entry.name()).into_parts())
1547 }
1548
1549 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1550 use SqlScalarType::*;
1551
1552 match typ {
1553 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1554 List {
1555 custom_id: Some(item_id),
1556 ..
1557 }
1558 | Map {
1559 custom_id: Some(item_id),
1560 ..
1561 } => {
1562 let item = self.get_item(item_id);
1563 self.minimal_qualification(item.name()).to_string()
1564 }
1565 List { element_type, .. } => {
1566 format!(
1567 "{} list",
1568 self.humanize_sql_scalar_type(element_type, postgres_compat)
1569 )
1570 }
1571 Map { value_type, .. } => format!(
1572 "map[{}=>{}]",
1573 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1574 self.humanize_sql_scalar_type(value_type, postgres_compat)
1575 ),
1576 Record {
1577 custom_id: Some(item_id),
1578 ..
1579 } => {
1580 let item = self.get_item(item_id);
1581 self.minimal_qualification(item.name()).to_string()
1582 }
1583 Record { fields, .. } => format!(
1584 "record({})",
1585 fields
1586 .iter()
1587 .map(|f| format!(
1588 "{}: {}",
1589 f.0,
1590 self.humanize_sql_column_type(&f.1, postgres_compat)
1591 ))
1592 .join(",")
1593 ),
1594 PgLegacyChar => "\"char\"".into(),
1595 Char { length } if !postgres_compat => match length {
1596 None => "char".into(),
1597 Some(length) => format!("char({})", length.into_u32()),
1598 },
1599 VarChar { max_length } if !postgres_compat => match max_length {
1600 None => "varchar".into(),
1601 Some(length) => format!("varchar({})", length.into_u32()),
1602 },
1603 UInt16 => "uint2".into(),
1604 UInt32 => "uint4".into(),
1605 UInt64 => "uint8".into(),
1606 ty => {
1607 let pgrepr_type = mz_pgrepr::Type::from(ty);
1608 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1609
1610 let res = if self
1611 .effective_search_path(true)
1612 .iter()
1613 .any(|(_, schema)| schema == &pg_catalog_schema)
1614 {
1615 pgrepr_type.name().to_string()
1616 } else {
1617 let name = QualifiedItemName {
1620 qualifiers: ItemQualifiers {
1621 database_spec: ResolvedDatabaseSpecifier::Ambient,
1622 schema_spec: pg_catalog_schema,
1623 },
1624 item: pgrepr_type.name().to_string(),
1625 };
1626 self.resolve_full_name(&name).to_string()
1627 };
1628 res
1629 }
1630 }
1631 }
1632
1633 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1634 let entry = self.state.try_get_entry_by_global_id(&id)?;
1635
1636 match entry.index() {
1637 Some(index) => {
1638 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1639 let mut on_names = on_desc
1640 .iter_names()
1641 .map(|col_name| col_name.to_string())
1642 .collect::<Vec<_>>();
1643
1644 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1645
1646 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1650 let mut ix_names = vec![String::new(); ix_arity];
1651
1652 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1654 let on_name = on_names.get_mut(on_pos).expect("on_name");
1655 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1656 std::mem::swap(on_name, ix_name);
1657 }
1658
1659 Some(ix_names) }
1661 None => {
1662 let desc = self.state.try_get_desc_by_global_id(&id)?;
1663 let column_names = desc
1664 .iter_names()
1665 .map(|col_name| col_name.to_string())
1666 .collect();
1667
1668 Some(column_names)
1669 }
1670 }
1671 }
1672
1673 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1674 let desc = self.state.try_get_desc_by_global_id(&id)?;
1675 Some(desc.get_name(column).to_string())
1676 }
1677
1678 fn id_exists(&self, id: GlobalId) -> bool {
1679 self.state.entry_by_global_id.contains_key(&id)
1680 }
1681}
1682
1683impl SessionCatalog for ConnCatalog<'_> {
1684 fn active_role_id(&self) -> &RoleId {
1685 &self.role_id
1686 }
1687
1688 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1689 self.prepared_statements
1690 .as_ref()
1691 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1692 .flatten()
1693 }
1694
1695 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1696 self.portals
1697 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1698 }
1699
1700 fn active_database(&self) -> Option<&DatabaseId> {
1701 self.database.as_ref()
1702 }
1703
1704 fn active_cluster(&self) -> &str {
1705 &self.cluster
1706 }
1707
1708 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1709 &self.search_path
1710 }
1711
1712 fn resolve_database(
1713 &self,
1714 database_name: &str,
1715 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1716 Ok(self.state.resolve_database(database_name)?)
1717 }
1718
1719 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1720 self.state
1721 .database_by_id
1722 .get(id)
1723 .expect("database doesn't exist")
1724 }
1725
1726 #[allow(clippy::as_conversions)]
1728 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1729 self.state
1730 .database_by_id
1731 .values()
1732 .map(|database| database as &dyn CatalogDatabase)
1733 .collect()
1734 }
1735
1736 fn resolve_schema(
1737 &self,
1738 database_name: Option<&str>,
1739 schema_name: &str,
1740 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1741 Ok(self.state.resolve_schema(
1742 self.database.as_ref(),
1743 database_name,
1744 schema_name,
1745 &self.conn_id,
1746 )?)
1747 }
1748
1749 fn resolve_schema_in_database(
1750 &self,
1751 database_spec: &ResolvedDatabaseSpecifier,
1752 schema_name: &str,
1753 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1754 Ok(self
1755 .state
1756 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1757 }
1758
1759 fn get_schema(
1760 &self,
1761 database_spec: &ResolvedDatabaseSpecifier,
1762 schema_spec: &SchemaSpecifier,
1763 ) -> &dyn CatalogSchema {
1764 self.state
1765 .get_schema(database_spec, schema_spec, &self.conn_id)
1766 }
1767
1768 #[allow(clippy::as_conversions)]
1770 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1771 self.get_databases()
1772 .into_iter()
1773 .flat_map(|database| database.schemas().into_iter())
1774 .chain(
1775 self.state
1776 .ambient_schemas_by_id
1777 .values()
1778 .chain(self.state.temporary_schemas.values())
1779 .map(|schema| schema as &dyn CatalogSchema),
1780 )
1781 .collect()
1782 }
1783
1784 fn get_mz_internal_schema_id(&self) -> SchemaId {
1785 self.state().get_mz_internal_schema_id()
1786 }
1787
1788 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1789 self.state().get_mz_unsafe_schema_id()
1790 }
1791
1792 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1793 self.state.is_system_schema_specifier(schema)
1794 }
1795
1796 fn resolve_role(
1797 &self,
1798 role_name: &str,
1799 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1800 match self.state.try_get_role_by_name(role_name) {
1801 Some(role) => Ok(role),
1802 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1803 }
1804 }
1805
1806 fn resolve_network_policy(
1807 &self,
1808 policy_name: &str,
1809 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1810 match self.state.try_get_network_policy_by_name(policy_name) {
1811 Some(policy) => Ok(policy),
1812 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1813 }
1814 }
1815
1816 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1817 Some(self.state.roles_by_id.get(id)?)
1818 }
1819
1820 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1821 self.state.get_role(id)
1822 }
1823
1824 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1825 #[allow(clippy::as_conversions)]
1827 self.state
1828 .roles_by_id
1829 .values()
1830 .map(|role| role as &dyn CatalogRole)
1831 .collect()
1832 }
1833
1834 fn mz_system_role_id(&self) -> RoleId {
1835 MZ_SYSTEM_ROLE_ID
1836 }
1837
1838 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1839 self.state.collect_role_membership(id)
1840 }
1841
1842 fn get_network_policy(
1843 &self,
1844 id: &NetworkPolicyId,
1845 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1846 self.state.get_network_policy(id)
1847 }
1848
1849 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1850 #[allow(clippy::as_conversions)]
1852 self.state
1853 .network_policies_by_id
1854 .values()
1855 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1856 .collect()
1857 }
1858
1859 fn resolve_cluster(
1860 &self,
1861 cluster_name: Option<&str>,
1862 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1863 Ok(self
1864 .state
1865 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1866 }
1867
1868 fn resolve_cluster_replica(
1869 &self,
1870 cluster_replica_name: &QualifiedReplica,
1871 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1872 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1873 }
1874
1875 fn resolve_item(
1876 &self,
1877 name: &PartialItemName,
1878 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1879 let r = self.state.resolve_entry(
1880 self.database.as_ref(),
1881 &self.effective_search_path(true),
1882 name,
1883 &self.conn_id,
1884 )?;
1885 if self.unresolvable_ids.contains(&r.id()) {
1886 Err(SqlCatalogError::UnknownItem(name.to_string()))
1887 } else {
1888 Ok(r)
1889 }
1890 }
1891
1892 fn resolve_function(
1893 &self,
1894 name: &PartialItemName,
1895 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1896 let r = self.state.resolve_function(
1897 self.database.as_ref(),
1898 &self.effective_search_path(false),
1899 name,
1900 &self.conn_id,
1901 )?;
1902
1903 if self.unresolvable_ids.contains(&r.id()) {
1904 Err(SqlCatalogError::UnknownFunction {
1905 name: name.to_string(),
1906 alternative: None,
1907 })
1908 } else {
1909 Ok(r)
1910 }
1911 }
1912
1913 fn resolve_type(
1914 &self,
1915 name: &PartialItemName,
1916 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1917 let r = self.state.resolve_type(
1918 self.database.as_ref(),
1919 &self.effective_search_path(false),
1920 name,
1921 &self.conn_id,
1922 )?;
1923
1924 if self.unresolvable_ids.contains(&r.id()) {
1925 Err(SqlCatalogError::UnknownType {
1926 name: name.to_string(),
1927 })
1928 } else {
1929 Ok(r)
1930 }
1931 }
1932
1933 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1934 self.state.get_system_type(name)
1935 }
1936
1937 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1938 Some(self.state.try_get_entry(id)?)
1939 }
1940
1941 fn try_get_item_by_global_id(
1942 &self,
1943 id: &GlobalId,
1944 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1945 let entry = self.state.try_get_entry_by_global_id(id)?;
1946 let entry = match &entry.item {
1947 CatalogItem::Table(table) => {
1948 let (version, _gid) = table
1949 .collections
1950 .iter()
1951 .find(|(_version, gid)| *gid == id)
1952 .expect("catalog out of sync, mismatched GlobalId");
1953 entry.at_version(RelationVersionSelector::Specific(*version))
1954 }
1955 _ => entry.at_version(RelationVersionSelector::Latest),
1956 };
1957 Some(entry)
1958 }
1959
1960 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1961 self.state.get_entry(id)
1962 }
1963
1964 fn get_item_by_global_id(
1965 &self,
1966 id: &GlobalId,
1967 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
1968 let entry = self.state.get_entry_by_global_id(id);
1969 let entry = match &entry.item {
1970 CatalogItem::Table(table) => {
1971 let (version, _gid) = table
1972 .collections
1973 .iter()
1974 .find(|(_version, gid)| *gid == id)
1975 .expect("catalog out of sync, mismatched GlobalId");
1976 entry.at_version(RelationVersionSelector::Specific(*version))
1977 }
1978 _ => entry.at_version(RelationVersionSelector::Latest),
1979 };
1980 entry
1981 }
1982
1983 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
1984 self.get_schemas()
1985 .into_iter()
1986 .flat_map(|schema| schema.item_ids())
1987 .map(|id| self.get_item(&id))
1988 .collect()
1989 }
1990
1991 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1992 self.state
1993 .get_item_by_name(name, &self.conn_id)
1994 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
1995 }
1996
1997 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
1998 self.state
1999 .get_type_by_name(name, &self.conn_id)
2000 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2001 }
2002
2003 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2004 &self.state.clusters_by_id[&id]
2005 }
2006
2007 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2008 self.state
2009 .clusters_by_id
2010 .values()
2011 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2012 .collect()
2013 }
2014
2015 fn get_cluster_replica(
2016 &self,
2017 cluster_id: ClusterId,
2018 replica_id: ReplicaId,
2019 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2020 let cluster = self.get_cluster(cluster_id);
2021 cluster.replica(replica_id)
2022 }
2023
2024 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2025 self.get_clusters()
2026 .into_iter()
2027 .flat_map(|cluster| cluster.replicas().into_iter())
2028 .collect()
2029 }
2030
2031 fn get_system_privileges(&self) -> &PrivilegeMap {
2032 &self.state.system_privileges
2033 }
2034
2035 fn get_default_privileges(
2036 &self,
2037 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2038 self.state
2039 .default_privileges
2040 .iter()
2041 .map(|(object, acl_items)| (object, acl_items.collect()))
2042 .collect()
2043 }
2044
2045 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2046 self.state.find_available_name(name, &self.conn_id)
2047 }
2048
2049 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2050 self.state.resolve_full_name(name, Some(&self.conn_id))
2051 }
2052
2053 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2054 self.state.resolve_full_schema_name(name)
2055 }
2056
2057 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2058 self.state.get_entry_by_global_id(global_id).id()
2059 }
2060
2061 fn resolve_global_id(
2062 &self,
2063 item_id: &CatalogItemId,
2064 version: RelationVersionSelector,
2065 ) -> GlobalId {
2066 self.state
2067 .get_entry(item_id)
2068 .at_version(version)
2069 .global_id()
2070 }
2071
2072 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2073 self.state.config()
2074 }
2075
2076 fn now(&self) -> EpochMillis {
2077 (self.state.config().now)()
2078 }
2079
2080 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2081 self.state.aws_privatelink_availability_zones.clone()
2082 }
2083
2084 fn system_vars(&self) -> &SystemVars {
2085 &self.state.system_configuration
2086 }
2087
2088 fn system_vars_mut(&mut self) -> &mut SystemVars {
2089 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2090 }
2091
2092 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2093 self.state().get_owner_id(id, self.conn_id())
2094 }
2095
2096 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2097 match id {
2098 SystemObjectId::System => Some(&self.state.system_privileges),
2099 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2100 Some(self.get_cluster(*id).privileges())
2101 }
2102 SystemObjectId::Object(ObjectId::Database(id)) => {
2103 Some(self.get_database(id).privileges())
2104 }
2105 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2106 self.state
2109 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2110 .map(|schema| schema.privileges())
2111 }
2112 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2113 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2114 Some(self.get_network_policy(id).privileges())
2115 }
2116 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2117 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2118 }
2119 }
2120
2121 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2122 let mut seen = BTreeSet::new();
2123 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2124 }
2125
2126 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2127 let mut seen = BTreeSet::new();
2128 self.state.item_dependents(id, &mut seen)
2129 }
2130
2131 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2132 rbac::all_object_privileges(object_type)
2133 }
2134
2135 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2136 self.state.get_object_type(object_id)
2137 }
2138
2139 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2140 self.state.get_system_object_type(id)
2141 }
2142
2143 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2150 if qualified_name.qualifiers.schema_spec.is_temporary() {
2151 return qualified_name.item.clone().into();
2159 }
2160
2161 let database_id = match &qualified_name.qualifiers.database_spec {
2162 ResolvedDatabaseSpecifier::Ambient => None,
2163 ResolvedDatabaseSpecifier::Id(id)
2164 if self.database.is_some() && self.database == Some(*id) =>
2165 {
2166 None
2167 }
2168 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2169 };
2170
2171 let schema_spec = if database_id.is_none()
2172 && self.resolve_item_name(&PartialItemName {
2173 database: None,
2174 schema: None,
2175 item: qualified_name.item.clone(),
2176 }) == Ok(qualified_name)
2177 || self.resolve_function_name(&PartialItemName {
2178 database: None,
2179 schema: None,
2180 item: qualified_name.item.clone(),
2181 }) == Ok(qualified_name)
2182 || self.resolve_type_name(&PartialItemName {
2183 database: None,
2184 schema: None,
2185 item: qualified_name.item.clone(),
2186 }) == Ok(qualified_name)
2187 {
2188 None
2189 } else {
2190 Some(qualified_name.qualifiers.schema_spec.clone())
2193 };
2194
2195 let res = PartialItemName {
2196 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2197 schema: schema_spec.map(|spec| {
2198 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2199 .name()
2200 .schema
2201 .clone()
2202 }),
2203 item: qualified_name.item.clone(),
2204 };
2205 assert!(
2206 self.resolve_item_name(&res) == Ok(qualified_name)
2207 || self.resolve_function_name(&res) == Ok(qualified_name)
2208 || self.resolve_type_name(&res) == Ok(qualified_name)
2209 );
2210 res
2211 }
2212
2213 fn add_notice(&self, notice: PlanNotice) {
2214 let _ = self.notices_tx.send(notice.into());
2215 }
2216
2217 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2218 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2219 self.state.comments.get_object_comments(comment_id)
2220 }
2221
2222 fn is_cluster_size_cc(&self, size: &str) -> bool {
2223 self.state
2224 .cluster_replica_sizes
2225 .0
2226 .get(size)
2227 .map_or(false, |a| a.is_cc)
2228 }
2229}
2230
2231#[cfg(test)]
2232mod tests {
2233 use std::collections::{BTreeMap, BTreeSet};
2234 use std::sync::Arc;
2235 use std::{env, iter};
2236
2237 use itertools::Itertools;
2238 use mz_catalog::memory::objects::CatalogItem;
2239 use tokio_postgres::NoTls;
2240 use tokio_postgres::types::Type;
2241 use uuid::Uuid;
2242
2243 use mz_catalog::SYSTEM_CONN_ID;
2244 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2245 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2246 use mz_controller_types::{ClusterId, ReplicaId};
2247 use mz_expr::MirScalarExpr;
2248 use mz_ore::now::to_datetime;
2249 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2250 use mz_persist_client::PersistClient;
2251 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2252 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2253 use mz_repr::role_id::RoleId;
2254 use mz_repr::{
2255 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2256 SqlScalarType, Timestamp,
2257 };
2258 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2259 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2260 use mz_sql::names::{
2261 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2262 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2263 };
2264 use mz_sql::plan::{
2265 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2266 QueryLifetime, Scope, StatementContext,
2267 };
2268 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2269 use mz_sql::session::vars::{SystemVars, VarInput};
2270
2271 use crate::catalog::state::LocalExpressionCache;
2272 use crate::catalog::{Catalog, Op};
2273 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2274 use crate::session::Session;
2275
2276 #[mz_ore::test(tokio::test)]
2283 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2285 Catalog::with_debug(|catalog| async move {
2286 struct TestCase {
2287 input: QualifiedItemName,
2288 system_output: PartialItemName,
2289 normal_output: PartialItemName,
2290 }
2291
2292 let test_cases = vec![
2293 TestCase {
2294 input: QualifiedItemName {
2295 qualifiers: ItemQualifiers {
2296 database_spec: ResolvedDatabaseSpecifier::Ambient,
2297 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2298 },
2299 item: "numeric".to_string(),
2300 },
2301 system_output: PartialItemName {
2302 database: None,
2303 schema: None,
2304 item: "numeric".to_string(),
2305 },
2306 normal_output: PartialItemName {
2307 database: None,
2308 schema: None,
2309 item: "numeric".to_string(),
2310 },
2311 },
2312 TestCase {
2313 input: QualifiedItemName {
2314 qualifiers: ItemQualifiers {
2315 database_spec: ResolvedDatabaseSpecifier::Ambient,
2316 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2317 },
2318 item: "mz_array_types".to_string(),
2319 },
2320 system_output: PartialItemName {
2321 database: None,
2322 schema: None,
2323 item: "mz_array_types".to_string(),
2324 },
2325 normal_output: PartialItemName {
2326 database: None,
2327 schema: None,
2328 item: "mz_array_types".to_string(),
2329 },
2330 },
2331 ];
2332
2333 for tc in test_cases {
2334 assert_eq!(
2335 catalog
2336 .for_system_session()
2337 .minimal_qualification(&tc.input),
2338 tc.system_output
2339 );
2340 assert_eq!(
2341 catalog
2342 .for_session(&Session::dummy())
2343 .minimal_qualification(&tc.input),
2344 tc.normal_output
2345 );
2346 }
2347 catalog.expire().await;
2348 })
2349 .await
2350 }
2351
2352 #[mz_ore::test(tokio::test)]
2353 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2355 let persist_client = PersistClient::new_for_tests().await;
2356 let organization_id = Uuid::new_v4();
2357 let bootstrap_args = test_bootstrap_args();
2358 {
2359 let mut catalog = Catalog::open_debug_catalog(
2360 persist_client.clone(),
2361 organization_id.clone(),
2362 &bootstrap_args,
2363 )
2364 .await
2365 .expect("unable to open debug catalog");
2366 assert_eq!(catalog.transient_revision(), 1);
2367 let commit_ts = catalog.current_upper().await;
2368 catalog
2369 .transact(
2370 None,
2371 commit_ts,
2372 None,
2373 vec![Op::CreateDatabase {
2374 name: "test".to_string(),
2375 owner_id: MZ_SYSTEM_ROLE_ID,
2376 }],
2377 )
2378 .await
2379 .expect("failed to transact");
2380 assert_eq!(catalog.transient_revision(), 2);
2381 catalog.expire().await;
2382 }
2383 {
2384 let catalog =
2385 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2386 .await
2387 .expect("unable to open debug catalog");
2388 assert_eq!(catalog.transient_revision(), 1);
2390 catalog.expire().await;
2391 }
2392 }
2393
2394 #[mz_ore::test(tokio::test)]
2395 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2397 Catalog::with_debug(|catalog| async move {
2398 let mz_catalog_schema = (
2399 ResolvedDatabaseSpecifier::Ambient,
2400 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2401 );
2402 let pg_catalog_schema = (
2403 ResolvedDatabaseSpecifier::Ambient,
2404 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2405 );
2406 let mz_temp_schema = (
2407 ResolvedDatabaseSpecifier::Ambient,
2408 SchemaSpecifier::Temporary,
2409 );
2410
2411 let session = Session::dummy();
2413 let conn_catalog = catalog.for_session(&session);
2414 assert_ne!(
2415 conn_catalog.effective_search_path(false),
2416 conn_catalog.search_path
2417 );
2418 assert_ne!(
2419 conn_catalog.effective_search_path(true),
2420 conn_catalog.search_path
2421 );
2422 assert_eq!(
2423 conn_catalog.effective_search_path(false),
2424 vec![
2425 mz_catalog_schema.clone(),
2426 pg_catalog_schema.clone(),
2427 conn_catalog.search_path[0].clone()
2428 ]
2429 );
2430 assert_eq!(
2431 conn_catalog.effective_search_path(true),
2432 vec![
2433 mz_temp_schema.clone(),
2434 mz_catalog_schema.clone(),
2435 pg_catalog_schema.clone(),
2436 conn_catalog.search_path[0].clone()
2437 ]
2438 );
2439
2440 let mut session = Session::dummy();
2442 session
2443 .vars_mut()
2444 .set(
2445 &SystemVars::new(),
2446 "search_path",
2447 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2448 false,
2449 )
2450 .expect("failed to set search_path");
2451 let conn_catalog = catalog.for_session(&session);
2452 assert_ne!(
2453 conn_catalog.effective_search_path(false),
2454 conn_catalog.search_path
2455 );
2456 assert_ne!(
2457 conn_catalog.effective_search_path(true),
2458 conn_catalog.search_path
2459 );
2460 assert_eq!(
2461 conn_catalog.effective_search_path(false),
2462 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2463 );
2464 assert_eq!(
2465 conn_catalog.effective_search_path(true),
2466 vec![
2467 mz_temp_schema.clone(),
2468 mz_catalog_schema.clone(),
2469 pg_catalog_schema.clone()
2470 ]
2471 );
2472
2473 let mut session = Session::dummy();
2474 session
2475 .vars_mut()
2476 .set(
2477 &SystemVars::new(),
2478 "search_path",
2479 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2480 false,
2481 )
2482 .expect("failed to set search_path");
2483 let conn_catalog = catalog.for_session(&session);
2484 assert_ne!(
2485 conn_catalog.effective_search_path(false),
2486 conn_catalog.search_path
2487 );
2488 assert_ne!(
2489 conn_catalog.effective_search_path(true),
2490 conn_catalog.search_path
2491 );
2492 assert_eq!(
2493 conn_catalog.effective_search_path(false),
2494 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2495 );
2496 assert_eq!(
2497 conn_catalog.effective_search_path(true),
2498 vec![
2499 mz_temp_schema.clone(),
2500 pg_catalog_schema.clone(),
2501 mz_catalog_schema.clone()
2502 ]
2503 );
2504
2505 let mut session = Session::dummy();
2506 session
2507 .vars_mut()
2508 .set(
2509 &SystemVars::new(),
2510 "search_path",
2511 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2512 false,
2513 )
2514 .expect("failed to set search_path");
2515 let conn_catalog = catalog.for_session(&session);
2516 assert_ne!(
2517 conn_catalog.effective_search_path(false),
2518 conn_catalog.search_path
2519 );
2520 assert_ne!(
2521 conn_catalog.effective_search_path(true),
2522 conn_catalog.search_path
2523 );
2524 assert_eq!(
2525 conn_catalog.effective_search_path(false),
2526 vec![
2527 mz_catalog_schema.clone(),
2528 pg_catalog_schema.clone(),
2529 mz_temp_schema.clone()
2530 ]
2531 );
2532 assert_eq!(
2533 conn_catalog.effective_search_path(true),
2534 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2535 );
2536 catalog.expire().await;
2537 })
2538 .await
2539 }
2540
2541 #[mz_ore::test(tokio::test)]
2542 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2544 use mz_ore::collections::CollectionExt;
2545 Catalog::with_debug(|catalog| async move {
2546 let conn_catalog = catalog.for_system_session();
2547 let scx = &mut StatementContext::new(None, &conn_catalog);
2548
2549 let parsed = mz_sql_parser::parser::parse_statements(
2550 "create view public.foo as select 1 as bar",
2551 )
2552 .expect("")
2553 .into_element()
2554 .ast;
2555
2556 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2557
2558 assert_eq!(
2560 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2561 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2562 );
2563 catalog.expire().await;
2564 })
2565 .await;
2566 }
2567
2568 #[mz_ore::test(tokio::test)]
2570 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2572 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2573 let column = ", 1";
2574 let view_def_size = view_def.bytes().count();
2575 let column_size = column.bytes().count();
2576 let column_count =
2577 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2578 let columns = iter::repeat(column).take(column_count).join("");
2579 let create_sql = format!("{view_def}{columns})");
2580 let create_sql_check = create_sql.clone();
2581 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2582 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2583 &create_sql
2584 ));
2585
2586 let persist_client = PersistClient::new_for_tests().await;
2587 let organization_id = Uuid::new_v4();
2588 let id = CatalogItemId::User(1);
2589 let gid = GlobalId::User(1);
2590 let bootstrap_args = test_bootstrap_args();
2591 {
2592 let mut catalog = Catalog::open_debug_catalog(
2593 persist_client.clone(),
2594 organization_id.clone(),
2595 &bootstrap_args,
2596 )
2597 .await
2598 .expect("unable to open debug catalog");
2599 let item = catalog
2600 .state()
2601 .deserialize_item(
2602 gid,
2603 &create_sql,
2604 &BTreeMap::new(),
2605 &mut LocalExpressionCache::Closed,
2606 None,
2607 )
2608 .expect("unable to parse view");
2609 let commit_ts = catalog.current_upper().await;
2610 catalog
2611 .transact(
2612 None,
2613 commit_ts,
2614 None,
2615 vec![Op::CreateItem {
2616 item,
2617 name: QualifiedItemName {
2618 qualifiers: ItemQualifiers {
2619 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2620 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2621 },
2622 item: "v".to_string(),
2623 },
2624 id,
2625 owner_id: MZ_SYSTEM_ROLE_ID,
2626 }],
2627 )
2628 .await
2629 .expect("failed to transact");
2630 catalog.expire().await;
2631 }
2632 {
2633 let catalog =
2634 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2635 .await
2636 .expect("unable to open debug catalog");
2637 let view = catalog.get_entry(&id);
2638 assert_eq!("v", view.name.item);
2639 match &view.item {
2640 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2641 item => panic!("expected view, got {}", item.typ()),
2642 }
2643 catalog.expire().await;
2644 }
2645 }
2646
2647 #[mz_ore::test(tokio::test)]
2648 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2650 Catalog::with_debug(|catalog| async move {
2651 let conn_catalog = catalog.for_system_session();
2652
2653 assert_eq!(
2654 mz_sql::catalog::ObjectType::ClusterReplica,
2655 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2656 ClusterId::user(1).expect("1 is a valid ID"),
2657 ReplicaId::User(1)
2658 )))
2659 );
2660 assert_eq!(
2661 mz_sql::catalog::ObjectType::Role,
2662 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2663 );
2664 catalog.expire().await;
2665 })
2666 .await;
2667 }
2668
2669 #[mz_ore::test(tokio::test)]
2670 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2672 Catalog::with_debug(|catalog| async move {
2673 let conn_catalog = catalog.for_system_session();
2674
2675 assert_eq!(
2676 None,
2677 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2678 ClusterId::user(1).expect("1 is a valid ID"),
2679 ReplicaId::User(1),
2680 ))))
2681 );
2682 assert_eq!(
2683 None,
2684 conn_catalog
2685 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2686 );
2687 catalog.expire().await;
2688 })
2689 .await;
2690 }
2691
2692 #[mz_ore::test(tokio::test)]
2693 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2695 Catalog::with_debug(|catalog| async move {
2696 let conn_catalog = catalog.for_system_session();
2697
2698 let builtins_cfg = BuiltinsConfig {
2699 include_continual_tasks: true,
2700 };
2701 for builtin in BUILTINS::iter(&builtins_cfg) {
2702 let (schema, name, expected_desc) = match builtin {
2703 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2704 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2705 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2706 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2707 Builtin::Log(_)
2708 | Builtin::Type(_)
2709 | Builtin::Func(_)
2710 | Builtin::ContinualTask(_)
2711 | Builtin::Index(_)
2712 | Builtin::Connection(_) => continue,
2713 };
2714 let item = conn_catalog
2715 .resolve_item(&PartialItemName {
2716 database: None,
2717 schema: Some(schema.to_string()),
2718 item: name.to_string(),
2719 })
2720 .expect("unable to resolve item")
2721 .at_version(RelationVersionSelector::Latest);
2722
2723 let actual_desc = item.relation_desc().expect("invalid item type");
2724 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2725 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2726 {
2727 assert_eq!(
2728 actual_name, expected_name,
2729 "item {schema}.{name} column {index} name did not match its expected name"
2730 );
2731 assert_eq!(
2732 actual_typ, expected_typ,
2733 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2734 );
2735 }
2736 assert_eq!(
2737 &*actual_desc, expected_desc,
2738 "item {schema}.{name} did not match its expected RelationDesc"
2739 );
2740 }
2741 catalog.expire().await;
2742 })
2743 .await
2744 }
2745
2746 #[mz_ore::test(tokio::test)]
2749 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2751 async fn inner(catalog: Catalog) {
2752 let (client, connection) = tokio_postgres::connect(
2756 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2757 NoTls,
2758 )
2759 .await
2760 .expect("failed to connect to Postgres");
2761
2762 task::spawn(|| "compare_builtin_postgres", async move {
2763 if let Err(e) = connection.await {
2764 panic!("connection error: {}", e);
2765 }
2766 });
2767
2768 struct PgProc {
2769 name: String,
2770 arg_oids: Vec<u32>,
2771 ret_oid: Option<u32>,
2772 ret_set: bool,
2773 }
2774
2775 struct PgType {
2776 name: String,
2777 ty: String,
2778 elem: u32,
2779 array: u32,
2780 input: u32,
2781 receive: u32,
2782 }
2783
2784 struct PgOper {
2785 oprresult: u32,
2786 name: String,
2787 }
2788
2789 let pg_proc: BTreeMap<_, _> = client
2790 .query(
2791 "SELECT
2792 p.oid,
2793 proname,
2794 proargtypes,
2795 prorettype,
2796 proretset
2797 FROM pg_proc p
2798 JOIN pg_namespace n ON p.pronamespace = n.oid",
2799 &[],
2800 )
2801 .await
2802 .expect("pg query failed")
2803 .into_iter()
2804 .map(|row| {
2805 let oid: u32 = row.get("oid");
2806 let pg_proc = PgProc {
2807 name: row.get("proname"),
2808 arg_oids: row.get("proargtypes"),
2809 ret_oid: row.get("prorettype"),
2810 ret_set: row.get("proretset"),
2811 };
2812 (oid, pg_proc)
2813 })
2814 .collect();
2815
2816 let pg_type: BTreeMap<_, _> = client
2817 .query(
2818 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2819 &[],
2820 )
2821 .await
2822 .expect("pg query failed")
2823 .into_iter()
2824 .map(|row| {
2825 let oid: u32 = row.get("oid");
2826 let pg_type = PgType {
2827 name: row.get("typname"),
2828 ty: row.get("typtype"),
2829 elem: row.get("typelem"),
2830 array: row.get("typarray"),
2831 input: row.get("typinput"),
2832 receive: row.get("typreceive"),
2833 };
2834 (oid, pg_type)
2835 })
2836 .collect();
2837
2838 let pg_oper: BTreeMap<_, _> = client
2839 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2840 .await
2841 .expect("pg query failed")
2842 .into_iter()
2843 .map(|row| {
2844 let oid: u32 = row.get("oid");
2845 let pg_oper = PgOper {
2846 name: row.get("oprname"),
2847 oprresult: row.get("oprresult"),
2848 };
2849 (oid, pg_oper)
2850 })
2851 .collect();
2852
2853 let conn_catalog = catalog.for_system_session();
2854 let resolve_type_oid = |item: &str| {
2855 conn_catalog
2856 .resolve_type(&PartialItemName {
2857 database: None,
2858 schema: Some(PG_CATALOG_SCHEMA.into()),
2861 item: item.to_string(),
2862 })
2863 .expect("unable to resolve type")
2864 .oid()
2865 };
2866
2867 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2868 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2869 .collect();
2870
2871 let mut all_oids = BTreeSet::new();
2872
2873 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2876 [
2877 (Type::NAME, Type::TEXT),
2879 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2880 (Type::TIME, Type::TIMETZ),
2882 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2883 ]
2884 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2885 );
2886 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2887 1619, ]);
2889 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2890 if ignore_return_types.contains(&fn_oid) {
2891 return true;
2892 }
2893 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2894 return true;
2895 }
2896 a == b
2897 };
2898
2899 let builtins_cfg = BuiltinsConfig {
2900 include_continual_tasks: true,
2901 };
2902 for builtin in BUILTINS::iter(&builtins_cfg) {
2903 match builtin {
2904 Builtin::Type(ty) => {
2905 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2906
2907 if ty.oid >= FIRST_MATERIALIZE_OID {
2908 continue;
2911 }
2912
2913 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2916 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2917 });
2918 assert_eq!(
2919 ty.name, pg_ty.name,
2920 "oid {} has name {} in postgres; expected {}",
2921 ty.oid, pg_ty.name, ty.name,
2922 );
2923
2924 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2925 None => (0, 0),
2926 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2927 };
2928 assert_eq!(
2929 typinput_oid, pg_ty.input,
2930 "type {} has typinput OID {:?} in mz but {:?} in pg",
2931 ty.name, typinput_oid, pg_ty.input,
2932 );
2933 assert_eq!(
2934 typreceive_oid, pg_ty.receive,
2935 "type {} has typreceive OID {:?} in mz but {:?} in pg",
2936 ty.name, typreceive_oid, pg_ty.receive,
2937 );
2938 if typinput_oid != 0 {
2939 assert!(
2940 func_oids.contains(&typinput_oid),
2941 "type {} has typinput OID {} that does not exist in pg_proc",
2942 ty.name,
2943 typinput_oid,
2944 );
2945 }
2946 if typreceive_oid != 0 {
2947 assert!(
2948 func_oids.contains(&typreceive_oid),
2949 "type {} has typreceive OID {} that does not exist in pg_proc",
2950 ty.name,
2951 typreceive_oid,
2952 );
2953 }
2954
2955 match &ty.details.typ {
2957 CatalogType::Array { element_reference } => {
2958 let elem_ty = BUILTINS::iter(&builtins_cfg)
2959 .filter_map(|builtin| match builtin {
2960 Builtin::Type(ty @ BuiltinType { name, .. })
2961 if element_reference == name =>
2962 {
2963 Some(ty)
2964 }
2965 _ => None,
2966 })
2967 .next();
2968 let elem_ty = match elem_ty {
2969 Some(ty) => ty,
2970 None => {
2971 panic!("{} is unexpectedly not a type", element_reference)
2972 }
2973 };
2974 assert_eq!(
2975 pg_ty.elem, elem_ty.oid,
2976 "type {} has mismatched element OIDs",
2977 ty.name
2978 )
2979 }
2980 CatalogType::Pseudo => {
2981 assert_eq!(
2982 pg_ty.ty, "p",
2983 "type {} is not a pseudo type as expected",
2984 ty.name
2985 )
2986 }
2987 CatalogType::Range { .. } => {
2988 assert_eq!(
2989 pg_ty.ty, "r",
2990 "type {} is not a range type as expected",
2991 ty.name
2992 );
2993 }
2994 _ => {
2995 assert_eq!(
2996 pg_ty.ty, "b",
2997 "type {} is not a base type as expected",
2998 ty.name
2999 )
3000 }
3001 }
3002
3003 let schema = catalog
3005 .resolve_schema_in_database(
3006 &ResolvedDatabaseSpecifier::Ambient,
3007 ty.schema,
3008 &SYSTEM_CONN_ID,
3009 )
3010 .expect("unable to resolve schema");
3011 let allocated_type = catalog
3012 .resolve_type(
3013 None,
3014 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3015 &PartialItemName {
3016 database: None,
3017 schema: Some(schema.name().schema.clone()),
3018 item: ty.name.to_string(),
3019 },
3020 &SYSTEM_CONN_ID,
3021 )
3022 .expect("unable to resolve type");
3023 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3024 ty
3025 } else {
3026 panic!("unexpectedly not a type")
3027 };
3028 match ty.details.array_id {
3029 Some(array_id) => {
3030 let array_ty = catalog.get_entry(&array_id);
3031 assert_eq!(
3032 pg_ty.array, array_ty.oid,
3033 "type {} has mismatched array OIDs",
3034 allocated_type.name.item,
3035 );
3036 }
3037 None => assert_eq!(
3038 pg_ty.array, 0,
3039 "type {} does not have an array type in mz but does in pg",
3040 allocated_type.name.item,
3041 ),
3042 }
3043 }
3044 Builtin::Func(func) => {
3045 for imp in func.inner.func_impls() {
3046 assert!(
3047 all_oids.insert(imp.oid),
3048 "{} reused oid {}",
3049 func.name,
3050 imp.oid
3051 );
3052
3053 assert!(
3054 imp.oid < FIRST_USER_OID,
3055 "built-in function {} erroneously has OID in user space ({})",
3056 func.name,
3057 imp.oid,
3058 );
3059
3060 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3063 continue;
3064 } else {
3065 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3066 panic!(
3067 "pg_proc missing function {}: oid {}",
3068 func.name, imp.oid
3069 )
3070 })
3071 };
3072 assert_eq!(
3073 func.name, pg_fn.name,
3074 "funcs with oid {} don't match names: {} in mz, {} in pg",
3075 imp.oid, func.name, pg_fn.name
3076 );
3077
3078 let imp_arg_oids = imp
3081 .arg_typs
3082 .iter()
3083 .map(|item| resolve_type_oid(item))
3084 .collect::<Vec<_>>();
3085
3086 if imp_arg_oids != pg_fn.arg_oids {
3087 println!(
3088 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3089 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3090 );
3091 }
3092
3093 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3094
3095 assert!(
3096 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3097 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3098 imp.oid,
3099 func.name,
3100 imp_return_oid,
3101 pg_fn.ret_oid
3102 );
3103
3104 assert_eq!(
3105 imp.return_is_set, pg_fn.ret_set,
3106 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3107 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3108 );
3109 }
3110 }
3111 _ => (),
3112 }
3113 }
3114
3115 for (op, func) in OP_IMPLS.iter() {
3116 for imp in func.func_impls() {
3117 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3118
3119 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3121 continue;
3122 } else {
3123 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3124 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3125 })
3126 };
3127
3128 assert_eq!(*op, pg_op.name);
3129
3130 let imp_return_oid =
3131 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3132 if imp_return_oid != pg_op.oprresult {
3133 panic!(
3134 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3135 imp.oid, op, imp_return_oid, pg_op.oprresult
3136 );
3137 }
3138 }
3139 }
3140 catalog.expire().await;
3141 }
3142
3143 Catalog::with_debug(inner).await
3144 }
3145
3146 #[mz_ore::test(tokio::test)]
3148 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3150 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3151 let catalog = Arc::new(catalog);
3152 let conn_catalog = catalog.for_system_session();
3153
3154 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3155 let mut handles = Vec::new();
3156
3157 let ignore_names = BTreeSet::from([
3159 "avg",
3160 "avg_internal_v1",
3161 "bool_and",
3162 "bool_or",
3163 "has_table_privilege", "has_type_privilege", "mod",
3166 "mz_panic",
3167 "mz_sleep",
3168 "pow",
3169 "stddev_pop",
3170 "stddev_samp",
3171 "stddev",
3172 "var_pop",
3173 "var_samp",
3174 "variance",
3175 ]);
3176
3177 let fns = BUILTINS::funcs()
3178 .map(|func| (&func.name, func.inner))
3179 .chain(OP_IMPLS.iter());
3180
3181 for (name, func) in fns {
3182 if ignore_names.contains(name) {
3183 continue;
3184 }
3185 let Func::Scalar(impls) = func else {
3186 continue;
3187 };
3188
3189 'outer: for imp in impls {
3190 let details = imp.details();
3191 let mut styps = Vec::new();
3192 for item in details.arg_typs.iter() {
3193 let oid = resolve_type_oid(item);
3194 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3195 continue 'outer;
3196 };
3197 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3198 }
3199 let datums = styps
3200 .iter()
3201 .map(|styp| {
3202 let mut datums = vec![Datum::Null];
3203 datums.extend(styp.interesting_datums());
3204 datums
3205 })
3206 .collect::<Vec<_>>();
3207 if datums.is_empty() {
3209 continue;
3210 }
3211
3212 let return_oid = details
3213 .return_typ
3214 .map(resolve_type_oid)
3215 .expect("must exist");
3216 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3217 .ok()
3218 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3219
3220 let mut idxs = vec![0; datums.len()];
3221 while idxs[0] < datums[0].len() {
3222 let mut args = Vec::with_capacity(idxs.len());
3223 for i in 0..(datums.len()) {
3224 args.push(datums[i][idxs[i]]);
3225 }
3226
3227 let op = &imp.op;
3228 let scalars = args
3229 .iter()
3230 .enumerate()
3231 .map(|(i, datum)| {
3232 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3233 datum.clone(),
3234 styps[i].clone(),
3235 ))
3236 })
3237 .collect();
3238
3239 let call_name = format!(
3240 "{name}({}) (oid: {})",
3241 args.iter()
3242 .map(|d| d.to_string())
3243 .collect::<Vec<_>>()
3244 .join(", "),
3245 imp.oid
3246 );
3247 let catalog = Arc::clone(&catalog);
3248 let call_name_fn = call_name.clone();
3249 let return_styp = return_styp.clone();
3250 let handle = task::spawn_blocking(
3251 || call_name,
3252 move || {
3253 smoketest_fn(
3254 name,
3255 call_name_fn,
3256 op,
3257 imp,
3258 args,
3259 catalog,
3260 scalars,
3261 return_styp,
3262 )
3263 },
3264 );
3265 handles.push(handle);
3266
3267 for i in (0..datums.len()).rev() {
3269 idxs[i] += 1;
3270 if idxs[i] >= datums[i].len() {
3271 if i == 0 {
3272 break;
3273 }
3274 idxs[i] = 0;
3275 continue;
3276 } else {
3277 break;
3278 }
3279 }
3280 }
3281 }
3282 }
3283 handles
3284 }
3285
3286 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3287 for handle in handles {
3288 handle.await;
3289 }
3290 }
3291
3292 fn smoketest_fn(
3293 name: &&str,
3294 call_name: String,
3295 op: &Operation<HirScalarExpr>,
3296 imp: &FuncImpl<HirScalarExpr>,
3297 args: Vec<Datum<'_>>,
3298 catalog: Arc<Catalog>,
3299 scalars: Vec<CoercibleScalarExpr>,
3300 return_styp: Option<SqlScalarType>,
3301 ) {
3302 let conn_catalog = catalog.for_system_session();
3303 let pcx = PlanContext::zero();
3304 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3305 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3306 let ecx = ExprContext {
3307 qcx: &qcx,
3308 name: "smoketest",
3309 scope: &Scope::empty(),
3310 relation_type: &SqlRelationType::empty(),
3311 allow_aggregates: false,
3312 allow_subqueries: false,
3313 allow_parameters: false,
3314 allow_windows: false,
3315 };
3316 let arena = RowArena::new();
3317 let mut session = Session::dummy();
3318 session
3319 .start_transaction(to_datetime(0), None, None)
3320 .expect("must succeed");
3321 let prep_style = ExprPrepOneShot {
3322 logical_time: EvalTime::Time(Timestamp::MIN),
3323 session: &session,
3324 catalog_state: &catalog.state,
3325 };
3326
3327 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3330 if let Ok(hir) = res {
3331 let uneliminated_result_row = {
3332 if let HirScalarExpr::CallUnary { func, .. } = &hir
3333 && func.is_eliminable_cast()
3334 {
3335 let mut uneliminated_mir = hir
3336 .clone()
3337 .lower_uncorrelated(HirToMirConfig {
3338 enable_cast_elimination: false,
3339 ..catalog.system_config().into()
3340 })
3341 .expect("lowering eliminable cast should always succeed");
3342 prep_style
3343 .prep_scalar_expr(&mut uneliminated_mir)
3344 .expect("must succeed");
3345
3346 uneliminated_mir
3348 .eval(&[], &arena)
3349 .ok()
3350 .map(|datum| Row::pack([datum]))
3351 } else {
3352 None
3353 }
3354 };
3355
3356 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3357 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3359
3360 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3361 if let Some(return_styp) = return_styp {
3362 let mir_typ = mir.typ(&[]);
3363 soft_assert_eq_or_log!(
3366 mir_typ.scalar_type,
3367 (&return_styp).into(),
3368 "MIR type did not match the catalog type (cast elimination/repr type error)"
3369 );
3370 if !eval_result_datum.is_instance_of(&mir_typ) {
3374 panic!(
3375 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3376 );
3377 }
3378 if let Some(row) = uneliminated_result_row {
3380 let uneliminated_result_datum = row.unpack_first();
3381 assert_eq!(
3382 uneliminated_result_datum, eval_result_datum,
3383 "datums should not change if cast is eliminable"
3384 );
3385 }
3386 if let Some((introduces_nulls, propagates_nulls)) =
3389 call_introduces_propagates_nulls(&mir)
3390 {
3391 if introduces_nulls {
3392 assert!(
3396 mir_typ.nullable,
3397 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3398 name, args, mir, mir_typ.nullable
3399 );
3400 } else {
3401 let any_input_null = args.iter().any(|arg| arg.is_null());
3402 if !any_input_null {
3403 assert!(
3404 !mir_typ.nullable,
3405 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3406 name, args, mir, mir_typ.nullable
3407 );
3408 } else if propagates_nulls {
3409 assert!(
3412 mir_typ.nullable,
3413 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3414 name, args, mir, mir_typ.nullable
3415 );
3416 }
3417 }
3422 }
3423 let mut reduced = mir.clone();
3426 reduced.reduce(&[]);
3427 match reduced {
3428 MirScalarExpr::Literal(reduce_result, ctyp) => {
3429 match reduce_result {
3430 Ok(reduce_result_row) => {
3431 let reduce_result_datum = reduce_result_row.unpack_first();
3432 assert_eq!(
3433 reduce_result_datum,
3434 eval_result_datum,
3435 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3436 name,
3437 args,
3438 mir,
3439 eval_result_datum,
3440 mir_typ.scalar_type,
3441 reduce_result_datum,
3442 ctyp.scalar_type
3443 );
3444 assert_eq!(
3450 ctyp.scalar_type,
3451 mir_typ.scalar_type,
3452 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3453 name,
3454 args,
3455 mir,
3456 eval_result_datum,
3457 mir_typ.scalar_type,
3458 reduce_result_datum,
3459 ctyp.scalar_type
3460 );
3461 }
3462 Err(..) => {} }
3464 }
3465 _ => unreachable!(
3466 "all args are literals, so should have reduced to a literal"
3467 ),
3468 }
3469 }
3470 }
3471 }
3472 }
3473 }
3474
3475 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3480 match mir_func_call {
3481 MirScalarExpr::CallUnary { func, expr } => {
3482 if expr.is_literal() {
3483 Some((func.introduces_nulls(), func.propagates_nulls()))
3484 } else {
3485 None
3486 }
3487 }
3488 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3489 if expr1.is_literal() && expr2.is_literal() {
3490 Some((func.introduces_nulls(), func.propagates_nulls()))
3491 } else {
3492 None
3493 }
3494 }
3495 MirScalarExpr::CallVariadic { func, exprs } => {
3496 if exprs.iter().all(|arg| arg.is_literal()) {
3497 Some((func.introduces_nulls(), func.propagates_nulls()))
3498 } else {
3499 None
3500 }
3501 }
3502 _ => None,
3503 }
3504 }
3505
3506 #[mz_ore::test(tokio::test)]
3508 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3510 Catalog::with_debug(|catalog| async move {
3511 let conn_catalog = catalog.for_system_session();
3512
3513 for view in BUILTINS::views().filter(|view| {
3514 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3515 }) {
3516 let item = conn_catalog
3517 .resolve_item(&PartialItemName {
3518 database: None,
3519 schema: Some(view.schema.to_string()),
3520 item: view.name.to_string(),
3521 })
3522 .expect("unable to resolve view")
3523 .at_version(RelationVersionSelector::Latest);
3525 let full_name = conn_catalog.resolve_full_name(item.name());
3526 let desc = item.relation_desc().expect("invalid item type");
3527 for col_type in desc.iter_types() {
3528 match &col_type.scalar_type {
3529 typ @ SqlScalarType::UInt16
3530 | typ @ SqlScalarType::UInt32
3531 | typ @ SqlScalarType::UInt64
3532 | typ @ SqlScalarType::MzTimestamp
3533 | typ @ SqlScalarType::List { .. }
3534 | typ @ SqlScalarType::Map { .. }
3535 | typ @ SqlScalarType::MzAclItem => {
3536 panic!("{typ:?} type found in {full_name}");
3537 }
3538 SqlScalarType::AclItem
3539 | SqlScalarType::Bool
3540 | SqlScalarType::Int16
3541 | SqlScalarType::Int32
3542 | SqlScalarType::Int64
3543 | SqlScalarType::Float32
3544 | SqlScalarType::Float64
3545 | SqlScalarType::Numeric { .. }
3546 | SqlScalarType::Date
3547 | SqlScalarType::Time
3548 | SqlScalarType::Timestamp { .. }
3549 | SqlScalarType::TimestampTz { .. }
3550 | SqlScalarType::Interval
3551 | SqlScalarType::PgLegacyChar
3552 | SqlScalarType::Bytes
3553 | SqlScalarType::String
3554 | SqlScalarType::Char { .. }
3555 | SqlScalarType::VarChar { .. }
3556 | SqlScalarType::Jsonb
3557 | SqlScalarType::Uuid
3558 | SqlScalarType::Array(_)
3559 | SqlScalarType::Record { .. }
3560 | SqlScalarType::Oid
3561 | SqlScalarType::RegProc
3562 | SqlScalarType::RegType
3563 | SqlScalarType::RegClass
3564 | SqlScalarType::Int2Vector
3565 | SqlScalarType::Range { .. }
3566 | SqlScalarType::PgLegacyName => {}
3567 }
3568 }
3569 }
3570 catalog.expire().await;
3571 })
3572 .await
3573 }
3574
3575 #[mz_ore::test(tokio::test)]
3578 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3580 Catalog::with_debug(|catalog| async move {
3581 let conn_catalog = catalog.for_system_session();
3582
3583 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3584 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3585
3586 for entry in catalog.entries() {
3587 let schema_spec = entry.name().qualifiers.schema_spec;
3588 let introspection_deps = catalog.introspection_dependencies(entry.id);
3589 if introspection_deps.is_empty() {
3590 assert!(
3591 schema_spec != introspection_schema_spec,
3592 "entry does not depend on introspection sources but is in \
3593 `mz_introspection`: {}",
3594 conn_catalog.resolve_full_name(entry.name()),
3595 );
3596 } else {
3597 assert!(
3598 schema_spec == introspection_schema_spec,
3599 "entry depends on introspection sources but is not in \
3600 `mz_introspection`: {}",
3601 conn_catalog.resolve_full_name(entry.name()),
3602 );
3603 }
3604 }
3605 })
3606 .await
3607 }
3608
3609 #[mz_ore::test(tokio::test)]
3610 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3612 let persist_client = PersistClient::new_for_tests().await;
3613 let bootstrap_args = test_bootstrap_args();
3614 let organization_id = Uuid::new_v4();
3615 let db_name = "DB";
3616
3617 let mut writer_catalog = Catalog::open_debug_catalog(
3618 persist_client.clone(),
3619 organization_id.clone(),
3620 &bootstrap_args,
3621 )
3622 .await
3623 .expect("open_debug_catalog");
3624 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3625 persist_client.clone(),
3626 organization_id.clone(),
3627 &bootstrap_args,
3628 )
3629 .await
3630 .expect("open_debug_read_only_catalog");
3631 assert_err!(writer_catalog.resolve_database(db_name));
3632 assert_err!(read_only_catalog.resolve_database(db_name));
3633
3634 let commit_ts = writer_catalog.current_upper().await;
3635 writer_catalog
3636 .transact(
3637 None,
3638 commit_ts,
3639 None,
3640 vec![Op::CreateDatabase {
3641 name: db_name.to_string(),
3642 owner_id: MZ_SYSTEM_ROLE_ID,
3643 }],
3644 )
3645 .await
3646 .expect("failed to transact");
3647
3648 let write_db = writer_catalog
3649 .resolve_database(db_name)
3650 .expect("resolve_database");
3651 read_only_catalog
3652 .sync_to_current_updates()
3653 .await
3654 .expect("sync_to_current_updates");
3655 let read_db = read_only_catalog
3656 .resolve_database(db_name)
3657 .expect("resolve_database");
3658
3659 assert_eq!(write_db, read_db);
3660
3661 let writer_catalog_fencer =
3662 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3663 .await
3664 .expect("open_debug_catalog for fencer");
3665 let fencer_db = writer_catalog_fencer
3666 .resolve_database(db_name)
3667 .expect("resolve_database for fencer");
3668 assert_eq!(fencer_db, read_db);
3669
3670 let write_fence_err = writer_catalog
3671 .sync_to_current_updates()
3672 .await
3673 .expect_err("sync_to_current_updates for fencer");
3674 assert!(matches!(
3675 write_fence_err,
3676 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3677 ));
3678 let read_fence_err = read_only_catalog
3679 .sync_to_current_updates()
3680 .await
3681 .expect_err("sync_to_current_updates after fencer");
3682 assert!(matches!(
3683 read_fence_err,
3684 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3685 ));
3686
3687 writer_catalog.expire().await;
3688 read_only_catalog.expire().await;
3689 writer_catalog_fencer.expire().await;
3690 }
3691}