1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, STORAGE_USAGE_ID_ALLOC_KEY, TestCatalogStateBuilder,
39 test_bootstrap_args,
40};
41use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
42use mz_catalog::memory::error::{Error, ErrorKind};
43use mz_catalog::memory::objects::{
44 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
45 NetworkPolicy, Role, RoleAuth, Schema,
46};
47use mz_compute_types::dataflows::DataflowDescription;
48use mz_controller::clusters::ReplicaLocation;
49use mz_controller_types::{ClusterId, ReplicaId};
50use mz_expr::OptimizedMirRelationExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68 SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 expr_cache_handle: Option<ExpressionCacheHandle>,
140 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141 transient_revision: u64,
142}
143
144impl Clone for Catalog {
147 fn clone(&self) -> Self {
148 Self {
149 state: self.state.clone(),
150 expr_cache_handle: self.expr_cache_handle.clone(),
151 storage: Arc::clone(&self.storage),
152 transient_revision: self.transient_revision,
153 }
154 }
155}
156
157impl Catalog {
158 #[mz_ore::instrument(level = "trace")]
164 pub fn set_optimized_plan(
165 &mut self,
166 id: GlobalId,
167 plan: DataflowDescription<OptimizedMirRelationExpr>,
168 ) {
169 self.state.set_optimized_plan(id, plan);
170 }
171
172 #[mz_ore::instrument(level = "trace")]
178 pub fn set_physical_plan(
179 &mut self,
180 id: GlobalId,
181 plan: DataflowDescription<mz_compute_types::plan::Plan>,
182 ) {
183 self.state.set_physical_plan(id, plan);
184 }
185
186 #[mz_ore::instrument(level = "trace")]
188 pub fn try_get_optimized_plan(
189 &self,
190 id: &GlobalId,
191 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
192 let entry = self.state.try_get_entry_by_global_id(id)?;
193 entry.item().optimized_plan().map(AsRef::as_ref)
194 }
195
196 #[mz_ore::instrument(level = "trace")]
198 pub fn try_get_physical_plan(
199 &self,
200 id: &GlobalId,
201 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202 let entry = self.state.try_get_entry_by_global_id(id)?;
203 entry.item().physical_plan().map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
212 pub fn set_dataflow_metainfo(
213 &mut self,
214 id: GlobalId,
215 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
216 ) {
217 self.state.set_dataflow_metainfo(id, metainfo);
218 }
219
220 #[mz_ore::instrument(level = "trace")]
222 pub fn try_get_dataflow_metainfo(
223 &self,
224 id: &GlobalId,
225 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
226 let entry = self.state.try_get_entry_by_global_id(id)?;
227 entry.item().dataflow_metainfo()
228 }
229}
230
231#[derive(Debug)]
232pub struct ConnCatalog<'a> {
233 state: Cow<'a, CatalogState>,
234 unresolvable_ids: BTreeSet<CatalogItemId>,
244 conn_id: ConnectionId,
245 cluster: String,
246 database: Option<DatabaseId>,
247 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
248 role_id: RoleId,
249 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
250 portals: Option<&'a BTreeMap<String, Portal>>,
251 notices_tx: UnboundedSender<AdapterNotice>,
252 restrict_to_user_objects: bool,
253}
254
255impl ConnCatalog<'_> {
256 pub fn conn_id(&self) -> &ConnectionId {
257 &self.conn_id
258 }
259
260 pub fn state(&self) -> &CatalogState {
261 &*self.state
262 }
263
264 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
274 assert_eq!(
275 self.role_id, MZ_SYSTEM_ROLE_ID,
276 "only the system role can mark IDs unresolvable",
277 );
278 self.unresolvable_ids.insert(id);
279 }
280
281 pub fn effective_search_path(
287 &self,
288 include_temp_schema: bool,
289 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
290 self.state
291 .effective_search_path(&self.search_path, include_temp_schema)
292 }
293}
294
295impl ConnectionResolver for ConnCatalog<'_> {
296 fn resolve_connection(
297 &self,
298 id: CatalogItemId,
299 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
300 self.state().resolve_connection(id)
301 }
302}
303
304impl Catalog {
305 pub fn transient_revision(&self) -> u64 {
309 self.transient_revision
310 }
311
312 pub async fn with_debug<F, Fut, T>(f: F) -> T
324 where
325 F: FnOnce(Catalog) -> Fut,
326 Fut: Future<Output = T>,
327 {
328 let persist_client = PersistClient::new_for_tests().await;
329 let organization_id = Uuid::new_v4();
330 let bootstrap_args = test_bootstrap_args();
331 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
332 .await
333 .expect("can open debug catalog");
334 f(catalog).await
335 }
336
337 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
340 where
341 F: FnOnce(Catalog) -> Fut,
342 Fut: Future<Output = T>,
343 {
344 let persist_client = PersistClient::new_for_tests().await;
345 let organization_id = Uuid::new_v4();
346 let bootstrap_args = test_bootstrap_args();
347 let mut catalog =
348 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
349 .await
350 .expect("can open debug catalog");
351
352 let now = SYSTEM_TIME.clone();
354 let openable_storage = TestCatalogStateBuilder::new(persist_client)
355 .with_organization_id(organization_id)
356 .with_default_deploy_generation()
357 .build()
358 .await
359 .expect("can create durable catalog");
360 let mut storage = openable_storage
361 .open(now().into(), &bootstrap_args)
362 .await
363 .expect("can open durable catalog")
364 .0;
365 let _ = storage
367 .sync_to_current_updates()
368 .await
369 .expect("can sync to current updates");
370 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
371
372 f(catalog).await
373 }
374
375 pub async fn open_debug_catalog(
379 persist_client: PersistClient,
380 organization_id: Uuid,
381 bootstrap_args: &BootstrapArgs,
382 ) -> Result<Catalog, anyhow::Error> {
383 let now = SYSTEM_TIME.clone();
384 let environment_id = None;
385 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
386 .with_organization_id(organization_id)
387 .with_default_deploy_generation()
388 .build()
389 .await?;
390 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
391 let system_parameter_defaults = BTreeMap::default();
392 Self::open_debug_catalog_inner(
393 persist_client,
394 storage,
395 now,
396 environment_id,
397 &DUMMY_BUILD_INFO,
398 system_parameter_defaults,
399 bootstrap_args,
400 None,
401 )
402 .await
403 }
404
405 pub async fn open_debug_read_only_catalog(
410 persist_client: PersistClient,
411 organization_id: Uuid,
412 bootstrap_args: &BootstrapArgs,
413 ) -> Result<Catalog, anyhow::Error> {
414 let now = SYSTEM_TIME.clone();
415 let environment_id = None;
416 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
417 .with_organization_id(organization_id)
418 .build()
419 .await?;
420 let storage = openable_storage
421 .open_read_only(&test_bootstrap_args())
422 .await?;
423 let system_parameter_defaults = BTreeMap::default();
424 Self::open_debug_catalog_inner(
425 persist_client,
426 storage,
427 now,
428 environment_id,
429 &DUMMY_BUILD_INFO,
430 system_parameter_defaults,
431 bootstrap_args,
432 None,
433 )
434 .await
435 }
436
437 pub async fn open_debug_read_only_persist_catalog_config(
442 persist_client: PersistClient,
443 now: NowFn,
444 environment_id: EnvironmentId,
445 system_parameter_defaults: BTreeMap<String, String>,
446 build_info: &'static BuildInfo,
447 bootstrap_args: &BootstrapArgs,
448 enable_expression_cache_override: Option<bool>,
449 ) -> Result<Catalog, anyhow::Error> {
450 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
451 .with_organization_id(environment_id.organization_id())
452 .with_version(
453 build_info
454 .version
455 .parse()
456 .expect("build version is parseable"),
457 )
458 .build()
459 .await?;
460 let storage = openable_storage.open_read_only(bootstrap_args).await?;
461 Self::open_debug_catalog_inner(
462 persist_client,
463 storage,
464 now,
465 Some(environment_id),
466 build_info,
467 system_parameter_defaults,
468 bootstrap_args,
469 enable_expression_cache_override,
470 )
471 .await
472 }
473
474 async fn open_debug_catalog_inner(
475 persist_client: PersistClient,
476 storage: Box<dyn DurableCatalogState>,
477 now: NowFn,
478 environment_id: Option<EnvironmentId>,
479 build_info: &'static BuildInfo,
480 system_parameter_defaults: BTreeMap<String, String>,
481 bootstrap_args: &BootstrapArgs,
482 enable_expression_cache_override: Option<bool>,
483 ) -> Result<Catalog, anyhow::Error> {
484 let metrics_registry = &MetricsRegistry::new();
485 let secrets_reader = Arc::new(InMemorySecretsController::new());
486 let previous_ts = now().into();
489 let replica_size = &bootstrap_args.default_cluster_replica_size;
490 let read_only = false;
491
492 let OpenCatalogResult {
493 catalog,
494 migrated_storage_collections_0dt: _,
495 new_builtin_collections: _,
496 builtin_table_updates: _,
497 cached_global_exprs: _,
498 uncached_local_exprs: _,
499 } = Catalog::open(Config {
500 storage,
501 metrics_registry,
502 state: StateConfig {
503 unsafe_mode: true,
504 all_features: false,
505 build_info,
506 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
507 read_only,
508 now,
509 boot_ts: previous_ts,
510 skip_migrations: true,
511 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
512 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
513 size: replica_size.clone(),
514 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
515 },
516 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
517 size: replica_size.clone(),
518 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
519 },
520 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
521 size: replica_size.clone(),
522 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
523 },
524 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
525 size: replica_size.clone(),
526 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
527 },
528 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
529 size: replica_size.clone(),
530 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
531 },
532 system_parameter_defaults,
533 remote_system_parameters: None,
534 availability_zones: vec![],
535 egress_addresses: vec![],
536 aws_principal_context: None,
537 aws_privatelink_availability_zones: None,
538 http_host_name: None,
539 connection_context: ConnectionContext::for_tests(secrets_reader),
540 builtin_item_migration_config: BuiltinItemMigrationConfig {
541 persist_client: persist_client.clone(),
542 read_only,
543 force_migration: None,
544 },
545 persist_client,
546 enable_expression_cache_override,
547 helm_chart_version: None,
548 external_login_password_mz_system: None,
549 license_key: ValidatedLicenseKey::for_tests(),
550 },
551 })
552 .await?;
553 Ok(catalog)
554 }
555
556 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
557 self.state.for_session(session)
558 }
559
560 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
561 self.state.for_sessionless_user(role_id)
562 }
563
564 pub fn for_system_session(&self) -> ConnCatalog<'_> {
565 self.state.for_system_session()
566 }
567
568 async fn storage<'a>(
569 &'a self,
570 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
571 self.storage.lock().await
572 }
573
574 pub async fn current_upper(&self) -> mz_repr::Timestamp {
575 self.storage().await.current_upper().await
576 }
577
578 pub async fn allocate_user_id(
579 &self,
580 commit_ts: mz_repr::Timestamp,
581 ) -> Result<(CatalogItemId, GlobalId), Error> {
582 self.storage()
583 .await
584 .allocate_user_id(commit_ts)
585 .await
586 .maybe_terminate("allocating user ids")
587 .err_into()
588 }
589
590 pub async fn allocate_user_ids(
592 &self,
593 amount: u64,
594 commit_ts: mz_repr::Timestamp,
595 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
596 self.storage()
597 .await
598 .allocate_user_ids(amount, commit_ts)
599 .await
600 .maybe_terminate("allocating user ids")
601 .err_into()
602 }
603
604 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
605 let commit_ts = self.storage().await.current_upper().await;
606 self.allocate_user_id(commit_ts).await
607 }
608
609 pub async fn allocate_storage_usage_id(
617 &self,
618 commit_ts: mz_repr::Timestamp,
619 ) -> Result<u64, Error> {
620 use mz_ore::collections::CollectionExt;
621
622 self.storage()
623 .await
624 .allocate_id(STORAGE_USAGE_ID_ALLOC_KEY, 1, commit_ts)
625 .await
626 .maybe_terminate("allocating storage usage id")
627 .map(|ids| ids.into_element())
628 .err_into()
629 }
630
631 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
633 self.storage()
634 .await
635 .get_next_user_item_id()
636 .await
637 .err_into()
638 }
639
640 #[cfg(test)]
641 pub async fn allocate_system_id(
642 &self,
643 commit_ts: mz_repr::Timestamp,
644 ) -> Result<(CatalogItemId, GlobalId), Error> {
645 use mz_ore::collections::CollectionExt;
646
647 let mut storage = self.storage().await;
648 let mut txn = storage.transaction().await?;
649 let id = txn
650 .allocate_system_item_ids(1)
651 .maybe_terminate("allocating system ids")?
652 .into_element();
653 let _ = txn.get_and_commit_op_updates();
655 txn.commit(commit_ts).await?;
656 Ok(id)
657 }
658
659 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
661 self.storage()
662 .await
663 .get_next_system_item_id()
664 .await
665 .err_into()
666 }
667
668 pub async fn allocate_user_cluster_id(
669 &self,
670 commit_ts: mz_repr::Timestamp,
671 ) -> Result<ClusterId, Error> {
672 self.storage()
673 .await
674 .allocate_user_cluster_id(commit_ts)
675 .await
676 .maybe_terminate("allocating user cluster ids")
677 .err_into()
678 }
679
680 pub async fn allocate_user_replica_ids(
683 &self,
684 amount: u64,
685 commit_ts: mz_repr::Timestamp,
686 ) -> Result<Vec<ReplicaId>, Error> {
687 self.storage()
688 .await
689 .allocate_user_replica_ids(amount, commit_ts)
690 .await
691 .maybe_terminate("allocating user replica ids")
692 .err_into()
693 }
694
695 pub async fn allocate_system_replica_ids(
698 &self,
699 amount: u64,
700 commit_ts: mz_repr::Timestamp,
701 ) -> Result<Vec<ReplicaId>, Error> {
702 self.storage()
703 .await
704 .allocate_system_replica_ids(amount, commit_ts)
705 .await
706 .maybe_terminate("allocating system replica ids")
707 .err_into()
708 }
709
710 pub async fn allocate_replica_ids(
713 &self,
714 cluster_id: ClusterId,
715 amount: u64,
716 commit_ts: mz_repr::Timestamp,
717 ) -> Result<Vec<ReplicaId>, Error> {
718 if cluster_id.is_system() {
719 self.allocate_system_replica_ids(amount, commit_ts).await
720 } else {
721 self.allocate_user_replica_ids(amount, commit_ts).await
722 }
723 }
724
725 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
727 self.storage()
728 .await
729 .get_next_system_replica_id()
730 .await
731 .err_into()
732 }
733
734 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
736 self.storage()
737 .await
738 .get_next_user_replica_id()
739 .await
740 .err_into()
741 }
742
743 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
744 self.state.resolve_database(database_name)
745 }
746
747 pub fn resolve_schema(
748 &self,
749 current_database: Option<&DatabaseId>,
750 database_name: Option<&str>,
751 schema_name: &str,
752 conn_id: &ConnectionId,
753 ) -> Result<&Schema, SqlCatalogError> {
754 self.state
755 .resolve_schema(current_database, database_name, schema_name, conn_id)
756 }
757
758 pub fn resolve_schema_in_database(
759 &self,
760 database_spec: &ResolvedDatabaseSpecifier,
761 schema_name: &str,
762 conn_id: &ConnectionId,
763 ) -> Result<&Schema, SqlCatalogError> {
764 self.state
765 .resolve_schema_in_database(database_spec, schema_name, conn_id)
766 }
767
768 pub fn resolve_replica_in_cluster(
769 &self,
770 cluster_id: &ClusterId,
771 replica_name: &str,
772 ) -> Result<&ClusterReplica, SqlCatalogError> {
773 self.state
774 .resolve_replica_in_cluster(cluster_id, replica_name)
775 }
776
777 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
778 self.state.resolve_system_schema(name)
779 }
780
781 pub fn resolve_search_path(
782 &self,
783 session: &Session,
784 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
785 self.state.resolve_search_path(session)
786 }
787
788 pub fn resolve_entry(
790 &self,
791 current_database: Option<&DatabaseId>,
792 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
793 name: &PartialItemName,
794 conn_id: &ConnectionId,
795 ) -> Result<&CatalogEntry, SqlCatalogError> {
796 self.state
797 .resolve_entry(current_database, search_path, name, conn_id)
798 }
799
800 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
802 self.state.resolve_builtin_table(builtin)
803 }
804
805 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
807 self.state.resolve_builtin_log(builtin).0
808 }
809
810 pub fn resolve_builtin_storage_collection(
812 &self,
813 builtin: &'static BuiltinSource,
814 ) -> CatalogItemId {
815 self.state.resolve_builtin_source(builtin)
816 }
817
818 pub fn resolve_function(
820 &self,
821 current_database: Option<&DatabaseId>,
822 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
823 name: &PartialItemName,
824 conn_id: &ConnectionId,
825 ) -> Result<&CatalogEntry, SqlCatalogError> {
826 self.state
827 .resolve_function(current_database, search_path, name, conn_id)
828 }
829
830 pub fn resolve_type(
832 &self,
833 current_database: Option<&DatabaseId>,
834 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
835 name: &PartialItemName,
836 conn_id: &ConnectionId,
837 ) -> Result<&CatalogEntry, SqlCatalogError> {
838 self.state
839 .resolve_type(current_database, search_path, name, conn_id)
840 }
841
842 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
843 self.state.resolve_cluster(name)
844 }
845
846 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
852 self.state.resolve_builtin_cluster(cluster)
853 }
854
855 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
856 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
857 }
858
859 pub fn resolve_target_cluster(
861 &self,
862 target_cluster: TargetCluster,
863 session: &Session,
864 ) -> Result<&Cluster, AdapterError> {
865 match target_cluster {
866 TargetCluster::CatalogServer => {
867 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
868 }
869 TargetCluster::Active => self.active_cluster(session),
870 TargetCluster::Transaction(cluster_id) => self
871 .try_get_cluster(cluster_id)
872 .ok_or(AdapterError::ConcurrentClusterDrop),
873 }
874 }
875
876 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
877 if session.user().name != SYSTEM_USER.name
881 && session.user().name != SUPPORT_USER.name
882 && session.vars().cluster() == SYSTEM_USER.name
883 {
884 coord_bail!(
885 "system cluster '{}' cannot execute user queries",
886 SYSTEM_USER.name
887 );
888 }
889 let cluster = self.resolve_cluster(session.vars().cluster())?;
890 Ok(cluster)
891 }
892
893 pub fn state(&self) -> &CatalogState {
894 &self.state
895 }
896
897 pub fn resolve_full_name(
898 &self,
899 name: &QualifiedItemName,
900 conn_id: Option<&ConnectionId>,
901 ) -> FullItemName {
902 self.state.resolve_full_name(name, conn_id)
903 }
904
905 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
906 self.state.try_get_entry(id)
907 }
908
909 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
910 self.state.try_get_entry_by_global_id(id)
911 }
912
913 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
914 self.state.get_entry(id)
915 }
916
917 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
918 self.state.get_entry_by_global_id(id)
919 }
920
921 pub fn get_global_ids<'a>(
922 &'a self,
923 id: &CatalogItemId,
924 ) -> impl Iterator<Item = GlobalId> + use<'a> {
925 self.get_entry(id).global_ids()
926 }
927
928 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
929 self.get_entry_by_global_id(id).id()
930 }
931
932 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
933 let item = self.try_get_entry_by_global_id(id)?;
934 Some(item.id())
935 }
936
937 pub fn get_schema(
938 &self,
939 database_spec: &ResolvedDatabaseSpecifier,
940 schema_spec: &SchemaSpecifier,
941 conn_id: &ConnectionId,
942 ) -> &Schema {
943 self.state.get_schema(database_spec, schema_spec, conn_id)
944 }
945
946 pub fn try_get_schema(
947 &self,
948 database_spec: &ResolvedDatabaseSpecifier,
949 schema_spec: &SchemaSpecifier,
950 conn_id: &ConnectionId,
951 ) -> Option<&Schema> {
952 self.state
953 .try_get_schema(database_spec, schema_spec, conn_id)
954 }
955
956 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
957 self.state.get_mz_catalog_schema_id()
958 }
959
960 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
961 self.state.get_pg_catalog_schema_id()
962 }
963
964 pub fn get_information_schema_id(&self) -> SchemaId {
965 self.state.get_information_schema_id()
966 }
967
968 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
969 self.state.get_mz_internal_schema_id()
970 }
971
972 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
973 self.state.get_mz_introspection_schema_id()
974 }
975
976 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
977 self.state.get_mz_unsafe_schema_id()
978 }
979
980 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
981 self.state.system_schema_ids()
982 }
983
984 pub fn get_database(&self, id: &DatabaseId) -> &Database {
985 self.state.get_database(id)
986 }
987
988 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
989 self.state.try_get_role(id)
990 }
991
992 pub fn get_role(&self, id: &RoleId) -> &Role {
993 self.state.get_role(id)
994 }
995
996 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
997 self.state.try_get_role_by_name(role_name)
998 }
999
1000 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1001 self.state.try_get_role_auth_by_id(id)
1002 }
1003
1004 pub fn create_temporary_schema(
1007 &mut self,
1008 conn_id: &ConnectionId,
1009 owner_id: RoleId,
1010 ) -> Result<(), Error> {
1011 self.state.create_temporary_schema(conn_id, owner_id)
1012 }
1013
1014 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1015 self.state
1017 .temporary_schemas
1018 .get(conn_id)
1019 .map(|schema| schema.items.contains_key(item_name))
1020 .unwrap_or(false)
1021 }
1022
1023 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1026 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1027 return Ok(());
1028 };
1029 if !schema.items.is_empty() {
1030 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1031 }
1032 Ok(())
1033 }
1034
1035 pub(crate) fn object_dependents(
1036 &self,
1037 object_ids: &Vec<ObjectId>,
1038 conn_id: &ConnectionId,
1039 ) -> Vec<ObjectId> {
1040 let mut seen = BTreeSet::new();
1041 self.state.object_dependents(object_ids, conn_id, &mut seen)
1042 }
1043
1044 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1045 FullNameV1 {
1046 database: name.database.to_string(),
1047 schema: name.schema.clone(),
1048 item: name.item.clone(),
1049 }
1050 }
1051
1052 pub fn find_available_cluster_name(&self, name: &str) -> String {
1053 let mut i = 0;
1054 let mut candidate = name.to_string();
1055 while self.state.clusters_by_name.contains_key(&candidate) {
1056 i += 1;
1057 candidate = format!("{}{}", name, i);
1058 }
1059 candidate
1060 }
1061
1062 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1063 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1064 self.cluster_replica_sizes()
1065 .enabled_allocations()
1066 .map(|a| a.0.to_owned())
1067 .collect::<Vec<_>>()
1068 } else {
1069 self.system_config().allowed_cluster_replica_sizes()
1070 }
1071 }
1072
1073 pub fn concretize_replica_location(
1074 &self,
1075 location: mz_catalog::durable::ReplicaLocation,
1076 allowed_sizes: &Vec<String>,
1077 allowed_availability_zones: Option<&[String]>,
1078 allow_disabled: bool,
1079 ) -> Result<ReplicaLocation, Error> {
1080 self.state.concretize_replica_location(
1081 location,
1082 allowed_sizes,
1083 allowed_availability_zones,
1084 allow_disabled,
1085 )
1086 }
1087
1088 pub(crate) fn ensure_valid_replica_size(
1089 &self,
1090 allowed_sizes: &[String],
1091 size: &String,
1092 allow_disabled: bool,
1093 ) -> Result<(), Error> {
1094 self.state
1095 .ensure_valid_replica_size(allowed_sizes, size, allow_disabled)
1096 }
1097
1098 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1099 &self.state.cluster_replica_sizes
1100 }
1101
1102 pub fn get_privileges(
1104 &self,
1105 id: &SystemObjectId,
1106 conn_id: &ConnectionId,
1107 ) -> Option<&PrivilegeMap> {
1108 match id {
1109 SystemObjectId::Object(id) => match id {
1110 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1111 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1112 ObjectId::Schema((database_spec, schema_spec)) => Some(
1113 self.get_schema(database_spec, schema_spec, conn_id)
1114 .privileges(),
1115 ),
1116 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1117 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1118 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1119 },
1120 SystemObjectId::System => Some(&self.state.system_privileges),
1121 }
1122 }
1123
1124 #[mz_ore::instrument(level = "debug")]
1125 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1126 Ok(self.storage().await.advance_upper(new_upper).await?)
1127 }
1128
1129 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1131 self.state.introspection_dependencies(id)
1132 }
1133
1134 pub fn dump(&self) -> Result<CatalogDump, Error> {
1140 Ok(CatalogDump::new(self.state.dump(None)?))
1141 }
1142
1143 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1147 self.state.check_consistency().map_err(|inconsistencies| {
1148 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1149 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1150 })
1151 })
1152 }
1153
1154 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1155 self.state.config()
1156 }
1157
1158 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1159 self.state.entry_by_id.values()
1160 }
1161
1162 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1163 self.entries()
1164 .filter(|entry| entry.is_connection() && entry.id().is_user())
1165 }
1166
1167 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1168 self.entries()
1169 .filter(|entry| entry.is_table() && entry.id().is_user())
1170 }
1171
1172 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1173 self.entries()
1174 .filter(|entry| entry.is_source() && entry.id().is_user())
1175 }
1176
1177 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1178 self.entries()
1179 .filter(|entry| entry.is_sink() && entry.id().is_user())
1180 }
1181
1182 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1183 self.entries()
1184 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1185 }
1186
1187 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1188 self.entries()
1189 .filter(|entry| entry.is_secret() && entry.id().is_user())
1190 }
1191
1192 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1193 self.state.get_network_policy(&network_policy_id)
1194 }
1195
1196 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1197 self.state.try_get_network_policy_by_name(name)
1198 }
1199
1200 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1201 self.state.clusters_by_id.values()
1202 }
1203
1204 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1205 self.state.get_cluster(cluster_id)
1206 }
1207
1208 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1209 self.state.try_get_cluster(cluster_id)
1210 }
1211
1212 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1213 self.clusters().filter(|cluster| cluster.id.is_user())
1214 }
1215
1216 pub fn get_cluster_replica(
1217 &self,
1218 cluster_id: ClusterId,
1219 replica_id: ReplicaId,
1220 ) -> &ClusterReplica {
1221 self.state.get_cluster_replica(cluster_id, replica_id)
1222 }
1223
1224 pub fn try_get_cluster_replica(
1225 &self,
1226 cluster_id: ClusterId,
1227 replica_id: ReplicaId,
1228 ) -> Option<&ClusterReplica> {
1229 self.state.try_get_cluster_replica(cluster_id, replica_id)
1230 }
1231
1232 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1233 self.user_clusters()
1234 .flat_map(|cluster| cluster.user_replicas())
1235 }
1236
1237 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1238 self.state.database_by_id.values()
1239 }
1240
1241 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1242 self.state
1243 .roles_by_id
1244 .values()
1245 .filter(|role| role.is_user())
1246 }
1247
1248 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1249 self.state
1250 .network_policies_by_id
1251 .iter()
1252 .filter(|(id, _)| id.is_user())
1253 .map(|(_, policy)| policy)
1254 }
1255
1256 pub fn system_privileges(&self) -> &PrivilegeMap {
1257 &self.state.system_privileges
1258 }
1259
1260 pub fn default_privileges(
1261 &self,
1262 ) -> impl Iterator<
1263 Item = (
1264 &DefaultPrivilegeObject,
1265 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1266 ),
1267 > {
1268 self.state.default_privileges.iter()
1269 }
1270
1271 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1272 self.state
1273 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1274 }
1275
1276 pub fn pack_storage_usage_update(
1277 &self,
1278 event: VersionedStorageUsage,
1279 diff: Diff,
1280 ) -> BuiltinTableUpdate {
1281 self.state
1282 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1283 }
1284
1285 pub fn system_config(&self) -> &SystemVars {
1286 self.state.system_config()
1287 }
1288
1289 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1290 self.state.system_config_mut()
1291 }
1292
1293 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1294 self.state.ensure_not_reserved_role(role_id)
1295 }
1296
1297 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1298 self.state.ensure_grantable_role(role_id)
1299 }
1300
1301 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1302 self.state.ensure_not_system_role(role_id)
1303 }
1304
1305 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1306 self.state.ensure_not_predefined_role(role_id)
1307 }
1308
1309 pub fn ensure_not_reserved_network_policy(
1310 &self,
1311 network_policy_id: &NetworkPolicyId,
1312 ) -> Result<(), Error> {
1313 self.state
1314 .ensure_not_reserved_network_policy(network_policy_id)
1315 }
1316
1317 pub fn ensure_not_reserved_object(
1318 &self,
1319 object_id: &ObjectId,
1320 conn_id: &ConnectionId,
1321 ) -> Result<(), Error> {
1322 match object_id {
1323 ObjectId::Cluster(cluster_id) => {
1324 if cluster_id.is_system() {
1325 let cluster = self.get_cluster(*cluster_id);
1326 Err(Error::new(ErrorKind::ReadOnlyCluster(
1327 cluster.name().to_string(),
1328 )))
1329 } else {
1330 Ok(())
1331 }
1332 }
1333 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1334 if replica_id.is_system() {
1335 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1336 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1337 replica.name().to_string(),
1338 )))
1339 } else {
1340 Ok(())
1341 }
1342 }
1343 ObjectId::Database(database_id) => {
1344 if database_id.is_system() {
1345 let database = self.get_database(database_id);
1346 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1347 database.name().to_string(),
1348 )))
1349 } else {
1350 Ok(())
1351 }
1352 }
1353 ObjectId::Schema((database_spec, schema_spec)) => {
1354 if schema_spec.is_system() {
1355 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1356 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1357 schema.name().schema.clone(),
1358 )))
1359 } else {
1360 Ok(())
1361 }
1362 }
1363 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1364 ObjectId::Item(item_id) => {
1365 if item_id.is_system() {
1366 let item = self.get_entry(item_id);
1367 let name = self.resolve_full_name(item.name(), Some(conn_id));
1368 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1369 } else {
1370 Ok(())
1371 }
1372 }
1373 ObjectId::NetworkPolicy(network_policy_id) => {
1374 self.ensure_not_reserved_network_policy(network_policy_id)
1375 }
1376 }
1377 }
1378
1379 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1381 &mut self,
1382 create_sql: &str,
1383 force_if_exists_skip: bool,
1384 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1385 self.state
1386 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1387 }
1388
1389 pub(crate) fn cache_expressions(
1400 &self,
1401 id: GlobalId,
1402 local_mir: Option<OptimizedMirRelationExpr>,
1403 mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
1404 mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
1405 dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
1406 optimizer_features: OptimizerFeatures,
1407 ) -> BoxFuture<'static, ()> {
1408 global_mir.as_of = None;
1412 global_mir.until = Default::default();
1413 physical_plan.as_of = None;
1414 physical_plan.until = Default::default();
1415
1416 let mut local_exprs = Vec::new();
1417 if let Some(local_mir) = local_mir {
1418 local_exprs.push((
1419 id,
1420 LocalExpressions {
1421 local_mir,
1422 optimizer_features: optimizer_features.clone(),
1423 },
1424 ));
1425 }
1426 let global_exprs = vec![(
1427 id,
1428 GlobalExpressions {
1429 global_mir,
1430 physical_plan,
1431 dataflow_metainfos,
1432 optimizer_features,
1433 },
1434 )];
1435 self.update_expression_cache(local_exprs, global_exprs, Default::default())
1436 }
1437
1438 pub(crate) fn update_expression_cache<'a, 'b>(
1439 &'a self,
1440 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1441 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1442 invalidate_ids: BTreeSet<GlobalId>,
1443 ) -> BoxFuture<'b, ()> {
1444 if let Some(expr_cache) = &self.expr_cache_handle {
1445 expr_cache
1446 .update(
1447 new_local_expressions,
1448 new_global_expressions,
1449 invalidate_ids,
1450 )
1451 .boxed()
1452 } else {
1453 async {}.boxed()
1454 }
1455 }
1456
1457 #[cfg(test)]
1461 async fn sync_to_current_updates(
1462 &mut self,
1463 ) -> Result<
1464 (
1465 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1466 Vec<ParsedStateUpdate>,
1467 ),
1468 CatalogError,
1469 > {
1470 let updates = self.storage().await.sync_to_current_updates().await?;
1471 let (builtin_table_updates, catalog_updates) = self
1472 .state
1473 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1474 .await;
1475 Ok((builtin_table_updates, catalog_updates))
1476 }
1477}
1478
1479pub fn is_reserved_name(name: &str) -> bool {
1480 BUILTIN_PREFIXES
1481 .iter()
1482 .any(|prefix| name.starts_with(prefix))
1483}
1484
1485pub fn is_reserved_role_name(name: &str) -> bool {
1486 is_reserved_name(name) || is_public_role(name)
1487}
1488
1489pub fn is_public_role(name: &str) -> bool {
1490 name == &*PUBLIC_ROLE_NAME
1491}
1492
1493pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1494 object_type_to_audit_object_type(sql_type.into())
1495}
1496
1497pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1498 match id {
1499 CommentObjectId::Table(_) => ObjectType::Table,
1500 CommentObjectId::View(_) => ObjectType::View,
1501 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1502 CommentObjectId::Source(_) => ObjectType::Source,
1503 CommentObjectId::Sink(_) => ObjectType::Sink,
1504 CommentObjectId::Index(_) => ObjectType::Index,
1505 CommentObjectId::Func(_) => ObjectType::Func,
1506 CommentObjectId::Connection(_) => ObjectType::Connection,
1507 CommentObjectId::Type(_) => ObjectType::Type,
1508 CommentObjectId::Secret(_) => ObjectType::Secret,
1509 CommentObjectId::Role(_) => ObjectType::Role,
1510 CommentObjectId::Database(_) => ObjectType::Database,
1511 CommentObjectId::Schema(_) => ObjectType::Schema,
1512 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1513 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1514 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1515 }
1516}
1517
1518pub(crate) fn object_type_to_audit_object_type(
1519 object_type: mz_sql::catalog::ObjectType,
1520) -> ObjectType {
1521 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1522}
1523
1524pub(crate) fn system_object_type_to_audit_object_type(
1525 system_type: &SystemObjectType,
1526) -> ObjectType {
1527 match system_type {
1528 SystemObjectType::Object(object_type) => match object_type {
1529 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1530 mz_sql::catalog::ObjectType::View => ObjectType::View,
1531 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1532 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1533 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1534 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1535 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1536 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1537 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1538 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1539 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1540 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1541 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1542 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1543 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1544 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1545 },
1546 SystemObjectType::System => ObjectType::System,
1547 }
1548}
1549
1550#[derive(Debug, Copy, Clone)]
1551pub enum UpdatePrivilegeVariant {
1552 Grant,
1553 Revoke,
1554}
1555
1556impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1557 fn from(variant: UpdatePrivilegeVariant) -> Self {
1558 match variant {
1559 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1560 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1561 }
1562 }
1563}
1564
1565impl From<UpdatePrivilegeVariant> for EventType {
1566 fn from(variant: UpdatePrivilegeVariant) -> Self {
1567 match variant {
1568 UpdatePrivilegeVariant::Grant => EventType::Grant,
1569 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1570 }
1571 }
1572}
1573
1574impl ConnCatalog<'_> {
1575 fn resolve_item_name(
1576 &self,
1577 name: &PartialItemName,
1578 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1579 self.resolve_item(name).map(|entry| entry.name())
1580 }
1581
1582 fn resolve_function_name(
1583 &self,
1584 name: &PartialItemName,
1585 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1586 self.resolve_function(name).map(|entry| entry.name())
1587 }
1588
1589 fn resolve_type_name(
1590 &self,
1591 name: &PartialItemName,
1592 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1593 self.resolve_type(name).map(|entry| entry.name())
1594 }
1595}
1596
1597impl ExprHumanizer for ConnCatalog<'_> {
1598 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1599 let entry = self.state.try_get_entry_by_global_id(&id)?;
1600 Some(self.resolve_full_name(entry.name()).to_string())
1601 }
1602
1603 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1604 let entry = self.state.try_get_entry_by_global_id(&id)?;
1605 Some(entry.name().item.clone())
1606 }
1607
1608 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1609 let entry = self.state.try_get_entry_by_global_id(&id)?;
1610 Some(self.resolve_full_name(entry.name()).into_parts())
1611 }
1612
1613 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1614 use SqlScalarType::*;
1615
1616 match typ {
1617 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1618 List {
1619 custom_id: Some(item_id),
1620 ..
1621 }
1622 | Map {
1623 custom_id: Some(item_id),
1624 ..
1625 } => {
1626 let item = self.get_item(item_id);
1627 self.minimal_qualification(item.name()).to_string()
1628 }
1629 List { element_type, .. } => {
1630 format!(
1631 "{} list",
1632 self.humanize_sql_scalar_type(element_type, postgres_compat)
1633 )
1634 }
1635 Map { value_type, .. } => format!(
1636 "map[{}=>{}]",
1637 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1638 self.humanize_sql_scalar_type(value_type, postgres_compat)
1639 ),
1640 Record {
1641 custom_id: Some(item_id),
1642 ..
1643 } => {
1644 let item = self.get_item(item_id);
1645 self.minimal_qualification(item.name()).to_string()
1646 }
1647 Record { fields, .. } => format!(
1648 "record({})",
1649 fields
1650 .iter()
1651 .map(|f| format!(
1652 "{}: {}",
1653 f.0,
1654 self.humanize_sql_column_type(&f.1, postgres_compat)
1655 ))
1656 .join(",")
1657 ),
1658 PgLegacyChar => "\"char\"".into(),
1659 Char { length } if !postgres_compat => match length {
1660 None => "char".into(),
1661 Some(length) => format!("char({})", length.into_u32()),
1662 },
1663 VarChar { max_length } if !postgres_compat => match max_length {
1664 None => "varchar".into(),
1665 Some(length) => format!("varchar({})", length.into_u32()),
1666 },
1667 UInt16 => "uint2".into(),
1668 UInt32 => "uint4".into(),
1669 UInt64 => "uint8".into(),
1670 ty => {
1671 let pgrepr_type = mz_pgrepr::Type::from(ty);
1672 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1673
1674 let res = if self
1675 .effective_search_path(true)
1676 .iter()
1677 .any(|(_, schema)| schema == &pg_catalog_schema)
1678 {
1679 pgrepr_type.name().to_string()
1680 } else {
1681 let name = QualifiedItemName {
1684 qualifiers: ItemQualifiers {
1685 database_spec: ResolvedDatabaseSpecifier::Ambient,
1686 schema_spec: pg_catalog_schema,
1687 },
1688 item: pgrepr_type.name().to_string(),
1689 };
1690 self.resolve_full_name(&name).to_string()
1691 };
1692 res
1693 }
1694 }
1695 }
1696
1697 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1698 let entry = self.state.try_get_entry_by_global_id(&id)?;
1699
1700 match entry.index() {
1701 Some(index) => {
1702 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1703 let mut on_names = on_desc
1704 .iter_names()
1705 .map(|col_name| col_name.to_string())
1706 .collect::<Vec<_>>();
1707
1708 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1709
1710 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1714 let mut ix_names = vec![String::new(); ix_arity];
1715
1716 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1718 let on_name = on_names.get_mut(on_pos).expect("on_name");
1719 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1720 std::mem::swap(on_name, ix_name);
1721 }
1722
1723 Some(ix_names) }
1725 None => {
1726 let desc = self.state.try_get_desc_by_global_id(&id)?;
1727 let column_names = desc
1728 .iter_names()
1729 .map(|col_name| col_name.to_string())
1730 .collect();
1731
1732 Some(column_names)
1733 }
1734 }
1735 }
1736
1737 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1738 let desc = self.state.try_get_desc_by_global_id(&id)?;
1739 Some(desc.get_name(column).to_string())
1740 }
1741
1742 fn id_exists(&self, id: GlobalId) -> bool {
1743 self.state.entry_by_global_id.contains_key(&id)
1744 }
1745}
1746
1747impl SessionCatalog for ConnCatalog<'_> {
1748 fn active_role_id(&self) -> &RoleId {
1749 &self.role_id
1750 }
1751
1752 fn restrict_to_user_objects(&self) -> bool {
1753 self.restrict_to_user_objects
1754 }
1755
1756 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1757 self.prepared_statements
1758 .as_ref()
1759 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1760 .flatten()
1761 }
1762
1763 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1764 self.portals
1765 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1766 }
1767
1768 fn active_database(&self) -> Option<&DatabaseId> {
1769 self.database.as_ref()
1770 }
1771
1772 fn active_cluster(&self) -> &str {
1773 &self.cluster
1774 }
1775
1776 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1777 &self.search_path
1778 }
1779
1780 fn resolve_database(
1781 &self,
1782 database_name: &str,
1783 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1784 Ok(self.state.resolve_database(database_name)?)
1785 }
1786
1787 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1788 self.state
1789 .database_by_id
1790 .get(id)
1791 .expect("database doesn't exist")
1792 }
1793
1794 #[allow(clippy::as_conversions)]
1796 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1797 self.state
1798 .database_by_id
1799 .values()
1800 .map(|database| database as &dyn CatalogDatabase)
1801 .collect()
1802 }
1803
1804 fn resolve_schema(
1805 &self,
1806 database_name: Option<&str>,
1807 schema_name: &str,
1808 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1809 Ok(self.state.resolve_schema(
1810 self.database.as_ref(),
1811 database_name,
1812 schema_name,
1813 &self.conn_id,
1814 )?)
1815 }
1816
1817 fn resolve_schema_in_database(
1818 &self,
1819 database_spec: &ResolvedDatabaseSpecifier,
1820 schema_name: &str,
1821 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1822 Ok(self
1823 .state
1824 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1825 }
1826
1827 fn get_schema(
1828 &self,
1829 database_spec: &ResolvedDatabaseSpecifier,
1830 schema_spec: &SchemaSpecifier,
1831 ) -> &dyn CatalogSchema {
1832 self.state
1833 .get_schema(database_spec, schema_spec, &self.conn_id)
1834 }
1835
1836 #[allow(clippy::as_conversions)]
1838 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1839 self.get_databases()
1840 .into_iter()
1841 .flat_map(|database| database.schemas().into_iter())
1842 .chain(
1843 self.state
1844 .ambient_schemas_by_id
1845 .values()
1846 .chain(self.state.temporary_schemas.values())
1847 .map(|schema| schema as &dyn CatalogSchema),
1848 )
1849 .collect()
1850 }
1851
1852 fn get_mz_internal_schema_id(&self) -> SchemaId {
1853 self.state().get_mz_internal_schema_id()
1854 }
1855
1856 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1857 self.state().get_mz_unsafe_schema_id()
1858 }
1859
1860 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1861 self.state.is_system_schema_specifier(schema)
1862 }
1863
1864 fn resolve_role(
1865 &self,
1866 role_name: &str,
1867 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1868 match self.state.try_get_role_by_name(role_name) {
1869 Some(role) => Ok(role),
1870 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1871 }
1872 }
1873
1874 fn resolve_network_policy(
1875 &self,
1876 policy_name: &str,
1877 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1878 match self.state.try_get_network_policy_by_name(policy_name) {
1879 Some(policy) => Ok(policy),
1880 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1881 }
1882 }
1883
1884 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1885 Some(self.state.roles_by_id.get(id)?)
1886 }
1887
1888 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1889 self.state.get_role(id)
1890 }
1891
1892 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1893 #[allow(clippy::as_conversions)]
1895 self.state
1896 .roles_by_id
1897 .values()
1898 .map(|role| role as &dyn CatalogRole)
1899 .collect()
1900 }
1901
1902 fn mz_system_role_id(&self) -> RoleId {
1903 MZ_SYSTEM_ROLE_ID
1904 }
1905
1906 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1907 self.state.collect_role_membership(id)
1908 }
1909
1910 fn get_network_policy(
1911 &self,
1912 id: &NetworkPolicyId,
1913 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1914 self.state.get_network_policy(id)
1915 }
1916
1917 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1918 #[allow(clippy::as_conversions)]
1920 self.state
1921 .network_policies_by_id
1922 .values()
1923 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1924 .collect()
1925 }
1926
1927 fn resolve_cluster(
1928 &self,
1929 cluster_name: Option<&str>,
1930 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1931 Ok(self
1932 .state
1933 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1934 }
1935
1936 fn resolve_cluster_replica(
1937 &self,
1938 cluster_replica_name: &QualifiedReplica,
1939 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1940 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1941 }
1942
1943 fn resolve_item(
1944 &self,
1945 name: &PartialItemName,
1946 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1947 let r = self.state.resolve_entry(
1948 self.database.as_ref(),
1949 &self.effective_search_path(true),
1950 name,
1951 &self.conn_id,
1952 )?;
1953 if self.unresolvable_ids.contains(&r.id()) {
1954 Err(SqlCatalogError::UnknownItem(name.to_string()))
1955 } else {
1956 Ok(r)
1957 }
1958 }
1959
1960 fn resolve_function(
1961 &self,
1962 name: &PartialItemName,
1963 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1964 let r = self.state.resolve_function(
1965 self.database.as_ref(),
1966 &self.effective_search_path(false),
1967 name,
1968 &self.conn_id,
1969 )?;
1970
1971 if self.unresolvable_ids.contains(&r.id()) {
1972 Err(SqlCatalogError::UnknownFunction {
1973 name: name.to_string(),
1974 alternative: None,
1975 })
1976 } else {
1977 Ok(r)
1978 }
1979 }
1980
1981 fn resolve_type(
1982 &self,
1983 name: &PartialItemName,
1984 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1985 let r = self.state.resolve_type(
1986 self.database.as_ref(),
1987 &self.effective_search_path(false),
1988 name,
1989 &self.conn_id,
1990 )?;
1991
1992 if self.unresolvable_ids.contains(&r.id()) {
1993 Err(SqlCatalogError::UnknownType {
1994 name: name.to_string(),
1995 })
1996 } else {
1997 Ok(r)
1998 }
1999 }
2000
2001 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2002 self.state.get_system_type(name)
2003 }
2004
2005 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2006 Some(self.state.try_get_entry(id)?)
2007 }
2008
2009 fn try_get_item_by_global_id(
2010 &self,
2011 id: &GlobalId,
2012 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2013 let entry = self.state.try_get_entry_by_global_id(id)?;
2014 let entry = match &entry.item {
2015 CatalogItem::Table(table) => {
2016 let (version, _gid) = table
2017 .collections
2018 .iter()
2019 .find(|(_version, gid)| *gid == id)
2020 .expect("catalog out of sync, mismatched GlobalId");
2021 entry.at_version(RelationVersionSelector::Specific(*version))
2022 }
2023 _ => entry.at_version(RelationVersionSelector::Latest),
2024 };
2025 Some(entry)
2026 }
2027
2028 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2029 self.state.get_entry(id)
2030 }
2031
2032 fn get_item_by_global_id(
2033 &self,
2034 id: &GlobalId,
2035 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2036 let entry = self.state.get_entry_by_global_id(id);
2037 let entry = match &entry.item {
2038 CatalogItem::Table(table) => {
2039 let (version, _gid) = table
2040 .collections
2041 .iter()
2042 .find(|(_version, gid)| *gid == id)
2043 .expect("catalog out of sync, mismatched GlobalId");
2044 entry.at_version(RelationVersionSelector::Specific(*version))
2045 }
2046 _ => entry.at_version(RelationVersionSelector::Latest),
2047 };
2048 entry
2049 }
2050
2051 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2052 self.get_schemas()
2053 .into_iter()
2054 .flat_map(|schema| schema.item_ids())
2055 .map(|id| self.get_item(&id))
2056 .collect()
2057 }
2058
2059 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2060 self.state
2061 .get_item_by_name(name, &self.conn_id)
2062 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2063 }
2064
2065 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2066 self.state
2067 .get_type_by_name(name, &self.conn_id)
2068 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2069 }
2070
2071 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2072 &self.state.clusters_by_id[&id]
2073 }
2074
2075 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2076 self.state
2077 .clusters_by_id
2078 .values()
2079 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2080 .collect()
2081 }
2082
2083 fn get_cluster_replica(
2084 &self,
2085 cluster_id: ClusterId,
2086 replica_id: ReplicaId,
2087 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2088 let cluster = self.get_cluster(cluster_id);
2089 cluster.replica(replica_id)
2090 }
2091
2092 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2093 self.get_clusters()
2094 .into_iter()
2095 .flat_map(|cluster| cluster.replicas().into_iter())
2096 .collect()
2097 }
2098
2099 fn get_system_privileges(&self) -> &PrivilegeMap {
2100 &self.state.system_privileges
2101 }
2102
2103 fn get_default_privileges(
2104 &self,
2105 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2106 self.state
2107 .default_privileges
2108 .iter()
2109 .map(|(object, acl_items)| (object, acl_items.collect()))
2110 .collect()
2111 }
2112
2113 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2114 self.state.find_available_name(name, &self.conn_id)
2115 }
2116
2117 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2118 self.state.resolve_full_name(name, Some(&self.conn_id))
2119 }
2120
2121 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2122 self.state.resolve_full_schema_name(name)
2123 }
2124
2125 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2126 self.state.get_entry_by_global_id(global_id).id()
2127 }
2128
2129 fn resolve_global_id(
2130 &self,
2131 item_id: &CatalogItemId,
2132 version: RelationVersionSelector,
2133 ) -> GlobalId {
2134 self.state
2135 .get_entry(item_id)
2136 .at_version(version)
2137 .global_id()
2138 }
2139
2140 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2141 self.state.config()
2142 }
2143
2144 fn now(&self) -> EpochMillis {
2145 (self.state.config().now)()
2146 }
2147
2148 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2149 self.state.aws_privatelink_availability_zones.clone()
2150 }
2151
2152 fn system_vars(&self) -> &SystemVars {
2153 &self.state.system_configuration
2154 }
2155
2156 fn system_vars_mut(&mut self) -> &mut SystemVars {
2157 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2158 }
2159
2160 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2161 self.state().get_owner_id(id, self.conn_id())
2162 }
2163
2164 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2165 match id {
2166 SystemObjectId::System => Some(&self.state.system_privileges),
2167 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2168 Some(self.get_cluster(*id).privileges())
2169 }
2170 SystemObjectId::Object(ObjectId::Database(id)) => {
2171 Some(self.get_database(id).privileges())
2172 }
2173 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2174 self.state
2177 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2178 .map(|schema| schema.privileges())
2179 }
2180 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2181 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2182 Some(self.get_network_policy(id).privileges())
2183 }
2184 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2185 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2186 }
2187 }
2188
2189 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2190 let mut seen = BTreeSet::new();
2191 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2192 }
2193
2194 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2195 let mut seen = BTreeSet::new();
2196 self.state.item_dependents(id, &mut seen)
2197 }
2198
2199 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2200 rbac::all_object_privileges(object_type)
2201 }
2202
2203 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2204 self.state.get_object_type(object_id)
2205 }
2206
2207 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2208 self.state.get_system_object_type(id)
2209 }
2210
2211 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2218 if qualified_name.qualifiers.schema_spec.is_temporary() {
2219 return qualified_name.item.clone().into();
2227 }
2228
2229 let database_id = match &qualified_name.qualifiers.database_spec {
2230 ResolvedDatabaseSpecifier::Ambient => None,
2231 ResolvedDatabaseSpecifier::Id(id)
2232 if self.database.is_some() && self.database == Some(*id) =>
2233 {
2234 None
2235 }
2236 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2237 };
2238
2239 let schema_spec = if database_id.is_none()
2240 && self.resolve_item_name(&PartialItemName {
2241 database: None,
2242 schema: None,
2243 item: qualified_name.item.clone(),
2244 }) == Ok(qualified_name)
2245 || self.resolve_function_name(&PartialItemName {
2246 database: None,
2247 schema: None,
2248 item: qualified_name.item.clone(),
2249 }) == Ok(qualified_name)
2250 || self.resolve_type_name(&PartialItemName {
2251 database: None,
2252 schema: None,
2253 item: qualified_name.item.clone(),
2254 }) == Ok(qualified_name)
2255 {
2256 None
2257 } else {
2258 Some(qualified_name.qualifiers.schema_spec.clone())
2261 };
2262
2263 let res = PartialItemName {
2264 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2265 schema: schema_spec.map(|spec| {
2266 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2267 .name()
2268 .schema
2269 .clone()
2270 }),
2271 item: qualified_name.item.clone(),
2272 };
2273 assert!(
2274 self.resolve_item_name(&res) == Ok(qualified_name)
2275 || self.resolve_function_name(&res) == Ok(qualified_name)
2276 || self.resolve_type_name(&res) == Ok(qualified_name)
2277 );
2278 res
2279 }
2280
2281 fn add_notice(&self, notice: PlanNotice) {
2282 let _ = self.notices_tx.send(notice.into());
2283 }
2284
2285 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2286 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2287 self.state.comments.get_object_comments(comment_id)
2288 }
2289
2290 fn is_cluster_size_cc(&self, size: &str) -> bool {
2291 self.state
2292 .cluster_replica_sizes
2293 .0
2294 .get(size)
2295 .map_or(false, |a| a.is_cc)
2296 }
2297}
2298
2299#[cfg(test)]
2300mod tests {
2301 use std::collections::{BTreeMap, BTreeSet};
2302 use std::sync::Arc;
2303 use std::{env, iter};
2304
2305 use itertools::Itertools;
2306 use mz_catalog::memory::objects::CatalogItem;
2307 use mz_postgres_util::{query, sql};
2308 use tokio_postgres::NoTls;
2309 use tokio_postgres::types::Type;
2310 use uuid::Uuid;
2311
2312 use mz_catalog::SYSTEM_CONN_ID;
2313 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2314 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2315 use mz_controller_types::{ClusterId, ReplicaId};
2316 use mz_expr::{Eval, MirScalarExpr};
2317 use mz_ore::now::to_datetime;
2318 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2319 use mz_persist_client::PersistClient;
2320 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2321 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2322 use mz_repr::role_id::RoleId;
2323 use mz_repr::{
2324 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2325 SqlScalarType, Timestamp,
2326 };
2327 use mz_sql::catalog::{CatalogSchema, CatalogType, SessionCatalog};
2328 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2329 use mz_sql::names::{
2330 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2331 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2332 };
2333 use mz_sql::plan::{
2334 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2335 QueryLifetime, Scope, StatementContext,
2336 };
2337 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2338 use mz_sql::session::vars::{SystemVars, VarInput};
2339
2340 use crate::catalog::state::LocalExpressionCache;
2341 use crate::catalog::{Catalog, Op};
2342 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2343 use crate::session::Session;
2344
2345 #[mz_ore::test(tokio::test)]
2352 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2354 Catalog::with_debug(|catalog| async move {
2355 struct TestCase {
2356 input: QualifiedItemName,
2357 system_output: PartialItemName,
2358 normal_output: PartialItemName,
2359 }
2360
2361 let test_cases = vec![
2362 TestCase {
2363 input: QualifiedItemName {
2364 qualifiers: ItemQualifiers {
2365 database_spec: ResolvedDatabaseSpecifier::Ambient,
2366 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2367 },
2368 item: "numeric".to_string(),
2369 },
2370 system_output: PartialItemName {
2371 database: None,
2372 schema: None,
2373 item: "numeric".to_string(),
2374 },
2375 normal_output: PartialItemName {
2376 database: None,
2377 schema: None,
2378 item: "numeric".to_string(),
2379 },
2380 },
2381 TestCase {
2382 input: QualifiedItemName {
2383 qualifiers: ItemQualifiers {
2384 database_spec: ResolvedDatabaseSpecifier::Ambient,
2385 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2386 },
2387 item: "mz_array_types".to_string(),
2388 },
2389 system_output: PartialItemName {
2390 database: None,
2391 schema: None,
2392 item: "mz_array_types".to_string(),
2393 },
2394 normal_output: PartialItemName {
2395 database: None,
2396 schema: None,
2397 item: "mz_array_types".to_string(),
2398 },
2399 },
2400 ];
2401
2402 for tc in test_cases {
2403 assert_eq!(
2404 catalog
2405 .for_system_session()
2406 .minimal_qualification(&tc.input),
2407 tc.system_output
2408 );
2409 assert_eq!(
2410 catalog
2411 .for_session(&Session::dummy())
2412 .minimal_qualification(&tc.input),
2413 tc.normal_output
2414 );
2415 }
2416 catalog.expire().await;
2417 })
2418 .await
2419 }
2420
2421 #[mz_ore::test(tokio::test)]
2422 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2424 let persist_client = PersistClient::new_for_tests().await;
2425 let organization_id = Uuid::new_v4();
2426 let bootstrap_args = test_bootstrap_args();
2427 {
2428 let mut catalog = Catalog::open_debug_catalog(
2429 persist_client.clone(),
2430 organization_id.clone(),
2431 &bootstrap_args,
2432 )
2433 .await
2434 .expect("unable to open debug catalog");
2435 assert_eq!(catalog.transient_revision(), 1);
2436 let commit_ts = catalog.current_upper().await;
2437 catalog
2438 .transact(
2439 None,
2440 commit_ts,
2441 None,
2442 vec![Op::CreateDatabase {
2443 name: "test".to_string(),
2444 owner_id: MZ_SYSTEM_ROLE_ID,
2445 }],
2446 )
2447 .await
2448 .expect("failed to transact");
2449 assert_eq!(catalog.transient_revision(), 2);
2450 catalog.expire().await;
2451 }
2452 {
2453 let catalog =
2454 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2455 .await
2456 .expect("unable to open debug catalog");
2457 assert_eq!(catalog.transient_revision(), 1);
2459 catalog.expire().await;
2460 }
2461 }
2462
2463 #[mz_ore::test(tokio::test)]
2464 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2466 Catalog::with_debug(|catalog| async move {
2467 let mz_catalog_schema = (
2468 ResolvedDatabaseSpecifier::Ambient,
2469 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2470 );
2471 let pg_catalog_schema = (
2472 ResolvedDatabaseSpecifier::Ambient,
2473 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2474 );
2475 let mz_temp_schema = (
2476 ResolvedDatabaseSpecifier::Ambient,
2477 SchemaSpecifier::Temporary,
2478 );
2479
2480 let session = Session::dummy();
2482 let conn_catalog = catalog.for_session(&session);
2483 assert_ne!(
2484 conn_catalog.effective_search_path(false),
2485 conn_catalog.search_path
2486 );
2487 assert_ne!(
2488 conn_catalog.effective_search_path(true),
2489 conn_catalog.search_path
2490 );
2491 assert_eq!(
2492 conn_catalog.effective_search_path(false),
2493 vec![
2494 mz_catalog_schema.clone(),
2495 pg_catalog_schema.clone(),
2496 conn_catalog.search_path[0].clone()
2497 ]
2498 );
2499 assert_eq!(
2500 conn_catalog.effective_search_path(true),
2501 vec![
2502 mz_temp_schema.clone(),
2503 mz_catalog_schema.clone(),
2504 pg_catalog_schema.clone(),
2505 conn_catalog.search_path[0].clone()
2506 ]
2507 );
2508
2509 let mut session = Session::dummy();
2511 session
2512 .vars_mut()
2513 .set(
2514 &SystemVars::new(),
2515 "search_path",
2516 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2517 false,
2518 )
2519 .expect("failed to set search_path");
2520 let conn_catalog = catalog.for_session(&session);
2521 assert_ne!(
2522 conn_catalog.effective_search_path(false),
2523 conn_catalog.search_path
2524 );
2525 assert_ne!(
2526 conn_catalog.effective_search_path(true),
2527 conn_catalog.search_path
2528 );
2529 assert_eq!(
2530 conn_catalog.effective_search_path(false),
2531 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2532 );
2533 assert_eq!(
2534 conn_catalog.effective_search_path(true),
2535 vec![
2536 mz_temp_schema.clone(),
2537 mz_catalog_schema.clone(),
2538 pg_catalog_schema.clone()
2539 ]
2540 );
2541
2542 let mut session = Session::dummy();
2543 session
2544 .vars_mut()
2545 .set(
2546 &SystemVars::new(),
2547 "search_path",
2548 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2549 false,
2550 )
2551 .expect("failed to set search_path");
2552 let conn_catalog = catalog.for_session(&session);
2553 assert_ne!(
2554 conn_catalog.effective_search_path(false),
2555 conn_catalog.search_path
2556 );
2557 assert_ne!(
2558 conn_catalog.effective_search_path(true),
2559 conn_catalog.search_path
2560 );
2561 assert_eq!(
2562 conn_catalog.effective_search_path(false),
2563 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2564 );
2565 assert_eq!(
2566 conn_catalog.effective_search_path(true),
2567 vec![
2568 mz_temp_schema.clone(),
2569 pg_catalog_schema.clone(),
2570 mz_catalog_schema.clone()
2571 ]
2572 );
2573
2574 let mut session = Session::dummy();
2575 session
2576 .vars_mut()
2577 .set(
2578 &SystemVars::new(),
2579 "search_path",
2580 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2581 false,
2582 )
2583 .expect("failed to set search_path");
2584 let conn_catalog = catalog.for_session(&session);
2585 assert_ne!(
2586 conn_catalog.effective_search_path(false),
2587 conn_catalog.search_path
2588 );
2589 assert_ne!(
2590 conn_catalog.effective_search_path(true),
2591 conn_catalog.search_path
2592 );
2593 assert_eq!(
2594 conn_catalog.effective_search_path(false),
2595 vec![
2596 mz_catalog_schema.clone(),
2597 pg_catalog_schema.clone(),
2598 mz_temp_schema.clone()
2599 ]
2600 );
2601 assert_eq!(
2602 conn_catalog.effective_search_path(true),
2603 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2604 );
2605 catalog.expire().await;
2606 })
2607 .await
2608 }
2609
2610 #[mz_ore::test(tokio::test)]
2611 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2613 use mz_ore::collections::CollectionExt;
2614 Catalog::with_debug(|catalog| async move {
2615 let conn_catalog = catalog.for_system_session();
2616 let scx = &mut StatementContext::new(None, &conn_catalog);
2617
2618 let parsed = mz_sql_parser::parser::parse_statements(
2619 "create view public.foo as select 1 as bar",
2620 )
2621 .expect("")
2622 .into_element()
2623 .ast;
2624
2625 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2626
2627 assert_eq!(
2629 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2630 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2631 );
2632 catalog.expire().await;
2633 })
2634 .await;
2635 }
2636
2637 #[mz_ore::test(tokio::test)]
2639 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2641 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2642 let column = ", 1";
2643 let view_def_size = view_def.bytes().count();
2644 let column_size = column.bytes().count();
2645 let column_count =
2646 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2647 let columns = iter::repeat(column).take(column_count).join("");
2648 let create_sql = format!("{view_def}{columns})");
2649 let create_sql_check = create_sql.clone();
2650 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2651 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2652 &create_sql
2653 ));
2654
2655 let persist_client = PersistClient::new_for_tests().await;
2656 let organization_id = Uuid::new_v4();
2657 let id = CatalogItemId::User(1);
2658 let gid = GlobalId::User(1);
2659 let bootstrap_args = test_bootstrap_args();
2660 {
2661 let mut catalog = Catalog::open_debug_catalog(
2662 persist_client.clone(),
2663 organization_id.clone(),
2664 &bootstrap_args,
2665 )
2666 .await
2667 .expect("unable to open debug catalog");
2668 let item = catalog
2669 .state()
2670 .deserialize_item(
2671 gid,
2672 &create_sql,
2673 &BTreeMap::new(),
2674 &mut LocalExpressionCache::Closed,
2675 None,
2676 )
2677 .expect("unable to parse view");
2678 let commit_ts = catalog.current_upper().await;
2679 catalog
2680 .transact(
2681 None,
2682 commit_ts,
2683 None,
2684 vec![Op::CreateItem {
2685 item,
2686 name: QualifiedItemName {
2687 qualifiers: ItemQualifiers {
2688 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2689 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2690 },
2691 item: "v".to_string(),
2692 },
2693 id,
2694 owner_id: MZ_SYSTEM_ROLE_ID,
2695 }],
2696 )
2697 .await
2698 .expect("failed to transact");
2699 catalog.expire().await;
2700 }
2701 {
2702 let catalog =
2703 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2704 .await
2705 .expect("unable to open debug catalog");
2706 let view = catalog.get_entry(&id);
2707 assert_eq!("v", view.name.item);
2708 match &view.item {
2709 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2710 item => panic!("expected view, got {}", item.typ()),
2711 }
2712 catalog.expire().await;
2713 }
2714 }
2715
2716 #[mz_ore::test(tokio::test)]
2717 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2719 Catalog::with_debug(|catalog| async move {
2720 let conn_catalog = catalog.for_system_session();
2721
2722 assert_eq!(
2723 mz_sql::catalog::ObjectType::ClusterReplica,
2724 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2725 ClusterId::user(1).expect("1 is a valid ID"),
2726 ReplicaId::User(1)
2727 )))
2728 );
2729 assert_eq!(
2730 mz_sql::catalog::ObjectType::Role,
2731 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2732 );
2733 catalog.expire().await;
2734 })
2735 .await;
2736 }
2737
2738 #[mz_ore::test(tokio::test)]
2739 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2741 Catalog::with_debug(|catalog| async move {
2742 let conn_catalog = catalog.for_system_session();
2743
2744 assert_eq!(
2745 None,
2746 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2747 ClusterId::user(1).expect("1 is a valid ID"),
2748 ReplicaId::User(1),
2749 ))))
2750 );
2751 assert_eq!(
2752 None,
2753 conn_catalog
2754 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2755 );
2756 catalog.expire().await;
2757 })
2758 .await;
2759 }
2760
2761 #[mz_ore::test(tokio::test)]
2762 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2764 Catalog::with_debug(|catalog| async move {
2765 let conn_catalog = catalog.for_system_session();
2766
2767 for builtin in BUILTINS::iter() {
2768 let (schema, name, expected_desc) = match builtin {
2769 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2770 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2771 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2772 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2773 Builtin::Log(_)
2774 | Builtin::Type(_)
2775 | Builtin::Func(_)
2776 | Builtin::Index(_)
2777 | Builtin::Connection(_) => continue,
2778 };
2779 let item = conn_catalog
2780 .resolve_item(&PartialItemName {
2781 database: None,
2782 schema: Some(schema.to_string()),
2783 item: name.to_string(),
2784 })
2785 .expect("unable to resolve item")
2786 .at_version(RelationVersionSelector::Latest);
2787
2788 let actual_desc = item.relation_desc().expect("invalid item type");
2789 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2790 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2791 {
2792 assert_eq!(
2793 actual_name, expected_name,
2794 "item {schema}.{name} column {index} name did not match its expected name"
2795 );
2796 assert_eq!(
2797 actual_typ, expected_typ,
2798 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2799 );
2800 }
2801 assert_eq!(
2802 &*actual_desc, expected_desc,
2803 "item {schema}.{name} did not match its expected RelationDesc"
2804 );
2805 }
2806 catalog.expire().await;
2807 })
2808 .await
2809 }
2810
2811 #[mz_ore::test(tokio::test)]
2814 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2816 async fn inner(catalog: Catalog) {
2817 let (client, connection) = tokio_postgres::connect(
2821 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2822 NoTls,
2823 )
2824 .await
2825 .expect("failed to connect to Postgres");
2826
2827 task::spawn(|| "compare_builtin_postgres", async move {
2828 if let Err(e) = connection.await {
2829 panic!("connection error: {}", e);
2830 }
2831 });
2832
2833 struct PgProc {
2834 name: String,
2835 arg_oids: Vec<u32>,
2836 ret_oid: Option<u32>,
2837 ret_set: bool,
2838 }
2839
2840 struct PgType {
2841 name: String,
2842 ty: String,
2843 elem: u32,
2844 array: u32,
2845 input: u32,
2846 receive: u32,
2847 }
2848
2849 struct PgOper {
2850 oprresult: u32,
2851 name: String,
2852 }
2853
2854 let pg_proc: BTreeMap<_, _> = query(
2855 &client,
2856 sql!(
2857 "SELECT
2858 p.oid,
2859 proname,
2860 proargtypes,
2861 prorettype,
2862 proretset
2863 FROM pg_proc p
2864 JOIN pg_namespace n ON p.pronamespace = n.oid"
2865 ),
2866 &[],
2867 )
2868 .await
2869 .expect("pg query failed")
2870 .into_iter()
2871 .map(|row| {
2872 let oid: u32 = row.get("oid");
2873 let pg_proc = PgProc {
2874 name: row.get("proname"),
2875 arg_oids: row.get("proargtypes"),
2876 ret_oid: row.get("prorettype"),
2877 ret_set: row.get("proretset"),
2878 };
2879 (oid, pg_proc)
2880 })
2881 .collect();
2882
2883 let pg_type: BTreeMap<_, _> = query(
2884 &client,
2885 sql!(
2886 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type"
2887 ),
2888 &[],
2889 )
2890 .await
2891 .expect("pg query failed")
2892 .into_iter()
2893 .map(|row| {
2894 let oid: u32 = row.get("oid");
2895 let pg_type = PgType {
2896 name: row.get("typname"),
2897 ty: row.get("typtype"),
2898 elem: row.get("typelem"),
2899 array: row.get("typarray"),
2900 input: row.get("typinput"),
2901 receive: row.get("typreceive"),
2902 };
2903 (oid, pg_type)
2904 })
2905 .collect();
2906
2907 let pg_oper: BTreeMap<_, _> = query(
2908 &client,
2909 sql!("SELECT oid, oprname, oprresult FROM pg_operator"),
2910 &[],
2911 )
2912 .await
2913 .expect("pg query failed")
2914 .into_iter()
2915 .map(|row| {
2916 let oid: u32 = row.get("oid");
2917 let pg_oper = PgOper {
2918 name: row.get("oprname"),
2919 oprresult: row.get("oprresult"),
2920 };
2921 (oid, pg_oper)
2922 })
2923 .collect();
2924
2925 let conn_catalog = catalog.for_system_session();
2926 let resolve_type_oid = |item: &str| {
2927 conn_catalog
2928 .resolve_type(&PartialItemName {
2929 database: None,
2930 schema: Some(PG_CATALOG_SCHEMA.into()),
2933 item: item.to_string(),
2934 })
2935 .expect("unable to resolve type")
2936 .oid()
2937 };
2938
2939 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2940 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2941 .collect();
2942
2943 let mut all_oids = BTreeSet::new();
2944
2945 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2948 [
2949 (Type::NAME, Type::TEXT),
2951 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2952 (Type::TIME, Type::TIMETZ),
2954 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2955 ]
2956 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2957 );
2958 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2959 1619, ]);
2961 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2962 if ignore_return_types.contains(&fn_oid) {
2963 return true;
2964 }
2965 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2966 return true;
2967 }
2968 a == b
2969 };
2970
2971 for builtin in BUILTINS::iter() {
2972 match builtin {
2973 Builtin::Type(ty) => {
2974 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2975
2976 if ty.oid >= FIRST_MATERIALIZE_OID {
2977 continue;
2980 }
2981
2982 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2985 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2986 });
2987 assert_eq!(
2988 ty.name, pg_ty.name,
2989 "oid {} has name {} in postgres; expected {}",
2990 ty.oid, pg_ty.name, ty.name,
2991 );
2992
2993 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2994 None => (0, 0),
2995 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2996 };
2997 assert_eq!(
2998 typinput_oid, pg_ty.input,
2999 "type {} has typinput OID {:?} in mz but {:?} in pg",
3000 ty.name, typinput_oid, pg_ty.input,
3001 );
3002 assert_eq!(
3003 typreceive_oid, pg_ty.receive,
3004 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3005 ty.name, typreceive_oid, pg_ty.receive,
3006 );
3007 if typinput_oid != 0 {
3008 assert!(
3009 func_oids.contains(&typinput_oid),
3010 "type {} has typinput OID {} that does not exist in pg_proc",
3011 ty.name,
3012 typinput_oid,
3013 );
3014 }
3015 if typreceive_oid != 0 {
3016 assert!(
3017 func_oids.contains(&typreceive_oid),
3018 "type {} has typreceive OID {} that does not exist in pg_proc",
3019 ty.name,
3020 typreceive_oid,
3021 );
3022 }
3023
3024 match &ty.details.typ {
3026 CatalogType::Array { element_reference } => {
3027 let elem_ty = BUILTINS::iter()
3028 .filter_map(|builtin| match builtin {
3029 Builtin::Type(ty @ BuiltinType { name, .. })
3030 if element_reference == name =>
3031 {
3032 Some(ty)
3033 }
3034 _ => None,
3035 })
3036 .next();
3037 let elem_ty = match elem_ty {
3038 Some(ty) => ty,
3039 None => {
3040 panic!("{} is unexpectedly not a type", element_reference)
3041 }
3042 };
3043 assert_eq!(
3044 pg_ty.elem, elem_ty.oid,
3045 "type {} has mismatched element OIDs",
3046 ty.name
3047 )
3048 }
3049 CatalogType::Pseudo => {
3050 assert_eq!(
3051 pg_ty.ty, "p",
3052 "type {} is not a pseudo type as expected",
3053 ty.name
3054 )
3055 }
3056 CatalogType::Range { .. } => {
3057 assert_eq!(
3058 pg_ty.ty, "r",
3059 "type {} is not a range type as expected",
3060 ty.name
3061 );
3062 }
3063 _ => {
3064 assert_eq!(
3065 pg_ty.ty, "b",
3066 "type {} is not a base type as expected",
3067 ty.name
3068 )
3069 }
3070 }
3071
3072 let schema = catalog
3074 .resolve_schema_in_database(
3075 &ResolvedDatabaseSpecifier::Ambient,
3076 ty.schema,
3077 &SYSTEM_CONN_ID,
3078 )
3079 .expect("unable to resolve schema");
3080 let allocated_type = catalog
3081 .resolve_type(
3082 None,
3083 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3084 &PartialItemName {
3085 database: None,
3086 schema: Some(schema.name().schema.clone()),
3087 item: ty.name.to_string(),
3088 },
3089 &SYSTEM_CONN_ID,
3090 )
3091 .expect("unable to resolve type");
3092 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3093 ty
3094 } else {
3095 panic!("unexpectedly not a type")
3096 };
3097 match ty.details.array_id {
3098 Some(array_id) => {
3099 let array_ty = catalog.get_entry(&array_id);
3100 assert_eq!(
3101 pg_ty.array, array_ty.oid,
3102 "type {} has mismatched array OIDs",
3103 allocated_type.name.item,
3104 );
3105 }
3106 None => assert_eq!(
3107 pg_ty.array, 0,
3108 "type {} does not have an array type in mz but does in pg",
3109 allocated_type.name.item,
3110 ),
3111 }
3112 }
3113 Builtin::Func(func) => {
3114 for imp in func.inner.func_impls() {
3115 assert!(
3116 all_oids.insert(imp.oid),
3117 "{} reused oid {}",
3118 func.name,
3119 imp.oid
3120 );
3121
3122 assert!(
3123 imp.oid < FIRST_USER_OID,
3124 "built-in function {} erroneously has OID in user space ({})",
3125 func.name,
3126 imp.oid,
3127 );
3128
3129 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3132 continue;
3133 } else {
3134 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3135 panic!(
3136 "pg_proc missing function {}: oid {}",
3137 func.name, imp.oid
3138 )
3139 })
3140 };
3141 assert_eq!(
3142 func.name, pg_fn.name,
3143 "funcs with oid {} don't match names: {} in mz, {} in pg",
3144 imp.oid, func.name, pg_fn.name
3145 );
3146
3147 let imp_arg_oids = imp
3150 .arg_typs
3151 .iter()
3152 .map(|item| resolve_type_oid(item))
3153 .collect::<Vec<_>>();
3154
3155 if imp_arg_oids != pg_fn.arg_oids {
3156 println!(
3157 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3158 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3159 );
3160 }
3161
3162 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3163
3164 assert!(
3165 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3166 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3167 imp.oid,
3168 func.name,
3169 imp_return_oid,
3170 pg_fn.ret_oid
3171 );
3172
3173 assert_eq!(
3174 imp.return_is_set, pg_fn.ret_set,
3175 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3176 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3177 );
3178 }
3179 }
3180 _ => (),
3181 }
3182 }
3183
3184 for (op, func) in OP_IMPLS.iter() {
3185 for imp in func.func_impls() {
3186 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3187
3188 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3190 continue;
3191 } else {
3192 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3193 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3194 })
3195 };
3196
3197 assert_eq!(*op, pg_op.name);
3198
3199 let imp_return_oid =
3200 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3201 if imp_return_oid != pg_op.oprresult {
3202 panic!(
3203 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3204 imp.oid, op, imp_return_oid, pg_op.oprresult
3205 );
3206 }
3207 }
3208 }
3209 catalog.expire().await;
3210 }
3211
3212 Catalog::with_debug(inner).await
3213 }
3214
3215 #[mz_ore::test(tokio::test)]
3217 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3219 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3220 let catalog = Arc::new(catalog);
3221 let conn_catalog = catalog.for_system_session();
3222
3223 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3224 let mut handles = Vec::new();
3225
3226 let ignore_names = BTreeSet::from([
3228 "avg",
3229 "avg_internal_v1",
3230 "bool_and",
3231 "bool_or",
3232 "has_table_privilege", "has_type_privilege", "mod",
3235 "mz_panic",
3236 "mz_sleep",
3237 "pow",
3238 "stddev_pop",
3239 "stddev_samp",
3240 "stddev",
3241 "var_pop",
3242 "var_samp",
3243 "variance",
3244 ]);
3245
3246 let fns = BUILTINS::funcs()
3247 .map(|func| (&func.name, func.inner))
3248 .chain(OP_IMPLS.iter());
3249
3250 for (name, func) in fns {
3251 if ignore_names.contains(name) {
3252 continue;
3253 }
3254 let Func::Scalar(impls) = func else {
3255 continue;
3256 };
3257
3258 'outer: for imp in impls {
3259 let details = imp.details();
3260 let mut styps = Vec::new();
3261 for item in details.arg_typs.iter() {
3262 let oid = resolve_type_oid(item);
3263 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3264 continue 'outer;
3265 };
3266 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3267 }
3268 let datums = styps
3269 .iter()
3270 .map(|styp| {
3271 let mut datums = vec![Datum::Null];
3272 datums.extend(styp.interesting_datums());
3273 datums
3274 })
3275 .collect::<Vec<_>>();
3276 if datums.is_empty() {
3278 continue;
3279 }
3280
3281 let return_oid = details
3282 .return_typ
3283 .map(resolve_type_oid)
3284 .expect("must exist");
3285 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3286 .ok()
3287 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3288
3289 let mut idxs = vec![0; datums.len()];
3290 while idxs[0] < datums[0].len() {
3291 let mut args = Vec::with_capacity(idxs.len());
3292 for i in 0..(datums.len()) {
3293 args.push(datums[i][idxs[i]]);
3294 }
3295
3296 let op = &imp.op;
3297 let scalars = args
3298 .iter()
3299 .enumerate()
3300 .map(|(i, datum)| {
3301 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3302 datum.clone(),
3303 styps[i].clone(),
3304 ))
3305 })
3306 .collect();
3307
3308 let call_name = format!(
3309 "{name}({}) (oid: {})",
3310 args.iter()
3311 .map(|d| d.to_string())
3312 .collect::<Vec<_>>()
3313 .join(", "),
3314 imp.oid
3315 );
3316 let catalog = Arc::clone(&catalog);
3317 let call_name_fn = call_name.clone();
3318 let return_styp = return_styp.clone();
3319 let handle = task::spawn_blocking(
3320 || call_name,
3321 move || {
3322 smoketest_fn(
3323 name,
3324 call_name_fn,
3325 op,
3326 imp,
3327 args,
3328 catalog,
3329 scalars,
3330 return_styp,
3331 )
3332 },
3333 );
3334 handles.push(handle);
3335
3336 for i in (0..datums.len()).rev() {
3338 idxs[i] += 1;
3339 if idxs[i] >= datums[i].len() {
3340 if i == 0 {
3341 break;
3342 }
3343 idxs[i] = 0;
3344 continue;
3345 } else {
3346 break;
3347 }
3348 }
3349 }
3350 }
3351 }
3352 handles
3353 }
3354
3355 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3356 for handle in handles {
3357 handle.await;
3358 }
3359 }
3360
3361 fn smoketest_fn(
3362 name: &&str,
3363 call_name: String,
3364 op: &Operation<HirScalarExpr>,
3365 imp: &FuncImpl<HirScalarExpr>,
3366 args: Vec<Datum<'_>>,
3367 catalog: Arc<Catalog>,
3368 scalars: Vec<CoercibleScalarExpr>,
3369 return_styp: Option<SqlScalarType>,
3370 ) {
3371 let conn_catalog = catalog.for_system_session();
3372 let pcx = PlanContext::zero();
3373 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3374 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3375 let ecx = ExprContext {
3376 qcx: &qcx,
3377 name: "smoketest",
3378 scope: &Scope::empty(),
3379 relation_type: &SqlRelationType::empty(),
3380 allow_aggregates: false,
3381 allow_subqueries: false,
3382 allow_parameters: false,
3383 allow_windows: false,
3384 };
3385 let arena = RowArena::new();
3386 let mut session = Session::dummy();
3387 session
3388 .start_transaction(to_datetime(0), None, None)
3389 .expect("must succeed");
3390 let prep_style = ExprPrepOneShot {
3391 logical_time: EvalTime::Time(Timestamp::MIN),
3392 session: &session,
3393 catalog_state: &catalog.state,
3394 };
3395
3396 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3399 if let Ok(hir) = res {
3400 let uneliminated_result_row = {
3401 if let HirScalarExpr::CallUnary { func, .. } = &hir
3402 && func.is_eliminable_cast()
3403 {
3404 let mut uneliminated_mir = hir
3405 .clone()
3406 .lower_uncorrelated(HirToMirConfig {
3407 enable_cast_elimination: false,
3408 ..catalog.system_config().into()
3409 })
3410 .expect("lowering eliminable cast should always succeed");
3411 prep_style
3412 .prep_scalar_expr(&mut uneliminated_mir)
3413 .expect("must succeed");
3414
3415 uneliminated_mir
3417 .eval(&[], &arena)
3418 .ok()
3419 .map(|datum| Row::pack([datum]))
3420 } else {
3421 None
3422 }
3423 };
3424
3425 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3426 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3428
3429 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3430 if let Some(return_styp) = return_styp {
3431 let mir_typ = mir.typ(&[]);
3432 soft_assert_eq_or_log!(
3435 mir_typ.scalar_type,
3436 (&return_styp).into(),
3437 "MIR type did not match the catalog type (cast elimination/repr type error)"
3438 );
3439 if !eval_result_datum.is_instance_of(&mir_typ) {
3443 panic!(
3444 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3445 );
3446 }
3447 if let Some(row) = uneliminated_result_row {
3449 let uneliminated_result_datum = row.unpack_first();
3450 assert_eq!(
3451 uneliminated_result_datum, eval_result_datum,
3452 "datums should not change if cast is eliminable"
3453 );
3454 }
3455 if let Some((introduces_nulls, propagates_nulls)) =
3458 call_introduces_propagates_nulls(&mir)
3459 {
3460 if introduces_nulls {
3461 assert!(
3465 mir_typ.nullable,
3466 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3467 name, args, mir, mir_typ.nullable
3468 );
3469 } else {
3470 let any_input_null = args.iter().any(|arg| arg.is_null());
3471 if !any_input_null {
3472 assert!(
3473 !mir_typ.nullable,
3474 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3475 name, args, mir, mir_typ.nullable
3476 );
3477 } else if propagates_nulls {
3478 assert!(
3481 mir_typ.nullable,
3482 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3483 name, args, mir, mir_typ.nullable
3484 );
3485 }
3486 }
3491 }
3492 let mut reduced = mir.clone();
3495 reduced.reduce(&[]);
3496 match reduced {
3497 MirScalarExpr::Literal(reduce_result, ctyp) => {
3498 match reduce_result {
3499 Ok(reduce_result_row) => {
3500 let reduce_result_datum = reduce_result_row.unpack_first();
3501 assert_eq!(
3502 reduce_result_datum,
3503 eval_result_datum,
3504 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3505 name,
3506 args,
3507 mir,
3508 eval_result_datum,
3509 mir_typ.scalar_type,
3510 reduce_result_datum,
3511 ctyp.scalar_type
3512 );
3513 assert_eq!(
3519 ctyp.scalar_type,
3520 mir_typ.scalar_type,
3521 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3522 name,
3523 args,
3524 mir,
3525 eval_result_datum,
3526 mir_typ.scalar_type,
3527 reduce_result_datum,
3528 ctyp.scalar_type
3529 );
3530 }
3531 Err(..) => {} }
3533 }
3534 _ => unreachable!(
3535 "all args are literals, so should have reduced to a literal"
3536 ),
3537 }
3538 }
3539 }
3540 }
3541 }
3542 }
3543
3544 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3549 match mir_func_call {
3550 MirScalarExpr::CallUnary { func, expr } => {
3551 if expr.is_literal() {
3552 Some((func.introduces_nulls(), func.propagates_nulls()))
3553 } else {
3554 None
3555 }
3556 }
3557 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3558 if expr1.is_literal() && expr2.is_literal() {
3559 Some((func.introduces_nulls(), func.propagates_nulls()))
3560 } else {
3561 None
3562 }
3563 }
3564 MirScalarExpr::CallVariadic { func, exprs } => {
3565 if exprs.iter().all(|arg| arg.is_literal()) {
3566 Some((func.introduces_nulls(), func.propagates_nulls()))
3567 } else {
3568 None
3569 }
3570 }
3571 _ => None,
3572 }
3573 }
3574
3575 #[mz_ore::test(tokio::test)]
3577 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3579 Catalog::with_debug(|catalog| async move {
3580 let conn_catalog = catalog.for_system_session();
3581
3582 for view in BUILTINS::views().filter(|view| {
3583 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3584 }) {
3585 let item = conn_catalog
3586 .resolve_item(&PartialItemName {
3587 database: None,
3588 schema: Some(view.schema.to_string()),
3589 item: view.name.to_string(),
3590 })
3591 .expect("unable to resolve view")
3592 .at_version(RelationVersionSelector::Latest);
3594 let full_name = conn_catalog.resolve_full_name(item.name());
3595 let desc = item.relation_desc().expect("invalid item type");
3596 for col_type in desc.iter_types() {
3597 match &col_type.scalar_type {
3598 typ @ SqlScalarType::UInt16
3599 | typ @ SqlScalarType::UInt32
3600 | typ @ SqlScalarType::UInt64
3601 | typ @ SqlScalarType::MzTimestamp
3602 | typ @ SqlScalarType::List { .. }
3603 | typ @ SqlScalarType::Map { .. }
3604 | typ @ SqlScalarType::MzAclItem => {
3605 panic!("{typ:?} type found in {full_name}");
3606 }
3607 SqlScalarType::AclItem
3608 | SqlScalarType::Bool
3609 | SqlScalarType::Int16
3610 | SqlScalarType::Int32
3611 | SqlScalarType::Int64
3612 | SqlScalarType::Float32
3613 | SqlScalarType::Float64
3614 | SqlScalarType::Numeric { .. }
3615 | SqlScalarType::Date
3616 | SqlScalarType::Time
3617 | SqlScalarType::Timestamp { .. }
3618 | SqlScalarType::TimestampTz { .. }
3619 | SqlScalarType::Interval
3620 | SqlScalarType::PgLegacyChar
3621 | SqlScalarType::Bytes
3622 | SqlScalarType::String
3623 | SqlScalarType::Char { .. }
3624 | SqlScalarType::VarChar { .. }
3625 | SqlScalarType::Jsonb
3626 | SqlScalarType::Uuid
3627 | SqlScalarType::Array(_)
3628 | SqlScalarType::Record { .. }
3629 | SqlScalarType::Oid
3630 | SqlScalarType::RegProc
3631 | SqlScalarType::RegType
3632 | SqlScalarType::RegClass
3633 | SqlScalarType::Int2Vector
3634 | SqlScalarType::Range { .. }
3635 | SqlScalarType::PgLegacyName => {}
3636 }
3637 }
3638 }
3639 catalog.expire().await;
3640 })
3641 .await
3642 }
3643
3644 #[mz_ore::test(tokio::test)]
3647 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3649 Catalog::with_debug(|catalog| async move {
3650 let conn_catalog = catalog.for_system_session();
3651
3652 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3653 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3654
3655 for entry in catalog.entries() {
3656 let schema_spec = entry.name().qualifiers.schema_spec;
3657 let introspection_deps = catalog.introspection_dependencies(entry.id);
3658 if introspection_deps.is_empty() {
3659 assert!(
3660 schema_spec != introspection_schema_spec,
3661 "entry does not depend on introspection sources but is in \
3662 `mz_introspection`: {}",
3663 conn_catalog.resolve_full_name(entry.name()),
3664 );
3665 } else {
3666 assert!(
3667 schema_spec == introspection_schema_spec,
3668 "entry depends on introspection sources but is not in \
3669 `mz_introspection`: {}",
3670 conn_catalog.resolve_full_name(entry.name()),
3671 );
3672 }
3673 }
3674 })
3675 .await
3676 }
3677
3678 #[mz_ore::test(tokio::test)]
3679 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3681 let persist_client = PersistClient::new_for_tests().await;
3682 let bootstrap_args = test_bootstrap_args();
3683 let organization_id = Uuid::new_v4();
3684 let db_name = "DB";
3685
3686 let mut writer_catalog = Catalog::open_debug_catalog(
3687 persist_client.clone(),
3688 organization_id.clone(),
3689 &bootstrap_args,
3690 )
3691 .await
3692 .expect("open_debug_catalog");
3693 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3694 persist_client.clone(),
3695 organization_id.clone(),
3696 &bootstrap_args,
3697 )
3698 .await
3699 .expect("open_debug_read_only_catalog");
3700 assert_err!(writer_catalog.resolve_database(db_name));
3701 assert_err!(read_only_catalog.resolve_database(db_name));
3702
3703 let commit_ts = writer_catalog.current_upper().await;
3704 writer_catalog
3705 .transact(
3706 None,
3707 commit_ts,
3708 None,
3709 vec![Op::CreateDatabase {
3710 name: db_name.to_string(),
3711 owner_id: MZ_SYSTEM_ROLE_ID,
3712 }],
3713 )
3714 .await
3715 .expect("failed to transact");
3716
3717 let write_db = writer_catalog
3718 .resolve_database(db_name)
3719 .expect("resolve_database");
3720 read_only_catalog
3721 .sync_to_current_updates()
3722 .await
3723 .expect("sync_to_current_updates");
3724 let read_db = read_only_catalog
3725 .resolve_database(db_name)
3726 .expect("resolve_database");
3727
3728 assert_eq!(write_db, read_db);
3729
3730 let writer_catalog_fencer =
3731 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3732 .await
3733 .expect("open_debug_catalog for fencer");
3734 let fencer_db = writer_catalog_fencer
3735 .resolve_database(db_name)
3736 .expect("resolve_database for fencer");
3737 assert_eq!(fencer_db, read_db);
3738
3739 let write_fence_err = writer_catalog
3740 .sync_to_current_updates()
3741 .await
3742 .expect_err("sync_to_current_updates for fencer");
3743 assert!(matches!(
3744 write_fence_err,
3745 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3746 ));
3747 let read_fence_err = read_only_catalog
3748 .sync_to_current_updates()
3749 .await
3750 .expect_err("sync_to_current_updates after fencer");
3751 assert!(matches!(
3752 read_fence_err,
3753 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3754 ));
3755
3756 writer_catalog.expire().await;
3757 read_only_catalog.expire().await;
3758 writer_catalog_fencer.expire().await;
3759 }
3760}