1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
66 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
67 SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_transform::dataflow::DataflowMetainfo;
83use mz_transform::notice::OptimizerNotice;
84use smallvec::SmallVec;
85use tokio::sync::MutexGuard;
86use tokio::sync::mpsc::UnboundedSender;
87use uuid::Uuid;
88
89pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
91pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
92pub use crate::catalog::state::CatalogState;
93pub use crate::catalog::transact::{
94 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
95};
96use crate::command::CatalogDump;
97use crate::coord::TargetCluster;
98#[cfg(test)]
99use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 plans: CatalogPlans,
140 expr_cache_handle: Option<ExpressionCacheHandle>,
141 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142 transient_revision: u64,
143}
144
145impl Clone for Catalog {
148 fn clone(&self) -> Self {
149 Self {
150 state: self.state.clone(),
151 plans: self.plans.clone(),
152 expr_cache_handle: self.expr_cache_handle.clone(),
153 storage: Arc::clone(&self.storage),
154 transient_revision: self.transient_revision,
155 }
156 }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168 #[mz_ore::instrument(level = "trace")]
170 pub fn set_optimized_plan(
171 &mut self,
172 id: GlobalId,
173 plan: DataflowDescription<OptimizedMirRelationExpr>,
174 ) {
175 self.plans.optimized_plan_by_id.insert(id, plan.into());
176 }
177
178 #[mz_ore::instrument(level = "trace")]
180 pub fn set_physical_plan(
181 &mut self,
182 id: GlobalId,
183 plan: DataflowDescription<mz_compute_types::plan::Plan>,
184 ) {
185 self.plans.physical_plan_by_id.insert(id, plan.into());
186 }
187
188 #[mz_ore::instrument(level = "trace")]
190 pub fn try_get_optimized_plan(
191 &self,
192 id: &GlobalId,
193 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195 }
196
197 #[mz_ore::instrument(level = "trace")]
199 pub fn try_get_physical_plan(
200 &self,
201 id: &GlobalId,
202 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
208 pub fn set_dataflow_metainfo(
209 &mut self,
210 id: GlobalId,
211 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212 ) {
213 for notice in metainfo.optimizer_notices.iter() {
215 for dep_id in notice.dependencies.iter() {
216 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217 entry.push(Arc::clone(notice))
218 }
219 if let Some(item_id) = notice.item_id {
220 soft_assert_eq_or_log!(
221 item_id,
222 id,
223 "notice.item_id should match the id for whom we are saving the notice"
224 );
225 }
226 }
227 self.plans.dataflow_metainfos.insert(id, metainfo);
229 }
230
231 #[mz_ore::instrument(level = "trace")]
233 pub fn try_get_dataflow_metainfo(
234 &self,
235 id: &GlobalId,
236 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237 self.plans.dataflow_metainfos.get(id)
238 }
239
240 #[mz_ore::instrument(level = "trace")]
249 pub fn drop_plans_and_metainfos(
250 &mut self,
251 drop_ids: &BTreeSet<GlobalId>,
252 ) -> BTreeSet<Arc<OptimizerNotice>> {
253 let mut dropped_notices = BTreeSet::new();
255
256 for id in drop_ids {
258 self.plans.optimized_plan_by_id.remove(id);
259 self.plans.physical_plan_by_id.remove(id);
260 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261 soft_assert_or_log!(
262 metainfo.optimizer_notices.iter().all_unique(),
263 "should have been pushed there by `push_optimizer_notice_dedup`"
264 );
265 for n in metainfo.optimizer_notices.drain(..) {
266 for dep_id in n.dependencies.iter() {
268 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269 soft_assert_or_log!(
270 notices.iter().any(|x| &n == x),
271 "corrupt notices_by_dep_id"
272 );
273 notices.retain(|x| &n != x)
274 }
275 }
276 dropped_notices.insert(n);
277 }
278 }
279 }
280
281 for id in drop_ids {
283 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284 for n in notices.drain(..) {
285 if let Some(item_id) = n.item_id.as_ref() {
287 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288 metainfo.optimizer_notices.iter().for_each(|n2| {
289 if let Some(item_id_2) = n2.item_id {
290 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291 }
292 });
293 metainfo.optimizer_notices.retain(|x| &n != x);
294 }
295 }
296 dropped_notices.insert(n);
297 }
298 }
299 }
300
301 let mut todo_dep_ids = BTreeSet::new();
304 for notice in dropped_notices.iter() {
305 for dep_id in notice.dependencies.iter() {
306 if !drop_ids.contains(dep_id) {
307 todo_dep_ids.insert(*dep_id);
308 }
309 }
310 }
311 for id in todo_dep_ids {
314 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315 notices.retain(|n| !dropped_notices.contains(n))
316 }
317 }
318
319 dropped_notices
326 }
327}
328
329#[derive(Debug)]
330pub struct ConnCatalog<'a> {
331 state: Cow<'a, CatalogState>,
332 unresolvable_ids: BTreeSet<CatalogItemId>,
342 conn_id: ConnectionId,
343 cluster: String,
344 database: Option<DatabaseId>,
345 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
346 role_id: RoleId,
347 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
348 portals: Option<&'a BTreeMap<String, Portal>>,
349 notices_tx: UnboundedSender<AdapterNotice>,
350}
351
352impl ConnCatalog<'_> {
353 pub fn conn_id(&self) -> &ConnectionId {
354 &self.conn_id
355 }
356
357 pub fn state(&self) -> &CatalogState {
358 &*self.state
359 }
360
361 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
371 assert_eq!(
372 self.role_id, MZ_SYSTEM_ROLE_ID,
373 "only the system role can mark IDs unresolvable",
374 );
375 self.unresolvable_ids.insert(id);
376 }
377
378 pub fn effective_search_path(
384 &self,
385 include_temp_schema: bool,
386 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
387 self.state
388 .effective_search_path(&self.search_path, include_temp_schema)
389 }
390}
391
392impl ConnectionResolver for ConnCatalog<'_> {
393 fn resolve_connection(
394 &self,
395 id: CatalogItemId,
396 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
397 self.state().resolve_connection(id)
398 }
399}
400
401impl Catalog {
402 pub fn transient_revision(&self) -> u64 {
406 self.transient_revision
407 }
408
409 pub async fn with_debug<F, Fut, T>(f: F) -> T
421 where
422 F: FnOnce(Catalog) -> Fut,
423 Fut: Future<Output = T>,
424 {
425 let persist_client = PersistClient::new_for_tests().await;
426 let organization_id = Uuid::new_v4();
427 let bootstrap_args = test_bootstrap_args();
428 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
429 .await
430 .expect("can open debug catalog");
431 f(catalog).await
432 }
433
434 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
437 where
438 F: FnOnce(Catalog) -> Fut,
439 Fut: Future<Output = T>,
440 {
441 let persist_client = PersistClient::new_for_tests().await;
442 let organization_id = Uuid::new_v4();
443 let bootstrap_args = test_bootstrap_args();
444 let mut catalog =
445 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
446 .await
447 .expect("can open debug catalog");
448
449 let now = SYSTEM_TIME.clone();
451 let openable_storage = TestCatalogStateBuilder::new(persist_client)
452 .with_organization_id(organization_id)
453 .with_default_deploy_generation()
454 .build()
455 .await
456 .expect("can create durable catalog");
457 let mut storage = openable_storage
458 .open(now().into(), &bootstrap_args)
459 .await
460 .expect("can open durable catalog")
461 .0;
462 let _ = storage
464 .sync_to_current_updates()
465 .await
466 .expect("can sync to current updates");
467 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
468
469 f(catalog).await
470 }
471
472 pub async fn open_debug_catalog(
476 persist_client: PersistClient,
477 organization_id: Uuid,
478 bootstrap_args: &BootstrapArgs,
479 ) -> Result<Catalog, anyhow::Error> {
480 let now = SYSTEM_TIME.clone();
481 let environment_id = None;
482 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
483 .with_organization_id(organization_id)
484 .with_default_deploy_generation()
485 .build()
486 .await?;
487 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
488 let system_parameter_defaults = BTreeMap::default();
489 Self::open_debug_catalog_inner(
490 persist_client,
491 storage,
492 now,
493 environment_id,
494 &DUMMY_BUILD_INFO,
495 system_parameter_defaults,
496 bootstrap_args,
497 None,
498 )
499 .await
500 }
501
502 pub async fn open_debug_read_only_catalog(
507 persist_client: PersistClient,
508 organization_id: Uuid,
509 bootstrap_args: &BootstrapArgs,
510 ) -> Result<Catalog, anyhow::Error> {
511 let now = SYSTEM_TIME.clone();
512 let environment_id = None;
513 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
514 .with_organization_id(organization_id)
515 .build()
516 .await?;
517 let storage = openable_storage
518 .open_read_only(&test_bootstrap_args())
519 .await?;
520 let system_parameter_defaults = BTreeMap::default();
521 Self::open_debug_catalog_inner(
522 persist_client,
523 storage,
524 now,
525 environment_id,
526 &DUMMY_BUILD_INFO,
527 system_parameter_defaults,
528 bootstrap_args,
529 None,
530 )
531 .await
532 }
533
534 pub async fn open_debug_read_only_persist_catalog_config(
539 persist_client: PersistClient,
540 now: NowFn,
541 environment_id: EnvironmentId,
542 system_parameter_defaults: BTreeMap<String, String>,
543 build_info: &'static BuildInfo,
544 bootstrap_args: &BootstrapArgs,
545 enable_expression_cache_override: Option<bool>,
546 ) -> Result<Catalog, anyhow::Error> {
547 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
548 .with_organization_id(environment_id.organization_id())
549 .with_version(
550 build_info
551 .version
552 .parse()
553 .expect("build version is parseable"),
554 )
555 .build()
556 .await?;
557 let storage = openable_storage.open_read_only(bootstrap_args).await?;
558 Self::open_debug_catalog_inner(
559 persist_client,
560 storage,
561 now,
562 Some(environment_id),
563 build_info,
564 system_parameter_defaults,
565 bootstrap_args,
566 enable_expression_cache_override,
567 )
568 .await
569 }
570
571 async fn open_debug_catalog_inner(
572 persist_client: PersistClient,
573 storage: Box<dyn DurableCatalogState>,
574 now: NowFn,
575 environment_id: Option<EnvironmentId>,
576 build_info: &'static BuildInfo,
577 system_parameter_defaults: BTreeMap<String, String>,
578 bootstrap_args: &BootstrapArgs,
579 enable_expression_cache_override: Option<bool>,
580 ) -> Result<Catalog, anyhow::Error> {
581 let metrics_registry = &MetricsRegistry::new();
582 let secrets_reader = Arc::new(InMemorySecretsController::new());
583 let previous_ts = now().into();
586 let replica_size = &bootstrap_args.default_cluster_replica_size;
587 let read_only = false;
588
589 let OpenCatalogResult {
590 catalog,
591 migrated_storage_collections_0dt: _,
592 new_builtin_collections: _,
593 builtin_table_updates: _,
594 cached_global_exprs: _,
595 uncached_local_exprs: _,
596 } = Catalog::open(Config {
597 storage,
598 metrics_registry,
599 state: StateConfig {
600 unsafe_mode: true,
601 all_features: false,
602 build_info,
603 deploy_generation: 0,
604 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
605 read_only,
606 now,
607 boot_ts: previous_ts,
608 skip_migrations: true,
609 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
610 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
611 size: replica_size.clone(),
612 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
613 },
614 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
615 size: replica_size.clone(),
616 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
617 },
618 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
619 size: replica_size.clone(),
620 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
621 },
622 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
623 size: replica_size.clone(),
624 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
625 },
626 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
627 size: replica_size.clone(),
628 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
629 },
630 system_parameter_defaults,
631 remote_system_parameters: None,
632 availability_zones: vec![],
633 egress_addresses: vec![],
634 aws_principal_context: None,
635 aws_privatelink_availability_zones: None,
636 http_host_name: None,
637 connection_context: ConnectionContext::for_tests(secrets_reader),
638 builtin_item_migration_config: BuiltinItemMigrationConfig {
639 persist_client: persist_client.clone(),
640 read_only,
641 force_migration: None,
642 },
643 persist_client,
644 enable_expression_cache_override,
645 helm_chart_version: None,
646 external_login_password_mz_system: None,
647 license_key: ValidatedLicenseKey::for_tests(),
648 },
649 })
650 .await?;
651 Ok(catalog)
652 }
653
654 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
655 self.state.for_session(session)
656 }
657
658 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
659 self.state.for_sessionless_user(role_id)
660 }
661
662 pub fn for_system_session(&self) -> ConnCatalog<'_> {
663 self.state.for_system_session()
664 }
665
666 async fn storage<'a>(
667 &'a self,
668 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
669 self.storage.lock().await
670 }
671
672 pub async fn current_upper(&self) -> mz_repr::Timestamp {
673 self.storage().await.current_upper().await
674 }
675
676 pub async fn allocate_user_id(
677 &self,
678 commit_ts: mz_repr::Timestamp,
679 ) -> Result<(CatalogItemId, GlobalId), Error> {
680 self.storage()
681 .await
682 .allocate_user_id(commit_ts)
683 .await
684 .maybe_terminate("allocating user ids")
685 .err_into()
686 }
687
688 pub async fn allocate_user_ids(
690 &self,
691 amount: u64,
692 commit_ts: mz_repr::Timestamp,
693 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
694 self.storage()
695 .await
696 .allocate_user_ids(amount, commit_ts)
697 .await
698 .maybe_terminate("allocating user ids")
699 .err_into()
700 }
701
702 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
703 let commit_ts = self.storage().await.current_upper().await;
704 self.allocate_user_id(commit_ts).await
705 }
706
707 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
709 self.storage()
710 .await
711 .get_next_user_item_id()
712 .await
713 .err_into()
714 }
715
716 #[cfg(test)]
717 pub async fn allocate_system_id(
718 &self,
719 commit_ts: mz_repr::Timestamp,
720 ) -> Result<(CatalogItemId, GlobalId), Error> {
721 use mz_ore::collections::CollectionExt;
722
723 let mut storage = self.storage().await;
724 let mut txn = storage.transaction().await?;
725 let id = txn
726 .allocate_system_item_ids(1)
727 .maybe_terminate("allocating system ids")?
728 .into_element();
729 let _ = txn.get_and_commit_op_updates();
731 txn.commit(commit_ts).await?;
732 Ok(id)
733 }
734
735 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
737 self.storage()
738 .await
739 .get_next_system_item_id()
740 .await
741 .err_into()
742 }
743
744 pub async fn allocate_user_cluster_id(
745 &self,
746 commit_ts: mz_repr::Timestamp,
747 ) -> Result<ClusterId, Error> {
748 self.storage()
749 .await
750 .allocate_user_cluster_id(commit_ts)
751 .await
752 .maybe_terminate("allocating user cluster ids")
753 .err_into()
754 }
755
756 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
758 self.storage()
759 .await
760 .get_next_system_replica_id()
761 .await
762 .err_into()
763 }
764
765 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
767 self.storage()
768 .await
769 .get_next_user_replica_id()
770 .await
771 .err_into()
772 }
773
774 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
775 self.state.resolve_database(database_name)
776 }
777
778 pub fn resolve_schema(
779 &self,
780 current_database: Option<&DatabaseId>,
781 database_name: Option<&str>,
782 schema_name: &str,
783 conn_id: &ConnectionId,
784 ) -> Result<&Schema, SqlCatalogError> {
785 self.state
786 .resolve_schema(current_database, database_name, schema_name, conn_id)
787 }
788
789 pub fn resolve_schema_in_database(
790 &self,
791 database_spec: &ResolvedDatabaseSpecifier,
792 schema_name: &str,
793 conn_id: &ConnectionId,
794 ) -> Result<&Schema, SqlCatalogError> {
795 self.state
796 .resolve_schema_in_database(database_spec, schema_name, conn_id)
797 }
798
799 pub fn resolve_replica_in_cluster(
800 &self,
801 cluster_id: &ClusterId,
802 replica_name: &str,
803 ) -> Result<&ClusterReplica, SqlCatalogError> {
804 self.state
805 .resolve_replica_in_cluster(cluster_id, replica_name)
806 }
807
808 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
809 self.state.resolve_system_schema(name)
810 }
811
812 pub fn resolve_search_path(
813 &self,
814 session: &Session,
815 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
816 self.state.resolve_search_path(session)
817 }
818
819 pub fn resolve_entry(
821 &self,
822 current_database: Option<&DatabaseId>,
823 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
824 name: &PartialItemName,
825 conn_id: &ConnectionId,
826 ) -> Result<&CatalogEntry, SqlCatalogError> {
827 self.state
828 .resolve_entry(current_database, search_path, name, conn_id)
829 }
830
831 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
833 self.state.resolve_builtin_table(builtin)
834 }
835
836 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
838 self.state.resolve_builtin_log(builtin).0
839 }
840
841 pub fn resolve_builtin_storage_collection(
843 &self,
844 builtin: &'static BuiltinSource,
845 ) -> CatalogItemId {
846 self.state.resolve_builtin_source(builtin)
847 }
848
849 pub fn resolve_function(
851 &self,
852 current_database: Option<&DatabaseId>,
853 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
854 name: &PartialItemName,
855 conn_id: &ConnectionId,
856 ) -> Result<&CatalogEntry, SqlCatalogError> {
857 self.state
858 .resolve_function(current_database, search_path, name, conn_id)
859 }
860
861 pub fn resolve_type(
863 &self,
864 current_database: Option<&DatabaseId>,
865 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
866 name: &PartialItemName,
867 conn_id: &ConnectionId,
868 ) -> Result<&CatalogEntry, SqlCatalogError> {
869 self.state
870 .resolve_type(current_database, search_path, name, conn_id)
871 }
872
873 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
874 self.state.resolve_cluster(name)
875 }
876
877 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
883 self.state.resolve_builtin_cluster(cluster)
884 }
885
886 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
887 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
888 }
889
890 pub fn resolve_target_cluster(
892 &self,
893 target_cluster: TargetCluster,
894 session: &Session,
895 ) -> Result<&Cluster, AdapterError> {
896 match target_cluster {
897 TargetCluster::CatalogServer => {
898 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
899 }
900 TargetCluster::Active => self.active_cluster(session),
901 TargetCluster::Transaction(cluster_id) => self
902 .try_get_cluster(cluster_id)
903 .ok_or(AdapterError::ConcurrentClusterDrop),
904 }
905 }
906
907 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
908 if session.user().name != SYSTEM_USER.name
912 && session.user().name != SUPPORT_USER.name
913 && session.vars().cluster() == SYSTEM_USER.name
914 {
915 coord_bail!(
916 "system cluster '{}' cannot execute user queries",
917 SYSTEM_USER.name
918 );
919 }
920 let cluster = self.resolve_cluster(session.vars().cluster())?;
921 Ok(cluster)
922 }
923
924 pub fn state(&self) -> &CatalogState {
925 &self.state
926 }
927
928 pub fn resolve_full_name(
929 &self,
930 name: &QualifiedItemName,
931 conn_id: Option<&ConnectionId>,
932 ) -> FullItemName {
933 self.state.resolve_full_name(name, conn_id)
934 }
935
936 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
937 self.state.try_get_entry(id)
938 }
939
940 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
941 self.state.try_get_entry_by_global_id(id)
942 }
943
944 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
945 self.state.get_entry(id)
946 }
947
948 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
949 self.state.get_entry_by_global_id(id)
950 }
951
952 pub fn get_global_ids<'a>(
953 &'a self,
954 id: &CatalogItemId,
955 ) -> impl Iterator<Item = GlobalId> + use<'a> {
956 self.get_entry(id).global_ids()
957 }
958
959 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
960 self.get_entry_by_global_id(id).id()
961 }
962
963 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
964 let item = self.try_get_entry_by_global_id(id)?;
965 Some(item.id())
966 }
967
968 pub fn get_schema(
969 &self,
970 database_spec: &ResolvedDatabaseSpecifier,
971 schema_spec: &SchemaSpecifier,
972 conn_id: &ConnectionId,
973 ) -> &Schema {
974 self.state.get_schema(database_spec, schema_spec, conn_id)
975 }
976
977 pub fn try_get_schema(
978 &self,
979 database_spec: &ResolvedDatabaseSpecifier,
980 schema_spec: &SchemaSpecifier,
981 conn_id: &ConnectionId,
982 ) -> Option<&Schema> {
983 self.state
984 .try_get_schema(database_spec, schema_spec, conn_id)
985 }
986
987 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
988 self.state.get_mz_catalog_schema_id()
989 }
990
991 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
992 self.state.get_pg_catalog_schema_id()
993 }
994
995 pub fn get_information_schema_id(&self) -> SchemaId {
996 self.state.get_information_schema_id()
997 }
998
999 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1000 self.state.get_mz_internal_schema_id()
1001 }
1002
1003 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1004 self.state.get_mz_introspection_schema_id()
1005 }
1006
1007 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1008 self.state.get_mz_unsafe_schema_id()
1009 }
1010
1011 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1012 self.state.system_schema_ids()
1013 }
1014
1015 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1016 self.state.get_database(id)
1017 }
1018
1019 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1020 self.state.try_get_role(id)
1021 }
1022
1023 pub fn get_role(&self, id: &RoleId) -> &Role {
1024 self.state.get_role(id)
1025 }
1026
1027 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1028 self.state.try_get_role_by_name(role_name)
1029 }
1030
1031 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1032 self.state.try_get_role_auth_by_id(id)
1033 }
1034
1035 pub fn create_temporary_schema(
1038 &mut self,
1039 conn_id: &ConnectionId,
1040 owner_id: RoleId,
1041 ) -> Result<(), Error> {
1042 self.state.create_temporary_schema(conn_id, owner_id)
1043 }
1044
1045 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1046 self.state
1048 .temporary_schemas
1049 .get(conn_id)
1050 .map(|schema| schema.items.contains_key(item_name))
1051 .unwrap_or(false)
1052 }
1053
1054 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1057 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1058 return Ok(());
1059 };
1060 if !schema.items.is_empty() {
1061 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1062 }
1063 Ok(())
1064 }
1065
1066 pub(crate) fn object_dependents(
1067 &self,
1068 object_ids: &Vec<ObjectId>,
1069 conn_id: &ConnectionId,
1070 ) -> Vec<ObjectId> {
1071 let mut seen = BTreeSet::new();
1072 self.state.object_dependents(object_ids, conn_id, &mut seen)
1073 }
1074
1075 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1076 FullNameV1 {
1077 database: name.database.to_string(),
1078 schema: name.schema.clone(),
1079 item: name.item.clone(),
1080 }
1081 }
1082
1083 pub fn find_available_cluster_name(&self, name: &str) -> String {
1084 let mut i = 0;
1085 let mut candidate = name.to_string();
1086 while self.state.clusters_by_name.contains_key(&candidate) {
1087 i += 1;
1088 candidate = format!("{}{}", name, i);
1089 }
1090 candidate
1091 }
1092
1093 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1094 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1095 self.cluster_replica_sizes()
1096 .enabled_allocations()
1097 .map(|a| a.0.to_owned())
1098 .collect::<Vec<_>>()
1099 } else {
1100 self.system_config().allowed_cluster_replica_sizes()
1101 }
1102 }
1103
1104 pub fn concretize_replica_location(
1105 &self,
1106 location: mz_catalog::durable::ReplicaLocation,
1107 allowed_sizes: &Vec<String>,
1108 allowed_availability_zones: Option<&[String]>,
1109 ) -> Result<ReplicaLocation, Error> {
1110 self.state
1111 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1112 }
1113
1114 pub(crate) fn ensure_valid_replica_size(
1115 &self,
1116 allowed_sizes: &[String],
1117 size: &String,
1118 ) -> Result<(), Error> {
1119 self.state.ensure_valid_replica_size(allowed_sizes, size)
1120 }
1121
1122 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1123 &self.state.cluster_replica_sizes
1124 }
1125
1126 pub fn get_privileges(
1128 &self,
1129 id: &SystemObjectId,
1130 conn_id: &ConnectionId,
1131 ) -> Option<&PrivilegeMap> {
1132 match id {
1133 SystemObjectId::Object(id) => match id {
1134 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1135 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1136 ObjectId::Schema((database_spec, schema_spec)) => Some(
1137 self.get_schema(database_spec, schema_spec, conn_id)
1138 .privileges(),
1139 ),
1140 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1141 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1142 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1143 },
1144 SystemObjectId::System => Some(&self.state.system_privileges),
1145 }
1146 }
1147
1148 #[mz_ore::instrument(level = "debug")]
1149 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1150 Ok(self.storage().await.confirm_leadership().await?)
1151 }
1152
1153 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1155 self.state.introspection_dependencies(id)
1156 }
1157
1158 pub fn dump(&self) -> Result<CatalogDump, Error> {
1164 Ok(CatalogDump::new(self.state.dump(None)?))
1165 }
1166
1167 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1171 self.state.check_consistency().map_err(|inconsistencies| {
1172 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1173 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1174 })
1175 })
1176 }
1177
1178 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1179 self.state.config()
1180 }
1181
1182 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1183 self.state.entry_by_id.values()
1184 }
1185
1186 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1187 self.entries()
1188 .filter(|entry| entry.is_connection() && entry.id().is_user())
1189 }
1190
1191 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1192 self.entries()
1193 .filter(|entry| entry.is_table() && entry.id().is_user())
1194 }
1195
1196 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1197 self.entries()
1198 .filter(|entry| entry.is_source() && entry.id().is_user())
1199 }
1200
1201 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1202 self.entries()
1203 .filter(|entry| entry.is_sink() && entry.id().is_user())
1204 }
1205
1206 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1207 self.entries()
1208 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1209 }
1210
1211 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1212 self.entries()
1213 .filter(|entry| entry.is_secret() && entry.id().is_user())
1214 }
1215
1216 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1217 self.state.get_network_policy(&network_policy_id)
1218 }
1219
1220 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1221 self.state.try_get_network_policy_by_name(name)
1222 }
1223
1224 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1225 self.state.clusters_by_id.values()
1226 }
1227
1228 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1229 self.state.get_cluster(cluster_id)
1230 }
1231
1232 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1233 self.state.try_get_cluster(cluster_id)
1234 }
1235
1236 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1237 self.clusters().filter(|cluster| cluster.id.is_user())
1238 }
1239
1240 pub fn get_cluster_replica(
1241 &self,
1242 cluster_id: ClusterId,
1243 replica_id: ReplicaId,
1244 ) -> &ClusterReplica {
1245 self.state.get_cluster_replica(cluster_id, replica_id)
1246 }
1247
1248 pub fn try_get_cluster_replica(
1249 &self,
1250 cluster_id: ClusterId,
1251 replica_id: ReplicaId,
1252 ) -> Option<&ClusterReplica> {
1253 self.state.try_get_cluster_replica(cluster_id, replica_id)
1254 }
1255
1256 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1257 self.user_clusters()
1258 .flat_map(|cluster| cluster.user_replicas())
1259 }
1260
1261 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1262 self.state.database_by_id.values()
1263 }
1264
1265 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1266 self.state
1267 .roles_by_id
1268 .values()
1269 .filter(|role| role.is_user())
1270 }
1271
1272 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1273 self.entries()
1274 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1275 }
1276
1277 pub fn system_privileges(&self) -> &PrivilegeMap {
1278 &self.state.system_privileges
1279 }
1280
1281 pub fn default_privileges(
1282 &self,
1283 ) -> impl Iterator<
1284 Item = (
1285 &DefaultPrivilegeObject,
1286 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1287 ),
1288 > {
1289 self.state.default_privileges.iter()
1290 }
1291
1292 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1293 self.state
1294 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1295 }
1296
1297 pub fn pack_storage_usage_update(
1298 &self,
1299 event: VersionedStorageUsage,
1300 diff: Diff,
1301 ) -> BuiltinTableUpdate {
1302 self.state
1303 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1304 }
1305
1306 pub fn system_config(&self) -> &SystemVars {
1307 self.state.system_config()
1308 }
1309
1310 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1311 self.state.system_config_mut()
1312 }
1313
1314 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1315 self.state.ensure_not_reserved_role(role_id)
1316 }
1317
1318 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1319 self.state.ensure_grantable_role(role_id)
1320 }
1321
1322 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1323 self.state.ensure_not_system_role(role_id)
1324 }
1325
1326 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1327 self.state.ensure_not_predefined_role(role_id)
1328 }
1329
1330 pub fn ensure_not_reserved_network_policy(
1331 &self,
1332 network_policy_id: &NetworkPolicyId,
1333 ) -> Result<(), Error> {
1334 self.state
1335 .ensure_not_reserved_network_policy(network_policy_id)
1336 }
1337
1338 pub fn ensure_not_reserved_object(
1339 &self,
1340 object_id: &ObjectId,
1341 conn_id: &ConnectionId,
1342 ) -> Result<(), Error> {
1343 match object_id {
1344 ObjectId::Cluster(cluster_id) => {
1345 if cluster_id.is_system() {
1346 let cluster = self.get_cluster(*cluster_id);
1347 Err(Error::new(ErrorKind::ReadOnlyCluster(
1348 cluster.name().to_string(),
1349 )))
1350 } else {
1351 Ok(())
1352 }
1353 }
1354 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1355 if replica_id.is_system() {
1356 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1357 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1358 replica.name().to_string(),
1359 )))
1360 } else {
1361 Ok(())
1362 }
1363 }
1364 ObjectId::Database(database_id) => {
1365 if database_id.is_system() {
1366 let database = self.get_database(database_id);
1367 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1368 database.name().to_string(),
1369 )))
1370 } else {
1371 Ok(())
1372 }
1373 }
1374 ObjectId::Schema((database_spec, schema_spec)) => {
1375 if schema_spec.is_system() {
1376 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1377 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1378 schema.name().schema.clone(),
1379 )))
1380 } else {
1381 Ok(())
1382 }
1383 }
1384 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1385 ObjectId::Item(item_id) => {
1386 if item_id.is_system() {
1387 let item = self.get_entry(item_id);
1388 let name = self.resolve_full_name(item.name(), Some(conn_id));
1389 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1390 } else {
1391 Ok(())
1392 }
1393 }
1394 ObjectId::NetworkPolicy(network_policy_id) => {
1395 self.ensure_not_reserved_network_policy(network_policy_id)
1396 }
1397 }
1398 }
1399
1400 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1402 &mut self,
1403 create_sql: &str,
1404 force_if_exists_skip: bool,
1405 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1406 self.state
1407 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1408 }
1409
1410 pub(crate) fn update_expression_cache<'a, 'b>(
1411 &'a self,
1412 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1413 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1414 ) -> BoxFuture<'b, ()> {
1415 if let Some(expr_cache) = &self.expr_cache_handle {
1416 expr_cache
1417 .update(
1418 new_local_expressions,
1419 new_global_expressions,
1420 Default::default(),
1421 )
1422 .boxed()
1423 } else {
1424 async {}.boxed()
1425 }
1426 }
1427
1428 #[cfg(test)]
1432 async fn sync_to_current_updates(
1433 &mut self,
1434 ) -> Result<
1435 (
1436 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1437 Vec<ParsedStateUpdate>,
1438 ),
1439 CatalogError,
1440 > {
1441 let updates = self.storage().await.sync_to_current_updates().await?;
1442 let (builtin_table_updates, catalog_updates) = self
1443 .state
1444 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1445 .await;
1446 Ok((builtin_table_updates, catalog_updates))
1447 }
1448}
1449
1450pub fn is_reserved_name(name: &str) -> bool {
1451 BUILTIN_PREFIXES
1452 .iter()
1453 .any(|prefix| name.starts_with(prefix))
1454}
1455
1456pub fn is_reserved_role_name(name: &str) -> bool {
1457 is_reserved_name(name) || is_public_role(name)
1458}
1459
1460pub fn is_public_role(name: &str) -> bool {
1461 name == &*PUBLIC_ROLE_NAME
1462}
1463
1464pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1465 object_type_to_audit_object_type(sql_type.into())
1466}
1467
1468pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1469 match id {
1470 CommentObjectId::Table(_) => ObjectType::Table,
1471 CommentObjectId::View(_) => ObjectType::View,
1472 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1473 CommentObjectId::Source(_) => ObjectType::Source,
1474 CommentObjectId::Sink(_) => ObjectType::Sink,
1475 CommentObjectId::Index(_) => ObjectType::Index,
1476 CommentObjectId::Func(_) => ObjectType::Func,
1477 CommentObjectId::Connection(_) => ObjectType::Connection,
1478 CommentObjectId::Type(_) => ObjectType::Type,
1479 CommentObjectId::Secret(_) => ObjectType::Secret,
1480 CommentObjectId::Role(_) => ObjectType::Role,
1481 CommentObjectId::Database(_) => ObjectType::Database,
1482 CommentObjectId::Schema(_) => ObjectType::Schema,
1483 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1484 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1485 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1486 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1487 }
1488}
1489
1490pub(crate) fn object_type_to_audit_object_type(
1491 object_type: mz_sql::catalog::ObjectType,
1492) -> ObjectType {
1493 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1494}
1495
1496pub(crate) fn system_object_type_to_audit_object_type(
1497 system_type: &SystemObjectType,
1498) -> ObjectType {
1499 match system_type {
1500 SystemObjectType::Object(object_type) => match object_type {
1501 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1502 mz_sql::catalog::ObjectType::View => ObjectType::View,
1503 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1504 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1505 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1506 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1507 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1508 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1509 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1510 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1511 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1512 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1513 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1514 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1515 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1516 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1517 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1518 },
1519 SystemObjectType::System => ObjectType::System,
1520 }
1521}
1522
1523#[derive(Debug, Copy, Clone)]
1524pub enum UpdatePrivilegeVariant {
1525 Grant,
1526 Revoke,
1527}
1528
1529impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1530 fn from(variant: UpdatePrivilegeVariant) -> Self {
1531 match variant {
1532 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1533 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1534 }
1535 }
1536}
1537
1538impl From<UpdatePrivilegeVariant> for EventType {
1539 fn from(variant: UpdatePrivilegeVariant) -> Self {
1540 match variant {
1541 UpdatePrivilegeVariant::Grant => EventType::Grant,
1542 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1543 }
1544 }
1545}
1546
1547impl ConnCatalog<'_> {
1548 fn resolve_item_name(
1549 &self,
1550 name: &PartialItemName,
1551 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1552 self.resolve_item(name).map(|entry| entry.name())
1553 }
1554
1555 fn resolve_function_name(
1556 &self,
1557 name: &PartialItemName,
1558 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1559 self.resolve_function(name).map(|entry| entry.name())
1560 }
1561
1562 fn resolve_type_name(
1563 &self,
1564 name: &PartialItemName,
1565 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1566 self.resolve_type(name).map(|entry| entry.name())
1567 }
1568}
1569
1570impl ExprHumanizer for ConnCatalog<'_> {
1571 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1572 let entry = self.state.try_get_entry_by_global_id(&id)?;
1573 Some(self.resolve_full_name(entry.name()).to_string())
1574 }
1575
1576 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1577 let entry = self.state.try_get_entry_by_global_id(&id)?;
1578 Some(entry.name().item.clone())
1579 }
1580
1581 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1582 let entry = self.state.try_get_entry_by_global_id(&id)?;
1583 Some(self.resolve_full_name(entry.name()).into_parts())
1584 }
1585
1586 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1587 use SqlScalarType::*;
1588
1589 match typ {
1590 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1591 List {
1592 custom_id: Some(item_id),
1593 ..
1594 }
1595 | Map {
1596 custom_id: Some(item_id),
1597 ..
1598 } => {
1599 let item = self.get_item(item_id);
1600 self.minimal_qualification(item.name()).to_string()
1601 }
1602 List { element_type, .. } => {
1603 format!(
1604 "{} list",
1605 self.humanize_scalar_type(element_type, postgres_compat)
1606 )
1607 }
1608 Map { value_type, .. } => format!(
1609 "map[{}=>{}]",
1610 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1611 self.humanize_scalar_type(value_type, postgres_compat)
1612 ),
1613 Record {
1614 custom_id: Some(item_id),
1615 ..
1616 } => {
1617 let item = self.get_item(item_id);
1618 self.minimal_qualification(item.name()).to_string()
1619 }
1620 Record { fields, .. } => format!(
1621 "record({})",
1622 fields
1623 .iter()
1624 .map(|f| format!(
1625 "{}: {}",
1626 f.0,
1627 self.humanize_column_type(&f.1, postgres_compat)
1628 ))
1629 .join(",")
1630 ),
1631 PgLegacyChar => "\"char\"".into(),
1632 Char { length } if !postgres_compat => match length {
1633 None => "char".into(),
1634 Some(length) => format!("char({})", length.into_u32()),
1635 },
1636 VarChar { max_length } if !postgres_compat => match max_length {
1637 None => "varchar".into(),
1638 Some(length) => format!("varchar({})", length.into_u32()),
1639 },
1640 UInt16 => "uint2".into(),
1641 UInt32 => "uint4".into(),
1642 UInt64 => "uint8".into(),
1643 ty => {
1644 let pgrepr_type = mz_pgrepr::Type::from(ty);
1645 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1646
1647 let res = if self
1648 .effective_search_path(true)
1649 .iter()
1650 .any(|(_, schema)| schema == &pg_catalog_schema)
1651 {
1652 pgrepr_type.name().to_string()
1653 } else {
1654 let name = QualifiedItemName {
1657 qualifiers: ItemQualifiers {
1658 database_spec: ResolvedDatabaseSpecifier::Ambient,
1659 schema_spec: pg_catalog_schema,
1660 },
1661 item: pgrepr_type.name().to_string(),
1662 };
1663 self.resolve_full_name(&name).to_string()
1664 };
1665 res
1666 }
1667 }
1668 }
1669
1670 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1671 let entry = self.state.try_get_entry_by_global_id(&id)?;
1672
1673 match entry.index() {
1674 Some(index) => {
1675 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1676 let mut on_names = on_desc
1677 .iter_names()
1678 .map(|col_name| col_name.to_string())
1679 .collect::<Vec<_>>();
1680
1681 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1682
1683 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1687 let mut ix_names = vec![String::new(); ix_arity];
1688
1689 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1691 let on_name = on_names.get_mut(on_pos).expect("on_name");
1692 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1693 std::mem::swap(on_name, ix_name);
1694 }
1695
1696 Some(ix_names) }
1698 None => {
1699 let desc = self.state.try_get_desc_by_global_id(&id)?;
1700 let column_names = desc
1701 .iter_names()
1702 .map(|col_name| col_name.to_string())
1703 .collect();
1704
1705 Some(column_names)
1706 }
1707 }
1708 }
1709
1710 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1711 let desc = self.state.try_get_desc_by_global_id(&id)?;
1712 Some(desc.get_name(column).to_string())
1713 }
1714
1715 fn id_exists(&self, id: GlobalId) -> bool {
1716 self.state.entry_by_global_id.contains_key(&id)
1717 }
1718}
1719
1720impl SessionCatalog for ConnCatalog<'_> {
1721 fn active_role_id(&self) -> &RoleId {
1722 &self.role_id
1723 }
1724
1725 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1726 self.prepared_statements
1727 .as_ref()
1728 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1729 .flatten()
1730 }
1731
1732 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1733 self.portals
1734 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1735 }
1736
1737 fn active_database(&self) -> Option<&DatabaseId> {
1738 self.database.as_ref()
1739 }
1740
1741 fn active_cluster(&self) -> &str {
1742 &self.cluster
1743 }
1744
1745 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1746 &self.search_path
1747 }
1748
1749 fn resolve_database(
1750 &self,
1751 database_name: &str,
1752 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1753 Ok(self.state.resolve_database(database_name)?)
1754 }
1755
1756 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1757 self.state
1758 .database_by_id
1759 .get(id)
1760 .expect("database doesn't exist")
1761 }
1762
1763 #[allow(clippy::as_conversions)]
1765 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1766 self.state
1767 .database_by_id
1768 .values()
1769 .map(|database| database as &dyn CatalogDatabase)
1770 .collect()
1771 }
1772
1773 fn resolve_schema(
1774 &self,
1775 database_name: Option<&str>,
1776 schema_name: &str,
1777 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1778 Ok(self.state.resolve_schema(
1779 self.database.as_ref(),
1780 database_name,
1781 schema_name,
1782 &self.conn_id,
1783 )?)
1784 }
1785
1786 fn resolve_schema_in_database(
1787 &self,
1788 database_spec: &ResolvedDatabaseSpecifier,
1789 schema_name: &str,
1790 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1791 Ok(self
1792 .state
1793 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1794 }
1795
1796 fn get_schema(
1797 &self,
1798 database_spec: &ResolvedDatabaseSpecifier,
1799 schema_spec: &SchemaSpecifier,
1800 ) -> &dyn CatalogSchema {
1801 self.state
1802 .get_schema(database_spec, schema_spec, &self.conn_id)
1803 }
1804
1805 #[allow(clippy::as_conversions)]
1807 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1808 self.get_databases()
1809 .into_iter()
1810 .flat_map(|database| database.schemas().into_iter())
1811 .chain(
1812 self.state
1813 .ambient_schemas_by_id
1814 .values()
1815 .chain(self.state.temporary_schemas.values())
1816 .map(|schema| schema as &dyn CatalogSchema),
1817 )
1818 .collect()
1819 }
1820
1821 fn get_mz_internal_schema_id(&self) -> SchemaId {
1822 self.state().get_mz_internal_schema_id()
1823 }
1824
1825 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1826 self.state().get_mz_unsafe_schema_id()
1827 }
1828
1829 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1830 self.state.is_system_schema_specifier(schema)
1831 }
1832
1833 fn resolve_role(
1834 &self,
1835 role_name: &str,
1836 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1837 match self.state.try_get_role_by_name(role_name) {
1838 Some(role) => Ok(role),
1839 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1840 }
1841 }
1842
1843 fn resolve_network_policy(
1844 &self,
1845 policy_name: &str,
1846 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1847 match self.state.try_get_network_policy_by_name(policy_name) {
1848 Some(policy) => Ok(policy),
1849 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1850 }
1851 }
1852
1853 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1854 Some(self.state.roles_by_id.get(id)?)
1855 }
1856
1857 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1858 self.state.get_role(id)
1859 }
1860
1861 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1862 #[allow(clippy::as_conversions)]
1864 self.state
1865 .roles_by_id
1866 .values()
1867 .map(|role| role as &dyn CatalogRole)
1868 .collect()
1869 }
1870
1871 fn mz_system_role_id(&self) -> RoleId {
1872 MZ_SYSTEM_ROLE_ID
1873 }
1874
1875 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1876 self.state.collect_role_membership(id)
1877 }
1878
1879 fn get_network_policy(
1880 &self,
1881 id: &NetworkPolicyId,
1882 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1883 self.state.get_network_policy(id)
1884 }
1885
1886 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1887 #[allow(clippy::as_conversions)]
1889 self.state
1890 .network_policies_by_id
1891 .values()
1892 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1893 .collect()
1894 }
1895
1896 fn resolve_cluster(
1897 &self,
1898 cluster_name: Option<&str>,
1899 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1900 Ok(self
1901 .state
1902 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1903 }
1904
1905 fn resolve_cluster_replica(
1906 &self,
1907 cluster_replica_name: &QualifiedReplica,
1908 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1909 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1910 }
1911
1912 fn resolve_item(
1913 &self,
1914 name: &PartialItemName,
1915 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1916 let r = self.state.resolve_entry(
1917 self.database.as_ref(),
1918 &self.effective_search_path(true),
1919 name,
1920 &self.conn_id,
1921 )?;
1922 if self.unresolvable_ids.contains(&r.id()) {
1923 Err(SqlCatalogError::UnknownItem(name.to_string()))
1924 } else {
1925 Ok(r)
1926 }
1927 }
1928
1929 fn resolve_function(
1930 &self,
1931 name: &PartialItemName,
1932 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1933 let r = self.state.resolve_function(
1934 self.database.as_ref(),
1935 &self.effective_search_path(false),
1936 name,
1937 &self.conn_id,
1938 )?;
1939
1940 if self.unresolvable_ids.contains(&r.id()) {
1941 Err(SqlCatalogError::UnknownFunction {
1942 name: name.to_string(),
1943 alternative: None,
1944 })
1945 } else {
1946 Ok(r)
1947 }
1948 }
1949
1950 fn resolve_type(
1951 &self,
1952 name: &PartialItemName,
1953 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1954 let r = self.state.resolve_type(
1955 self.database.as_ref(),
1956 &self.effective_search_path(false),
1957 name,
1958 &self.conn_id,
1959 )?;
1960
1961 if self.unresolvable_ids.contains(&r.id()) {
1962 Err(SqlCatalogError::UnknownType {
1963 name: name.to_string(),
1964 })
1965 } else {
1966 Ok(r)
1967 }
1968 }
1969
1970 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
1971 self.state.get_system_type(name)
1972 }
1973
1974 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
1975 Some(self.state.try_get_entry(id)?)
1976 }
1977
1978 fn try_get_item_by_global_id(
1979 &self,
1980 id: &GlobalId,
1981 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
1982 let entry = self.state.try_get_entry_by_global_id(id)?;
1983 let entry = match &entry.item {
1984 CatalogItem::Table(table) => {
1985 let (version, _gid) = table
1986 .collections
1987 .iter()
1988 .find(|(_version, gid)| *gid == id)
1989 .expect("catalog out of sync, mismatched GlobalId");
1990 entry.at_version(RelationVersionSelector::Specific(*version))
1991 }
1992 _ => entry.at_version(RelationVersionSelector::Latest),
1993 };
1994 Some(entry)
1995 }
1996
1997 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
1998 self.state.get_entry(id)
1999 }
2000
2001 fn get_item_by_global_id(
2002 &self,
2003 id: &GlobalId,
2004 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2005 let entry = self.state.get_entry_by_global_id(id);
2006 let entry = match &entry.item {
2007 CatalogItem::Table(table) => {
2008 let (version, _gid) = table
2009 .collections
2010 .iter()
2011 .find(|(_version, gid)| *gid == id)
2012 .expect("catalog out of sync, mismatched GlobalId");
2013 entry.at_version(RelationVersionSelector::Specific(*version))
2014 }
2015 _ => entry.at_version(RelationVersionSelector::Latest),
2016 };
2017 entry
2018 }
2019
2020 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2021 self.get_schemas()
2022 .into_iter()
2023 .flat_map(|schema| schema.item_ids())
2024 .map(|id| self.get_item(&id))
2025 .collect()
2026 }
2027
2028 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2029 self.state
2030 .get_item_by_name(name, &self.conn_id)
2031 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2032 }
2033
2034 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2035 self.state
2036 .get_type_by_name(name, &self.conn_id)
2037 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2038 }
2039
2040 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2041 &self.state.clusters_by_id[&id]
2042 }
2043
2044 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2045 self.state
2046 .clusters_by_id
2047 .values()
2048 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2049 .collect()
2050 }
2051
2052 fn get_cluster_replica(
2053 &self,
2054 cluster_id: ClusterId,
2055 replica_id: ReplicaId,
2056 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2057 let cluster = self.get_cluster(cluster_id);
2058 cluster.replica(replica_id)
2059 }
2060
2061 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2062 self.get_clusters()
2063 .into_iter()
2064 .flat_map(|cluster| cluster.replicas().into_iter())
2065 .collect()
2066 }
2067
2068 fn get_system_privileges(&self) -> &PrivilegeMap {
2069 &self.state.system_privileges
2070 }
2071
2072 fn get_default_privileges(
2073 &self,
2074 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2075 self.state
2076 .default_privileges
2077 .iter()
2078 .map(|(object, acl_items)| (object, acl_items.collect()))
2079 .collect()
2080 }
2081
2082 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2083 self.state.find_available_name(name, &self.conn_id)
2084 }
2085
2086 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2087 self.state.resolve_full_name(name, Some(&self.conn_id))
2088 }
2089
2090 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2091 self.state.resolve_full_schema_name(name)
2092 }
2093
2094 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2095 self.state.get_entry_by_global_id(global_id).id()
2096 }
2097
2098 fn resolve_global_id(
2099 &self,
2100 item_id: &CatalogItemId,
2101 version: RelationVersionSelector,
2102 ) -> GlobalId {
2103 self.state
2104 .get_entry(item_id)
2105 .at_version(version)
2106 .global_id()
2107 }
2108
2109 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2110 self.state.config()
2111 }
2112
2113 fn now(&self) -> EpochMillis {
2114 (self.state.config().now)()
2115 }
2116
2117 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2118 self.state.aws_privatelink_availability_zones.clone()
2119 }
2120
2121 fn system_vars(&self) -> &SystemVars {
2122 &self.state.system_configuration
2123 }
2124
2125 fn system_vars_mut(&mut self) -> &mut SystemVars {
2126 &mut self.state.to_mut().system_configuration
2127 }
2128
2129 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2130 self.state().get_owner_id(id, self.conn_id())
2131 }
2132
2133 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2134 match id {
2135 SystemObjectId::System => Some(&self.state.system_privileges),
2136 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2137 Some(self.get_cluster(*id).privileges())
2138 }
2139 SystemObjectId::Object(ObjectId::Database(id)) => {
2140 Some(self.get_database(id).privileges())
2141 }
2142 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2143 self.state
2146 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2147 .map(|schema| schema.privileges())
2148 }
2149 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2150 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2151 Some(self.get_network_policy(id).privileges())
2152 }
2153 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2154 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2155 }
2156 }
2157
2158 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2159 let mut seen = BTreeSet::new();
2160 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2161 }
2162
2163 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2164 let mut seen = BTreeSet::new();
2165 self.state.item_dependents(id, &mut seen)
2166 }
2167
2168 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2169 rbac::all_object_privileges(object_type)
2170 }
2171
2172 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2173 self.state.get_object_type(object_id)
2174 }
2175
2176 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2177 self.state.get_system_object_type(id)
2178 }
2179
2180 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2187 if qualified_name.qualifiers.schema_spec.is_temporary() {
2188 return qualified_name.item.clone().into();
2196 }
2197
2198 let database_id = match &qualified_name.qualifiers.database_spec {
2199 ResolvedDatabaseSpecifier::Ambient => None,
2200 ResolvedDatabaseSpecifier::Id(id)
2201 if self.database.is_some() && self.database == Some(*id) =>
2202 {
2203 None
2204 }
2205 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2206 };
2207
2208 let schema_spec = if database_id.is_none()
2209 && self.resolve_item_name(&PartialItemName {
2210 database: None,
2211 schema: None,
2212 item: qualified_name.item.clone(),
2213 }) == Ok(qualified_name)
2214 || self.resolve_function_name(&PartialItemName {
2215 database: None,
2216 schema: None,
2217 item: qualified_name.item.clone(),
2218 }) == Ok(qualified_name)
2219 || self.resolve_type_name(&PartialItemName {
2220 database: None,
2221 schema: None,
2222 item: qualified_name.item.clone(),
2223 }) == Ok(qualified_name)
2224 {
2225 None
2226 } else {
2227 Some(qualified_name.qualifiers.schema_spec.clone())
2230 };
2231
2232 let res = PartialItemName {
2233 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2234 schema: schema_spec.map(|spec| {
2235 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2236 .name()
2237 .schema
2238 .clone()
2239 }),
2240 item: qualified_name.item.clone(),
2241 };
2242 assert!(
2243 self.resolve_item_name(&res) == Ok(qualified_name)
2244 || self.resolve_function_name(&res) == Ok(qualified_name)
2245 || self.resolve_type_name(&res) == Ok(qualified_name)
2246 );
2247 res
2248 }
2249
2250 fn add_notice(&self, notice: PlanNotice) {
2251 let _ = self.notices_tx.send(notice.into());
2252 }
2253
2254 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2255 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2256 self.state.comments.get_object_comments(comment_id)
2257 }
2258
2259 fn is_cluster_size_cc(&self, size: &str) -> bool {
2260 self.state
2261 .cluster_replica_sizes
2262 .0
2263 .get(size)
2264 .map_or(false, |a| a.is_cc)
2265 }
2266}
2267
2268#[cfg(test)]
2269mod tests {
2270 use std::collections::{BTreeMap, BTreeSet};
2271 use std::sync::Arc;
2272 use std::{env, iter};
2273
2274 use itertools::Itertools;
2275 use mz_catalog::memory::objects::CatalogItem;
2276 use tokio_postgres::NoTls;
2277 use tokio_postgres::types::Type;
2278 use uuid::Uuid;
2279
2280 use mz_catalog::SYSTEM_CONN_ID;
2281 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2282 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2283 use mz_controller_types::{ClusterId, ReplicaId};
2284 use mz_expr::MirScalarExpr;
2285 use mz_ore::now::to_datetime;
2286 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2287 use mz_persist_client::PersistClient;
2288 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2289 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2290 use mz_repr::role_id::RoleId;
2291 use mz_repr::{
2292 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2293 SqlScalarType, Timestamp,
2294 };
2295 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2296 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2297 use mz_sql::names::{
2298 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2299 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2300 };
2301 use mz_sql::plan::{
2302 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2303 Scope, StatementContext,
2304 };
2305 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2306 use mz_sql::session::vars::{SystemVars, VarInput};
2307
2308 use crate::catalog::state::LocalExpressionCache;
2309 use crate::catalog::{Catalog, Op};
2310 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2311 use crate::session::Session;
2312
2313 #[mz_ore::test(tokio::test)]
2320 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2322 Catalog::with_debug(|catalog| async move {
2323 struct TestCase {
2324 input: QualifiedItemName,
2325 system_output: PartialItemName,
2326 normal_output: PartialItemName,
2327 }
2328
2329 let test_cases = vec![
2330 TestCase {
2331 input: QualifiedItemName {
2332 qualifiers: ItemQualifiers {
2333 database_spec: ResolvedDatabaseSpecifier::Ambient,
2334 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2335 },
2336 item: "numeric".to_string(),
2337 },
2338 system_output: PartialItemName {
2339 database: None,
2340 schema: None,
2341 item: "numeric".to_string(),
2342 },
2343 normal_output: PartialItemName {
2344 database: None,
2345 schema: None,
2346 item: "numeric".to_string(),
2347 },
2348 },
2349 TestCase {
2350 input: QualifiedItemName {
2351 qualifiers: ItemQualifiers {
2352 database_spec: ResolvedDatabaseSpecifier::Ambient,
2353 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2354 },
2355 item: "mz_array_types".to_string(),
2356 },
2357 system_output: PartialItemName {
2358 database: None,
2359 schema: None,
2360 item: "mz_array_types".to_string(),
2361 },
2362 normal_output: PartialItemName {
2363 database: None,
2364 schema: None,
2365 item: "mz_array_types".to_string(),
2366 },
2367 },
2368 ];
2369
2370 for tc in test_cases {
2371 assert_eq!(
2372 catalog
2373 .for_system_session()
2374 .minimal_qualification(&tc.input),
2375 tc.system_output
2376 );
2377 assert_eq!(
2378 catalog
2379 .for_session(&Session::dummy())
2380 .minimal_qualification(&tc.input),
2381 tc.normal_output
2382 );
2383 }
2384 catalog.expire().await;
2385 })
2386 .await
2387 }
2388
2389 #[mz_ore::test(tokio::test)]
2390 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2392 let persist_client = PersistClient::new_for_tests().await;
2393 let organization_id = Uuid::new_v4();
2394 let bootstrap_args = test_bootstrap_args();
2395 {
2396 let mut catalog = Catalog::open_debug_catalog(
2397 persist_client.clone(),
2398 organization_id.clone(),
2399 &bootstrap_args,
2400 )
2401 .await
2402 .expect("unable to open debug catalog");
2403 assert_eq!(catalog.transient_revision(), 1);
2404 let commit_ts = catalog.current_upper().await;
2405 catalog
2406 .transact(
2407 None,
2408 commit_ts,
2409 None,
2410 vec![Op::CreateDatabase {
2411 name: "test".to_string(),
2412 owner_id: MZ_SYSTEM_ROLE_ID,
2413 }],
2414 )
2415 .await
2416 .expect("failed to transact");
2417 assert_eq!(catalog.transient_revision(), 2);
2418 catalog.expire().await;
2419 }
2420 {
2421 let catalog =
2422 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2423 .await
2424 .expect("unable to open debug catalog");
2425 assert_eq!(catalog.transient_revision(), 1);
2427 catalog.expire().await;
2428 }
2429 }
2430
2431 #[mz_ore::test(tokio::test)]
2432 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2434 Catalog::with_debug(|catalog| async move {
2435 let mz_catalog_schema = (
2436 ResolvedDatabaseSpecifier::Ambient,
2437 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2438 );
2439 let pg_catalog_schema = (
2440 ResolvedDatabaseSpecifier::Ambient,
2441 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2442 );
2443 let mz_temp_schema = (
2444 ResolvedDatabaseSpecifier::Ambient,
2445 SchemaSpecifier::Temporary,
2446 );
2447
2448 let session = Session::dummy();
2450 let conn_catalog = catalog.for_session(&session);
2451 assert_ne!(
2452 conn_catalog.effective_search_path(false),
2453 conn_catalog.search_path
2454 );
2455 assert_ne!(
2456 conn_catalog.effective_search_path(true),
2457 conn_catalog.search_path
2458 );
2459 assert_eq!(
2460 conn_catalog.effective_search_path(false),
2461 vec![
2462 mz_catalog_schema.clone(),
2463 pg_catalog_schema.clone(),
2464 conn_catalog.search_path[0].clone()
2465 ]
2466 );
2467 assert_eq!(
2468 conn_catalog.effective_search_path(true),
2469 vec![
2470 mz_temp_schema.clone(),
2471 mz_catalog_schema.clone(),
2472 pg_catalog_schema.clone(),
2473 conn_catalog.search_path[0].clone()
2474 ]
2475 );
2476
2477 let mut session = Session::dummy();
2479 session
2480 .vars_mut()
2481 .set(
2482 &SystemVars::new(),
2483 "search_path",
2484 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2485 false,
2486 )
2487 .expect("failed to set search_path");
2488 let conn_catalog = catalog.for_session(&session);
2489 assert_ne!(
2490 conn_catalog.effective_search_path(false),
2491 conn_catalog.search_path
2492 );
2493 assert_ne!(
2494 conn_catalog.effective_search_path(true),
2495 conn_catalog.search_path
2496 );
2497 assert_eq!(
2498 conn_catalog.effective_search_path(false),
2499 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2500 );
2501 assert_eq!(
2502 conn_catalog.effective_search_path(true),
2503 vec![
2504 mz_temp_schema.clone(),
2505 mz_catalog_schema.clone(),
2506 pg_catalog_schema.clone()
2507 ]
2508 );
2509
2510 let mut session = Session::dummy();
2511 session
2512 .vars_mut()
2513 .set(
2514 &SystemVars::new(),
2515 "search_path",
2516 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2517 false,
2518 )
2519 .expect("failed to set search_path");
2520 let conn_catalog = catalog.for_session(&session);
2521 assert_ne!(
2522 conn_catalog.effective_search_path(false),
2523 conn_catalog.search_path
2524 );
2525 assert_ne!(
2526 conn_catalog.effective_search_path(true),
2527 conn_catalog.search_path
2528 );
2529 assert_eq!(
2530 conn_catalog.effective_search_path(false),
2531 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2532 );
2533 assert_eq!(
2534 conn_catalog.effective_search_path(true),
2535 vec![
2536 mz_temp_schema.clone(),
2537 pg_catalog_schema.clone(),
2538 mz_catalog_schema.clone()
2539 ]
2540 );
2541
2542 let mut session = Session::dummy();
2543 session
2544 .vars_mut()
2545 .set(
2546 &SystemVars::new(),
2547 "search_path",
2548 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2549 false,
2550 )
2551 .expect("failed to set search_path");
2552 let conn_catalog = catalog.for_session(&session);
2553 assert_ne!(
2554 conn_catalog.effective_search_path(false),
2555 conn_catalog.search_path
2556 );
2557 assert_ne!(
2558 conn_catalog.effective_search_path(true),
2559 conn_catalog.search_path
2560 );
2561 assert_eq!(
2562 conn_catalog.effective_search_path(false),
2563 vec![
2564 mz_catalog_schema.clone(),
2565 pg_catalog_schema.clone(),
2566 mz_temp_schema.clone()
2567 ]
2568 );
2569 assert_eq!(
2570 conn_catalog.effective_search_path(true),
2571 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2572 );
2573 catalog.expire().await;
2574 })
2575 .await
2576 }
2577
2578 #[mz_ore::test(tokio::test)]
2579 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2581 use mz_ore::collections::CollectionExt;
2582 Catalog::with_debug(|catalog| async move {
2583 let conn_catalog = catalog.for_system_session();
2584 let scx = &mut StatementContext::new(None, &conn_catalog);
2585
2586 let parsed = mz_sql_parser::parser::parse_statements(
2587 "create view public.foo as select 1 as bar",
2588 )
2589 .expect("")
2590 .into_element()
2591 .ast;
2592
2593 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2594
2595 assert_eq!(
2597 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2598 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2599 );
2600 catalog.expire().await;
2601 })
2602 .await;
2603 }
2604
2605 #[mz_ore::test(tokio::test)]
2607 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2609 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2610 let column = ", 1";
2611 let view_def_size = view_def.bytes().count();
2612 let column_size = column.bytes().count();
2613 let column_count =
2614 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2615 let columns = iter::repeat(column).take(column_count).join("");
2616 let create_sql = format!("{view_def}{columns})");
2617 let create_sql_check = create_sql.clone();
2618 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2619 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2620 &create_sql
2621 ));
2622
2623 let persist_client = PersistClient::new_for_tests().await;
2624 let organization_id = Uuid::new_v4();
2625 let id = CatalogItemId::User(1);
2626 let gid = GlobalId::User(1);
2627 let bootstrap_args = test_bootstrap_args();
2628 {
2629 let mut catalog = Catalog::open_debug_catalog(
2630 persist_client.clone(),
2631 organization_id.clone(),
2632 &bootstrap_args,
2633 )
2634 .await
2635 .expect("unable to open debug catalog");
2636 let item = catalog
2637 .state()
2638 .deserialize_item(
2639 gid,
2640 &create_sql,
2641 &BTreeMap::new(),
2642 &mut LocalExpressionCache::Closed,
2643 None,
2644 )
2645 .expect("unable to parse view");
2646 let commit_ts = catalog.current_upper().await;
2647 catalog
2648 .transact(
2649 None,
2650 commit_ts,
2651 None,
2652 vec![Op::CreateItem {
2653 item,
2654 name: QualifiedItemName {
2655 qualifiers: ItemQualifiers {
2656 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2657 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2658 },
2659 item: "v".to_string(),
2660 },
2661 id,
2662 owner_id: MZ_SYSTEM_ROLE_ID,
2663 }],
2664 )
2665 .await
2666 .expect("failed to transact");
2667 catalog.expire().await;
2668 }
2669 {
2670 let catalog =
2671 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2672 .await
2673 .expect("unable to open debug catalog");
2674 let view = catalog.get_entry(&id);
2675 assert_eq!("v", view.name.item);
2676 match &view.item {
2677 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2678 item => panic!("expected view, got {}", item.typ()),
2679 }
2680 catalog.expire().await;
2681 }
2682 }
2683
2684 #[mz_ore::test(tokio::test)]
2685 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2687 Catalog::with_debug(|catalog| async move {
2688 let conn_catalog = catalog.for_system_session();
2689
2690 assert_eq!(
2691 mz_sql::catalog::ObjectType::ClusterReplica,
2692 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2693 ClusterId::user(1).expect("1 is a valid ID"),
2694 ReplicaId::User(1)
2695 )))
2696 );
2697 assert_eq!(
2698 mz_sql::catalog::ObjectType::Role,
2699 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2700 );
2701 catalog.expire().await;
2702 })
2703 .await;
2704 }
2705
2706 #[mz_ore::test(tokio::test)]
2707 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2709 Catalog::with_debug(|catalog| async move {
2710 let conn_catalog = catalog.for_system_session();
2711
2712 assert_eq!(
2713 None,
2714 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2715 ClusterId::user(1).expect("1 is a valid ID"),
2716 ReplicaId::User(1),
2717 ))))
2718 );
2719 assert_eq!(
2720 None,
2721 conn_catalog
2722 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2723 );
2724 catalog.expire().await;
2725 })
2726 .await;
2727 }
2728
2729 #[mz_ore::test(tokio::test)]
2730 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2732 Catalog::with_debug(|catalog| async move {
2733 let conn_catalog = catalog.for_system_session();
2734
2735 let builtins_cfg = BuiltinsConfig {
2736 include_continual_tasks: true,
2737 };
2738 for builtin in BUILTINS::iter(&builtins_cfg) {
2739 let (schema, name, expected_desc) = match builtin {
2740 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2741 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2742 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2743 Builtin::Log(_)
2744 | Builtin::Type(_)
2745 | Builtin::Func(_)
2746 | Builtin::ContinualTask(_)
2747 | Builtin::Index(_)
2748 | Builtin::Connection(_) => continue,
2749 };
2750 let item = conn_catalog
2751 .resolve_item(&PartialItemName {
2752 database: None,
2753 schema: Some(schema.to_string()),
2754 item: name.to_string(),
2755 })
2756 .expect("unable to resolve item")
2757 .at_version(RelationVersionSelector::Latest);
2758
2759 let actual_desc = item.relation_desc().expect("invalid item type");
2760 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2761 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2762 {
2763 assert_eq!(
2764 actual_name, expected_name,
2765 "item {schema}.{name} column {index} name did not match its expected name"
2766 );
2767 assert_eq!(
2768 actual_typ, expected_typ,
2769 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2770 );
2771 }
2772 assert_eq!(
2773 &*actual_desc, expected_desc,
2774 "item {schema}.{name} did not match its expected RelationDesc"
2775 );
2776 }
2777 catalog.expire().await;
2778 })
2779 .await
2780 }
2781
2782 #[mz_ore::test(tokio::test)]
2785 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2787 async fn inner(catalog: Catalog) {
2788 let (client, connection) = tokio_postgres::connect(
2792 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2793 NoTls,
2794 )
2795 .await
2796 .expect("failed to connect to Postgres");
2797
2798 task::spawn(|| "compare_builtin_postgres", async move {
2799 if let Err(e) = connection.await {
2800 panic!("connection error: {}", e);
2801 }
2802 });
2803
2804 struct PgProc {
2805 name: String,
2806 arg_oids: Vec<u32>,
2807 ret_oid: Option<u32>,
2808 ret_set: bool,
2809 }
2810
2811 struct PgType {
2812 name: String,
2813 ty: String,
2814 elem: u32,
2815 array: u32,
2816 input: u32,
2817 receive: u32,
2818 }
2819
2820 struct PgOper {
2821 oprresult: u32,
2822 name: String,
2823 }
2824
2825 let pg_proc: BTreeMap<_, _> = client
2826 .query(
2827 "SELECT
2828 p.oid,
2829 proname,
2830 proargtypes,
2831 prorettype,
2832 proretset
2833 FROM pg_proc p
2834 JOIN pg_namespace n ON p.pronamespace = n.oid",
2835 &[],
2836 )
2837 .await
2838 .expect("pg query failed")
2839 .into_iter()
2840 .map(|row| {
2841 let oid: u32 = row.get("oid");
2842 let pg_proc = PgProc {
2843 name: row.get("proname"),
2844 arg_oids: row.get("proargtypes"),
2845 ret_oid: row.get("prorettype"),
2846 ret_set: row.get("proretset"),
2847 };
2848 (oid, pg_proc)
2849 })
2850 .collect();
2851
2852 let pg_type: BTreeMap<_, _> = client
2853 .query(
2854 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2855 &[],
2856 )
2857 .await
2858 .expect("pg query failed")
2859 .into_iter()
2860 .map(|row| {
2861 let oid: u32 = row.get("oid");
2862 let pg_type = PgType {
2863 name: row.get("typname"),
2864 ty: row.get("typtype"),
2865 elem: row.get("typelem"),
2866 array: row.get("typarray"),
2867 input: row.get("typinput"),
2868 receive: row.get("typreceive"),
2869 };
2870 (oid, pg_type)
2871 })
2872 .collect();
2873
2874 let pg_oper: BTreeMap<_, _> = client
2875 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2876 .await
2877 .expect("pg query failed")
2878 .into_iter()
2879 .map(|row| {
2880 let oid: u32 = row.get("oid");
2881 let pg_oper = PgOper {
2882 name: row.get("oprname"),
2883 oprresult: row.get("oprresult"),
2884 };
2885 (oid, pg_oper)
2886 })
2887 .collect();
2888
2889 let conn_catalog = catalog.for_system_session();
2890 let resolve_type_oid = |item: &str| {
2891 conn_catalog
2892 .resolve_type(&PartialItemName {
2893 database: None,
2894 schema: Some(PG_CATALOG_SCHEMA.into()),
2897 item: item.to_string(),
2898 })
2899 .expect("unable to resolve type")
2900 .oid()
2901 };
2902
2903 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2904 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2905 .collect();
2906
2907 let mut all_oids = BTreeSet::new();
2908
2909 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2912 [
2913 (Type::NAME, Type::TEXT),
2915 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2916 (Type::TIME, Type::TIMETZ),
2918 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2919 ]
2920 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2921 );
2922 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2923 1619, ]);
2925 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2926 if ignore_return_types.contains(&fn_oid) {
2927 return true;
2928 }
2929 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2930 return true;
2931 }
2932 a == b
2933 };
2934
2935 let builtins_cfg = BuiltinsConfig {
2936 include_continual_tasks: true,
2937 };
2938 for builtin in BUILTINS::iter(&builtins_cfg) {
2939 match builtin {
2940 Builtin::Type(ty) => {
2941 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2942
2943 if ty.oid >= FIRST_MATERIALIZE_OID {
2944 continue;
2947 }
2948
2949 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2952 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2953 });
2954 assert_eq!(
2955 ty.name, pg_ty.name,
2956 "oid {} has name {} in postgres; expected {}",
2957 ty.oid, pg_ty.name, ty.name,
2958 );
2959
2960 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
2961 None => (0, 0),
2962 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
2963 };
2964 assert_eq!(
2965 typinput_oid, pg_ty.input,
2966 "type {} has typinput OID {:?} in mz but {:?} in pg",
2967 ty.name, typinput_oid, pg_ty.input,
2968 );
2969 assert_eq!(
2970 typreceive_oid, pg_ty.receive,
2971 "type {} has typreceive OID {:?} in mz but {:?} in pg",
2972 ty.name, typreceive_oid, pg_ty.receive,
2973 );
2974 if typinput_oid != 0 {
2975 assert!(
2976 func_oids.contains(&typinput_oid),
2977 "type {} has typinput OID {} that does not exist in pg_proc",
2978 ty.name,
2979 typinput_oid,
2980 );
2981 }
2982 if typreceive_oid != 0 {
2983 assert!(
2984 func_oids.contains(&typreceive_oid),
2985 "type {} has typreceive OID {} that does not exist in pg_proc",
2986 ty.name,
2987 typreceive_oid,
2988 );
2989 }
2990
2991 match &ty.details.typ {
2993 CatalogType::Array { element_reference } => {
2994 let elem_ty = BUILTINS::iter(&builtins_cfg)
2995 .filter_map(|builtin| match builtin {
2996 Builtin::Type(ty @ BuiltinType { name, .. })
2997 if element_reference == name =>
2998 {
2999 Some(ty)
3000 }
3001 _ => None,
3002 })
3003 .next();
3004 let elem_ty = match elem_ty {
3005 Some(ty) => ty,
3006 None => {
3007 panic!("{} is unexpectedly not a type", element_reference)
3008 }
3009 };
3010 assert_eq!(
3011 pg_ty.elem, elem_ty.oid,
3012 "type {} has mismatched element OIDs",
3013 ty.name
3014 )
3015 }
3016 CatalogType::Pseudo => {
3017 assert_eq!(
3018 pg_ty.ty, "p",
3019 "type {} is not a pseudo type as expected",
3020 ty.name
3021 )
3022 }
3023 CatalogType::Range { .. } => {
3024 assert_eq!(
3025 pg_ty.ty, "r",
3026 "type {} is not a range type as expected",
3027 ty.name
3028 );
3029 }
3030 _ => {
3031 assert_eq!(
3032 pg_ty.ty, "b",
3033 "type {} is not a base type as expected",
3034 ty.name
3035 )
3036 }
3037 }
3038
3039 let schema = catalog
3041 .resolve_schema_in_database(
3042 &ResolvedDatabaseSpecifier::Ambient,
3043 ty.schema,
3044 &SYSTEM_CONN_ID,
3045 )
3046 .expect("unable to resolve schema");
3047 let allocated_type = catalog
3048 .resolve_type(
3049 None,
3050 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3051 &PartialItemName {
3052 database: None,
3053 schema: Some(schema.name().schema.clone()),
3054 item: ty.name.to_string(),
3055 },
3056 &SYSTEM_CONN_ID,
3057 )
3058 .expect("unable to resolve type");
3059 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3060 ty
3061 } else {
3062 panic!("unexpectedly not a type")
3063 };
3064 match ty.details.array_id {
3065 Some(array_id) => {
3066 let array_ty = catalog.get_entry(&array_id);
3067 assert_eq!(
3068 pg_ty.array, array_ty.oid,
3069 "type {} has mismatched array OIDs",
3070 allocated_type.name.item,
3071 );
3072 }
3073 None => assert_eq!(
3074 pg_ty.array, 0,
3075 "type {} does not have an array type in mz but does in pg",
3076 allocated_type.name.item,
3077 ),
3078 }
3079 }
3080 Builtin::Func(func) => {
3081 for imp in func.inner.func_impls() {
3082 assert!(
3083 all_oids.insert(imp.oid),
3084 "{} reused oid {}",
3085 func.name,
3086 imp.oid
3087 );
3088
3089 assert!(
3090 imp.oid < FIRST_USER_OID,
3091 "built-in function {} erroneously has OID in user space ({})",
3092 func.name,
3093 imp.oid,
3094 );
3095
3096 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3099 continue;
3100 } else {
3101 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3102 panic!(
3103 "pg_proc missing function {}: oid {}",
3104 func.name, imp.oid
3105 )
3106 })
3107 };
3108 assert_eq!(
3109 func.name, pg_fn.name,
3110 "funcs with oid {} don't match names: {} in mz, {} in pg",
3111 imp.oid, func.name, pg_fn.name
3112 );
3113
3114 let imp_arg_oids = imp
3117 .arg_typs
3118 .iter()
3119 .map(|item| resolve_type_oid(item))
3120 .collect::<Vec<_>>();
3121
3122 if imp_arg_oids != pg_fn.arg_oids {
3123 println!(
3124 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3125 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3126 );
3127 }
3128
3129 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3130
3131 assert!(
3132 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3133 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3134 imp.oid,
3135 func.name,
3136 imp_return_oid,
3137 pg_fn.ret_oid
3138 );
3139
3140 assert_eq!(
3141 imp.return_is_set, pg_fn.ret_set,
3142 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3143 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3144 );
3145 }
3146 }
3147 _ => (),
3148 }
3149 }
3150
3151 for (op, func) in OP_IMPLS.iter() {
3152 for imp in func.func_impls() {
3153 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3154
3155 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3157 continue;
3158 } else {
3159 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3160 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3161 })
3162 };
3163
3164 assert_eq!(*op, pg_op.name);
3165
3166 let imp_return_oid =
3167 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3168 if imp_return_oid != pg_op.oprresult {
3169 panic!(
3170 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3171 imp.oid, op, imp_return_oid, pg_op.oprresult
3172 );
3173 }
3174 }
3175 }
3176 catalog.expire().await;
3177 }
3178
3179 Catalog::with_debug(inner).await
3180 }
3181
3182 #[mz_ore::test(tokio::test)]
3184 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3186 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3187 let catalog = Arc::new(catalog);
3188 let conn_catalog = catalog.for_system_session();
3189
3190 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3191 let mut handles = Vec::new();
3192
3193 let ignore_names = BTreeSet::from([
3195 "avg",
3196 "avg_internal_v1",
3197 "bool_and",
3198 "bool_or",
3199 "has_table_privilege", "has_type_privilege", "mod",
3202 "mz_panic",
3203 "mz_sleep",
3204 "pow",
3205 "stddev_pop",
3206 "stddev_samp",
3207 "stddev",
3208 "var_pop",
3209 "var_samp",
3210 "variance",
3211 ]);
3212
3213 let fns = BUILTINS::funcs()
3214 .map(|func| (&func.name, func.inner))
3215 .chain(OP_IMPLS.iter());
3216
3217 for (name, func) in fns {
3218 if ignore_names.contains(name) {
3219 continue;
3220 }
3221 let Func::Scalar(impls) = func else {
3222 continue;
3223 };
3224
3225 'outer: for imp in impls {
3226 let details = imp.details();
3227 let mut styps = Vec::new();
3228 for item in details.arg_typs.iter() {
3229 let oid = resolve_type_oid(item);
3230 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3231 continue 'outer;
3232 };
3233 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3234 }
3235 let datums = styps
3236 .iter()
3237 .map(|styp| {
3238 let mut datums = vec![Datum::Null];
3239 datums.extend(styp.interesting_datums());
3240 datums
3241 })
3242 .collect::<Vec<_>>();
3243 if datums.is_empty() {
3245 continue;
3246 }
3247
3248 let return_oid = details
3249 .return_typ
3250 .map(resolve_type_oid)
3251 .expect("must exist");
3252 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3253 .ok()
3254 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3255
3256 let mut idxs = vec![0; datums.len()];
3257 while idxs[0] < datums[0].len() {
3258 let mut args = Vec::with_capacity(idxs.len());
3259 for i in 0..(datums.len()) {
3260 args.push(datums[i][idxs[i]]);
3261 }
3262
3263 let op = &imp.op;
3264 let scalars = args
3265 .iter()
3266 .enumerate()
3267 .map(|(i, datum)| {
3268 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3269 datum.clone(),
3270 styps[i].clone(),
3271 ))
3272 })
3273 .collect();
3274
3275 let call_name = format!(
3276 "{name}({}) (oid: {})",
3277 args.iter()
3278 .map(|d| d.to_string())
3279 .collect::<Vec<_>>()
3280 .join(", "),
3281 imp.oid
3282 );
3283 let catalog = Arc::clone(&catalog);
3284 let call_name_fn = call_name.clone();
3285 let return_styp = return_styp.clone();
3286 let handle = task::spawn_blocking(
3287 || call_name,
3288 move || {
3289 smoketest_fn(
3290 name,
3291 call_name_fn,
3292 op,
3293 imp,
3294 args,
3295 catalog,
3296 scalars,
3297 return_styp,
3298 )
3299 },
3300 );
3301 handles.push(handle);
3302
3303 for i in (0..datums.len()).rev() {
3305 idxs[i] += 1;
3306 if idxs[i] >= datums[i].len() {
3307 if i == 0 {
3308 break;
3309 }
3310 idxs[i] = 0;
3311 continue;
3312 } else {
3313 break;
3314 }
3315 }
3316 }
3317 }
3318 }
3319 handles
3320 }
3321
3322 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3323 for handle in handles {
3324 handle.await;
3325 }
3326 }
3327
3328 fn smoketest_fn(
3329 name: &&str,
3330 call_name: String,
3331 op: &Operation<HirScalarExpr>,
3332 imp: &FuncImpl<HirScalarExpr>,
3333 args: Vec<Datum<'_>>,
3334 catalog: Arc<Catalog>,
3335 scalars: Vec<CoercibleScalarExpr>,
3336 return_styp: Option<SqlScalarType>,
3337 ) {
3338 let conn_catalog = catalog.for_system_session();
3339 let pcx = PlanContext::zero();
3340 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3341 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3342 let ecx = ExprContext {
3343 qcx: &qcx,
3344 name: "smoketest",
3345 scope: &Scope::empty(),
3346 relation_type: &SqlRelationType::empty(),
3347 allow_aggregates: false,
3348 allow_subqueries: false,
3349 allow_parameters: false,
3350 allow_windows: false,
3351 };
3352 let arena = RowArena::new();
3353 let mut session = Session::<Timestamp>::dummy();
3354 session
3355 .start_transaction(to_datetime(0), None, None)
3356 .expect("must succeed");
3357 let prep_style = ExprPrepOneShot {
3358 logical_time: EvalTime::Time(Timestamp::MIN),
3359 session: &session,
3360 catalog_state: &catalog.state,
3361 };
3362
3363 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3366 if let Ok(hir) = res {
3367 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3368 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3370
3371 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3372 if let Some(return_styp) = return_styp {
3373 let mir_typ = mir.typ(&[]);
3374 soft_assert_eq_or_log!(
3377 mir_typ.scalar_type,
3378 return_styp,
3379 "MIR type did not match the catalog type (cast elimination/repr type error)"
3380 );
3381 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3385 panic!(
3386 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3387 );
3388 }
3389 if let Some((introduces_nulls, propagates_nulls)) =
3392 call_introduces_propagates_nulls(&mir)
3393 {
3394 if introduces_nulls {
3395 assert!(
3399 mir_typ.nullable,
3400 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3401 name, args, mir, mir_typ.nullable
3402 );
3403 } else {
3404 let any_input_null = args.iter().any(|arg| arg.is_null());
3405 if !any_input_null {
3406 assert!(
3407 !mir_typ.nullable,
3408 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3409 name, args, mir, mir_typ.nullable
3410 );
3411 } else {
3412 assert_eq!(
3413 mir_typ.nullable, propagates_nulls,
3414 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3415 name, args, mir, mir_typ.nullable
3416 );
3417 }
3418 }
3419 }
3420 let mut reduced = mir.clone();
3423 reduced.reduce(&[]);
3424 match reduced {
3425 MirScalarExpr::Literal(reduce_result, ctyp) => {
3426 match reduce_result {
3427 Ok(reduce_result_row) => {
3428 let reduce_result_datum = reduce_result_row.unpack_first();
3429 assert_eq!(
3430 reduce_result_datum,
3431 eval_result_datum,
3432 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3433 name,
3434 args,
3435 mir,
3436 eval_result_datum,
3437 mir_typ.scalar_type,
3438 reduce_result_datum,
3439 ctyp.scalar_type
3440 );
3441 assert_eq!(
3447 ctyp.scalar_type,
3448 mir_typ.scalar_type,
3449 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3450 name,
3451 args,
3452 mir,
3453 eval_result_datum,
3454 mir_typ.scalar_type,
3455 reduce_result_datum,
3456 ctyp.scalar_type
3457 );
3458 }
3459 Err(..) => {} }
3461 }
3462 _ => unreachable!(
3463 "all args are literals, so should have reduced to a literal"
3464 ),
3465 }
3466 }
3467 }
3468 }
3469 }
3470 }
3471
3472 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3477 match mir_func_call {
3478 MirScalarExpr::CallUnary { func, expr } => {
3479 if expr.is_literal() {
3480 Some((func.introduces_nulls(), func.propagates_nulls()))
3481 } else {
3482 None
3483 }
3484 }
3485 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3486 if expr1.is_literal() && expr2.is_literal() {
3487 Some((func.introduces_nulls(), func.propagates_nulls()))
3488 } else {
3489 None
3490 }
3491 }
3492 MirScalarExpr::CallVariadic { func, exprs } => {
3493 if exprs.iter().all(|arg| arg.is_literal()) {
3494 Some((func.introduces_nulls(), func.propagates_nulls()))
3495 } else {
3496 None
3497 }
3498 }
3499 _ => None,
3500 }
3501 }
3502
3503 #[mz_ore::test(tokio::test)]
3505 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3507 Catalog::with_debug(|catalog| async move {
3508 let conn_catalog = catalog.for_system_session();
3509
3510 for view in BUILTINS::views().filter(|view| {
3511 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3512 }) {
3513 let item = conn_catalog
3514 .resolve_item(&PartialItemName {
3515 database: None,
3516 schema: Some(view.schema.to_string()),
3517 item: view.name.to_string(),
3518 })
3519 .expect("unable to resolve view")
3520 .at_version(RelationVersionSelector::Latest);
3522 let full_name = conn_catalog.resolve_full_name(item.name());
3523 let desc = item.relation_desc().expect("invalid item type");
3524 for col_type in desc.iter_types() {
3525 match &col_type.scalar_type {
3526 typ @ SqlScalarType::UInt16
3527 | typ @ SqlScalarType::UInt32
3528 | typ @ SqlScalarType::UInt64
3529 | typ @ SqlScalarType::MzTimestamp
3530 | typ @ SqlScalarType::List { .. }
3531 | typ @ SqlScalarType::Map { .. }
3532 | typ @ SqlScalarType::MzAclItem => {
3533 panic!("{typ:?} type found in {full_name}");
3534 }
3535 SqlScalarType::AclItem
3536 | SqlScalarType::Bool
3537 | SqlScalarType::Int16
3538 | SqlScalarType::Int32
3539 | SqlScalarType::Int64
3540 | SqlScalarType::Float32
3541 | SqlScalarType::Float64
3542 | SqlScalarType::Numeric { .. }
3543 | SqlScalarType::Date
3544 | SqlScalarType::Time
3545 | SqlScalarType::Timestamp { .. }
3546 | SqlScalarType::TimestampTz { .. }
3547 | SqlScalarType::Interval
3548 | SqlScalarType::PgLegacyChar
3549 | SqlScalarType::Bytes
3550 | SqlScalarType::String
3551 | SqlScalarType::Char { .. }
3552 | SqlScalarType::VarChar { .. }
3553 | SqlScalarType::Jsonb
3554 | SqlScalarType::Uuid
3555 | SqlScalarType::Array(_)
3556 | SqlScalarType::Record { .. }
3557 | SqlScalarType::Oid
3558 | SqlScalarType::RegProc
3559 | SqlScalarType::RegType
3560 | SqlScalarType::RegClass
3561 | SqlScalarType::Int2Vector
3562 | SqlScalarType::Range { .. }
3563 | SqlScalarType::PgLegacyName => {}
3564 }
3565 }
3566 }
3567 catalog.expire().await;
3568 })
3569 .await
3570 }
3571
3572 #[mz_ore::test(tokio::test)]
3575 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3577 Catalog::with_debug(|catalog| async move {
3578 let conn_catalog = catalog.for_system_session();
3579
3580 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3581 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3582
3583 for entry in catalog.entries() {
3584 let schema_spec = entry.name().qualifiers.schema_spec;
3585 let introspection_deps = catalog.introspection_dependencies(entry.id);
3586 if introspection_deps.is_empty() {
3587 assert!(
3588 schema_spec != introspection_schema_spec,
3589 "entry does not depend on introspection sources but is in \
3590 `mz_introspection`: {}",
3591 conn_catalog.resolve_full_name(entry.name()),
3592 );
3593 } else {
3594 assert!(
3595 schema_spec == introspection_schema_spec,
3596 "entry depends on introspection sources but is not in \
3597 `mz_introspection`: {}",
3598 conn_catalog.resolve_full_name(entry.name()),
3599 );
3600 }
3601 }
3602 })
3603 .await
3604 }
3605
3606 #[mz_ore::test(tokio::test)]
3607 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3609 let persist_client = PersistClient::new_for_tests().await;
3610 let bootstrap_args = test_bootstrap_args();
3611 let organization_id = Uuid::new_v4();
3612 let db_name = "DB";
3613
3614 let mut writer_catalog = Catalog::open_debug_catalog(
3615 persist_client.clone(),
3616 organization_id.clone(),
3617 &bootstrap_args,
3618 )
3619 .await
3620 .expect("open_debug_catalog");
3621 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3622 persist_client.clone(),
3623 organization_id.clone(),
3624 &bootstrap_args,
3625 )
3626 .await
3627 .expect("open_debug_read_only_catalog");
3628 assert_err!(writer_catalog.resolve_database(db_name));
3629 assert_err!(read_only_catalog.resolve_database(db_name));
3630
3631 let commit_ts = writer_catalog.current_upper().await;
3632 writer_catalog
3633 .transact(
3634 None,
3635 commit_ts,
3636 None,
3637 vec![Op::CreateDatabase {
3638 name: db_name.to_string(),
3639 owner_id: MZ_SYSTEM_ROLE_ID,
3640 }],
3641 )
3642 .await
3643 .expect("failed to transact");
3644
3645 let write_db = writer_catalog
3646 .resolve_database(db_name)
3647 .expect("resolve_database");
3648 read_only_catalog
3649 .sync_to_current_updates()
3650 .await
3651 .expect("sync_to_current_updates");
3652 let read_db = read_only_catalog
3653 .resolve_database(db_name)
3654 .expect("resolve_database");
3655
3656 assert_eq!(write_db, read_db);
3657
3658 let writer_catalog_fencer =
3659 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3660 .await
3661 .expect("open_debug_catalog for fencer");
3662 let fencer_db = writer_catalog_fencer
3663 .resolve_database(db_name)
3664 .expect("resolve_database for fencer");
3665 assert_eq!(fencer_db, read_db);
3666
3667 let write_fence_err = writer_catalog
3668 .sync_to_current_updates()
3669 .await
3670 .expect_err("sync_to_current_updates for fencer");
3671 assert!(matches!(
3672 write_fence_err,
3673 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3674 ));
3675 let read_fence_err = read_only_catalog
3676 .sync_to_current_updates()
3677 .await
3678 .expect_err("sync_to_current_updates after fencer");
3679 assert!(matches!(
3680 read_fence_err,
3681 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3682 ));
3683
3684 writer_catalog.expire().await;
3685 read_only_catalog.expire().await;
3686 writer_catalog_fencer.expire().await;
3687 }
3688}