1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::optimize::OptimizerFeatures;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
67 CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
68 SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use uuid::Uuid;
89
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
92pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
93pub use crate::catalog::state::CatalogState;
94pub use crate::catalog::transact::{
95 DropObjectInfo, InjectedAuditEvent, Op, ReplicaCreateDropReason, TransactionResult,
96};
97use crate::command::CatalogDump;
98use crate::coord::TargetCluster;
99#[cfg(test)]
100use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
101use crate::session::{Portal, PreparedStatement, Session};
102use crate::util::ResultExt;
103use crate::{AdapterError, AdapterNotice, ExecuteResponse};
104
105mod builtin_table_updates;
106pub(crate) mod consistency;
107mod migrate;
108
109mod apply;
110mod open;
111mod state;
112mod timeline;
113mod transact;
114
115#[derive(Debug)]
138pub struct Catalog {
139 state: CatalogState,
140 plans: CatalogPlans,
141 expr_cache_handle: Option<ExpressionCacheHandle>,
142 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
143 transient_revision: u64,
144}
145
146impl Clone for Catalog {
149 fn clone(&self) -> Self {
150 Self {
151 state: self.state.clone(),
152 plans: self.plans.clone(),
153 expr_cache_handle: self.expr_cache_handle.clone(),
154 storage: Arc::clone(&self.storage),
155 transient_revision: self.transient_revision,
156 }
157 }
158}
159
160#[derive(Default, Debug, Clone)]
161pub struct CatalogPlans {
162 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
163 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
164 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
165 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
166}
167
168impl Catalog {
169 #[mz_ore::instrument(level = "trace")]
171 pub fn set_optimized_plan(
172 &mut self,
173 id: GlobalId,
174 plan: DataflowDescription<OptimizedMirRelationExpr>,
175 ) {
176 self.plans.optimized_plan_by_id.insert(id, plan.into());
177 }
178
179 #[mz_ore::instrument(level = "trace")]
181 pub fn set_physical_plan(
182 &mut self,
183 id: GlobalId,
184 plan: DataflowDescription<mz_compute_types::plan::Plan>,
185 ) {
186 self.plans.physical_plan_by_id.insert(id, plan.into());
187 }
188
189 #[mz_ore::instrument(level = "trace")]
191 pub fn try_get_optimized_plan(
192 &self,
193 id: &GlobalId,
194 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
195 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
196 }
197
198 #[mz_ore::instrument(level = "trace")]
200 pub fn try_get_physical_plan(
201 &self,
202 id: &GlobalId,
203 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
204 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
205 }
206
207 #[mz_ore::instrument(level = "trace")]
209 pub fn set_dataflow_metainfo(
210 &mut self,
211 id: GlobalId,
212 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
213 ) {
214 for notice in metainfo.optimizer_notices.iter() {
216 for dep_id in notice.dependencies.iter() {
217 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
218 entry.push(Arc::clone(notice))
219 }
220 if let Some(item_id) = notice.item_id {
221 soft_assert_eq_or_log!(
222 item_id,
223 id,
224 "notice.item_id should match the id for whom we are saving the notice"
225 );
226 }
227 }
228 self.plans.dataflow_metainfos.insert(id, metainfo);
230 }
231
232 #[mz_ore::instrument(level = "trace")]
234 pub fn try_get_dataflow_metainfo(
235 &self,
236 id: &GlobalId,
237 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
238 self.plans.dataflow_metainfos.get(id)
239 }
240
241 #[mz_ore::instrument(level = "trace")]
250 pub fn drop_plans_and_metainfos(
251 &mut self,
252 drop_ids: &BTreeSet<GlobalId>,
253 ) -> BTreeSet<Arc<OptimizerNotice>> {
254 let mut dropped_notices = BTreeSet::new();
256
257 for id in drop_ids {
259 self.plans.optimized_plan_by_id.remove(id);
260 self.plans.physical_plan_by_id.remove(id);
261 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
262 soft_assert_or_log!(
263 metainfo.optimizer_notices.iter().all_unique(),
264 "should have been pushed there by `push_optimizer_notice_dedup`"
265 );
266 for n in metainfo.optimizer_notices.drain(..) {
267 for dep_id in n.dependencies.iter() {
269 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
270 soft_assert_or_log!(
271 notices.iter().any(|x| &n == x),
272 "corrupt notices_by_dep_id"
273 );
274 notices.retain(|x| &n != x)
275 }
276 }
277 dropped_notices.insert(n);
278 }
279 }
280 }
281
282 for id in drop_ids {
284 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
285 for n in notices.drain(..) {
286 if let Some(item_id) = n.item_id.as_ref() {
288 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
289 metainfo.optimizer_notices.iter().for_each(|n2| {
290 if let Some(item_id_2) = n2.item_id {
291 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
292 }
293 });
294 metainfo.optimizer_notices.retain(|x| &n != x);
295 }
296 }
297 dropped_notices.insert(n);
298 }
299 }
300 }
301
302 let mut todo_dep_ids = BTreeSet::new();
305 for notice in dropped_notices.iter() {
306 for dep_id in notice.dependencies.iter() {
307 if !drop_ids.contains(dep_id) {
308 todo_dep_ids.insert(*dep_id);
309 }
310 }
311 }
312 for id in todo_dep_ids {
315 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
316 notices.retain(|n| !dropped_notices.contains(n))
317 }
318 }
319
320 dropped_notices
327 }
328}
329
330#[derive(Debug)]
331pub struct ConnCatalog<'a> {
332 state: Cow<'a, CatalogState>,
333 unresolvable_ids: BTreeSet<CatalogItemId>,
343 conn_id: ConnectionId,
344 cluster: String,
345 database: Option<DatabaseId>,
346 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
347 role_id: RoleId,
348 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
349 portals: Option<&'a BTreeMap<String, Portal>>,
350 notices_tx: UnboundedSender<AdapterNotice>,
351}
352
353impl ConnCatalog<'_> {
354 pub fn conn_id(&self) -> &ConnectionId {
355 &self.conn_id
356 }
357
358 pub fn state(&self) -> &CatalogState {
359 &*self.state
360 }
361
362 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
372 assert_eq!(
373 self.role_id, MZ_SYSTEM_ROLE_ID,
374 "only the system role can mark IDs unresolvable",
375 );
376 self.unresolvable_ids.insert(id);
377 }
378
379 pub fn effective_search_path(
385 &self,
386 include_temp_schema: bool,
387 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
388 self.state
389 .effective_search_path(&self.search_path, include_temp_schema)
390 }
391}
392
393impl ConnectionResolver for ConnCatalog<'_> {
394 fn resolve_connection(
395 &self,
396 id: CatalogItemId,
397 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
398 self.state().resolve_connection(id)
399 }
400}
401
402impl Catalog {
403 pub fn transient_revision(&self) -> u64 {
407 self.transient_revision
408 }
409
410 pub async fn with_debug<F, Fut, T>(f: F) -> T
422 where
423 F: FnOnce(Catalog) -> Fut,
424 Fut: Future<Output = T>,
425 {
426 let persist_client = PersistClient::new_for_tests().await;
427 let organization_id = Uuid::new_v4();
428 let bootstrap_args = test_bootstrap_args();
429 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
430 .await
431 .expect("can open debug catalog");
432 f(catalog).await
433 }
434
435 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
438 where
439 F: FnOnce(Catalog) -> Fut,
440 Fut: Future<Output = T>,
441 {
442 let persist_client = PersistClient::new_for_tests().await;
443 let organization_id = Uuid::new_v4();
444 let bootstrap_args = test_bootstrap_args();
445 let mut catalog =
446 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
447 .await
448 .expect("can open debug catalog");
449
450 let now = SYSTEM_TIME.clone();
452 let openable_storage = TestCatalogStateBuilder::new(persist_client)
453 .with_organization_id(organization_id)
454 .with_default_deploy_generation()
455 .build()
456 .await
457 .expect("can create durable catalog");
458 let mut storage = openable_storage
459 .open(now().into(), &bootstrap_args)
460 .await
461 .expect("can open durable catalog")
462 .0;
463 let _ = storage
465 .sync_to_current_updates()
466 .await
467 .expect("can sync to current updates");
468 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
469
470 f(catalog).await
471 }
472
473 pub async fn open_debug_catalog(
477 persist_client: PersistClient,
478 organization_id: Uuid,
479 bootstrap_args: &BootstrapArgs,
480 ) -> Result<Catalog, anyhow::Error> {
481 let now = SYSTEM_TIME.clone();
482 let environment_id = None;
483 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
484 .with_organization_id(organization_id)
485 .with_default_deploy_generation()
486 .build()
487 .await?;
488 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
489 let system_parameter_defaults = BTreeMap::default();
490 Self::open_debug_catalog_inner(
491 persist_client,
492 storage,
493 now,
494 environment_id,
495 &DUMMY_BUILD_INFO,
496 system_parameter_defaults,
497 bootstrap_args,
498 None,
499 )
500 .await
501 }
502
503 pub async fn open_debug_read_only_catalog(
508 persist_client: PersistClient,
509 organization_id: Uuid,
510 bootstrap_args: &BootstrapArgs,
511 ) -> Result<Catalog, anyhow::Error> {
512 let now = SYSTEM_TIME.clone();
513 let environment_id = None;
514 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
515 .with_organization_id(organization_id)
516 .build()
517 .await?;
518 let storage = openable_storage
519 .open_read_only(&test_bootstrap_args())
520 .await?;
521 let system_parameter_defaults = BTreeMap::default();
522 Self::open_debug_catalog_inner(
523 persist_client,
524 storage,
525 now,
526 environment_id,
527 &DUMMY_BUILD_INFO,
528 system_parameter_defaults,
529 bootstrap_args,
530 None,
531 )
532 .await
533 }
534
535 pub async fn open_debug_read_only_persist_catalog_config(
540 persist_client: PersistClient,
541 now: NowFn,
542 environment_id: EnvironmentId,
543 system_parameter_defaults: BTreeMap<String, String>,
544 build_info: &'static BuildInfo,
545 bootstrap_args: &BootstrapArgs,
546 enable_expression_cache_override: Option<bool>,
547 ) -> Result<Catalog, anyhow::Error> {
548 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
549 .with_organization_id(environment_id.organization_id())
550 .with_version(
551 build_info
552 .version
553 .parse()
554 .expect("build version is parseable"),
555 )
556 .build()
557 .await?;
558 let storage = openable_storage.open_read_only(bootstrap_args).await?;
559 Self::open_debug_catalog_inner(
560 persist_client,
561 storage,
562 now,
563 Some(environment_id),
564 build_info,
565 system_parameter_defaults,
566 bootstrap_args,
567 enable_expression_cache_override,
568 )
569 .await
570 }
571
572 async fn open_debug_catalog_inner(
573 persist_client: PersistClient,
574 storage: Box<dyn DurableCatalogState>,
575 now: NowFn,
576 environment_id: Option<EnvironmentId>,
577 build_info: &'static BuildInfo,
578 system_parameter_defaults: BTreeMap<String, String>,
579 bootstrap_args: &BootstrapArgs,
580 enable_expression_cache_override: Option<bool>,
581 ) -> Result<Catalog, anyhow::Error> {
582 let metrics_registry = &MetricsRegistry::new();
583 let secrets_reader = Arc::new(InMemorySecretsController::new());
584 let previous_ts = now().into();
587 let replica_size = &bootstrap_args.default_cluster_replica_size;
588 let read_only = false;
589
590 let OpenCatalogResult {
591 catalog,
592 migrated_storage_collections_0dt: _,
593 new_builtin_collections: _,
594 builtin_table_updates: _,
595 cached_global_exprs: _,
596 uncached_local_exprs: _,
597 } = Catalog::open(Config {
598 storage,
599 metrics_registry,
600 state: StateConfig {
601 unsafe_mode: true,
602 all_features: false,
603 build_info,
604 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
605 read_only,
606 now,
607 boot_ts: previous_ts,
608 skip_migrations: true,
609 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
610 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
611 size: replica_size.clone(),
612 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
613 },
614 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
615 size: replica_size.clone(),
616 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
617 },
618 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
619 size: replica_size.clone(),
620 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
621 },
622 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
623 size: replica_size.clone(),
624 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
625 },
626 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
627 size: replica_size.clone(),
628 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
629 },
630 system_parameter_defaults,
631 remote_system_parameters: None,
632 availability_zones: vec![],
633 egress_addresses: vec![],
634 aws_principal_context: None,
635 aws_privatelink_availability_zones: None,
636 http_host_name: None,
637 connection_context: ConnectionContext::for_tests(secrets_reader),
638 builtin_item_migration_config: BuiltinItemMigrationConfig {
639 persist_client: persist_client.clone(),
640 read_only,
641 force_migration: None,
642 },
643 persist_client,
644 enable_expression_cache_override,
645 helm_chart_version: None,
646 external_login_password_mz_system: None,
647 license_key: ValidatedLicenseKey::for_tests(),
648 },
649 })
650 .await?;
651 Ok(catalog)
652 }
653
654 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
655 self.state.for_session(session)
656 }
657
658 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
659 self.state.for_sessionless_user(role_id)
660 }
661
662 pub fn for_system_session(&self) -> ConnCatalog<'_> {
663 self.state.for_system_session()
664 }
665
666 async fn storage<'a>(
667 &'a self,
668 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
669 self.storage.lock().await
670 }
671
672 pub async fn current_upper(&self) -> mz_repr::Timestamp {
673 self.storage().await.current_upper().await
674 }
675
676 pub async fn allocate_user_id(
677 &self,
678 commit_ts: mz_repr::Timestamp,
679 ) -> Result<(CatalogItemId, GlobalId), Error> {
680 self.storage()
681 .await
682 .allocate_user_id(commit_ts)
683 .await
684 .maybe_terminate("allocating user ids")
685 .err_into()
686 }
687
688 pub async fn allocate_user_ids(
690 &self,
691 amount: u64,
692 commit_ts: mz_repr::Timestamp,
693 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
694 self.storage()
695 .await
696 .allocate_user_ids(amount, commit_ts)
697 .await
698 .maybe_terminate("allocating user ids")
699 .err_into()
700 }
701
702 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
703 let commit_ts = self.storage().await.current_upper().await;
704 self.allocate_user_id(commit_ts).await
705 }
706
707 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
709 self.storage()
710 .await
711 .get_next_user_item_id()
712 .await
713 .err_into()
714 }
715
716 #[cfg(test)]
717 pub async fn allocate_system_id(
718 &self,
719 commit_ts: mz_repr::Timestamp,
720 ) -> Result<(CatalogItemId, GlobalId), Error> {
721 use mz_ore::collections::CollectionExt;
722
723 let mut storage = self.storage().await;
724 let mut txn = storage.transaction().await?;
725 let id = txn
726 .allocate_system_item_ids(1)
727 .maybe_terminate("allocating system ids")?
728 .into_element();
729 let _ = txn.get_and_commit_op_updates();
731 txn.commit(commit_ts).await?;
732 Ok(id)
733 }
734
735 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
737 self.storage()
738 .await
739 .get_next_system_item_id()
740 .await
741 .err_into()
742 }
743
744 pub async fn allocate_user_cluster_id(
745 &self,
746 commit_ts: mz_repr::Timestamp,
747 ) -> Result<ClusterId, Error> {
748 self.storage()
749 .await
750 .allocate_user_cluster_id(commit_ts)
751 .await
752 .maybe_terminate("allocating user cluster ids")
753 .err_into()
754 }
755
756 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
758 self.storage()
759 .await
760 .get_next_system_replica_id()
761 .await
762 .err_into()
763 }
764
765 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
767 self.storage()
768 .await
769 .get_next_user_replica_id()
770 .await
771 .err_into()
772 }
773
774 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
775 self.state.resolve_database(database_name)
776 }
777
778 pub fn resolve_schema(
779 &self,
780 current_database: Option<&DatabaseId>,
781 database_name: Option<&str>,
782 schema_name: &str,
783 conn_id: &ConnectionId,
784 ) -> Result<&Schema, SqlCatalogError> {
785 self.state
786 .resolve_schema(current_database, database_name, schema_name, conn_id)
787 }
788
789 pub fn resolve_schema_in_database(
790 &self,
791 database_spec: &ResolvedDatabaseSpecifier,
792 schema_name: &str,
793 conn_id: &ConnectionId,
794 ) -> Result<&Schema, SqlCatalogError> {
795 self.state
796 .resolve_schema_in_database(database_spec, schema_name, conn_id)
797 }
798
799 pub fn resolve_replica_in_cluster(
800 &self,
801 cluster_id: &ClusterId,
802 replica_name: &str,
803 ) -> Result<&ClusterReplica, SqlCatalogError> {
804 self.state
805 .resolve_replica_in_cluster(cluster_id, replica_name)
806 }
807
808 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
809 self.state.resolve_system_schema(name)
810 }
811
812 pub fn resolve_search_path(
813 &self,
814 session: &Session,
815 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
816 self.state.resolve_search_path(session)
817 }
818
819 pub fn resolve_entry(
821 &self,
822 current_database: Option<&DatabaseId>,
823 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
824 name: &PartialItemName,
825 conn_id: &ConnectionId,
826 ) -> Result<&CatalogEntry, SqlCatalogError> {
827 self.state
828 .resolve_entry(current_database, search_path, name, conn_id)
829 }
830
831 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
833 self.state.resolve_builtin_table(builtin)
834 }
835
836 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
838 self.state.resolve_builtin_log(builtin).0
839 }
840
841 pub fn resolve_builtin_storage_collection(
843 &self,
844 builtin: &'static BuiltinSource,
845 ) -> CatalogItemId {
846 self.state.resolve_builtin_source(builtin)
847 }
848
849 pub fn resolve_function(
851 &self,
852 current_database: Option<&DatabaseId>,
853 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
854 name: &PartialItemName,
855 conn_id: &ConnectionId,
856 ) -> Result<&CatalogEntry, SqlCatalogError> {
857 self.state
858 .resolve_function(current_database, search_path, name, conn_id)
859 }
860
861 pub fn resolve_type(
863 &self,
864 current_database: Option<&DatabaseId>,
865 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
866 name: &PartialItemName,
867 conn_id: &ConnectionId,
868 ) -> Result<&CatalogEntry, SqlCatalogError> {
869 self.state
870 .resolve_type(current_database, search_path, name, conn_id)
871 }
872
873 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
874 self.state.resolve_cluster(name)
875 }
876
877 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
883 self.state.resolve_builtin_cluster(cluster)
884 }
885
886 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
887 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
888 }
889
890 pub fn resolve_target_cluster(
892 &self,
893 target_cluster: TargetCluster,
894 session: &Session,
895 ) -> Result<&Cluster, AdapterError> {
896 match target_cluster {
897 TargetCluster::CatalogServer => {
898 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
899 }
900 TargetCluster::Active => self.active_cluster(session),
901 TargetCluster::Transaction(cluster_id) => self
902 .try_get_cluster(cluster_id)
903 .ok_or(AdapterError::ConcurrentClusterDrop),
904 }
905 }
906
907 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
908 if session.user().name != SYSTEM_USER.name
912 && session.user().name != SUPPORT_USER.name
913 && session.vars().cluster() == SYSTEM_USER.name
914 {
915 coord_bail!(
916 "system cluster '{}' cannot execute user queries",
917 SYSTEM_USER.name
918 );
919 }
920 let cluster = self.resolve_cluster(session.vars().cluster())?;
921 Ok(cluster)
922 }
923
924 pub fn state(&self) -> &CatalogState {
925 &self.state
926 }
927
928 pub fn resolve_full_name(
929 &self,
930 name: &QualifiedItemName,
931 conn_id: Option<&ConnectionId>,
932 ) -> FullItemName {
933 self.state.resolve_full_name(name, conn_id)
934 }
935
936 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
937 self.state.try_get_entry(id)
938 }
939
940 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
941 self.state.try_get_entry_by_global_id(id)
942 }
943
944 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
945 self.state.get_entry(id)
946 }
947
948 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
949 self.state.get_entry_by_global_id(id)
950 }
951
952 pub fn get_global_ids<'a>(
953 &'a self,
954 id: &CatalogItemId,
955 ) -> impl Iterator<Item = GlobalId> + use<'a> {
956 self.get_entry(id).global_ids()
957 }
958
959 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
960 self.get_entry_by_global_id(id).id()
961 }
962
963 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
964 let item = self.try_get_entry_by_global_id(id)?;
965 Some(item.id())
966 }
967
968 pub fn get_schema(
969 &self,
970 database_spec: &ResolvedDatabaseSpecifier,
971 schema_spec: &SchemaSpecifier,
972 conn_id: &ConnectionId,
973 ) -> &Schema {
974 self.state.get_schema(database_spec, schema_spec, conn_id)
975 }
976
977 pub fn try_get_schema(
978 &self,
979 database_spec: &ResolvedDatabaseSpecifier,
980 schema_spec: &SchemaSpecifier,
981 conn_id: &ConnectionId,
982 ) -> Option<&Schema> {
983 self.state
984 .try_get_schema(database_spec, schema_spec, conn_id)
985 }
986
987 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
988 self.state.get_mz_catalog_schema_id()
989 }
990
991 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
992 self.state.get_pg_catalog_schema_id()
993 }
994
995 pub fn get_information_schema_id(&self) -> SchemaId {
996 self.state.get_information_schema_id()
997 }
998
999 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1000 self.state.get_mz_internal_schema_id()
1001 }
1002
1003 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1004 self.state.get_mz_introspection_schema_id()
1005 }
1006
1007 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1008 self.state.get_mz_unsafe_schema_id()
1009 }
1010
1011 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1012 self.state.system_schema_ids()
1013 }
1014
1015 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1016 self.state.get_database(id)
1017 }
1018
1019 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1020 self.state.try_get_role(id)
1021 }
1022
1023 pub fn get_role(&self, id: &RoleId) -> &Role {
1024 self.state.get_role(id)
1025 }
1026
1027 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1028 self.state.try_get_role_by_name(role_name)
1029 }
1030
1031 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1032 self.state.try_get_role_auth_by_id(id)
1033 }
1034
1035 pub fn create_temporary_schema(
1038 &mut self,
1039 conn_id: &ConnectionId,
1040 owner_id: RoleId,
1041 ) -> Result<(), Error> {
1042 self.state.create_temporary_schema(conn_id, owner_id)
1043 }
1044
1045 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1046 self.state
1048 .temporary_schemas
1049 .get(conn_id)
1050 .map(|schema| schema.items.contains_key(item_name))
1051 .unwrap_or(false)
1052 }
1053
1054 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1057 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1058 return Ok(());
1059 };
1060 if !schema.items.is_empty() {
1061 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1062 }
1063 Ok(())
1064 }
1065
1066 pub(crate) fn object_dependents(
1067 &self,
1068 object_ids: &Vec<ObjectId>,
1069 conn_id: &ConnectionId,
1070 ) -> Vec<ObjectId> {
1071 let mut seen = BTreeSet::new();
1072 self.state.object_dependents(object_ids, conn_id, &mut seen)
1073 }
1074
1075 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1076 FullNameV1 {
1077 database: name.database.to_string(),
1078 schema: name.schema.clone(),
1079 item: name.item.clone(),
1080 }
1081 }
1082
1083 pub fn find_available_cluster_name(&self, name: &str) -> String {
1084 let mut i = 0;
1085 let mut candidate = name.to_string();
1086 while self.state.clusters_by_name.contains_key(&candidate) {
1087 i += 1;
1088 candidate = format!("{}{}", name, i);
1089 }
1090 candidate
1091 }
1092
1093 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1094 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1095 self.cluster_replica_sizes()
1096 .enabled_allocations()
1097 .map(|a| a.0.to_owned())
1098 .collect::<Vec<_>>()
1099 } else {
1100 self.system_config().allowed_cluster_replica_sizes()
1101 }
1102 }
1103
1104 pub fn concretize_replica_location(
1105 &self,
1106 location: mz_catalog::durable::ReplicaLocation,
1107 allowed_sizes: &Vec<String>,
1108 allowed_availability_zones: Option<&[String]>,
1109 ) -> Result<ReplicaLocation, Error> {
1110 self.state
1111 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1112 }
1113
1114 pub(crate) fn ensure_valid_replica_size(
1115 &self,
1116 allowed_sizes: &[String],
1117 size: &String,
1118 ) -> Result<(), Error> {
1119 self.state.ensure_valid_replica_size(allowed_sizes, size)
1120 }
1121
1122 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1123 &self.state.cluster_replica_sizes
1124 }
1125
1126 pub fn get_privileges(
1128 &self,
1129 id: &SystemObjectId,
1130 conn_id: &ConnectionId,
1131 ) -> Option<&PrivilegeMap> {
1132 match id {
1133 SystemObjectId::Object(id) => match id {
1134 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1135 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1136 ObjectId::Schema((database_spec, schema_spec)) => Some(
1137 self.get_schema(database_spec, schema_spec, conn_id)
1138 .privileges(),
1139 ),
1140 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1141 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1142 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1143 },
1144 SystemObjectId::System => Some(&self.state.system_privileges),
1145 }
1146 }
1147
1148 #[mz_ore::instrument(level = "debug")]
1149 pub async fn advance_upper(&self, new_upper: mz_repr::Timestamp) -> Result<(), AdapterError> {
1150 Ok(self.storage().await.advance_upper(new_upper).await?)
1151 }
1152
1153 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1155 self.state.introspection_dependencies(id)
1156 }
1157
1158 pub fn dump(&self) -> Result<CatalogDump, Error> {
1164 Ok(CatalogDump::new(self.state.dump(None)?))
1165 }
1166
1167 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1171 self.state.check_consistency().map_err(|inconsistencies| {
1172 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1173 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1174 })
1175 })
1176 }
1177
1178 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1179 self.state.config()
1180 }
1181
1182 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1183 self.state.entry_by_id.values()
1184 }
1185
1186 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1187 self.entries()
1188 .filter(|entry| entry.is_connection() && entry.id().is_user())
1189 }
1190
1191 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1192 self.entries()
1193 .filter(|entry| entry.is_table() && entry.id().is_user())
1194 }
1195
1196 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1197 self.entries()
1198 .filter(|entry| entry.is_source() && entry.id().is_user())
1199 }
1200
1201 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1202 self.entries()
1203 .filter(|entry| entry.is_sink() && entry.id().is_user())
1204 }
1205
1206 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1207 self.entries()
1208 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1209 }
1210
1211 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1212 self.entries()
1213 .filter(|entry| entry.is_secret() && entry.id().is_user())
1214 }
1215
1216 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1217 self.state.get_network_policy(&network_policy_id)
1218 }
1219
1220 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1221 self.state.try_get_network_policy_by_name(name)
1222 }
1223
1224 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1225 self.state.clusters_by_id.values()
1226 }
1227
1228 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1229 self.state.get_cluster(cluster_id)
1230 }
1231
1232 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1233 self.state.try_get_cluster(cluster_id)
1234 }
1235
1236 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1237 self.clusters().filter(|cluster| cluster.id.is_user())
1238 }
1239
1240 pub fn get_cluster_replica(
1241 &self,
1242 cluster_id: ClusterId,
1243 replica_id: ReplicaId,
1244 ) -> &ClusterReplica {
1245 self.state.get_cluster_replica(cluster_id, replica_id)
1246 }
1247
1248 pub fn try_get_cluster_replica(
1249 &self,
1250 cluster_id: ClusterId,
1251 replica_id: ReplicaId,
1252 ) -> Option<&ClusterReplica> {
1253 self.state.try_get_cluster_replica(cluster_id, replica_id)
1254 }
1255
1256 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1257 self.user_clusters()
1258 .flat_map(|cluster| cluster.user_replicas())
1259 }
1260
1261 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1262 self.state.database_by_id.values()
1263 }
1264
1265 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1266 self.state
1267 .roles_by_id
1268 .values()
1269 .filter(|role| role.is_user())
1270 }
1271
1272 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1273 self.entries()
1274 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1275 }
1276
1277 pub fn user_network_policies(&self) -> impl Iterator<Item = &NetworkPolicy> {
1278 self.state
1279 .network_policies_by_id
1280 .iter()
1281 .filter(|(id, _)| id.is_user())
1282 .map(|(_, policy)| policy)
1283 }
1284
1285 pub fn system_privileges(&self) -> &PrivilegeMap {
1286 &self.state.system_privileges
1287 }
1288
1289 pub fn default_privileges(
1290 &self,
1291 ) -> impl Iterator<
1292 Item = (
1293 &DefaultPrivilegeObject,
1294 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1295 ),
1296 > {
1297 self.state.default_privileges.iter()
1298 }
1299
1300 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1301 self.state
1302 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1303 }
1304
1305 pub fn pack_storage_usage_update(
1306 &self,
1307 event: VersionedStorageUsage,
1308 diff: Diff,
1309 ) -> BuiltinTableUpdate {
1310 self.state
1311 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1312 }
1313
1314 pub fn system_config(&self) -> &SystemVars {
1315 self.state.system_config()
1316 }
1317
1318 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1319 self.state.system_config_mut()
1320 }
1321
1322 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1323 self.state.ensure_not_reserved_role(role_id)
1324 }
1325
1326 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1327 self.state.ensure_grantable_role(role_id)
1328 }
1329
1330 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1331 self.state.ensure_not_system_role(role_id)
1332 }
1333
1334 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1335 self.state.ensure_not_predefined_role(role_id)
1336 }
1337
1338 pub fn ensure_not_reserved_network_policy(
1339 &self,
1340 network_policy_id: &NetworkPolicyId,
1341 ) -> Result<(), Error> {
1342 self.state
1343 .ensure_not_reserved_network_policy(network_policy_id)
1344 }
1345
1346 pub fn ensure_not_reserved_object(
1347 &self,
1348 object_id: &ObjectId,
1349 conn_id: &ConnectionId,
1350 ) -> Result<(), Error> {
1351 match object_id {
1352 ObjectId::Cluster(cluster_id) => {
1353 if cluster_id.is_system() {
1354 let cluster = self.get_cluster(*cluster_id);
1355 Err(Error::new(ErrorKind::ReadOnlyCluster(
1356 cluster.name().to_string(),
1357 )))
1358 } else {
1359 Ok(())
1360 }
1361 }
1362 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1363 if replica_id.is_system() {
1364 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1365 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1366 replica.name().to_string(),
1367 )))
1368 } else {
1369 Ok(())
1370 }
1371 }
1372 ObjectId::Database(database_id) => {
1373 if database_id.is_system() {
1374 let database = self.get_database(database_id);
1375 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1376 database.name().to_string(),
1377 )))
1378 } else {
1379 Ok(())
1380 }
1381 }
1382 ObjectId::Schema((database_spec, schema_spec)) => {
1383 if schema_spec.is_system() {
1384 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1385 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1386 schema.name().schema.clone(),
1387 )))
1388 } else {
1389 Ok(())
1390 }
1391 }
1392 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1393 ObjectId::Item(item_id) => {
1394 if item_id.is_system() {
1395 let item = self.get_entry(item_id);
1396 let name = self.resolve_full_name(item.name(), Some(conn_id));
1397 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1398 } else {
1399 Ok(())
1400 }
1401 }
1402 ObjectId::NetworkPolicy(network_policy_id) => {
1403 self.ensure_not_reserved_network_policy(network_policy_id)
1404 }
1405 }
1406 }
1407
1408 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1410 &mut self,
1411 create_sql: &str,
1412 force_if_exists_skip: bool,
1413 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1414 self.state
1415 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1416 }
1417
1418 pub(crate) fn cache_expressions(
1424 &self,
1425 id: GlobalId,
1426 local_mir: Option<OptimizedMirRelationExpr>,
1427 optimizer_features: OptimizerFeatures,
1428 ) {
1429 let Some(mut global_mir) = self.try_get_optimized_plan(&id).cloned() else {
1430 soft_panic_or_log!("optimized plan missing for ID {id}");
1431 return;
1432 };
1433 let Some(mut physical_plan) = self.try_get_physical_plan(&id).cloned() else {
1434 soft_panic_or_log!("physical plan missing for ID {id}");
1435 return;
1436 };
1437 let Some(dataflow_metainfos) = self.try_get_dataflow_metainfo(&id).cloned() else {
1438 soft_panic_or_log!("dataflow metainfo missing for ID {id}");
1439 return;
1440 };
1441
1442 global_mir.as_of = None;
1445 global_mir.until = Default::default();
1446 physical_plan.as_of = None;
1447 physical_plan.until = Default::default();
1448
1449 let mut local_exprs = Vec::new();
1450 if let Some(local_mir) = local_mir {
1451 local_exprs.push((
1452 id,
1453 LocalExpressions {
1454 local_mir,
1455 optimizer_features: optimizer_features.clone(),
1456 },
1457 ));
1458 }
1459 let global_exprs = vec![(
1460 id,
1461 GlobalExpressions {
1462 global_mir,
1463 physical_plan,
1464 dataflow_metainfos,
1465 optimizer_features,
1466 },
1467 )];
1468 let _fut = self.update_expression_cache(local_exprs, global_exprs, Default::default());
1469 }
1470
1471 pub(crate) fn update_expression_cache<'a, 'b>(
1472 &'a self,
1473 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1474 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1475 invalidate_ids: BTreeSet<GlobalId>,
1476 ) -> BoxFuture<'b, ()> {
1477 if let Some(expr_cache) = &self.expr_cache_handle {
1478 expr_cache
1479 .update(
1480 new_local_expressions,
1481 new_global_expressions,
1482 invalidate_ids,
1483 )
1484 .boxed()
1485 } else {
1486 async {}.boxed()
1487 }
1488 }
1489
1490 #[cfg(test)]
1494 async fn sync_to_current_updates(
1495 &mut self,
1496 ) -> Result<
1497 (
1498 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1499 Vec<ParsedStateUpdate>,
1500 ),
1501 CatalogError,
1502 > {
1503 let updates = self.storage().await.sync_to_current_updates().await?;
1504 let (builtin_table_updates, catalog_updates) = self
1505 .state
1506 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1507 .await;
1508 Ok((builtin_table_updates, catalog_updates))
1509 }
1510}
1511
1512pub fn is_reserved_name(name: &str) -> bool {
1513 BUILTIN_PREFIXES
1514 .iter()
1515 .any(|prefix| name.starts_with(prefix))
1516}
1517
1518pub fn is_reserved_role_name(name: &str) -> bool {
1519 is_reserved_name(name) || is_public_role(name)
1520}
1521
1522pub fn is_public_role(name: &str) -> bool {
1523 name == &*PUBLIC_ROLE_NAME
1524}
1525
1526pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1527 object_type_to_audit_object_type(sql_type.into())
1528}
1529
1530pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1531 match id {
1532 CommentObjectId::Table(_) => ObjectType::Table,
1533 CommentObjectId::View(_) => ObjectType::View,
1534 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1535 CommentObjectId::Source(_) => ObjectType::Source,
1536 CommentObjectId::Sink(_) => ObjectType::Sink,
1537 CommentObjectId::Index(_) => ObjectType::Index,
1538 CommentObjectId::Func(_) => ObjectType::Func,
1539 CommentObjectId::Connection(_) => ObjectType::Connection,
1540 CommentObjectId::Type(_) => ObjectType::Type,
1541 CommentObjectId::Secret(_) => ObjectType::Secret,
1542 CommentObjectId::Role(_) => ObjectType::Role,
1543 CommentObjectId::Database(_) => ObjectType::Database,
1544 CommentObjectId::Schema(_) => ObjectType::Schema,
1545 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1546 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1547 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1548 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1549 }
1550}
1551
1552pub(crate) fn object_type_to_audit_object_type(
1553 object_type: mz_sql::catalog::ObjectType,
1554) -> ObjectType {
1555 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1556}
1557
1558pub(crate) fn system_object_type_to_audit_object_type(
1559 system_type: &SystemObjectType,
1560) -> ObjectType {
1561 match system_type {
1562 SystemObjectType::Object(object_type) => match object_type {
1563 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1564 mz_sql::catalog::ObjectType::View => ObjectType::View,
1565 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1566 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1567 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1568 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1569 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1570 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1571 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1572 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1573 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1574 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1575 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1576 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1577 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1578 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1579 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1580 },
1581 SystemObjectType::System => ObjectType::System,
1582 }
1583}
1584
1585#[derive(Debug, Copy, Clone)]
1586pub enum UpdatePrivilegeVariant {
1587 Grant,
1588 Revoke,
1589}
1590
1591impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1592 fn from(variant: UpdatePrivilegeVariant) -> Self {
1593 match variant {
1594 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1595 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1596 }
1597 }
1598}
1599
1600impl From<UpdatePrivilegeVariant> for EventType {
1601 fn from(variant: UpdatePrivilegeVariant) -> Self {
1602 match variant {
1603 UpdatePrivilegeVariant::Grant => EventType::Grant,
1604 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1605 }
1606 }
1607}
1608
1609impl ConnCatalog<'_> {
1610 fn resolve_item_name(
1611 &self,
1612 name: &PartialItemName,
1613 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1614 self.resolve_item(name).map(|entry| entry.name())
1615 }
1616
1617 fn resolve_function_name(
1618 &self,
1619 name: &PartialItemName,
1620 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1621 self.resolve_function(name).map(|entry| entry.name())
1622 }
1623
1624 fn resolve_type_name(
1625 &self,
1626 name: &PartialItemName,
1627 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1628 self.resolve_type(name).map(|entry| entry.name())
1629 }
1630}
1631
1632impl ExprHumanizer for ConnCatalog<'_> {
1633 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1634 let entry = self.state.try_get_entry_by_global_id(&id)?;
1635 Some(self.resolve_full_name(entry.name()).to_string())
1636 }
1637
1638 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1639 let entry = self.state.try_get_entry_by_global_id(&id)?;
1640 Some(entry.name().item.clone())
1641 }
1642
1643 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1644 let entry = self.state.try_get_entry_by_global_id(&id)?;
1645 Some(self.resolve_full_name(entry.name()).into_parts())
1646 }
1647
1648 fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1649 use SqlScalarType::*;
1650
1651 match typ {
1652 Array(t) => format!("{}[]", self.humanize_sql_scalar_type(t, postgres_compat)),
1653 List {
1654 custom_id: Some(item_id),
1655 ..
1656 }
1657 | Map {
1658 custom_id: Some(item_id),
1659 ..
1660 } => {
1661 let item = self.get_item(item_id);
1662 self.minimal_qualification(item.name()).to_string()
1663 }
1664 List { element_type, .. } => {
1665 format!(
1666 "{} list",
1667 self.humanize_sql_scalar_type(element_type, postgres_compat)
1668 )
1669 }
1670 Map { value_type, .. } => format!(
1671 "map[{}=>{}]",
1672 self.humanize_sql_scalar_type(&SqlScalarType::String, postgres_compat),
1673 self.humanize_sql_scalar_type(value_type, postgres_compat)
1674 ),
1675 Record {
1676 custom_id: Some(item_id),
1677 ..
1678 } => {
1679 let item = self.get_item(item_id);
1680 self.minimal_qualification(item.name()).to_string()
1681 }
1682 Record { fields, .. } => format!(
1683 "record({})",
1684 fields
1685 .iter()
1686 .map(|f| format!(
1687 "{}: {}",
1688 f.0,
1689 self.humanize_sql_column_type(&f.1, postgres_compat)
1690 ))
1691 .join(",")
1692 ),
1693 PgLegacyChar => "\"char\"".into(),
1694 Char { length } if !postgres_compat => match length {
1695 None => "char".into(),
1696 Some(length) => format!("char({})", length.into_u32()),
1697 },
1698 VarChar { max_length } if !postgres_compat => match max_length {
1699 None => "varchar".into(),
1700 Some(length) => format!("varchar({})", length.into_u32()),
1701 },
1702 UInt16 => "uint2".into(),
1703 UInt32 => "uint4".into(),
1704 UInt64 => "uint8".into(),
1705 ty => {
1706 let pgrepr_type = mz_pgrepr::Type::from(ty);
1707 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1708
1709 let res = if self
1710 .effective_search_path(true)
1711 .iter()
1712 .any(|(_, schema)| schema == &pg_catalog_schema)
1713 {
1714 pgrepr_type.name().to_string()
1715 } else {
1716 let name = QualifiedItemName {
1719 qualifiers: ItemQualifiers {
1720 database_spec: ResolvedDatabaseSpecifier::Ambient,
1721 schema_spec: pg_catalog_schema,
1722 },
1723 item: pgrepr_type.name().to_string(),
1724 };
1725 self.resolve_full_name(&name).to_string()
1726 };
1727 res
1728 }
1729 }
1730 }
1731
1732 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1733 let entry = self.state.try_get_entry_by_global_id(&id)?;
1734
1735 match entry.index() {
1736 Some(index) => {
1737 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1738 let mut on_names = on_desc
1739 .iter_names()
1740 .map(|col_name| col_name.to_string())
1741 .collect::<Vec<_>>();
1742
1743 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1744
1745 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1749 let mut ix_names = vec![String::new(); ix_arity];
1750
1751 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1753 let on_name = on_names.get_mut(on_pos).expect("on_name");
1754 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1755 std::mem::swap(on_name, ix_name);
1756 }
1757
1758 Some(ix_names) }
1760 None => {
1761 let desc = self.state.try_get_desc_by_global_id(&id)?;
1762 let column_names = desc
1763 .iter_names()
1764 .map(|col_name| col_name.to_string())
1765 .collect();
1766
1767 Some(column_names)
1768 }
1769 }
1770 }
1771
1772 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1773 let desc = self.state.try_get_desc_by_global_id(&id)?;
1774 Some(desc.get_name(column).to_string())
1775 }
1776
1777 fn id_exists(&self, id: GlobalId) -> bool {
1778 self.state.entry_by_global_id.contains_key(&id)
1779 }
1780}
1781
1782impl SessionCatalog for ConnCatalog<'_> {
1783 fn active_role_id(&self) -> &RoleId {
1784 &self.role_id
1785 }
1786
1787 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1788 self.prepared_statements
1789 .as_ref()
1790 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1791 .flatten()
1792 }
1793
1794 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1795 self.portals
1796 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1797 }
1798
1799 fn active_database(&self) -> Option<&DatabaseId> {
1800 self.database.as_ref()
1801 }
1802
1803 fn active_cluster(&self) -> &str {
1804 &self.cluster
1805 }
1806
1807 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1808 &self.search_path
1809 }
1810
1811 fn resolve_database(
1812 &self,
1813 database_name: &str,
1814 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1815 Ok(self.state.resolve_database(database_name)?)
1816 }
1817
1818 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1819 self.state
1820 .database_by_id
1821 .get(id)
1822 .expect("database doesn't exist")
1823 }
1824
1825 #[allow(clippy::as_conversions)]
1827 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1828 self.state
1829 .database_by_id
1830 .values()
1831 .map(|database| database as &dyn CatalogDatabase)
1832 .collect()
1833 }
1834
1835 fn resolve_schema(
1836 &self,
1837 database_name: Option<&str>,
1838 schema_name: &str,
1839 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1840 Ok(self.state.resolve_schema(
1841 self.database.as_ref(),
1842 database_name,
1843 schema_name,
1844 &self.conn_id,
1845 )?)
1846 }
1847
1848 fn resolve_schema_in_database(
1849 &self,
1850 database_spec: &ResolvedDatabaseSpecifier,
1851 schema_name: &str,
1852 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1853 Ok(self
1854 .state
1855 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1856 }
1857
1858 fn get_schema(
1859 &self,
1860 database_spec: &ResolvedDatabaseSpecifier,
1861 schema_spec: &SchemaSpecifier,
1862 ) -> &dyn CatalogSchema {
1863 self.state
1864 .get_schema(database_spec, schema_spec, &self.conn_id)
1865 }
1866
1867 #[allow(clippy::as_conversions)]
1869 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1870 self.get_databases()
1871 .into_iter()
1872 .flat_map(|database| database.schemas().into_iter())
1873 .chain(
1874 self.state
1875 .ambient_schemas_by_id
1876 .values()
1877 .chain(self.state.temporary_schemas.values())
1878 .map(|schema| schema as &dyn CatalogSchema),
1879 )
1880 .collect()
1881 }
1882
1883 fn get_mz_internal_schema_id(&self) -> SchemaId {
1884 self.state().get_mz_internal_schema_id()
1885 }
1886
1887 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1888 self.state().get_mz_unsafe_schema_id()
1889 }
1890
1891 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1892 self.state.is_system_schema_specifier(schema)
1893 }
1894
1895 fn resolve_role(
1896 &self,
1897 role_name: &str,
1898 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1899 match self.state.try_get_role_by_name(role_name) {
1900 Some(role) => Ok(role),
1901 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1902 }
1903 }
1904
1905 fn resolve_network_policy(
1906 &self,
1907 policy_name: &str,
1908 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1909 match self.state.try_get_network_policy_by_name(policy_name) {
1910 Some(policy) => Ok(policy),
1911 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1912 }
1913 }
1914
1915 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1916 Some(self.state.roles_by_id.get(id)?)
1917 }
1918
1919 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1920 self.state.get_role(id)
1921 }
1922
1923 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1924 #[allow(clippy::as_conversions)]
1926 self.state
1927 .roles_by_id
1928 .values()
1929 .map(|role| role as &dyn CatalogRole)
1930 .collect()
1931 }
1932
1933 fn mz_system_role_id(&self) -> RoleId {
1934 MZ_SYSTEM_ROLE_ID
1935 }
1936
1937 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1938 self.state.collect_role_membership(id)
1939 }
1940
1941 fn get_network_policy(
1942 &self,
1943 id: &NetworkPolicyId,
1944 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1945 self.state.get_network_policy(id)
1946 }
1947
1948 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1949 #[allow(clippy::as_conversions)]
1951 self.state
1952 .network_policies_by_id
1953 .values()
1954 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1955 .collect()
1956 }
1957
1958 fn resolve_cluster(
1959 &self,
1960 cluster_name: Option<&str>,
1961 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1962 Ok(self
1963 .state
1964 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1965 }
1966
1967 fn resolve_cluster_replica(
1968 &self,
1969 cluster_replica_name: &QualifiedReplica,
1970 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1971 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1972 }
1973
1974 fn resolve_item(
1975 &self,
1976 name: &PartialItemName,
1977 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1978 let r = self.state.resolve_entry(
1979 self.database.as_ref(),
1980 &self.effective_search_path(true),
1981 name,
1982 &self.conn_id,
1983 )?;
1984 if self.unresolvable_ids.contains(&r.id()) {
1985 Err(SqlCatalogError::UnknownItem(name.to_string()))
1986 } else {
1987 Ok(r)
1988 }
1989 }
1990
1991 fn resolve_function(
1992 &self,
1993 name: &PartialItemName,
1994 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1995 let r = self.state.resolve_function(
1996 self.database.as_ref(),
1997 &self.effective_search_path(false),
1998 name,
1999 &self.conn_id,
2000 )?;
2001
2002 if self.unresolvable_ids.contains(&r.id()) {
2003 Err(SqlCatalogError::UnknownFunction {
2004 name: name.to_string(),
2005 alternative: None,
2006 })
2007 } else {
2008 Ok(r)
2009 }
2010 }
2011
2012 fn resolve_type(
2013 &self,
2014 name: &PartialItemName,
2015 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2016 let r = self.state.resolve_type(
2017 self.database.as_ref(),
2018 &self.effective_search_path(false),
2019 name,
2020 &self.conn_id,
2021 )?;
2022
2023 if self.unresolvable_ids.contains(&r.id()) {
2024 Err(SqlCatalogError::UnknownType {
2025 name: name.to_string(),
2026 })
2027 } else {
2028 Ok(r)
2029 }
2030 }
2031
2032 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2033 self.state.get_system_type(name)
2034 }
2035
2036 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2037 Some(self.state.try_get_entry(id)?)
2038 }
2039
2040 fn try_get_item_by_global_id(
2041 &self,
2042 id: &GlobalId,
2043 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2044 let entry = self.state.try_get_entry_by_global_id(id)?;
2045 let entry = match &entry.item {
2046 CatalogItem::Table(table) => {
2047 let (version, _gid) = table
2048 .collections
2049 .iter()
2050 .find(|(_version, gid)| *gid == id)
2051 .expect("catalog out of sync, mismatched GlobalId");
2052 entry.at_version(RelationVersionSelector::Specific(*version))
2053 }
2054 _ => entry.at_version(RelationVersionSelector::Latest),
2055 };
2056 Some(entry)
2057 }
2058
2059 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2060 self.state.get_entry(id)
2061 }
2062
2063 fn get_item_by_global_id(
2064 &self,
2065 id: &GlobalId,
2066 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2067 let entry = self.state.get_entry_by_global_id(id);
2068 let entry = match &entry.item {
2069 CatalogItem::Table(table) => {
2070 let (version, _gid) = table
2071 .collections
2072 .iter()
2073 .find(|(_version, gid)| *gid == id)
2074 .expect("catalog out of sync, mismatched GlobalId");
2075 entry.at_version(RelationVersionSelector::Specific(*version))
2076 }
2077 _ => entry.at_version(RelationVersionSelector::Latest),
2078 };
2079 entry
2080 }
2081
2082 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2083 self.get_schemas()
2084 .into_iter()
2085 .flat_map(|schema| schema.item_ids())
2086 .map(|id| self.get_item(&id))
2087 .collect()
2088 }
2089
2090 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2091 self.state
2092 .get_item_by_name(name, &self.conn_id)
2093 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2094 }
2095
2096 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2097 self.state
2098 .get_type_by_name(name, &self.conn_id)
2099 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2100 }
2101
2102 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2103 &self.state.clusters_by_id[&id]
2104 }
2105
2106 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2107 self.state
2108 .clusters_by_id
2109 .values()
2110 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2111 .collect()
2112 }
2113
2114 fn get_cluster_replica(
2115 &self,
2116 cluster_id: ClusterId,
2117 replica_id: ReplicaId,
2118 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2119 let cluster = self.get_cluster(cluster_id);
2120 cluster.replica(replica_id)
2121 }
2122
2123 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2124 self.get_clusters()
2125 .into_iter()
2126 .flat_map(|cluster| cluster.replicas().into_iter())
2127 .collect()
2128 }
2129
2130 fn get_system_privileges(&self) -> &PrivilegeMap {
2131 &self.state.system_privileges
2132 }
2133
2134 fn get_default_privileges(
2135 &self,
2136 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2137 self.state
2138 .default_privileges
2139 .iter()
2140 .map(|(object, acl_items)| (object, acl_items.collect()))
2141 .collect()
2142 }
2143
2144 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2145 self.state.find_available_name(name, &self.conn_id)
2146 }
2147
2148 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2149 self.state.resolve_full_name(name, Some(&self.conn_id))
2150 }
2151
2152 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2153 self.state.resolve_full_schema_name(name)
2154 }
2155
2156 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2157 self.state.get_entry_by_global_id(global_id).id()
2158 }
2159
2160 fn resolve_global_id(
2161 &self,
2162 item_id: &CatalogItemId,
2163 version: RelationVersionSelector,
2164 ) -> GlobalId {
2165 self.state
2166 .get_entry(item_id)
2167 .at_version(version)
2168 .global_id()
2169 }
2170
2171 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2172 self.state.config()
2173 }
2174
2175 fn now(&self) -> EpochMillis {
2176 (self.state.config().now)()
2177 }
2178
2179 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2180 self.state.aws_privatelink_availability_zones.clone()
2181 }
2182
2183 fn system_vars(&self) -> &SystemVars {
2184 &self.state.system_configuration
2185 }
2186
2187 fn system_vars_mut(&mut self) -> &mut SystemVars {
2188 Arc::make_mut(&mut self.state.to_mut().system_configuration)
2189 }
2190
2191 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2192 self.state().get_owner_id(id, self.conn_id())
2193 }
2194
2195 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2196 match id {
2197 SystemObjectId::System => Some(&self.state.system_privileges),
2198 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2199 Some(self.get_cluster(*id).privileges())
2200 }
2201 SystemObjectId::Object(ObjectId::Database(id)) => {
2202 Some(self.get_database(id).privileges())
2203 }
2204 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2205 self.state
2208 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2209 .map(|schema| schema.privileges())
2210 }
2211 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2212 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2213 Some(self.get_network_policy(id).privileges())
2214 }
2215 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2216 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2217 }
2218 }
2219
2220 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2221 let mut seen = BTreeSet::new();
2222 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2223 }
2224
2225 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2226 let mut seen = BTreeSet::new();
2227 self.state.item_dependents(id, &mut seen)
2228 }
2229
2230 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2231 rbac::all_object_privileges(object_type)
2232 }
2233
2234 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2235 self.state.get_object_type(object_id)
2236 }
2237
2238 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2239 self.state.get_system_object_type(id)
2240 }
2241
2242 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2249 if qualified_name.qualifiers.schema_spec.is_temporary() {
2250 return qualified_name.item.clone().into();
2258 }
2259
2260 let database_id = match &qualified_name.qualifiers.database_spec {
2261 ResolvedDatabaseSpecifier::Ambient => None,
2262 ResolvedDatabaseSpecifier::Id(id)
2263 if self.database.is_some() && self.database == Some(*id) =>
2264 {
2265 None
2266 }
2267 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2268 };
2269
2270 let schema_spec = if database_id.is_none()
2271 && self.resolve_item_name(&PartialItemName {
2272 database: None,
2273 schema: None,
2274 item: qualified_name.item.clone(),
2275 }) == Ok(qualified_name)
2276 || self.resolve_function_name(&PartialItemName {
2277 database: None,
2278 schema: None,
2279 item: qualified_name.item.clone(),
2280 }) == Ok(qualified_name)
2281 || self.resolve_type_name(&PartialItemName {
2282 database: None,
2283 schema: None,
2284 item: qualified_name.item.clone(),
2285 }) == Ok(qualified_name)
2286 {
2287 None
2288 } else {
2289 Some(qualified_name.qualifiers.schema_spec.clone())
2292 };
2293
2294 let res = PartialItemName {
2295 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2296 schema: schema_spec.map(|spec| {
2297 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2298 .name()
2299 .schema
2300 .clone()
2301 }),
2302 item: qualified_name.item.clone(),
2303 };
2304 assert!(
2305 self.resolve_item_name(&res) == Ok(qualified_name)
2306 || self.resolve_function_name(&res) == Ok(qualified_name)
2307 || self.resolve_type_name(&res) == Ok(qualified_name)
2308 );
2309 res
2310 }
2311
2312 fn add_notice(&self, notice: PlanNotice) {
2313 let _ = self.notices_tx.send(notice.into());
2314 }
2315
2316 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2317 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2318 self.state.comments.get_object_comments(comment_id)
2319 }
2320
2321 fn is_cluster_size_cc(&self, size: &str) -> bool {
2322 self.state
2323 .cluster_replica_sizes
2324 .0
2325 .get(size)
2326 .map_or(false, |a| a.is_cc)
2327 }
2328}
2329
2330#[cfg(test)]
2331mod tests {
2332 use std::collections::{BTreeMap, BTreeSet};
2333 use std::sync::Arc;
2334 use std::{env, iter};
2335
2336 use itertools::Itertools;
2337 use mz_catalog::memory::objects::CatalogItem;
2338 use tokio_postgres::NoTls;
2339 use tokio_postgres::types::Type;
2340 use uuid::Uuid;
2341
2342 use mz_catalog::SYSTEM_CONN_ID;
2343 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2344 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2345 use mz_controller_types::{ClusterId, ReplicaId};
2346 use mz_expr::MirScalarExpr;
2347 use mz_ore::now::to_datetime;
2348 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2349 use mz_persist_client::PersistClient;
2350 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2351 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2352 use mz_repr::role_id::RoleId;
2353 use mz_repr::{
2354 CatalogItemId, Datum, GlobalId, RelationVersionSelector, Row, RowArena, SqlRelationType,
2355 SqlScalarType, Timestamp,
2356 };
2357 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2358 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2359 use mz_sql::names::{
2360 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2361 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2362 };
2363 use mz_sql::plan::{
2364 CoercibleScalarExpr, ExprContext, HirScalarExpr, HirToMirConfig, PlanContext, QueryContext,
2365 QueryLifetime, Scope, StatementContext,
2366 };
2367 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2368 use mz_sql::session::vars::{SystemVars, VarInput};
2369
2370 use crate::catalog::state::LocalExpressionCache;
2371 use crate::catalog::{Catalog, Op};
2372 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2373 use crate::session::Session;
2374
2375 #[mz_ore::test(tokio::test)]
2382 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2384 Catalog::with_debug(|catalog| async move {
2385 struct TestCase {
2386 input: QualifiedItemName,
2387 system_output: PartialItemName,
2388 normal_output: PartialItemName,
2389 }
2390
2391 let test_cases = vec![
2392 TestCase {
2393 input: QualifiedItemName {
2394 qualifiers: ItemQualifiers {
2395 database_spec: ResolvedDatabaseSpecifier::Ambient,
2396 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2397 },
2398 item: "numeric".to_string(),
2399 },
2400 system_output: PartialItemName {
2401 database: None,
2402 schema: None,
2403 item: "numeric".to_string(),
2404 },
2405 normal_output: PartialItemName {
2406 database: None,
2407 schema: None,
2408 item: "numeric".to_string(),
2409 },
2410 },
2411 TestCase {
2412 input: QualifiedItemName {
2413 qualifiers: ItemQualifiers {
2414 database_spec: ResolvedDatabaseSpecifier::Ambient,
2415 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2416 },
2417 item: "mz_array_types".to_string(),
2418 },
2419 system_output: PartialItemName {
2420 database: None,
2421 schema: None,
2422 item: "mz_array_types".to_string(),
2423 },
2424 normal_output: PartialItemName {
2425 database: None,
2426 schema: None,
2427 item: "mz_array_types".to_string(),
2428 },
2429 },
2430 ];
2431
2432 for tc in test_cases {
2433 assert_eq!(
2434 catalog
2435 .for_system_session()
2436 .minimal_qualification(&tc.input),
2437 tc.system_output
2438 );
2439 assert_eq!(
2440 catalog
2441 .for_session(&Session::dummy())
2442 .minimal_qualification(&tc.input),
2443 tc.normal_output
2444 );
2445 }
2446 catalog.expire().await;
2447 })
2448 .await
2449 }
2450
2451 #[mz_ore::test(tokio::test)]
2452 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2454 let persist_client = PersistClient::new_for_tests().await;
2455 let organization_id = Uuid::new_v4();
2456 let bootstrap_args = test_bootstrap_args();
2457 {
2458 let mut catalog = Catalog::open_debug_catalog(
2459 persist_client.clone(),
2460 organization_id.clone(),
2461 &bootstrap_args,
2462 )
2463 .await
2464 .expect("unable to open debug catalog");
2465 assert_eq!(catalog.transient_revision(), 1);
2466 let commit_ts = catalog.current_upper().await;
2467 catalog
2468 .transact(
2469 None,
2470 commit_ts,
2471 None,
2472 vec![Op::CreateDatabase {
2473 name: "test".to_string(),
2474 owner_id: MZ_SYSTEM_ROLE_ID,
2475 }],
2476 )
2477 .await
2478 .expect("failed to transact");
2479 assert_eq!(catalog.transient_revision(), 2);
2480 catalog.expire().await;
2481 }
2482 {
2483 let catalog =
2484 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2485 .await
2486 .expect("unable to open debug catalog");
2487 assert_eq!(catalog.transient_revision(), 1);
2489 catalog.expire().await;
2490 }
2491 }
2492
2493 #[mz_ore::test(tokio::test)]
2494 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2496 Catalog::with_debug(|catalog| async move {
2497 let mz_catalog_schema = (
2498 ResolvedDatabaseSpecifier::Ambient,
2499 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2500 );
2501 let pg_catalog_schema = (
2502 ResolvedDatabaseSpecifier::Ambient,
2503 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2504 );
2505 let mz_temp_schema = (
2506 ResolvedDatabaseSpecifier::Ambient,
2507 SchemaSpecifier::Temporary,
2508 );
2509
2510 let session = Session::dummy();
2512 let conn_catalog = catalog.for_session(&session);
2513 assert_ne!(
2514 conn_catalog.effective_search_path(false),
2515 conn_catalog.search_path
2516 );
2517 assert_ne!(
2518 conn_catalog.effective_search_path(true),
2519 conn_catalog.search_path
2520 );
2521 assert_eq!(
2522 conn_catalog.effective_search_path(false),
2523 vec![
2524 mz_catalog_schema.clone(),
2525 pg_catalog_schema.clone(),
2526 conn_catalog.search_path[0].clone()
2527 ]
2528 );
2529 assert_eq!(
2530 conn_catalog.effective_search_path(true),
2531 vec![
2532 mz_temp_schema.clone(),
2533 mz_catalog_schema.clone(),
2534 pg_catalog_schema.clone(),
2535 conn_catalog.search_path[0].clone()
2536 ]
2537 );
2538
2539 let mut session = Session::dummy();
2541 session
2542 .vars_mut()
2543 .set(
2544 &SystemVars::new(),
2545 "search_path",
2546 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2547 false,
2548 )
2549 .expect("failed to set search_path");
2550 let conn_catalog = catalog.for_session(&session);
2551 assert_ne!(
2552 conn_catalog.effective_search_path(false),
2553 conn_catalog.search_path
2554 );
2555 assert_ne!(
2556 conn_catalog.effective_search_path(true),
2557 conn_catalog.search_path
2558 );
2559 assert_eq!(
2560 conn_catalog.effective_search_path(false),
2561 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2562 );
2563 assert_eq!(
2564 conn_catalog.effective_search_path(true),
2565 vec![
2566 mz_temp_schema.clone(),
2567 mz_catalog_schema.clone(),
2568 pg_catalog_schema.clone()
2569 ]
2570 );
2571
2572 let mut session = Session::dummy();
2573 session
2574 .vars_mut()
2575 .set(
2576 &SystemVars::new(),
2577 "search_path",
2578 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2579 false,
2580 )
2581 .expect("failed to set search_path");
2582 let conn_catalog = catalog.for_session(&session);
2583 assert_ne!(
2584 conn_catalog.effective_search_path(false),
2585 conn_catalog.search_path
2586 );
2587 assert_ne!(
2588 conn_catalog.effective_search_path(true),
2589 conn_catalog.search_path
2590 );
2591 assert_eq!(
2592 conn_catalog.effective_search_path(false),
2593 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2594 );
2595 assert_eq!(
2596 conn_catalog.effective_search_path(true),
2597 vec![
2598 mz_temp_schema.clone(),
2599 pg_catalog_schema.clone(),
2600 mz_catalog_schema.clone()
2601 ]
2602 );
2603
2604 let mut session = Session::dummy();
2605 session
2606 .vars_mut()
2607 .set(
2608 &SystemVars::new(),
2609 "search_path",
2610 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2611 false,
2612 )
2613 .expect("failed to set search_path");
2614 let conn_catalog = catalog.for_session(&session);
2615 assert_ne!(
2616 conn_catalog.effective_search_path(false),
2617 conn_catalog.search_path
2618 );
2619 assert_ne!(
2620 conn_catalog.effective_search_path(true),
2621 conn_catalog.search_path
2622 );
2623 assert_eq!(
2624 conn_catalog.effective_search_path(false),
2625 vec![
2626 mz_catalog_schema.clone(),
2627 pg_catalog_schema.clone(),
2628 mz_temp_schema.clone()
2629 ]
2630 );
2631 assert_eq!(
2632 conn_catalog.effective_search_path(true),
2633 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2634 );
2635 catalog.expire().await;
2636 })
2637 .await
2638 }
2639
2640 #[mz_ore::test(tokio::test)]
2641 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2643 use mz_ore::collections::CollectionExt;
2644 Catalog::with_debug(|catalog| async move {
2645 let conn_catalog = catalog.for_system_session();
2646 let scx = &mut StatementContext::new(None, &conn_catalog);
2647
2648 let parsed = mz_sql_parser::parser::parse_statements(
2649 "create view public.foo as select 1 as bar",
2650 )
2651 .expect("")
2652 .into_element()
2653 .ast;
2654
2655 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2656
2657 assert_eq!(
2659 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2660 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2661 );
2662 catalog.expire().await;
2663 })
2664 .await;
2665 }
2666
2667 #[mz_ore::test(tokio::test)]
2669 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2671 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2672 let column = ", 1";
2673 let view_def_size = view_def.bytes().count();
2674 let column_size = column.bytes().count();
2675 let column_count =
2676 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2677 let columns = iter::repeat(column).take(column_count).join("");
2678 let create_sql = format!("{view_def}{columns})");
2679 let create_sql_check = create_sql.clone();
2680 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2681 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2682 &create_sql
2683 ));
2684
2685 let persist_client = PersistClient::new_for_tests().await;
2686 let organization_id = Uuid::new_v4();
2687 let id = CatalogItemId::User(1);
2688 let gid = GlobalId::User(1);
2689 let bootstrap_args = test_bootstrap_args();
2690 {
2691 let mut catalog = Catalog::open_debug_catalog(
2692 persist_client.clone(),
2693 organization_id.clone(),
2694 &bootstrap_args,
2695 )
2696 .await
2697 .expect("unable to open debug catalog");
2698 let item = catalog
2699 .state()
2700 .deserialize_item(
2701 gid,
2702 &create_sql,
2703 &BTreeMap::new(),
2704 &mut LocalExpressionCache::Closed,
2705 None,
2706 )
2707 .expect("unable to parse view");
2708 let commit_ts = catalog.current_upper().await;
2709 catalog
2710 .transact(
2711 None,
2712 commit_ts,
2713 None,
2714 vec![Op::CreateItem {
2715 item,
2716 name: QualifiedItemName {
2717 qualifiers: ItemQualifiers {
2718 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2719 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2720 },
2721 item: "v".to_string(),
2722 },
2723 id,
2724 owner_id: MZ_SYSTEM_ROLE_ID,
2725 }],
2726 )
2727 .await
2728 .expect("failed to transact");
2729 catalog.expire().await;
2730 }
2731 {
2732 let catalog =
2733 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2734 .await
2735 .expect("unable to open debug catalog");
2736 let view = catalog.get_entry(&id);
2737 assert_eq!("v", view.name.item);
2738 match &view.item {
2739 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2740 item => panic!("expected view, got {}", item.typ()),
2741 }
2742 catalog.expire().await;
2743 }
2744 }
2745
2746 #[mz_ore::test(tokio::test)]
2747 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2749 Catalog::with_debug(|catalog| async move {
2750 let conn_catalog = catalog.for_system_session();
2751
2752 assert_eq!(
2753 mz_sql::catalog::ObjectType::ClusterReplica,
2754 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2755 ClusterId::user(1).expect("1 is a valid ID"),
2756 ReplicaId::User(1)
2757 )))
2758 );
2759 assert_eq!(
2760 mz_sql::catalog::ObjectType::Role,
2761 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2762 );
2763 catalog.expire().await;
2764 })
2765 .await;
2766 }
2767
2768 #[mz_ore::test(tokio::test)]
2769 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2771 Catalog::with_debug(|catalog| async move {
2772 let conn_catalog = catalog.for_system_session();
2773
2774 assert_eq!(
2775 None,
2776 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2777 ClusterId::user(1).expect("1 is a valid ID"),
2778 ReplicaId::User(1),
2779 ))))
2780 );
2781 assert_eq!(
2782 None,
2783 conn_catalog
2784 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2785 );
2786 catalog.expire().await;
2787 })
2788 .await;
2789 }
2790
2791 #[mz_ore::test(tokio::test)]
2792 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2794 Catalog::with_debug(|catalog| async move {
2795 let conn_catalog = catalog.for_system_session();
2796
2797 let builtins_cfg = BuiltinsConfig {
2798 include_continual_tasks: true,
2799 };
2800 for builtin in BUILTINS::iter(&builtins_cfg) {
2801 let (schema, name, expected_desc) = match builtin {
2802 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2803 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2804 Builtin::MaterializedView(mv) => (&mv.schema, &mv.name, &mv.desc),
2805 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2806 Builtin::Log(_)
2807 | Builtin::Type(_)
2808 | Builtin::Func(_)
2809 | Builtin::ContinualTask(_)
2810 | Builtin::Index(_)
2811 | Builtin::Connection(_) => continue,
2812 };
2813 let item = conn_catalog
2814 .resolve_item(&PartialItemName {
2815 database: None,
2816 schema: Some(schema.to_string()),
2817 item: name.to_string(),
2818 })
2819 .expect("unable to resolve item")
2820 .at_version(RelationVersionSelector::Latest);
2821
2822 let actual_desc = item.relation_desc().expect("invalid item type");
2823 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2824 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2825 {
2826 assert_eq!(
2827 actual_name, expected_name,
2828 "item {schema}.{name} column {index} name did not match its expected name"
2829 );
2830 assert_eq!(
2831 actual_typ, expected_typ,
2832 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2833 );
2834 }
2835 assert_eq!(
2836 &*actual_desc, expected_desc,
2837 "item {schema}.{name} did not match its expected RelationDesc"
2838 );
2839 }
2840 catalog.expire().await;
2841 })
2842 .await
2843 }
2844
2845 #[mz_ore::test(tokio::test)]
2848 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2850 async fn inner(catalog: Catalog) {
2851 let (client, connection) = tokio_postgres::connect(
2855 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2856 NoTls,
2857 )
2858 .await
2859 .expect("failed to connect to Postgres");
2860
2861 task::spawn(|| "compare_builtin_postgres", async move {
2862 if let Err(e) = connection.await {
2863 panic!("connection error: {}", e);
2864 }
2865 });
2866
2867 struct PgProc {
2868 name: String,
2869 arg_oids: Vec<u32>,
2870 ret_oid: Option<u32>,
2871 ret_set: bool,
2872 }
2873
2874 struct PgType {
2875 name: String,
2876 ty: String,
2877 elem: u32,
2878 array: u32,
2879 input: u32,
2880 receive: u32,
2881 }
2882
2883 struct PgOper {
2884 oprresult: u32,
2885 name: String,
2886 }
2887
2888 let pg_proc: BTreeMap<_, _> = client
2889 .query(
2890 "SELECT
2891 p.oid,
2892 proname,
2893 proargtypes,
2894 prorettype,
2895 proretset
2896 FROM pg_proc p
2897 JOIN pg_namespace n ON p.pronamespace = n.oid",
2898 &[],
2899 )
2900 .await
2901 .expect("pg query failed")
2902 .into_iter()
2903 .map(|row| {
2904 let oid: u32 = row.get("oid");
2905 let pg_proc = PgProc {
2906 name: row.get("proname"),
2907 arg_oids: row.get("proargtypes"),
2908 ret_oid: row.get("prorettype"),
2909 ret_set: row.get("proretset"),
2910 };
2911 (oid, pg_proc)
2912 })
2913 .collect();
2914
2915 let pg_type: BTreeMap<_, _> = client
2916 .query(
2917 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2918 &[],
2919 )
2920 .await
2921 .expect("pg query failed")
2922 .into_iter()
2923 .map(|row| {
2924 let oid: u32 = row.get("oid");
2925 let pg_type = PgType {
2926 name: row.get("typname"),
2927 ty: row.get("typtype"),
2928 elem: row.get("typelem"),
2929 array: row.get("typarray"),
2930 input: row.get("typinput"),
2931 receive: row.get("typreceive"),
2932 };
2933 (oid, pg_type)
2934 })
2935 .collect();
2936
2937 let pg_oper: BTreeMap<_, _> = client
2938 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2939 .await
2940 .expect("pg query failed")
2941 .into_iter()
2942 .map(|row| {
2943 let oid: u32 = row.get("oid");
2944 let pg_oper = PgOper {
2945 name: row.get("oprname"),
2946 oprresult: row.get("oprresult"),
2947 };
2948 (oid, pg_oper)
2949 })
2950 .collect();
2951
2952 let conn_catalog = catalog.for_system_session();
2953 let resolve_type_oid = |item: &str| {
2954 conn_catalog
2955 .resolve_type(&PartialItemName {
2956 database: None,
2957 schema: Some(PG_CATALOG_SCHEMA.into()),
2960 item: item.to_string(),
2961 })
2962 .expect("unable to resolve type")
2963 .oid()
2964 };
2965
2966 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2967 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2968 .collect();
2969
2970 let mut all_oids = BTreeSet::new();
2971
2972 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2975 [
2976 (Type::NAME, Type::TEXT),
2978 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2979 (Type::TIME, Type::TIMETZ),
2981 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2982 ]
2983 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2984 );
2985 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2986 1619, ]);
2988 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2989 if ignore_return_types.contains(&fn_oid) {
2990 return true;
2991 }
2992 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2993 return true;
2994 }
2995 a == b
2996 };
2997
2998 let builtins_cfg = BuiltinsConfig {
2999 include_continual_tasks: true,
3000 };
3001 for builtin in BUILTINS::iter(&builtins_cfg) {
3002 match builtin {
3003 Builtin::Type(ty) => {
3004 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3005
3006 if ty.oid >= FIRST_MATERIALIZE_OID {
3007 continue;
3010 }
3011
3012 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3015 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3016 });
3017 assert_eq!(
3018 ty.name, pg_ty.name,
3019 "oid {} has name {} in postgres; expected {}",
3020 ty.oid, pg_ty.name, ty.name,
3021 );
3022
3023 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3024 None => (0, 0),
3025 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3026 };
3027 assert_eq!(
3028 typinput_oid, pg_ty.input,
3029 "type {} has typinput OID {:?} in mz but {:?} in pg",
3030 ty.name, typinput_oid, pg_ty.input,
3031 );
3032 assert_eq!(
3033 typreceive_oid, pg_ty.receive,
3034 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3035 ty.name, typreceive_oid, pg_ty.receive,
3036 );
3037 if typinput_oid != 0 {
3038 assert!(
3039 func_oids.contains(&typinput_oid),
3040 "type {} has typinput OID {} that does not exist in pg_proc",
3041 ty.name,
3042 typinput_oid,
3043 );
3044 }
3045 if typreceive_oid != 0 {
3046 assert!(
3047 func_oids.contains(&typreceive_oid),
3048 "type {} has typreceive OID {} that does not exist in pg_proc",
3049 ty.name,
3050 typreceive_oid,
3051 );
3052 }
3053
3054 match &ty.details.typ {
3056 CatalogType::Array { element_reference } => {
3057 let elem_ty = BUILTINS::iter(&builtins_cfg)
3058 .filter_map(|builtin| match builtin {
3059 Builtin::Type(ty @ BuiltinType { name, .. })
3060 if element_reference == name =>
3061 {
3062 Some(ty)
3063 }
3064 _ => None,
3065 })
3066 .next();
3067 let elem_ty = match elem_ty {
3068 Some(ty) => ty,
3069 None => {
3070 panic!("{} is unexpectedly not a type", element_reference)
3071 }
3072 };
3073 assert_eq!(
3074 pg_ty.elem, elem_ty.oid,
3075 "type {} has mismatched element OIDs",
3076 ty.name
3077 )
3078 }
3079 CatalogType::Pseudo => {
3080 assert_eq!(
3081 pg_ty.ty, "p",
3082 "type {} is not a pseudo type as expected",
3083 ty.name
3084 )
3085 }
3086 CatalogType::Range { .. } => {
3087 assert_eq!(
3088 pg_ty.ty, "r",
3089 "type {} is not a range type as expected",
3090 ty.name
3091 );
3092 }
3093 _ => {
3094 assert_eq!(
3095 pg_ty.ty, "b",
3096 "type {} is not a base type as expected",
3097 ty.name
3098 )
3099 }
3100 }
3101
3102 let schema = catalog
3104 .resolve_schema_in_database(
3105 &ResolvedDatabaseSpecifier::Ambient,
3106 ty.schema,
3107 &SYSTEM_CONN_ID,
3108 )
3109 .expect("unable to resolve schema");
3110 let allocated_type = catalog
3111 .resolve_type(
3112 None,
3113 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3114 &PartialItemName {
3115 database: None,
3116 schema: Some(schema.name().schema.clone()),
3117 item: ty.name.to_string(),
3118 },
3119 &SYSTEM_CONN_ID,
3120 )
3121 .expect("unable to resolve type");
3122 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3123 ty
3124 } else {
3125 panic!("unexpectedly not a type")
3126 };
3127 match ty.details.array_id {
3128 Some(array_id) => {
3129 let array_ty = catalog.get_entry(&array_id);
3130 assert_eq!(
3131 pg_ty.array, array_ty.oid,
3132 "type {} has mismatched array OIDs",
3133 allocated_type.name.item,
3134 );
3135 }
3136 None => assert_eq!(
3137 pg_ty.array, 0,
3138 "type {} does not have an array type in mz but does in pg",
3139 allocated_type.name.item,
3140 ),
3141 }
3142 }
3143 Builtin::Func(func) => {
3144 for imp in func.inner.func_impls() {
3145 assert!(
3146 all_oids.insert(imp.oid),
3147 "{} reused oid {}",
3148 func.name,
3149 imp.oid
3150 );
3151
3152 assert!(
3153 imp.oid < FIRST_USER_OID,
3154 "built-in function {} erroneously has OID in user space ({})",
3155 func.name,
3156 imp.oid,
3157 );
3158
3159 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3162 continue;
3163 } else {
3164 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3165 panic!(
3166 "pg_proc missing function {}: oid {}",
3167 func.name, imp.oid
3168 )
3169 })
3170 };
3171 assert_eq!(
3172 func.name, pg_fn.name,
3173 "funcs with oid {} don't match names: {} in mz, {} in pg",
3174 imp.oid, func.name, pg_fn.name
3175 );
3176
3177 let imp_arg_oids = imp
3180 .arg_typs
3181 .iter()
3182 .map(|item| resolve_type_oid(item))
3183 .collect::<Vec<_>>();
3184
3185 if imp_arg_oids != pg_fn.arg_oids {
3186 println!(
3187 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3188 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3189 );
3190 }
3191
3192 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3193
3194 assert!(
3195 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3196 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3197 imp.oid,
3198 func.name,
3199 imp_return_oid,
3200 pg_fn.ret_oid
3201 );
3202
3203 assert_eq!(
3204 imp.return_is_set, pg_fn.ret_set,
3205 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3206 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3207 );
3208 }
3209 }
3210 _ => (),
3211 }
3212 }
3213
3214 for (op, func) in OP_IMPLS.iter() {
3215 for imp in func.func_impls() {
3216 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3217
3218 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3220 continue;
3221 } else {
3222 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3223 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3224 })
3225 };
3226
3227 assert_eq!(*op, pg_op.name);
3228
3229 let imp_return_oid =
3230 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3231 if imp_return_oid != pg_op.oprresult {
3232 panic!(
3233 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3234 imp.oid, op, imp_return_oid, pg_op.oprresult
3235 );
3236 }
3237 }
3238 }
3239 catalog.expire().await;
3240 }
3241
3242 Catalog::with_debug(inner).await
3243 }
3244
3245 #[mz_ore::test(tokio::test)]
3247 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3249 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3250 let catalog = Arc::new(catalog);
3251 let conn_catalog = catalog.for_system_session();
3252
3253 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3254 let mut handles = Vec::new();
3255
3256 let ignore_names = BTreeSet::from([
3258 "avg",
3259 "avg_internal_v1",
3260 "bool_and",
3261 "bool_or",
3262 "has_table_privilege", "has_type_privilege", "mod",
3265 "mz_panic",
3266 "mz_sleep",
3267 "pow",
3268 "stddev_pop",
3269 "stddev_samp",
3270 "stddev",
3271 "var_pop",
3272 "var_samp",
3273 "variance",
3274 ]);
3275
3276 let fns = BUILTINS::funcs()
3277 .map(|func| (&func.name, func.inner))
3278 .chain(OP_IMPLS.iter());
3279
3280 for (name, func) in fns {
3281 if ignore_names.contains(name) {
3282 continue;
3283 }
3284 let Func::Scalar(impls) = func else {
3285 continue;
3286 };
3287
3288 'outer: for imp in impls {
3289 let details = imp.details();
3290 let mut styps = Vec::new();
3291 for item in details.arg_typs.iter() {
3292 let oid = resolve_type_oid(item);
3293 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3294 continue 'outer;
3295 };
3296 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3297 }
3298 let datums = styps
3299 .iter()
3300 .map(|styp| {
3301 let mut datums = vec![Datum::Null];
3302 datums.extend(styp.interesting_datums());
3303 datums
3304 })
3305 .collect::<Vec<_>>();
3306 if datums.is_empty() {
3308 continue;
3309 }
3310
3311 let return_oid = details
3312 .return_typ
3313 .map(resolve_type_oid)
3314 .expect("must exist");
3315 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3316 .ok()
3317 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3318
3319 let mut idxs = vec![0; datums.len()];
3320 while idxs[0] < datums[0].len() {
3321 let mut args = Vec::with_capacity(idxs.len());
3322 for i in 0..(datums.len()) {
3323 args.push(datums[i][idxs[i]]);
3324 }
3325
3326 let op = &imp.op;
3327 let scalars = args
3328 .iter()
3329 .enumerate()
3330 .map(|(i, datum)| {
3331 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3332 datum.clone(),
3333 styps[i].clone(),
3334 ))
3335 })
3336 .collect();
3337
3338 let call_name = format!(
3339 "{name}({}) (oid: {})",
3340 args.iter()
3341 .map(|d| d.to_string())
3342 .collect::<Vec<_>>()
3343 .join(", "),
3344 imp.oid
3345 );
3346 let catalog = Arc::clone(&catalog);
3347 let call_name_fn = call_name.clone();
3348 let return_styp = return_styp.clone();
3349 let handle = task::spawn_blocking(
3350 || call_name,
3351 move || {
3352 smoketest_fn(
3353 name,
3354 call_name_fn,
3355 op,
3356 imp,
3357 args,
3358 catalog,
3359 scalars,
3360 return_styp,
3361 )
3362 },
3363 );
3364 handles.push(handle);
3365
3366 for i in (0..datums.len()).rev() {
3368 idxs[i] += 1;
3369 if idxs[i] >= datums[i].len() {
3370 if i == 0 {
3371 break;
3372 }
3373 idxs[i] = 0;
3374 continue;
3375 } else {
3376 break;
3377 }
3378 }
3379 }
3380 }
3381 }
3382 handles
3383 }
3384
3385 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3386 for handle in handles {
3387 handle.await;
3388 }
3389 }
3390
3391 fn smoketest_fn(
3392 name: &&str,
3393 call_name: String,
3394 op: &Operation<HirScalarExpr>,
3395 imp: &FuncImpl<HirScalarExpr>,
3396 args: Vec<Datum<'_>>,
3397 catalog: Arc<Catalog>,
3398 scalars: Vec<CoercibleScalarExpr>,
3399 return_styp: Option<SqlScalarType>,
3400 ) {
3401 let conn_catalog = catalog.for_system_session();
3402 let pcx = PlanContext::zero();
3403 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3404 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3405 let ecx = ExprContext {
3406 qcx: &qcx,
3407 name: "smoketest",
3408 scope: &Scope::empty(),
3409 relation_type: &SqlRelationType::empty(),
3410 allow_aggregates: false,
3411 allow_subqueries: false,
3412 allow_parameters: false,
3413 allow_windows: false,
3414 };
3415 let arena = RowArena::new();
3416 let mut session = Session::<Timestamp>::dummy();
3417 session
3418 .start_transaction(to_datetime(0), None, None)
3419 .expect("must succeed");
3420 let prep_style = ExprPrepOneShot {
3421 logical_time: EvalTime::Time(Timestamp::MIN),
3422 session: &session,
3423 catalog_state: &catalog.state,
3424 };
3425
3426 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3429 if let Ok(hir) = res {
3430 let uneliminated_result_row = {
3431 if let HirScalarExpr::CallUnary { func, .. } = &hir
3432 && func.is_eliminable_cast()
3433 {
3434 let mut uneliminated_mir = hir
3435 .clone()
3436 .lower_uncorrelated(HirToMirConfig {
3437 enable_cast_elimination: false,
3438 ..catalog.system_config().into()
3439 })
3440 .expect("lowering eliminable cast should always succeed");
3441 prep_style
3442 .prep_scalar_expr(&mut uneliminated_mir)
3443 .expect("must succeed");
3444
3445 uneliminated_mir
3447 .eval(&[], &arena)
3448 .ok()
3449 .map(|datum| Row::pack([datum]))
3450 } else {
3451 None
3452 }
3453 };
3454
3455 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3456 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3458
3459 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3460 if let Some(return_styp) = return_styp {
3461 let mir_typ = mir.typ(&[]);
3462 soft_assert_eq_or_log!(
3465 mir_typ.scalar_type,
3466 (&return_styp).into(),
3467 "MIR type did not match the catalog type (cast elimination/repr type error)"
3468 );
3469 if !eval_result_datum.is_instance_of(&mir_typ) {
3473 panic!(
3474 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3475 );
3476 }
3477 if let Some(row) = uneliminated_result_row {
3479 let uneliminated_result_datum = row.unpack_first();
3480 assert_eq!(
3481 uneliminated_result_datum, eval_result_datum,
3482 "datums should not change if cast is eliminable"
3483 );
3484 }
3485 if let Some((introduces_nulls, propagates_nulls)) =
3488 call_introduces_propagates_nulls(&mir)
3489 {
3490 if introduces_nulls {
3491 assert!(
3495 mir_typ.nullable,
3496 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3497 name, args, mir, mir_typ.nullable
3498 );
3499 } else {
3500 let any_input_null = args.iter().any(|arg| arg.is_null());
3501 if !any_input_null {
3502 assert!(
3503 !mir_typ.nullable,
3504 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3505 name, args, mir, mir_typ.nullable
3506 );
3507 } else if propagates_nulls {
3508 assert!(
3511 mir_typ.nullable,
3512 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3513 name, args, mir, mir_typ.nullable
3514 );
3515 }
3516 }
3521 }
3522 let mut reduced = mir.clone();
3525 reduced.reduce(&[]);
3526 match reduced {
3527 MirScalarExpr::Literal(reduce_result, ctyp) => {
3528 match reduce_result {
3529 Ok(reduce_result_row) => {
3530 let reduce_result_datum = reduce_result_row.unpack_first();
3531 assert_eq!(
3532 reduce_result_datum,
3533 eval_result_datum,
3534 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3535 name,
3536 args,
3537 mir,
3538 eval_result_datum,
3539 mir_typ.scalar_type,
3540 reduce_result_datum,
3541 ctyp.scalar_type
3542 );
3543 assert_eq!(
3549 ctyp.scalar_type,
3550 mir_typ.scalar_type,
3551 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3552 name,
3553 args,
3554 mir,
3555 eval_result_datum,
3556 mir_typ.scalar_type,
3557 reduce_result_datum,
3558 ctyp.scalar_type
3559 );
3560 }
3561 Err(..) => {} }
3563 }
3564 _ => unreachable!(
3565 "all args are literals, so should have reduced to a literal"
3566 ),
3567 }
3568 }
3569 }
3570 }
3571 }
3572 }
3573
3574 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3579 match mir_func_call {
3580 MirScalarExpr::CallUnary { func, expr } => {
3581 if expr.is_literal() {
3582 Some((func.introduces_nulls(), func.propagates_nulls()))
3583 } else {
3584 None
3585 }
3586 }
3587 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3588 if expr1.is_literal() && expr2.is_literal() {
3589 Some((func.introduces_nulls(), func.propagates_nulls()))
3590 } else {
3591 None
3592 }
3593 }
3594 MirScalarExpr::CallVariadic { func, exprs } => {
3595 if exprs.iter().all(|arg| arg.is_literal()) {
3596 Some((func.introduces_nulls(), func.propagates_nulls()))
3597 } else {
3598 None
3599 }
3600 }
3601 _ => None,
3602 }
3603 }
3604
3605 #[mz_ore::test(tokio::test)]
3607 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3609 Catalog::with_debug(|catalog| async move {
3610 let conn_catalog = catalog.for_system_session();
3611
3612 for view in BUILTINS::views().filter(|view| {
3613 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3614 }) {
3615 let item = conn_catalog
3616 .resolve_item(&PartialItemName {
3617 database: None,
3618 schema: Some(view.schema.to_string()),
3619 item: view.name.to_string(),
3620 })
3621 .expect("unable to resolve view")
3622 .at_version(RelationVersionSelector::Latest);
3624 let full_name = conn_catalog.resolve_full_name(item.name());
3625 let desc = item.relation_desc().expect("invalid item type");
3626 for col_type in desc.iter_types() {
3627 match &col_type.scalar_type {
3628 typ @ SqlScalarType::UInt16
3629 | typ @ SqlScalarType::UInt32
3630 | typ @ SqlScalarType::UInt64
3631 | typ @ SqlScalarType::MzTimestamp
3632 | typ @ SqlScalarType::List { .. }
3633 | typ @ SqlScalarType::Map { .. }
3634 | typ @ SqlScalarType::MzAclItem => {
3635 panic!("{typ:?} type found in {full_name}");
3636 }
3637 SqlScalarType::AclItem
3638 | SqlScalarType::Bool
3639 | SqlScalarType::Int16
3640 | SqlScalarType::Int32
3641 | SqlScalarType::Int64
3642 | SqlScalarType::Float32
3643 | SqlScalarType::Float64
3644 | SqlScalarType::Numeric { .. }
3645 | SqlScalarType::Date
3646 | SqlScalarType::Time
3647 | SqlScalarType::Timestamp { .. }
3648 | SqlScalarType::TimestampTz { .. }
3649 | SqlScalarType::Interval
3650 | SqlScalarType::PgLegacyChar
3651 | SqlScalarType::Bytes
3652 | SqlScalarType::String
3653 | SqlScalarType::Char { .. }
3654 | SqlScalarType::VarChar { .. }
3655 | SqlScalarType::Jsonb
3656 | SqlScalarType::Uuid
3657 | SqlScalarType::Array(_)
3658 | SqlScalarType::Record { .. }
3659 | SqlScalarType::Oid
3660 | SqlScalarType::RegProc
3661 | SqlScalarType::RegType
3662 | SqlScalarType::RegClass
3663 | SqlScalarType::Int2Vector
3664 | SqlScalarType::Range { .. }
3665 | SqlScalarType::PgLegacyName => {}
3666 }
3667 }
3668 }
3669 catalog.expire().await;
3670 })
3671 .await
3672 }
3673
3674 #[mz_ore::test(tokio::test)]
3677 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3679 Catalog::with_debug(|catalog| async move {
3680 let conn_catalog = catalog.for_system_session();
3681
3682 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3683 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3684
3685 for entry in catalog.entries() {
3686 let schema_spec = entry.name().qualifiers.schema_spec;
3687 let introspection_deps = catalog.introspection_dependencies(entry.id);
3688 if introspection_deps.is_empty() {
3689 assert!(
3690 schema_spec != introspection_schema_spec,
3691 "entry does not depend on introspection sources but is in \
3692 `mz_introspection`: {}",
3693 conn_catalog.resolve_full_name(entry.name()),
3694 );
3695 } else {
3696 assert!(
3697 schema_spec == introspection_schema_spec,
3698 "entry depends on introspection sources but is not in \
3699 `mz_introspection`: {}",
3700 conn_catalog.resolve_full_name(entry.name()),
3701 );
3702 }
3703 }
3704 })
3705 .await
3706 }
3707
3708 #[mz_ore::test(tokio::test)]
3709 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3711 let persist_client = PersistClient::new_for_tests().await;
3712 let bootstrap_args = test_bootstrap_args();
3713 let organization_id = Uuid::new_v4();
3714 let db_name = "DB";
3715
3716 let mut writer_catalog = Catalog::open_debug_catalog(
3717 persist_client.clone(),
3718 organization_id.clone(),
3719 &bootstrap_args,
3720 )
3721 .await
3722 .expect("open_debug_catalog");
3723 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3724 persist_client.clone(),
3725 organization_id.clone(),
3726 &bootstrap_args,
3727 )
3728 .await
3729 .expect("open_debug_read_only_catalog");
3730 assert_err!(writer_catalog.resolve_database(db_name));
3731 assert_err!(read_only_catalog.resolve_database(db_name));
3732
3733 let commit_ts = writer_catalog.current_upper().await;
3734 writer_catalog
3735 .transact(
3736 None,
3737 commit_ts,
3738 None,
3739 vec![Op::CreateDatabase {
3740 name: db_name.to_string(),
3741 owner_id: MZ_SYSTEM_ROLE_ID,
3742 }],
3743 )
3744 .await
3745 .expect("failed to transact");
3746
3747 let write_db = writer_catalog
3748 .resolve_database(db_name)
3749 .expect("resolve_database");
3750 read_only_catalog
3751 .sync_to_current_updates()
3752 .await
3753 .expect("sync_to_current_updates");
3754 let read_db = read_only_catalog
3755 .resolve_database(db_name)
3756 .expect("resolve_database");
3757
3758 assert_eq!(write_db, read_db);
3759
3760 let writer_catalog_fencer =
3761 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3762 .await
3763 .expect("open_debug_catalog for fencer");
3764 let fencer_db = writer_catalog_fencer
3765 .resolve_database(db_name)
3766 .expect("resolve_database for fencer");
3767 assert_eq!(fencer_db, read_db);
3768
3769 let write_fence_err = writer_catalog
3770 .sync_to_current_updates()
3771 .await
3772 .expect_err("sync_to_current_updates for fencer");
3773 assert!(matches!(
3774 write_fence_err,
3775 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3776 ));
3777 let read_fence_err = read_only_catalog
3778 .sync_to_current_updates()
3779 .await
3780 .expect_err("sync_to_current_updates after fencer");
3781 assert!(matches!(
3782 read_fence_err,
3783 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3784 ));
3785
3786 writer_catalog.expire().await;
3787 read_only_catalog.expire().await;
3788 writer_catalog_fencer.expire().await;
3789 }
3790}