1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_ore::collections::HashSet;
51use mz_ore::metrics::MetricsRegistry;
52use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
53use mz_ore::result::ResultExt as _;
54use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
55use mz_persist_client::PersistClient;
56use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
57use mz_repr::explain::ExprHumanizer;
58use mz_repr::namespaces::MZ_TEMP_SCHEMA;
59use mz_repr::network_policy_id::NetworkPolicyId;
60use mz_repr::role_id::RoleId;
61use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, ScalarType};
62use mz_secrets::InMemorySecretsController;
63use mz_sql::catalog::{
64 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
65 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
66 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
67 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
68};
69use mz_sql::names::{
70 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
71 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
72 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
73};
74use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
75use mz_sql::rbac;
76use mz_sql::session::metadata::SessionMetadata;
77use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
78use mz_sql::session::vars::SystemVars;
79use mz_sql_parser::ast::QualifiedReplica;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
82use mz_storage_types::read_policy::ReadPolicy;
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod transact;
112
113#[derive(Debug)]
136pub struct Catalog {
137 state: CatalogState,
138 plans: CatalogPlans,
139 expr_cache_handle: Option<ExpressionCacheHandle>,
140 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
141 transient_revision: u64,
142}
143
144impl Clone for Catalog {
147 fn clone(&self) -> Self {
148 Self {
149 state: self.state.clone(),
150 plans: self.plans.clone(),
151 expr_cache_handle: self.expr_cache_handle.clone(),
152 storage: Arc::clone(&self.storage),
153 transient_revision: self.transient_revision,
154 }
155 }
156}
157
158#[derive(Default, Debug, Clone)]
159pub struct CatalogPlans {
160 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
161 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
162 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
163 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
164}
165
166impl Catalog {
167 #[mz_ore::instrument(level = "trace")]
169 pub fn set_optimized_plan(
170 &mut self,
171 id: GlobalId,
172 plan: DataflowDescription<OptimizedMirRelationExpr>,
173 ) {
174 self.plans.optimized_plan_by_id.insert(id, plan.into());
175 }
176
177 #[mz_ore::instrument(level = "trace")]
179 pub fn set_physical_plan(
180 &mut self,
181 id: GlobalId,
182 plan: DataflowDescription<mz_compute_types::plan::Plan>,
183 ) {
184 self.plans.physical_plan_by_id.insert(id, plan.into());
185 }
186
187 #[mz_ore::instrument(level = "trace")]
189 pub fn try_get_optimized_plan(
190 &self,
191 id: &GlobalId,
192 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
193 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
194 }
195
196 #[mz_ore::instrument(level = "trace")]
198 pub fn try_get_physical_plan(
199 &self,
200 id: &GlobalId,
201 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
202 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
203 }
204
205 #[mz_ore::instrument(level = "trace")]
207 pub fn set_dataflow_metainfo(
208 &mut self,
209 id: GlobalId,
210 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
211 ) {
212 for notice in metainfo.optimizer_notices.iter() {
214 for dep_id in notice.dependencies.iter() {
215 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
216 entry.push(Arc::clone(notice))
217 }
218 if let Some(item_id) = notice.item_id {
219 soft_assert_eq_or_log!(
220 item_id,
221 id,
222 "notice.item_id should match the id for whom we are saving the notice"
223 );
224 }
225 }
226 self.plans.dataflow_metainfos.insert(id, metainfo);
228 }
229
230 #[mz_ore::instrument(level = "trace")]
232 pub fn try_get_dataflow_metainfo(
233 &self,
234 id: &GlobalId,
235 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
236 self.plans.dataflow_metainfos.get(id)
237 }
238
239 #[mz_ore::instrument(level = "trace")]
248 pub fn drop_plans_and_metainfos(
249 &mut self,
250 drop_ids: &BTreeSet<GlobalId>,
251 ) -> BTreeSet<Arc<OptimizerNotice>> {
252 let mut dropped_notices = BTreeSet::new();
254
255 for id in drop_ids {
257 self.plans.optimized_plan_by_id.remove(id);
258 self.plans.physical_plan_by_id.remove(id);
259 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
260 soft_assert_or_log!(
261 metainfo.optimizer_notices.iter().all_unique(),
262 "should have been pushed there by `push_optimizer_notice_dedup`"
263 );
264 for n in metainfo.optimizer_notices.drain(..) {
265 for dep_id in n.dependencies.iter() {
267 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
268 soft_assert_or_log!(
269 notices.iter().any(|x| &n == x),
270 "corrupt notices_by_dep_id"
271 );
272 notices.retain(|x| &n != x)
273 }
274 }
275 dropped_notices.insert(n);
276 }
277 }
278 }
279
280 for id in drop_ids {
282 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
283 for n in notices.drain(..) {
284 if let Some(item_id) = n.item_id.as_ref() {
286 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
287 metainfo.optimizer_notices.iter().for_each(|n2| {
288 if let Some(item_id_2) = n2.item_id {
289 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
290 }
291 });
292 metainfo.optimizer_notices.retain(|x| &n != x);
293 }
294 }
295 dropped_notices.insert(n);
296 }
297 }
298 }
299
300 let mut todo_dep_ids = BTreeSet::new();
303 for notice in dropped_notices.iter() {
304 for dep_id in notice.dependencies.iter() {
305 if !drop_ids.contains(dep_id) {
306 todo_dep_ids.insert(*dep_id);
307 }
308 }
309 }
310 for id in todo_dep_ids {
313 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
314 notices.retain(|n| !dropped_notices.contains(n))
315 }
316 }
317
318 if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
319 use mz_ore::str::{bracketed, separated};
320 let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
321 let bad_notices = bad_notices.map(|n| {
322 let mut dataflow_metainfo_occurrences = Vec::new();
325 for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
326 if meta_info.optimizer_notices.contains(n) {
327 dataflow_metainfo_occurrences.push(id);
328 }
329 }
330 let mut notices_by_dep_id_occurrences = Vec::new();
332 for (id, notices) in self.plans.notices_by_dep_id.iter() {
333 if notices.iter().contains(n) {
334 notices_by_dep_id_occurrences.push(id);
335 }
336 }
337 format!(
338 "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
339 dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
340 n.id,
341 n.kind,
342 n.dependencies,
343 Arc::strong_count(n),
344 dataflow_metainfo_occurrences,
345 notices_by_dep_id_occurrences
346 )
347 });
348 let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
349 error!(
350 "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
351 bad_notices = {bad_notices}; \
352 drop_ids = {drop_ids:?}"
353 );
354 }
355
356 dropped_notices
357 }
358
359 pub fn source_read_policies(
363 &self,
364 id: CatalogItemId,
365 ) -> Vec<(CatalogItemId, ReadPolicy<mz_repr::Timestamp>)> {
366 let mut policies = Vec::new();
367 let cws = self.state.source_compaction_windows([id]);
368 for (cw, items) in cws {
369 for id in items {
370 policies.push((id, cw.into()));
371 }
372 }
373 policies
374 }
375
376 pub(crate) fn invalidate_for_index(
387 &self,
388 ons: impl Iterator<Item = GlobalId>,
389 ) -> BTreeSet<GlobalId> {
390 let mut dependencies = BTreeSet::new();
391 let mut queue = VecDeque::new();
392 let mut seen = HashSet::new();
393 for on in ons {
394 let entry = self.get_entry_by_global_id(&on);
395 dependencies.insert(on);
396 seen.insert(entry.id);
397 let uses = entry.uses();
398 queue.extend(uses.clone());
399 }
400
401 while let Some(cur) = queue.pop_front() {
402 let entry = self.get_entry(&cur);
403 if seen.insert(cur) {
404 let global_ids = entry.global_ids();
405 match entry.item_type() {
406 CatalogItemType::Table
407 | CatalogItemType::Source
408 | CatalogItemType::MaterializedView
409 | CatalogItemType::Sink
410 | CatalogItemType::Index
411 | CatalogItemType::Type
412 | CatalogItemType::Func
413 | CatalogItemType::Secret
414 | CatalogItemType::Connection
415 | CatalogItemType::ContinualTask => {
416 dependencies.extend(global_ids);
417 }
418 CatalogItemType::View => {
419 dependencies.extend(global_ids);
420 queue.extend(entry.uses());
421 }
422 }
423 }
424 }
425 dependencies
426 }
427}
428
429#[derive(Debug)]
430pub struct ConnCatalog<'a> {
431 state: Cow<'a, CatalogState>,
432 unresolvable_ids: BTreeSet<CatalogItemId>,
442 conn_id: ConnectionId,
443 cluster: String,
444 database: Option<DatabaseId>,
445 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
446 role_id: RoleId,
447 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
448 notices_tx: UnboundedSender<AdapterNotice>,
449}
450
451impl ConnCatalog<'_> {
452 pub fn conn_id(&self) -> &ConnectionId {
453 &self.conn_id
454 }
455
456 pub fn state(&self) -> &CatalogState {
457 &*self.state
458 }
459
460 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
470 assert_eq!(
471 self.role_id, MZ_SYSTEM_ROLE_ID,
472 "only the system role can mark IDs unresolvable",
473 );
474 self.unresolvable_ids.insert(id);
475 }
476
477 pub fn effective_search_path(
483 &self,
484 include_temp_schema: bool,
485 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
486 self.state
487 .effective_search_path(&self.search_path, include_temp_schema)
488 }
489}
490
491impl ConnectionResolver for ConnCatalog<'_> {
492 fn resolve_connection(
493 &self,
494 id: CatalogItemId,
495 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
496 self.state().resolve_connection(id)
497 }
498}
499
500impl Catalog {
501 pub fn transient_revision(&self) -> u64 {
505 self.transient_revision
506 }
507
508 pub async fn with_debug<F, Fut, T>(f: F) -> T
520 where
521 F: FnOnce(Catalog) -> Fut,
522 Fut: Future<Output = T>,
523 {
524 let persist_client = PersistClient::new_for_tests().await;
525 let organization_id = Uuid::new_v4();
526 let bootstrap_args = test_bootstrap_args();
527 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
528 .await
529 .expect("can open debug catalog");
530 f(catalog).await
531 }
532
533 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
536 where
537 F: FnOnce(Catalog) -> Fut,
538 Fut: Future<Output = T>,
539 {
540 let persist_client = PersistClient::new_for_tests().await;
541 let organization_id = Uuid::new_v4();
542 let bootstrap_args = test_bootstrap_args();
543 let mut catalog =
544 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
545 .await
546 .expect("can open debug catalog");
547
548 let now = SYSTEM_TIME.clone();
550 let openable_storage = TestCatalogStateBuilder::new(persist_client)
551 .with_organization_id(organization_id)
552 .with_default_deploy_generation()
553 .build()
554 .await
555 .expect("can create durable catalog");
556 let mut storage = openable_storage
557 .open(now().into(), &bootstrap_args)
558 .await
559 .expect("can open durable catalog")
560 .0;
561 let _ = storage
563 .sync_to_current_updates()
564 .await
565 .expect("can sync to current updates");
566 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
567
568 f(catalog).await
569 }
570
571 pub async fn open_debug_catalog(
575 persist_client: PersistClient,
576 organization_id: Uuid,
577 bootstrap_args: &BootstrapArgs,
578 ) -> Result<Catalog, anyhow::Error> {
579 let now = SYSTEM_TIME.clone();
580 let environment_id = None;
581 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
582 .with_organization_id(organization_id)
583 .with_default_deploy_generation()
584 .build()
585 .await?;
586 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
587 let system_parameter_defaults = BTreeMap::default();
588 Self::open_debug_catalog_inner(
589 persist_client,
590 storage,
591 now,
592 environment_id,
593 &DUMMY_BUILD_INFO,
594 system_parameter_defaults,
595 bootstrap_args,
596 None,
597 )
598 .await
599 }
600
601 pub async fn open_debug_read_only_catalog(
606 persist_client: PersistClient,
607 organization_id: Uuid,
608 bootstrap_args: &BootstrapArgs,
609 ) -> Result<Catalog, anyhow::Error> {
610 let now = SYSTEM_TIME.clone();
611 let environment_id = None;
612 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
613 .with_organization_id(organization_id)
614 .build()
615 .await?;
616 let storage = openable_storage
617 .open_read_only(&test_bootstrap_args())
618 .await?;
619 let system_parameter_defaults = BTreeMap::default();
620 Self::open_debug_catalog_inner(
621 persist_client,
622 storage,
623 now,
624 environment_id,
625 &DUMMY_BUILD_INFO,
626 system_parameter_defaults,
627 bootstrap_args,
628 None,
629 )
630 .await
631 }
632
633 pub async fn open_debug_read_only_persist_catalog_config(
638 persist_client: PersistClient,
639 now: NowFn,
640 environment_id: EnvironmentId,
641 system_parameter_defaults: BTreeMap<String, String>,
642 build_info: &'static BuildInfo,
643 bootstrap_args: &BootstrapArgs,
644 enable_expression_cache_override: Option<bool>,
645 ) -> Result<Catalog, anyhow::Error> {
646 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
647 .with_organization_id(environment_id.organization_id())
648 .with_version(
649 build_info
650 .version
651 .parse()
652 .expect("build version is parseable"),
653 )
654 .build()
655 .await?;
656 let storage = openable_storage.open_read_only(bootstrap_args).await?;
657 Self::open_debug_catalog_inner(
658 persist_client,
659 storage,
660 now,
661 Some(environment_id),
662 build_info,
663 system_parameter_defaults,
664 bootstrap_args,
665 enable_expression_cache_override,
666 )
667 .await
668 }
669
670 async fn open_debug_catalog_inner(
671 persist_client: PersistClient,
672 storage: Box<dyn DurableCatalogState>,
673 now: NowFn,
674 environment_id: Option<EnvironmentId>,
675 build_info: &'static BuildInfo,
676 system_parameter_defaults: BTreeMap<String, String>,
677 bootstrap_args: &BootstrapArgs,
678 enable_expression_cache_override: Option<bool>,
679 ) -> Result<Catalog, anyhow::Error> {
680 let metrics_registry = &MetricsRegistry::new();
681 let secrets_reader = Arc::new(InMemorySecretsController::new());
682 let previous_ts = now().into();
685 let replica_size = &bootstrap_args.default_cluster_replica_size;
686 let read_only = false;
687
688 let OpenCatalogResult {
689 catalog,
690 migrated_storage_collections_0dt: _,
691 new_builtin_collections: _,
692 builtin_table_updates: _,
693 cached_global_exprs: _,
694 uncached_local_exprs: _,
695 } = Catalog::open(Config {
696 storage,
697 metrics_registry,
698 state: StateConfig {
699 unsafe_mode: true,
700 all_features: false,
701 build_info,
702 environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
703 read_only,
704 now,
705 boot_ts: previous_ts,
706 skip_migrations: true,
707 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
708 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
709 size: replica_size.clone(),
710 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711 },
712 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
713 size: replica_size.clone(),
714 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
715 },
716 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
717 size: replica_size.clone(),
718 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
719 },
720 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
721 size: replica_size.clone(),
722 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
723 },
724 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
725 size: replica_size.clone(),
726 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
727 },
728 system_parameter_defaults,
729 remote_system_parameters: None,
730 availability_zones: vec![],
731 egress_addresses: vec![],
732 aws_principal_context: None,
733 aws_privatelink_availability_zones: None,
734 http_host_name: None,
735 connection_context: ConnectionContext::for_tests(secrets_reader),
736 builtin_item_migration_config: BuiltinItemMigrationConfig {
737 persist_client: persist_client.clone(),
738 read_only,
739 },
740 persist_client,
741 enable_expression_cache_override,
742 enable_0dt_deployment: true,
743 helm_chart_version: None,
744 },
745 })
746 .await?;
747 Ok(catalog)
748 }
749
750 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
751 self.state.for_session(session)
752 }
753
754 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog {
755 self.state.for_sessionless_user(role_id)
756 }
757
758 pub fn for_system_session(&self) -> ConnCatalog {
759 self.state.for_system_session()
760 }
761
762 async fn storage<'a>(
763 &'a self,
764 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
765 self.storage.lock().await
766 }
767
768 pub async fn current_upper(&self) -> mz_repr::Timestamp {
769 self.storage().await.current_upper().await
770 }
771
772 pub async fn allocate_user_id(
773 &self,
774 commit_ts: mz_repr::Timestamp,
775 ) -> Result<(CatalogItemId, GlobalId), Error> {
776 self.storage()
777 .await
778 .allocate_user_id(commit_ts)
779 .await
780 .maybe_terminate("allocating user ids")
781 .err_into()
782 }
783
784 pub async fn allocate_user_ids(
786 &self,
787 amount: u64,
788 commit_ts: mz_repr::Timestamp,
789 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
790 self.storage()
791 .await
792 .allocate_user_ids(amount, commit_ts)
793 .await
794 .maybe_terminate("allocating user ids")
795 .err_into()
796 }
797
798 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
799 let commit_ts = self.storage().await.current_upper().await;
800 self.allocate_user_id(commit_ts).await
801 }
802
803 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
805 self.storage()
806 .await
807 .get_next_user_item_id()
808 .await
809 .err_into()
810 }
811
812 #[cfg(test)]
813 pub async fn allocate_system_id(
814 &self,
815 commit_ts: mz_repr::Timestamp,
816 ) -> Result<(CatalogItemId, GlobalId), Error> {
817 use mz_ore::collections::CollectionExt;
818
819 let mut storage = self.storage().await;
820 let mut txn = storage.transaction().await?;
821 let id = txn
822 .allocate_system_item_ids(1)
823 .maybe_terminate("allocating system ids")?
824 .into_element();
825 let _ = txn.get_and_commit_op_updates();
827 txn.commit(commit_ts).await?;
828 Ok(id)
829 }
830
831 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
833 self.storage()
834 .await
835 .get_next_system_item_id()
836 .await
837 .err_into()
838 }
839
840 pub async fn allocate_user_cluster_id(
841 &self,
842 commit_ts: mz_repr::Timestamp,
843 ) -> Result<ClusterId, Error> {
844 self.storage()
845 .await
846 .allocate_user_cluster_id(commit_ts)
847 .await
848 .maybe_terminate("allocating user cluster ids")
849 .err_into()
850 }
851
852 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
854 self.storage()
855 .await
856 .get_next_system_replica_id()
857 .await
858 .err_into()
859 }
860
861 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
863 self.storage()
864 .await
865 .get_next_user_replica_id()
866 .await
867 .err_into()
868 }
869
870 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
871 self.state.resolve_database(database_name)
872 }
873
874 pub fn resolve_schema(
875 &self,
876 current_database: Option<&DatabaseId>,
877 database_name: Option<&str>,
878 schema_name: &str,
879 conn_id: &ConnectionId,
880 ) -> Result<&Schema, SqlCatalogError> {
881 self.state
882 .resolve_schema(current_database, database_name, schema_name, conn_id)
883 }
884
885 pub fn resolve_schema_in_database(
886 &self,
887 database_spec: &ResolvedDatabaseSpecifier,
888 schema_name: &str,
889 conn_id: &ConnectionId,
890 ) -> Result<&Schema, SqlCatalogError> {
891 self.state
892 .resolve_schema_in_database(database_spec, schema_name, conn_id)
893 }
894
895 pub fn resolve_replica_in_cluster(
896 &self,
897 cluster_id: &ClusterId,
898 replica_name: &str,
899 ) -> Result<&ClusterReplica, SqlCatalogError> {
900 self.state
901 .resolve_replica_in_cluster(cluster_id, replica_name)
902 }
903
904 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
905 self.state.resolve_system_schema(name)
906 }
907
908 pub fn resolve_search_path(
909 &self,
910 session: &Session,
911 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
912 self.state.resolve_search_path(session)
913 }
914
915 pub fn resolve_entry(
917 &self,
918 current_database: Option<&DatabaseId>,
919 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
920 name: &PartialItemName,
921 conn_id: &ConnectionId,
922 ) -> Result<&CatalogEntry, SqlCatalogError> {
923 self.state
924 .resolve_entry(current_database, search_path, name, conn_id)
925 }
926
927 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
929 self.state.resolve_builtin_table(builtin)
930 }
931
932 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
934 self.state.resolve_builtin_log(builtin).0
935 }
936
937 pub fn resolve_builtin_storage_collection(
939 &self,
940 builtin: &'static BuiltinSource,
941 ) -> CatalogItemId {
942 self.state.resolve_builtin_source(builtin)
943 }
944
945 pub fn resolve_function(
947 &self,
948 current_database: Option<&DatabaseId>,
949 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
950 name: &PartialItemName,
951 conn_id: &ConnectionId,
952 ) -> Result<&CatalogEntry, SqlCatalogError> {
953 self.state
954 .resolve_function(current_database, search_path, name, conn_id)
955 }
956
957 pub fn resolve_type(
959 &self,
960 current_database: Option<&DatabaseId>,
961 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
962 name: &PartialItemName,
963 conn_id: &ConnectionId,
964 ) -> Result<&CatalogEntry, SqlCatalogError> {
965 self.state
966 .resolve_type(current_database, search_path, name, conn_id)
967 }
968
969 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
970 self.state.resolve_cluster(name)
971 }
972
973 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
979 self.state.resolve_builtin_cluster(cluster)
980 }
981
982 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
983 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
984 }
985
986 pub fn resolve_target_cluster(
988 &self,
989 target_cluster: TargetCluster,
990 session: &Session,
991 ) -> Result<&Cluster, AdapterError> {
992 match target_cluster {
993 TargetCluster::CatalogServer => {
994 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
995 }
996 TargetCluster::Active => self.active_cluster(session),
997 TargetCluster::Transaction(cluster_id) => self
998 .try_get_cluster(cluster_id)
999 .ok_or(AdapterError::ConcurrentClusterDrop),
1000 }
1001 }
1002
1003 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
1004 if session.user().name != SYSTEM_USER.name
1008 && session.user().name != SUPPORT_USER.name
1009 && session.vars().cluster() == SYSTEM_USER.name
1010 {
1011 coord_bail!(
1012 "system cluster '{}' cannot execute user queries",
1013 SYSTEM_USER.name
1014 );
1015 }
1016 let cluster = self.resolve_cluster(session.vars().cluster())?;
1017 Ok(cluster)
1018 }
1019
1020 pub fn state(&self) -> &CatalogState {
1021 &self.state
1022 }
1023
1024 pub fn resolve_full_name(
1025 &self,
1026 name: &QualifiedItemName,
1027 conn_id: Option<&ConnectionId>,
1028 ) -> FullItemName {
1029 self.state.resolve_full_name(name, conn_id)
1030 }
1031
1032 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1033 self.state.try_get_entry(id)
1034 }
1035
1036 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1037 self.state.try_get_entry_by_global_id(id)
1038 }
1039
1040 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1041 self.state.get_entry(id)
1042 }
1043
1044 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1045 self.state.get_entry_by_global_id(id)
1046 }
1047
1048 pub fn get_global_ids<'a>(
1049 &'a self,
1050 id: &CatalogItemId,
1051 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1052 self.get_entry(id).global_ids()
1053 }
1054
1055 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1056 self.get_entry_by_global_id(id).id()
1057 }
1058
1059 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1060 let item = self.try_get_entry_by_global_id(id)?;
1061 Some(item.id())
1062 }
1063
1064 pub fn get_schema(
1065 &self,
1066 database_spec: &ResolvedDatabaseSpecifier,
1067 schema_spec: &SchemaSpecifier,
1068 conn_id: &ConnectionId,
1069 ) -> &Schema {
1070 self.state.get_schema(database_spec, schema_spec, conn_id)
1071 }
1072
1073 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1074 self.state.get_mz_catalog_schema_id()
1075 }
1076
1077 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1078 self.state.get_pg_catalog_schema_id()
1079 }
1080
1081 pub fn get_information_schema_id(&self) -> SchemaId {
1082 self.state.get_information_schema_id()
1083 }
1084
1085 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1086 self.state.get_mz_internal_schema_id()
1087 }
1088
1089 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1090 self.state.get_mz_introspection_schema_id()
1091 }
1092
1093 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1094 self.state.get_mz_unsafe_schema_id()
1095 }
1096
1097 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1098 self.state.system_schema_ids()
1099 }
1100
1101 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1102 self.state.get_database(id)
1103 }
1104
1105 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1106 self.state.try_get_role(id)
1107 }
1108
1109 pub fn get_role(&self, id: &RoleId) -> &Role {
1110 self.state.get_role(id)
1111 }
1112
1113 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1114 self.state.try_get_role_by_name(role_name)
1115 }
1116
1117 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1118 self.state.try_get_role_auth_by_id(id)
1119 }
1120
1121 pub fn create_temporary_schema(
1124 &mut self,
1125 conn_id: &ConnectionId,
1126 owner_id: RoleId,
1127 ) -> Result<(), Error> {
1128 self.state.create_temporary_schema(conn_id, owner_id)
1129 }
1130
1131 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1132 self.state.temporary_schemas[conn_id]
1133 .items
1134 .contains_key(item_name)
1135 }
1136
1137 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1140 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1141 return Ok(());
1142 };
1143 if !schema.items.is_empty() {
1144 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1145 }
1146 Ok(())
1147 }
1148
1149 pub(crate) fn object_dependents(
1150 &self,
1151 object_ids: &Vec<ObjectId>,
1152 conn_id: &ConnectionId,
1153 ) -> Vec<ObjectId> {
1154 let mut seen = BTreeSet::new();
1155 self.state.object_dependents(object_ids, conn_id, &mut seen)
1156 }
1157
1158 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1159 FullNameV1 {
1160 database: name.database.to_string(),
1161 schema: name.schema.clone(),
1162 item: name.item.clone(),
1163 }
1164 }
1165
1166 pub fn find_available_cluster_name(&self, name: &str) -> String {
1167 let mut i = 0;
1168 let mut candidate = name.to_string();
1169 while self.state.clusters_by_name.contains_key(&candidate) {
1170 i += 1;
1171 candidate = format!("{}{}", name, i);
1172 }
1173 candidate
1174 }
1175
1176 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1177 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1178 self.cluster_replica_sizes()
1179 .enabled_allocations()
1180 .map(|a| a.0.to_owned())
1181 .collect::<Vec<_>>()
1182 } else {
1183 self.system_config().allowed_cluster_replica_sizes()
1184 }
1185 }
1186
1187 pub fn concretize_replica_location(
1188 &self,
1189 location: mz_catalog::durable::ReplicaLocation,
1190 allowed_sizes: &Vec<String>,
1191 allowed_availability_zones: Option<&[String]>,
1192 ) -> Result<ReplicaLocation, Error> {
1193 self.state
1194 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1195 }
1196
1197 pub(crate) fn ensure_valid_replica_size(
1198 &self,
1199 allowed_sizes: &[String],
1200 size: &String,
1201 ) -> Result<(), Error> {
1202 self.state.ensure_valid_replica_size(allowed_sizes, size)
1203 }
1204
1205 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1206 &self.state.cluster_replica_sizes
1207 }
1208
1209 pub fn get_privileges(
1211 &self,
1212 id: &SystemObjectId,
1213 conn_id: &ConnectionId,
1214 ) -> Option<&PrivilegeMap> {
1215 match id {
1216 SystemObjectId::Object(id) => match id {
1217 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1218 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1219 ObjectId::Schema((database_spec, schema_spec)) => Some(
1220 self.get_schema(database_spec, schema_spec, conn_id)
1221 .privileges(),
1222 ),
1223 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1224 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1225 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1226 },
1227 SystemObjectId::System => Some(&self.state.system_privileges),
1228 }
1229 }
1230
1231 #[mz_ore::instrument(level = "debug")]
1232 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1233 Ok(self.storage().await.confirm_leadership().await?)
1234 }
1235
1236 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1238 self.state.introspection_dependencies(id)
1239 }
1240
1241 pub fn dump(&self) -> Result<CatalogDump, Error> {
1247 Ok(CatalogDump::new(self.state.dump(None)?))
1248 }
1249
1250 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1254 self.state.check_consistency().map_err(|inconsistencies| {
1255 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1256 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1257 })
1258 })
1259 }
1260
1261 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1262 self.state.config()
1263 }
1264
1265 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1266 self.state.entry_by_id.values()
1267 }
1268
1269 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1270 self.entries()
1271 .filter(|entry| entry.is_connection() && entry.id().is_user())
1272 }
1273
1274 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1275 self.entries()
1276 .filter(|entry| entry.is_table() && entry.id().is_user())
1277 }
1278
1279 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1280 self.entries()
1281 .filter(|entry| entry.is_source() && entry.id().is_user())
1282 }
1283
1284 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1285 self.entries()
1286 .filter(|entry| entry.is_sink() && entry.id().is_user())
1287 }
1288
1289 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1290 self.entries()
1291 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1292 }
1293
1294 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1295 self.entries()
1296 .filter(|entry| entry.is_secret() && entry.id().is_user())
1297 }
1298
1299 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1300 self.state.get_network_policy(&network_policy_id)
1301 }
1302
1303 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1304 self.state.try_get_network_policy_by_name(name)
1305 }
1306
1307 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1308 self.state.clusters_by_id.values()
1309 }
1310
1311 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1312 self.state.get_cluster(cluster_id)
1313 }
1314
1315 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1316 self.state.try_get_cluster(cluster_id)
1317 }
1318
1319 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1320 self.clusters().filter(|cluster| cluster.id.is_user())
1321 }
1322
1323 pub fn get_cluster_replica(
1324 &self,
1325 cluster_id: ClusterId,
1326 replica_id: ReplicaId,
1327 ) -> &ClusterReplica {
1328 self.state.get_cluster_replica(cluster_id, replica_id)
1329 }
1330
1331 pub fn try_get_cluster_replica(
1332 &self,
1333 cluster_id: ClusterId,
1334 replica_id: ReplicaId,
1335 ) -> Option<&ClusterReplica> {
1336 self.state.try_get_cluster_replica(cluster_id, replica_id)
1337 }
1338
1339 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1340 self.user_clusters()
1341 .flat_map(|cluster| cluster.user_replicas())
1342 }
1343
1344 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1345 self.state.database_by_id.values()
1346 }
1347
1348 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1349 self.state
1350 .roles_by_id
1351 .values()
1352 .filter(|role| role.is_user())
1353 }
1354
1355 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1356 self.entries()
1357 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1358 }
1359
1360 pub fn system_privileges(&self) -> &PrivilegeMap {
1361 &self.state.system_privileges
1362 }
1363
1364 pub fn default_privileges(
1365 &self,
1366 ) -> impl Iterator<
1367 Item = (
1368 &DefaultPrivilegeObject,
1369 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1370 ),
1371 > {
1372 self.state.default_privileges.iter()
1373 }
1374
1375 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1376 self.state
1377 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1378 }
1379
1380 pub fn pack_storage_usage_update(
1381 &self,
1382 event: VersionedStorageUsage,
1383 diff: Diff,
1384 ) -> BuiltinTableUpdate {
1385 self.state
1386 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1387 }
1388
1389 pub fn system_config(&self) -> &SystemVars {
1390 self.state.system_config()
1391 }
1392
1393 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1394 self.state.system_config_mut()
1395 }
1396
1397 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1398 self.state.ensure_not_reserved_role(role_id)
1399 }
1400
1401 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1402 self.state.ensure_grantable_role(role_id)
1403 }
1404
1405 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1406 self.state.ensure_not_system_role(role_id)
1407 }
1408
1409 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1410 self.state.ensure_not_predefined_role(role_id)
1411 }
1412
1413 pub fn ensure_not_reserved_network_policy(
1414 &self,
1415 network_policy_id: &NetworkPolicyId,
1416 ) -> Result<(), Error> {
1417 self.state
1418 .ensure_not_reserved_network_policy(network_policy_id)
1419 }
1420
1421 pub fn ensure_not_reserved_object(
1422 &self,
1423 object_id: &ObjectId,
1424 conn_id: &ConnectionId,
1425 ) -> Result<(), Error> {
1426 match object_id {
1427 ObjectId::Cluster(cluster_id) => {
1428 if cluster_id.is_system() {
1429 let cluster = self.get_cluster(*cluster_id);
1430 Err(Error::new(ErrorKind::ReadOnlyCluster(
1431 cluster.name().to_string(),
1432 )))
1433 } else {
1434 Ok(())
1435 }
1436 }
1437 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1438 if replica_id.is_system() {
1439 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1440 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1441 replica.name().to_string(),
1442 )))
1443 } else {
1444 Ok(())
1445 }
1446 }
1447 ObjectId::Database(database_id) => {
1448 if database_id.is_system() {
1449 let database = self.get_database(database_id);
1450 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1451 database.name().to_string(),
1452 )))
1453 } else {
1454 Ok(())
1455 }
1456 }
1457 ObjectId::Schema((database_spec, schema_spec)) => {
1458 if schema_spec.is_system() {
1459 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1460 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1461 schema.name().schema.clone(),
1462 )))
1463 } else {
1464 Ok(())
1465 }
1466 }
1467 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1468 ObjectId::Item(item_id) => {
1469 if item_id.is_system() {
1470 let item = self.get_entry(item_id);
1471 let name = self.resolve_full_name(item.name(), Some(conn_id));
1472 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1473 } else {
1474 Ok(())
1475 }
1476 }
1477 ObjectId::NetworkPolicy(network_policy_id) => {
1478 self.ensure_not_reserved_network_policy(network_policy_id)
1479 }
1480 }
1481 }
1482
1483 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1485 &mut self,
1486 create_sql: &str,
1487 force_if_exists_skip: bool,
1488 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1489 self.state
1490 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1491 }
1492
1493 pub(crate) fn update_expression_cache<'a, 'b>(
1494 &'a self,
1495 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1496 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1497 ) -> BoxFuture<'b, ()> {
1498 if let Some(expr_cache) = &self.expr_cache_handle {
1499 let ons = new_local_expressions
1500 .iter()
1501 .map(|(id, _)| id)
1502 .chain(new_global_expressions.iter().map(|(id, _)| id))
1503 .map(|id| self.get_entry_by_global_id(id))
1504 .filter_map(|entry| entry.index().map(|index| index.on));
1505 let invalidate_ids = self.invalidate_for_index(ons);
1506 expr_cache
1507 .update(
1508 new_local_expressions,
1509 new_global_expressions,
1510 invalidate_ids,
1511 )
1512 .boxed()
1513 } else {
1514 async {}.boxed()
1515 }
1516 }
1517
1518 #[cfg(test)]
1522 async fn sync_to_current_updates(
1523 &mut self,
1524 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1525 let updates = self.storage().await.sync_to_current_updates().await?;
1526 let builtin_table_updates = self.state.apply_updates(updates)?;
1527 Ok(builtin_table_updates)
1528 }
1529}
1530
1531pub fn is_reserved_name(name: &str) -> bool {
1532 BUILTIN_PREFIXES
1533 .iter()
1534 .any(|prefix| name.starts_with(prefix))
1535}
1536
1537pub fn is_reserved_role_name(name: &str) -> bool {
1538 is_reserved_name(name) || is_public_role(name)
1539}
1540
1541pub fn is_public_role(name: &str) -> bool {
1542 name == &*PUBLIC_ROLE_NAME
1543}
1544
1545pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1546 object_type_to_audit_object_type(sql_type.into())
1547}
1548
1549pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1550 match id {
1551 CommentObjectId::Table(_) => ObjectType::Table,
1552 CommentObjectId::View(_) => ObjectType::View,
1553 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1554 CommentObjectId::Source(_) => ObjectType::Source,
1555 CommentObjectId::Sink(_) => ObjectType::Sink,
1556 CommentObjectId::Index(_) => ObjectType::Index,
1557 CommentObjectId::Func(_) => ObjectType::Func,
1558 CommentObjectId::Connection(_) => ObjectType::Connection,
1559 CommentObjectId::Type(_) => ObjectType::Type,
1560 CommentObjectId::Secret(_) => ObjectType::Secret,
1561 CommentObjectId::Role(_) => ObjectType::Role,
1562 CommentObjectId::Database(_) => ObjectType::Database,
1563 CommentObjectId::Schema(_) => ObjectType::Schema,
1564 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1565 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1566 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1567 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1568 }
1569}
1570
1571pub(crate) fn object_type_to_audit_object_type(
1572 object_type: mz_sql::catalog::ObjectType,
1573) -> ObjectType {
1574 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1575}
1576
1577pub(crate) fn system_object_type_to_audit_object_type(
1578 system_type: &SystemObjectType,
1579) -> ObjectType {
1580 match system_type {
1581 SystemObjectType::Object(object_type) => match object_type {
1582 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1583 mz_sql::catalog::ObjectType::View => ObjectType::View,
1584 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1585 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1586 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1587 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1588 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1589 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1590 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1591 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1592 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1593 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1594 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1595 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1596 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1597 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1598 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1599 },
1600 SystemObjectType::System => ObjectType::System,
1601 }
1602}
1603
1604#[derive(Debug, Copy, Clone)]
1605pub enum UpdatePrivilegeVariant {
1606 Grant,
1607 Revoke,
1608}
1609
1610impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1611 fn from(variant: UpdatePrivilegeVariant) -> Self {
1612 match variant {
1613 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1614 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1615 }
1616 }
1617}
1618
1619impl From<UpdatePrivilegeVariant> for EventType {
1620 fn from(variant: UpdatePrivilegeVariant) -> Self {
1621 match variant {
1622 UpdatePrivilegeVariant::Grant => EventType::Grant,
1623 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1624 }
1625 }
1626}
1627
1628impl ConnCatalog<'_> {
1629 fn resolve_item_name(
1630 &self,
1631 name: &PartialItemName,
1632 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1633 self.resolve_item(name).map(|entry| entry.name())
1634 }
1635
1636 fn resolve_function_name(
1637 &self,
1638 name: &PartialItemName,
1639 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1640 self.resolve_function(name).map(|entry| entry.name())
1641 }
1642
1643 fn resolve_type_name(
1644 &self,
1645 name: &PartialItemName,
1646 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1647 self.resolve_type(name).map(|entry| entry.name())
1648 }
1649}
1650
1651impl ExprHumanizer for ConnCatalog<'_> {
1652 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1653 let entry = self.state.try_get_entry_by_global_id(&id)?;
1654 Some(self.resolve_full_name(entry.name()).to_string())
1655 }
1656
1657 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1658 let entry = self.state.try_get_entry_by_global_id(&id)?;
1659 Some(entry.name().item.clone())
1660 }
1661
1662 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1663 let entry = self.state.try_get_entry_by_global_id(&id)?;
1664 Some(self.resolve_full_name(entry.name()).into_parts())
1665 }
1666
1667 fn humanize_scalar_type(&self, typ: &ScalarType, postgres_compat: bool) -> String {
1668 use ScalarType::*;
1669
1670 match typ {
1671 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1672 List {
1673 custom_id: Some(item_id),
1674 ..
1675 }
1676 | Map {
1677 custom_id: Some(item_id),
1678 ..
1679 } => {
1680 let item = self.get_item(item_id);
1681 self.minimal_qualification(item.name()).to_string()
1682 }
1683 List { element_type, .. } => {
1684 format!(
1685 "{} list",
1686 self.humanize_scalar_type(element_type, postgres_compat)
1687 )
1688 }
1689 Map { value_type, .. } => format!(
1690 "map[{}=>{}]",
1691 self.humanize_scalar_type(&ScalarType::String, postgres_compat),
1692 self.humanize_scalar_type(value_type, postgres_compat)
1693 ),
1694 Record {
1695 custom_id: Some(item_id),
1696 ..
1697 } => {
1698 let item = self.get_item(item_id);
1699 self.minimal_qualification(item.name()).to_string()
1700 }
1701 Record { fields, .. } => format!(
1702 "record({})",
1703 fields
1704 .iter()
1705 .map(|f| format!(
1706 "{}: {}",
1707 f.0,
1708 self.humanize_column_type(&f.1, postgres_compat)
1709 ))
1710 .join(",")
1711 ),
1712 PgLegacyChar => "\"char\"".into(),
1713 Char { length } if !postgres_compat => match length {
1714 None => "char".into(),
1715 Some(length) => format!("char({})", length.into_u32()),
1716 },
1717 VarChar { max_length } if !postgres_compat => match max_length {
1718 None => "varchar".into(),
1719 Some(length) => format!("varchar({})", length.into_u32()),
1720 },
1721 UInt16 => "uint2".into(),
1722 UInt32 => "uint4".into(),
1723 UInt64 => "uint8".into(),
1724 ty => {
1725 let pgrepr_type = mz_pgrepr::Type::from(ty);
1726 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1727
1728 let res = if self
1729 .effective_search_path(true)
1730 .iter()
1731 .any(|(_, schema)| schema == &pg_catalog_schema)
1732 {
1733 pgrepr_type.name().to_string()
1734 } else {
1735 let name = QualifiedItemName {
1738 qualifiers: ItemQualifiers {
1739 database_spec: ResolvedDatabaseSpecifier::Ambient,
1740 schema_spec: pg_catalog_schema,
1741 },
1742 item: pgrepr_type.name().to_string(),
1743 };
1744 self.resolve_full_name(&name).to_string()
1745 };
1746 res
1747 }
1748 }
1749 }
1750
1751 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1752 let entry = self.state.try_get_entry_by_global_id(&id)?;
1753
1754 match entry.index() {
1755 Some(index) => {
1756 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1757 let mut on_names = on_desc
1758 .iter_names()
1759 .map(|col_name| col_name.to_string())
1760 .collect::<Vec<_>>();
1761
1762 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1763
1764 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1768 let mut ix_names = vec![String::new(); ix_arity];
1769
1770 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1772 let on_name = on_names.get_mut(on_pos).expect("on_name");
1773 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1774 std::mem::swap(on_name, ix_name);
1775 }
1776
1777 Some(ix_names) }
1779 None => {
1780 let desc = self.state.try_get_desc_by_global_id(&id)?;
1781 let column_names = desc
1782 .iter_names()
1783 .map(|col_name| col_name.to_string())
1784 .collect();
1785
1786 Some(column_names)
1787 }
1788 }
1789 }
1790
1791 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1792 let desc = self.state.try_get_desc_by_global_id(&id)?;
1793 Some(desc.get_name(column).to_string())
1794 }
1795
1796 fn id_exists(&self, id: GlobalId) -> bool {
1797 self.state.entry_by_global_id.contains_key(&id)
1798 }
1799}
1800
1801impl SessionCatalog for ConnCatalog<'_> {
1802 fn active_role_id(&self) -> &RoleId {
1803 &self.role_id
1804 }
1805
1806 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1807 self.prepared_statements
1808 .as_ref()
1809 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1810 .flatten()
1811 }
1812
1813 fn active_database(&self) -> Option<&DatabaseId> {
1814 self.database.as_ref()
1815 }
1816
1817 fn active_cluster(&self) -> &str {
1818 &self.cluster
1819 }
1820
1821 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1822 &self.search_path
1823 }
1824
1825 fn resolve_database(
1826 &self,
1827 database_name: &str,
1828 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1829 Ok(self.state.resolve_database(database_name)?)
1830 }
1831
1832 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1833 self.state
1834 .database_by_id
1835 .get(id)
1836 .expect("database doesn't exist")
1837 }
1838
1839 #[allow(clippy::as_conversions)]
1841 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1842 self.state
1843 .database_by_id
1844 .values()
1845 .map(|database| database as &dyn CatalogDatabase)
1846 .collect()
1847 }
1848
1849 fn resolve_schema(
1850 &self,
1851 database_name: Option<&str>,
1852 schema_name: &str,
1853 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1854 Ok(self.state.resolve_schema(
1855 self.database.as_ref(),
1856 database_name,
1857 schema_name,
1858 &self.conn_id,
1859 )?)
1860 }
1861
1862 fn resolve_schema_in_database(
1863 &self,
1864 database_spec: &ResolvedDatabaseSpecifier,
1865 schema_name: &str,
1866 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1867 Ok(self
1868 .state
1869 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1870 }
1871
1872 fn get_schema(
1873 &self,
1874 database_spec: &ResolvedDatabaseSpecifier,
1875 schema_spec: &SchemaSpecifier,
1876 ) -> &dyn CatalogSchema {
1877 self.state
1878 .get_schema(database_spec, schema_spec, &self.conn_id)
1879 }
1880
1881 #[allow(clippy::as_conversions)]
1883 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1884 self.get_databases()
1885 .into_iter()
1886 .flat_map(|database| database.schemas().into_iter())
1887 .chain(
1888 self.state
1889 .ambient_schemas_by_id
1890 .values()
1891 .chain(self.state.temporary_schemas.values())
1892 .map(|schema| schema as &dyn CatalogSchema),
1893 )
1894 .collect()
1895 }
1896
1897 fn get_mz_internal_schema_id(&self) -> SchemaId {
1898 self.state().get_mz_internal_schema_id()
1899 }
1900
1901 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1902 self.state().get_mz_unsafe_schema_id()
1903 }
1904
1905 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1906 self.state.is_system_schema_specifier(schema)
1907 }
1908
1909 fn resolve_role(
1910 &self,
1911 role_name: &str,
1912 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1913 match self.state.try_get_role_by_name(role_name) {
1914 Some(role) => Ok(role),
1915 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1916 }
1917 }
1918
1919 fn resolve_network_policy(
1920 &self,
1921 policy_name: &str,
1922 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1923 match self.state.try_get_network_policy_by_name(policy_name) {
1924 Some(policy) => Ok(policy),
1925 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1926 }
1927 }
1928
1929 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1930 Some(self.state.roles_by_id.get(id)?)
1931 }
1932
1933 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1934 self.state.get_role(id)
1935 }
1936
1937 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1938 #[allow(clippy::as_conversions)]
1940 self.state
1941 .roles_by_id
1942 .values()
1943 .map(|role| role as &dyn CatalogRole)
1944 .collect()
1945 }
1946
1947 fn mz_system_role_id(&self) -> RoleId {
1948 MZ_SYSTEM_ROLE_ID
1949 }
1950
1951 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1952 self.state.collect_role_membership(id)
1953 }
1954
1955 fn get_network_policy(
1956 &self,
1957 id: &NetworkPolicyId,
1958 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1959 self.state.get_network_policy(id)
1960 }
1961
1962 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1963 #[allow(clippy::as_conversions)]
1965 self.state
1966 .network_policies_by_id
1967 .values()
1968 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1969 .collect()
1970 }
1971
1972 fn resolve_cluster(
1973 &self,
1974 cluster_name: Option<&str>,
1975 ) -> Result<&dyn mz_sql::catalog::CatalogCluster, SqlCatalogError> {
1976 Ok(self
1977 .state
1978 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1979 }
1980
1981 fn resolve_cluster_replica(
1982 &self,
1983 cluster_replica_name: &QualifiedReplica,
1984 ) -> Result<&dyn CatalogClusterReplica, SqlCatalogError> {
1985 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1986 }
1987
1988 fn resolve_item(
1989 &self,
1990 name: &PartialItemName,
1991 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1992 let r = self.state.resolve_entry(
1993 self.database.as_ref(),
1994 &self.effective_search_path(true),
1995 name,
1996 &self.conn_id,
1997 )?;
1998 if self.unresolvable_ids.contains(&r.id()) {
1999 Err(SqlCatalogError::UnknownItem(name.to_string()))
2000 } else {
2001 Ok(r)
2002 }
2003 }
2004
2005 fn resolve_function(
2006 &self,
2007 name: &PartialItemName,
2008 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2009 let r = self.state.resolve_function(
2010 self.database.as_ref(),
2011 &self.effective_search_path(false),
2012 name,
2013 &self.conn_id,
2014 )?;
2015
2016 if self.unresolvable_ids.contains(&r.id()) {
2017 Err(SqlCatalogError::UnknownFunction {
2018 name: name.to_string(),
2019 alternative: None,
2020 })
2021 } else {
2022 Ok(r)
2023 }
2024 }
2025
2026 fn resolve_type(
2027 &self,
2028 name: &PartialItemName,
2029 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2030 let r = self.state.resolve_type(
2031 self.database.as_ref(),
2032 &self.effective_search_path(false),
2033 name,
2034 &self.conn_id,
2035 )?;
2036
2037 if self.unresolvable_ids.contains(&r.id()) {
2038 Err(SqlCatalogError::UnknownType {
2039 name: name.to_string(),
2040 })
2041 } else {
2042 Ok(r)
2043 }
2044 }
2045
2046 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2047 self.state.get_system_type(name)
2048 }
2049
2050 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2051 Some(self.state.try_get_entry(id)?)
2052 }
2053
2054 fn try_get_item_by_global_id(
2055 &self,
2056 id: &GlobalId,
2057 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2058 let entry = self.state.try_get_entry_by_global_id(id)?;
2059 let entry = match &entry.item {
2060 CatalogItem::Table(table) => {
2061 let (version, _gid) = table
2062 .collections
2063 .iter()
2064 .find(|(_version, gid)| *gid == id)
2065 .expect("catalog out of sync, mismatched GlobalId");
2066 entry.at_version(RelationVersionSelector::Specific(*version))
2067 }
2068 _ => entry.at_version(RelationVersionSelector::Latest),
2069 };
2070 Some(entry)
2071 }
2072
2073 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2074 self.state.get_entry(id)
2075 }
2076
2077 fn get_item_by_global_id(
2078 &self,
2079 id: &GlobalId,
2080 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2081 let entry = self.state.get_entry_by_global_id(id);
2082 let entry = match &entry.item {
2083 CatalogItem::Table(table) => {
2084 let (version, _gid) = table
2085 .collections
2086 .iter()
2087 .find(|(_version, gid)| *gid == id)
2088 .expect("catalog out of sync, mismatched GlobalId");
2089 entry.at_version(RelationVersionSelector::Specific(*version))
2090 }
2091 _ => entry.at_version(RelationVersionSelector::Latest),
2092 };
2093 entry
2094 }
2095
2096 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2097 self.get_schemas()
2098 .into_iter()
2099 .flat_map(|schema| schema.item_ids())
2100 .map(|id| self.get_item(&id))
2101 .collect()
2102 }
2103
2104 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2105 self.state
2106 .get_item_by_name(name, &self.conn_id)
2107 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2108 }
2109
2110 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2111 self.state
2112 .get_type_by_name(name, &self.conn_id)
2113 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2114 }
2115
2116 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster {
2117 &self.state.clusters_by_id[&id]
2118 }
2119
2120 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster> {
2121 self.state
2122 .clusters_by_id
2123 .values()
2124 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2125 .collect()
2126 }
2127
2128 fn get_cluster_replica(
2129 &self,
2130 cluster_id: ClusterId,
2131 replica_id: ReplicaId,
2132 ) -> &dyn mz_sql::catalog::CatalogClusterReplica {
2133 let cluster = self.get_cluster(cluster_id);
2134 cluster.replica(replica_id)
2135 }
2136
2137 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica> {
2138 self.get_clusters()
2139 .into_iter()
2140 .flat_map(|cluster| cluster.replicas().into_iter())
2141 .collect()
2142 }
2143
2144 fn get_system_privileges(&self) -> &PrivilegeMap {
2145 &self.state.system_privileges
2146 }
2147
2148 fn get_default_privileges(
2149 &self,
2150 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2151 self.state
2152 .default_privileges
2153 .iter()
2154 .map(|(object, acl_items)| (object, acl_items.collect()))
2155 .collect()
2156 }
2157
2158 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2159 self.state.find_available_name(name, &self.conn_id)
2160 }
2161
2162 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2163 self.state.resolve_full_name(name, Some(&self.conn_id))
2164 }
2165
2166 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2167 self.state.resolve_full_schema_name(name)
2168 }
2169
2170 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2171 self.state.get_entry_by_global_id(global_id).id()
2172 }
2173
2174 fn resolve_global_id(
2175 &self,
2176 item_id: &CatalogItemId,
2177 version: RelationVersionSelector,
2178 ) -> GlobalId {
2179 self.state
2180 .get_entry(item_id)
2181 .at_version(version)
2182 .global_id()
2183 }
2184
2185 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2186 self.state.config()
2187 }
2188
2189 fn now(&self) -> EpochMillis {
2190 (self.state.config().now)()
2191 }
2192
2193 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2194 self.state.aws_privatelink_availability_zones.clone()
2195 }
2196
2197 fn system_vars(&self) -> &SystemVars {
2198 &self.state.system_configuration
2199 }
2200
2201 fn system_vars_mut(&mut self) -> &mut SystemVars {
2202 &mut self.state.to_mut().system_configuration
2203 }
2204
2205 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2206 self.state().get_owner_id(id, self.conn_id())
2207 }
2208
2209 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2210 match id {
2211 SystemObjectId::System => Some(&self.state.system_privileges),
2212 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2213 Some(self.get_cluster(*id).privileges())
2214 }
2215 SystemObjectId::Object(ObjectId::Database(id)) => {
2216 Some(self.get_database(id).privileges())
2217 }
2218 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2219 Some(self.get_schema(database_spec, schema_spec).privileges())
2220 }
2221 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2222 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2223 Some(self.get_network_policy(id).privileges())
2224 }
2225 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2226 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2227 }
2228 }
2229
2230 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2231 let mut seen = BTreeSet::new();
2232 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2233 }
2234
2235 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2236 let mut seen = BTreeSet::new();
2237 self.state.item_dependents(id, &mut seen)
2238 }
2239
2240 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2241 rbac::all_object_privileges(object_type)
2242 }
2243
2244 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2245 self.state.get_object_type(object_id)
2246 }
2247
2248 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2249 self.state.get_system_object_type(id)
2250 }
2251
2252 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2255 let database_id = match &qualified_name.qualifiers.database_spec {
2256 ResolvedDatabaseSpecifier::Ambient => None,
2257 ResolvedDatabaseSpecifier::Id(id)
2258 if self.database.is_some() && self.database == Some(*id) =>
2259 {
2260 None
2261 }
2262 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2263 };
2264
2265 let schema_spec = if database_id.is_none()
2266 && self.resolve_item_name(&PartialItemName {
2267 database: None,
2268 schema: None,
2269 item: qualified_name.item.clone(),
2270 }) == Ok(qualified_name)
2271 || self.resolve_function_name(&PartialItemName {
2272 database: None,
2273 schema: None,
2274 item: qualified_name.item.clone(),
2275 }) == Ok(qualified_name)
2276 || self.resolve_type_name(&PartialItemName {
2277 database: None,
2278 schema: None,
2279 item: qualified_name.item.clone(),
2280 }) == Ok(qualified_name)
2281 {
2282 None
2283 } else {
2284 Some(qualified_name.qualifiers.schema_spec.clone())
2287 };
2288
2289 let res = PartialItemName {
2290 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2291 schema: schema_spec.map(|spec| {
2292 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2293 .name()
2294 .schema
2295 .clone()
2296 }),
2297 item: qualified_name.item.clone(),
2298 };
2299 assert!(
2300 self.resolve_item_name(&res) == Ok(qualified_name)
2301 || self.resolve_function_name(&res) == Ok(qualified_name)
2302 || self.resolve_type_name(&res) == Ok(qualified_name)
2303 );
2304 res
2305 }
2306
2307 fn add_notice(&self, notice: PlanNotice) {
2308 let _ = self.notices_tx.send(notice.into());
2309 }
2310
2311 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2312 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2313 self.state.comments.get_object_comments(comment_id)
2314 }
2315
2316 fn is_cluster_size_cc(&self, size: &str) -> bool {
2317 self.state
2318 .cluster_replica_sizes
2319 .0
2320 .get(size)
2321 .map_or(false, |a| a.is_cc)
2322 }
2323}
2324
2325#[cfg(test)]
2326mod tests {
2327 use std::collections::{BTreeMap, BTreeSet};
2328 use std::sync::Arc;
2329 use std::{env, iter};
2330
2331 use itertools::Itertools;
2332 use mz_catalog::memory::objects::CatalogItem;
2333 use tokio_postgres::NoTls;
2334 use tokio_postgres::types::Type;
2335 use uuid::Uuid;
2336
2337 use mz_catalog::SYSTEM_CONN_ID;
2338 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2339 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2340 use mz_controller_types::{ClusterId, ReplicaId};
2341 use mz_expr::MirScalarExpr;
2342 use mz_ore::now::to_datetime;
2343 use mz_ore::{assert_err, assert_ok, task};
2344 use mz_persist_client::PersistClient;
2345 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2346 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2347 use mz_repr::role_id::RoleId;
2348 use mz_repr::{
2349 CatalogItemId, Datum, GlobalId, RelationType, RelationVersionSelector, RowArena,
2350 ScalarType, Timestamp,
2351 };
2352 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2353 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2354 use mz_sql::names::{
2355 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2356 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2357 };
2358 use mz_sql::plan::{
2359 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2360 Scope, StatementContext,
2361 };
2362 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2363 use mz_sql::session::vars::{SystemVars, VarInput};
2364
2365 use crate::catalog::state::LocalExpressionCache;
2366 use crate::catalog::{Catalog, Op};
2367 use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2368 use crate::session::Session;
2369
2370 #[mz_ore::test(tokio::test)]
2377 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2379 Catalog::with_debug(|catalog| async move {
2380 struct TestCase {
2381 input: QualifiedItemName,
2382 system_output: PartialItemName,
2383 normal_output: PartialItemName,
2384 }
2385
2386 let test_cases = vec![
2387 TestCase {
2388 input: QualifiedItemName {
2389 qualifiers: ItemQualifiers {
2390 database_spec: ResolvedDatabaseSpecifier::Ambient,
2391 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2392 },
2393 item: "numeric".to_string(),
2394 },
2395 system_output: PartialItemName {
2396 database: None,
2397 schema: None,
2398 item: "numeric".to_string(),
2399 },
2400 normal_output: PartialItemName {
2401 database: None,
2402 schema: None,
2403 item: "numeric".to_string(),
2404 },
2405 },
2406 TestCase {
2407 input: QualifiedItemName {
2408 qualifiers: ItemQualifiers {
2409 database_spec: ResolvedDatabaseSpecifier::Ambient,
2410 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2411 },
2412 item: "mz_array_types".to_string(),
2413 },
2414 system_output: PartialItemName {
2415 database: None,
2416 schema: None,
2417 item: "mz_array_types".to_string(),
2418 },
2419 normal_output: PartialItemName {
2420 database: None,
2421 schema: None,
2422 item: "mz_array_types".to_string(),
2423 },
2424 },
2425 ];
2426
2427 for tc in test_cases {
2428 assert_eq!(
2429 catalog
2430 .for_system_session()
2431 .minimal_qualification(&tc.input),
2432 tc.system_output
2433 );
2434 assert_eq!(
2435 catalog
2436 .for_session(&Session::dummy())
2437 .minimal_qualification(&tc.input),
2438 tc.normal_output
2439 );
2440 }
2441 catalog.expire().await;
2442 })
2443 .await
2444 }
2445
2446 #[mz_ore::test(tokio::test)]
2447 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2449 let persist_client = PersistClient::new_for_tests().await;
2450 let organization_id = Uuid::new_v4();
2451 let bootstrap_args = test_bootstrap_args();
2452 {
2453 let mut catalog = Catalog::open_debug_catalog(
2454 persist_client.clone(),
2455 organization_id.clone(),
2456 &bootstrap_args,
2457 )
2458 .await
2459 .expect("unable to open debug catalog");
2460 assert_eq!(catalog.transient_revision(), 1);
2461 let commit_ts = catalog.current_upper().await;
2462 catalog
2463 .transact(
2464 None,
2465 commit_ts,
2466 None,
2467 vec![Op::CreateDatabase {
2468 name: "test".to_string(),
2469 owner_id: MZ_SYSTEM_ROLE_ID,
2470 }],
2471 )
2472 .await
2473 .expect("failed to transact");
2474 assert_eq!(catalog.transient_revision(), 2);
2475 catalog.expire().await;
2476 }
2477 {
2478 let catalog =
2479 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2480 .await
2481 .expect("unable to open debug catalog");
2482 assert_eq!(catalog.transient_revision(), 1);
2484 catalog.expire().await;
2485 }
2486 }
2487
2488 #[mz_ore::test(tokio::test)]
2489 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2491 Catalog::with_debug(|catalog| async move {
2492 let mz_catalog_schema = (
2493 ResolvedDatabaseSpecifier::Ambient,
2494 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2495 );
2496 let pg_catalog_schema = (
2497 ResolvedDatabaseSpecifier::Ambient,
2498 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2499 );
2500 let mz_temp_schema = (
2501 ResolvedDatabaseSpecifier::Ambient,
2502 SchemaSpecifier::Temporary,
2503 );
2504
2505 let session = Session::dummy();
2507 let conn_catalog = catalog.for_session(&session);
2508 assert_ne!(
2509 conn_catalog.effective_search_path(false),
2510 conn_catalog.search_path
2511 );
2512 assert_ne!(
2513 conn_catalog.effective_search_path(true),
2514 conn_catalog.search_path
2515 );
2516 assert_eq!(
2517 conn_catalog.effective_search_path(false),
2518 vec![
2519 mz_catalog_schema.clone(),
2520 pg_catalog_schema.clone(),
2521 conn_catalog.search_path[0].clone()
2522 ]
2523 );
2524 assert_eq!(
2525 conn_catalog.effective_search_path(true),
2526 vec![
2527 mz_temp_schema.clone(),
2528 mz_catalog_schema.clone(),
2529 pg_catalog_schema.clone(),
2530 conn_catalog.search_path[0].clone()
2531 ]
2532 );
2533
2534 let mut session = Session::dummy();
2536 session
2537 .vars_mut()
2538 .set(
2539 &SystemVars::new(),
2540 "search_path",
2541 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2542 false,
2543 )
2544 .expect("failed to set search_path");
2545 let conn_catalog = catalog.for_session(&session);
2546 assert_ne!(
2547 conn_catalog.effective_search_path(false),
2548 conn_catalog.search_path
2549 );
2550 assert_ne!(
2551 conn_catalog.effective_search_path(true),
2552 conn_catalog.search_path
2553 );
2554 assert_eq!(
2555 conn_catalog.effective_search_path(false),
2556 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2557 );
2558 assert_eq!(
2559 conn_catalog.effective_search_path(true),
2560 vec![
2561 mz_temp_schema.clone(),
2562 mz_catalog_schema.clone(),
2563 pg_catalog_schema.clone()
2564 ]
2565 );
2566
2567 let mut session = Session::dummy();
2568 session
2569 .vars_mut()
2570 .set(
2571 &SystemVars::new(),
2572 "search_path",
2573 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2574 false,
2575 )
2576 .expect("failed to set search_path");
2577 let conn_catalog = catalog.for_session(&session);
2578 assert_ne!(
2579 conn_catalog.effective_search_path(false),
2580 conn_catalog.search_path
2581 );
2582 assert_ne!(
2583 conn_catalog.effective_search_path(true),
2584 conn_catalog.search_path
2585 );
2586 assert_eq!(
2587 conn_catalog.effective_search_path(false),
2588 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2589 );
2590 assert_eq!(
2591 conn_catalog.effective_search_path(true),
2592 vec![
2593 mz_temp_schema.clone(),
2594 pg_catalog_schema.clone(),
2595 mz_catalog_schema.clone()
2596 ]
2597 );
2598
2599 let mut session = Session::dummy();
2600 session
2601 .vars_mut()
2602 .set(
2603 &SystemVars::new(),
2604 "search_path",
2605 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2606 false,
2607 )
2608 .expect("failed to set search_path");
2609 let conn_catalog = catalog.for_session(&session);
2610 assert_ne!(
2611 conn_catalog.effective_search_path(false),
2612 conn_catalog.search_path
2613 );
2614 assert_ne!(
2615 conn_catalog.effective_search_path(true),
2616 conn_catalog.search_path
2617 );
2618 assert_eq!(
2619 conn_catalog.effective_search_path(false),
2620 vec![
2621 mz_catalog_schema.clone(),
2622 pg_catalog_schema.clone(),
2623 mz_temp_schema.clone()
2624 ]
2625 );
2626 assert_eq!(
2627 conn_catalog.effective_search_path(true),
2628 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2629 );
2630 catalog.expire().await;
2631 })
2632 .await
2633 }
2634
2635 #[mz_ore::test(tokio::test)]
2636 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2638 use mz_ore::collections::CollectionExt;
2639 Catalog::with_debug(|catalog| async move {
2640 let conn_catalog = catalog.for_system_session();
2641 let scx = &mut StatementContext::new(None, &conn_catalog);
2642
2643 let parsed = mz_sql_parser::parser::parse_statements(
2644 "create view public.foo as select 1 as bar",
2645 )
2646 .expect("")
2647 .into_element()
2648 .ast;
2649
2650 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2651
2652 assert_eq!(
2654 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2655 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2656 );
2657 catalog.expire().await;
2658 })
2659 .await;
2660 }
2661
2662 #[mz_ore::test(tokio::test)]
2664 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2666 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2667 let column = ", 1";
2668 let view_def_size = view_def.bytes().count();
2669 let column_size = column.bytes().count();
2670 let column_count =
2671 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2672 let columns = iter::repeat(column).take(column_count).join("");
2673 let create_sql = format!("{view_def}{columns})");
2674 let create_sql_check = create_sql.clone();
2675 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2676 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2677 &create_sql
2678 ));
2679
2680 let persist_client = PersistClient::new_for_tests().await;
2681 let organization_id = Uuid::new_v4();
2682 let id = CatalogItemId::User(1);
2683 let gid = GlobalId::User(1);
2684 let bootstrap_args = test_bootstrap_args();
2685 {
2686 let mut catalog = Catalog::open_debug_catalog(
2687 persist_client.clone(),
2688 organization_id.clone(),
2689 &bootstrap_args,
2690 )
2691 .await
2692 .expect("unable to open debug catalog");
2693 let item = catalog
2694 .state()
2695 .deserialize_item(
2696 gid,
2697 &create_sql,
2698 &BTreeMap::new(),
2699 &mut LocalExpressionCache::Closed,
2700 None,
2701 )
2702 .expect("unable to parse view");
2703 let commit_ts = catalog.current_upper().await;
2704 catalog
2705 .transact(
2706 None,
2707 commit_ts,
2708 None,
2709 vec![Op::CreateItem {
2710 item,
2711 name: QualifiedItemName {
2712 qualifiers: ItemQualifiers {
2713 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2714 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2715 },
2716 item: "v".to_string(),
2717 },
2718 id,
2719 owner_id: MZ_SYSTEM_ROLE_ID,
2720 }],
2721 )
2722 .await
2723 .expect("failed to transact");
2724 catalog.expire().await;
2725 }
2726 {
2727 let catalog =
2728 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2729 .await
2730 .expect("unable to open debug catalog");
2731 let view = catalog.get_entry(&id);
2732 assert_eq!("v", view.name.item);
2733 match &view.item {
2734 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2735 item => panic!("expected view, got {}", item.typ()),
2736 }
2737 catalog.expire().await;
2738 }
2739 }
2740
2741 #[mz_ore::test(tokio::test)]
2742 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2744 Catalog::with_debug(|catalog| async move {
2745 let conn_catalog = catalog.for_system_session();
2746
2747 assert_eq!(
2748 mz_sql::catalog::ObjectType::ClusterReplica,
2749 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2750 ClusterId::user(1).expect("1 is a valid ID"),
2751 ReplicaId::User(1)
2752 )))
2753 );
2754 assert_eq!(
2755 mz_sql::catalog::ObjectType::Role,
2756 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2757 );
2758 catalog.expire().await;
2759 })
2760 .await;
2761 }
2762
2763 #[mz_ore::test(tokio::test)]
2764 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2766 Catalog::with_debug(|catalog| async move {
2767 let conn_catalog = catalog.for_system_session();
2768
2769 assert_eq!(
2770 None,
2771 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2772 ClusterId::user(1).expect("1 is a valid ID"),
2773 ReplicaId::User(1),
2774 ))))
2775 );
2776 assert_eq!(
2777 None,
2778 conn_catalog
2779 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2780 );
2781 catalog.expire().await;
2782 })
2783 .await;
2784 }
2785
2786 #[mz_ore::test(tokio::test)]
2787 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2789 Catalog::with_debug(|catalog| async move {
2790 let conn_catalog = catalog.for_system_session();
2791
2792 let builtins_cfg = BuiltinsConfig {
2793 include_continual_tasks: true,
2794 };
2795 for builtin in BUILTINS::iter(&builtins_cfg) {
2796 let (schema, name, expected_desc) = match builtin {
2797 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2798 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2799 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2800 Builtin::Log(_)
2801 | Builtin::Type(_)
2802 | Builtin::Func(_)
2803 | Builtin::ContinualTask(_)
2804 | Builtin::Index(_)
2805 | Builtin::Connection(_) => continue,
2806 };
2807 let item = conn_catalog
2808 .resolve_item(&PartialItemName {
2809 database: None,
2810 schema: Some(schema.to_string()),
2811 item: name.to_string(),
2812 })
2813 .expect("unable to resolve item")
2814 .at_version(RelationVersionSelector::Latest);
2815
2816 let full_name = conn_catalog.resolve_full_name(item.name());
2817 let actual_desc = item.desc(&full_name).expect("invalid item type");
2818 assert_eq!(
2819 &*actual_desc, expected_desc,
2820 "item {schema}.{name} did not match its expected RelationDesc"
2821 );
2822 }
2823 catalog.expire().await;
2824 })
2825 .await
2826 }
2827
2828 #[mz_ore::test(tokio::test)]
2831 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2833 async fn inner(catalog: Catalog) {
2834 let (client, connection) = tokio_postgres::connect(
2838 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2839 NoTls,
2840 )
2841 .await
2842 .expect("failed to connect to Postgres");
2843
2844 task::spawn(|| "compare_builtin_postgres", async move {
2845 if let Err(e) = connection.await {
2846 panic!("connection error: {}", e);
2847 }
2848 });
2849
2850 struct PgProc {
2851 name: String,
2852 arg_oids: Vec<u32>,
2853 ret_oid: Option<u32>,
2854 ret_set: bool,
2855 }
2856
2857 struct PgType {
2858 name: String,
2859 ty: String,
2860 elem: u32,
2861 array: u32,
2862 input: u32,
2863 receive: u32,
2864 }
2865
2866 struct PgOper {
2867 oprresult: u32,
2868 name: String,
2869 }
2870
2871 let pg_proc: BTreeMap<_, _> = client
2872 .query(
2873 "SELECT
2874 p.oid,
2875 proname,
2876 proargtypes,
2877 prorettype,
2878 proretset
2879 FROM pg_proc p
2880 JOIN pg_namespace n ON p.pronamespace = n.oid",
2881 &[],
2882 )
2883 .await
2884 .expect("pg query failed")
2885 .into_iter()
2886 .map(|row| {
2887 let oid: u32 = row.get("oid");
2888 let pg_proc = PgProc {
2889 name: row.get("proname"),
2890 arg_oids: row.get("proargtypes"),
2891 ret_oid: row.get("prorettype"),
2892 ret_set: row.get("proretset"),
2893 };
2894 (oid, pg_proc)
2895 })
2896 .collect();
2897
2898 let pg_type: BTreeMap<_, _> = client
2899 .query(
2900 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2901 &[],
2902 )
2903 .await
2904 .expect("pg query failed")
2905 .into_iter()
2906 .map(|row| {
2907 let oid: u32 = row.get("oid");
2908 let pg_type = PgType {
2909 name: row.get("typname"),
2910 ty: row.get("typtype"),
2911 elem: row.get("typelem"),
2912 array: row.get("typarray"),
2913 input: row.get("typinput"),
2914 receive: row.get("typreceive"),
2915 };
2916 (oid, pg_type)
2917 })
2918 .collect();
2919
2920 let pg_oper: BTreeMap<_, _> = client
2921 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2922 .await
2923 .expect("pg query failed")
2924 .into_iter()
2925 .map(|row| {
2926 let oid: u32 = row.get("oid");
2927 let pg_oper = PgOper {
2928 name: row.get("oprname"),
2929 oprresult: row.get("oprresult"),
2930 };
2931 (oid, pg_oper)
2932 })
2933 .collect();
2934
2935 let conn_catalog = catalog.for_system_session();
2936 let resolve_type_oid = |item: &str| {
2937 conn_catalog
2938 .resolve_type(&PartialItemName {
2939 database: None,
2940 schema: Some(PG_CATALOG_SCHEMA.into()),
2943 item: item.to_string(),
2944 })
2945 .expect("unable to resolve type")
2946 .oid()
2947 };
2948
2949 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2950 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2951 .collect();
2952
2953 let mut all_oids = BTreeSet::new();
2954
2955 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2958 [
2959 (Type::NAME, Type::TEXT),
2961 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2962 (Type::TIME, Type::TIMETZ),
2964 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2965 ]
2966 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2967 );
2968 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2969 1619, ]);
2971 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2972 if ignore_return_types.contains(&fn_oid) {
2973 return true;
2974 }
2975 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2976 return true;
2977 }
2978 a == b
2979 };
2980
2981 let builtins_cfg = BuiltinsConfig {
2982 include_continual_tasks: true,
2983 };
2984 for builtin in BUILTINS::iter(&builtins_cfg) {
2985 match builtin {
2986 Builtin::Type(ty) => {
2987 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2988
2989 if ty.oid >= FIRST_MATERIALIZE_OID {
2990 continue;
2993 }
2994
2995 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2998 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2999 });
3000 assert_eq!(
3001 ty.name, pg_ty.name,
3002 "oid {} has name {} in postgres; expected {}",
3003 ty.oid, pg_ty.name, ty.name,
3004 );
3005
3006 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3007 None => (0, 0),
3008 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3009 };
3010 assert_eq!(
3011 typinput_oid, pg_ty.input,
3012 "type {} has typinput OID {:?} in mz but {:?} in pg",
3013 ty.name, typinput_oid, pg_ty.input,
3014 );
3015 assert_eq!(
3016 typreceive_oid, pg_ty.receive,
3017 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3018 ty.name, typreceive_oid, pg_ty.receive,
3019 );
3020 if typinput_oid != 0 {
3021 assert!(
3022 func_oids.contains(&typinput_oid),
3023 "type {} has typinput OID {} that does not exist in pg_proc",
3024 ty.name,
3025 typinput_oid,
3026 );
3027 }
3028 if typreceive_oid != 0 {
3029 assert!(
3030 func_oids.contains(&typreceive_oid),
3031 "type {} has typreceive OID {} that does not exist in pg_proc",
3032 ty.name,
3033 typreceive_oid,
3034 );
3035 }
3036
3037 match &ty.details.typ {
3039 CatalogType::Array { element_reference } => {
3040 let elem_ty = BUILTINS::iter(&builtins_cfg)
3041 .filter_map(|builtin| match builtin {
3042 Builtin::Type(ty @ BuiltinType { name, .. })
3043 if element_reference == name =>
3044 {
3045 Some(ty)
3046 }
3047 _ => None,
3048 })
3049 .next();
3050 let elem_ty = match elem_ty {
3051 Some(ty) => ty,
3052 None => {
3053 panic!("{} is unexpectedly not a type", element_reference)
3054 }
3055 };
3056 assert_eq!(
3057 pg_ty.elem, elem_ty.oid,
3058 "type {} has mismatched element OIDs",
3059 ty.name
3060 )
3061 }
3062 CatalogType::Pseudo => {
3063 assert_eq!(
3064 pg_ty.ty, "p",
3065 "type {} is not a pseudo type as expected",
3066 ty.name
3067 )
3068 }
3069 CatalogType::Range { .. } => {
3070 assert_eq!(
3071 pg_ty.ty, "r",
3072 "type {} is not a range type as expected",
3073 ty.name
3074 );
3075 }
3076 _ => {
3077 assert_eq!(
3078 pg_ty.ty, "b",
3079 "type {} is not a base type as expected",
3080 ty.name
3081 )
3082 }
3083 }
3084
3085 let schema = catalog
3087 .resolve_schema_in_database(
3088 &ResolvedDatabaseSpecifier::Ambient,
3089 ty.schema,
3090 &SYSTEM_CONN_ID,
3091 )
3092 .expect("unable to resolve schema");
3093 let allocated_type = catalog
3094 .resolve_type(
3095 None,
3096 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3097 &PartialItemName {
3098 database: None,
3099 schema: Some(schema.name().schema.clone()),
3100 item: ty.name.to_string(),
3101 },
3102 &SYSTEM_CONN_ID,
3103 )
3104 .expect("unable to resolve type");
3105 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3106 ty
3107 } else {
3108 panic!("unexpectedly not a type")
3109 };
3110 match ty.details.array_id {
3111 Some(array_id) => {
3112 let array_ty = catalog.get_entry(&array_id);
3113 assert_eq!(
3114 pg_ty.array, array_ty.oid,
3115 "type {} has mismatched array OIDs",
3116 allocated_type.name.item,
3117 );
3118 }
3119 None => assert_eq!(
3120 pg_ty.array, 0,
3121 "type {} does not have an array type in mz but does in pg",
3122 allocated_type.name.item,
3123 ),
3124 }
3125 }
3126 Builtin::Func(func) => {
3127 for imp in func.inner.func_impls() {
3128 assert!(
3129 all_oids.insert(imp.oid),
3130 "{} reused oid {}",
3131 func.name,
3132 imp.oid
3133 );
3134
3135 assert!(
3136 imp.oid < FIRST_USER_OID,
3137 "built-in function {} erroneously has OID in user space ({})",
3138 func.name,
3139 imp.oid,
3140 );
3141
3142 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3145 continue;
3146 } else {
3147 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3148 panic!(
3149 "pg_proc missing function {}: oid {}",
3150 func.name, imp.oid
3151 )
3152 })
3153 };
3154 assert_eq!(
3155 func.name, pg_fn.name,
3156 "funcs with oid {} don't match names: {} in mz, {} in pg",
3157 imp.oid, func.name, pg_fn.name
3158 );
3159
3160 let imp_arg_oids = imp
3163 .arg_typs
3164 .iter()
3165 .map(|item| resolve_type_oid(item))
3166 .collect::<Vec<_>>();
3167
3168 if imp_arg_oids != pg_fn.arg_oids {
3169 println!(
3170 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3171 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3172 );
3173 }
3174
3175 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3176
3177 assert!(
3178 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3179 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3180 imp.oid,
3181 func.name,
3182 imp_return_oid,
3183 pg_fn.ret_oid
3184 );
3185
3186 assert_eq!(
3187 imp.return_is_set, pg_fn.ret_set,
3188 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3189 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3190 );
3191 }
3192 }
3193 _ => (),
3194 }
3195 }
3196
3197 for (op, func) in OP_IMPLS.iter() {
3198 for imp in func.func_impls() {
3199 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3200
3201 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3203 continue;
3204 } else {
3205 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3206 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3207 })
3208 };
3209
3210 assert_eq!(*op, pg_op.name);
3211
3212 let imp_return_oid =
3213 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3214 if imp_return_oid != pg_op.oprresult {
3215 panic!(
3216 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3217 imp.oid, op, imp_return_oid, pg_op.oprresult
3218 );
3219 }
3220 }
3221 }
3222 catalog.expire().await;
3223 }
3224
3225 Catalog::with_debug(inner).await
3226 }
3227
3228 #[mz_ore::test(tokio::test)]
3230 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3232 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3233 let catalog = Arc::new(catalog);
3234 let conn_catalog = catalog.for_system_session();
3235
3236 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3237 let mut handles = Vec::new();
3238
3239 let ignore_names = BTreeSet::from([
3241 "avg",
3242 "avg_internal_v1",
3243 "bool_and",
3244 "bool_or",
3245 "mod",
3246 "mz_panic",
3247 "mz_sleep",
3248 "pow",
3249 "stddev_pop",
3250 "stddev_samp",
3251 "stddev",
3252 "var_pop",
3253 "var_samp",
3254 "variance",
3255 ]);
3256
3257 let fns = BUILTINS::funcs()
3258 .map(|func| (&func.name, func.inner))
3259 .chain(OP_IMPLS.iter());
3260
3261 for (name, func) in fns {
3262 if ignore_names.contains(name) {
3263 continue;
3264 }
3265 let Func::Scalar(impls) = func else {
3266 continue;
3267 };
3268
3269 'outer: for imp in impls {
3270 let details = imp.details();
3271 let mut styps = Vec::new();
3272 for item in details.arg_typs.iter() {
3273 let oid = resolve_type_oid(item);
3274 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3275 continue 'outer;
3276 };
3277 styps.push(ScalarType::try_from(&pgtyp).expect("must exist"));
3278 }
3279 let datums = styps
3280 .iter()
3281 .map(|styp| {
3282 let mut datums = vec![Datum::Null];
3283 datums.extend(styp.interesting_datums());
3284 datums
3285 })
3286 .collect::<Vec<_>>();
3287 if datums.is_empty() {
3289 continue;
3290 }
3291
3292 let return_oid = details
3293 .return_typ
3294 .map(resolve_type_oid)
3295 .expect("must exist");
3296 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3297 .ok()
3298 .map(|typ| ScalarType::try_from(&typ).expect("must exist"));
3299
3300 let mut idxs = vec![0; datums.len()];
3301 while idxs[0] < datums[0].len() {
3302 let mut args = Vec::with_capacity(idxs.len());
3303 for i in 0..(datums.len()) {
3304 args.push(datums[i][idxs[i]]);
3305 }
3306
3307 let op = &imp.op;
3308 let scalars = args
3309 .iter()
3310 .enumerate()
3311 .map(|(i, datum)| {
3312 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3313 datum.clone(),
3314 styps[i].clone(),
3315 ))
3316 })
3317 .collect();
3318
3319 let call_name = format!(
3320 "{name}({}) (oid: {})",
3321 args.iter()
3322 .map(|d| d.to_string())
3323 .collect::<Vec<_>>()
3324 .join(", "),
3325 imp.oid
3326 );
3327 let catalog = Arc::clone(&catalog);
3328 let call_name_fn = call_name.clone();
3329 let return_styp = return_styp.clone();
3330 let handle = task::spawn_blocking(
3331 || call_name,
3332 move || {
3333 smoketest_fn(
3334 name,
3335 call_name_fn,
3336 op,
3337 imp,
3338 args,
3339 catalog,
3340 scalars,
3341 return_styp,
3342 )
3343 },
3344 );
3345 handles.push(handle);
3346
3347 for i in (0..datums.len()).rev() {
3349 idxs[i] += 1;
3350 if idxs[i] >= datums[i].len() {
3351 if i == 0 {
3352 break;
3353 }
3354 idxs[i] = 0;
3355 continue;
3356 } else {
3357 break;
3358 }
3359 }
3360 }
3361 }
3362 }
3363 handles
3364 }
3365
3366 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3367 for handle in handles {
3368 handle.await.expect("must succeed");
3369 }
3370 }
3371
3372 fn smoketest_fn(
3373 name: &&str,
3374 call_name: String,
3375 op: &Operation<HirScalarExpr>,
3376 imp: &FuncImpl<HirScalarExpr>,
3377 args: Vec<Datum<'_>>,
3378 catalog: Arc<Catalog>,
3379 scalars: Vec<CoercibleScalarExpr>,
3380 return_styp: Option<ScalarType>,
3381 ) {
3382 let conn_catalog = catalog.for_system_session();
3383 let pcx = PlanContext::zero();
3384 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3385 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3386 let ecx = ExprContext {
3387 qcx: &qcx,
3388 name: "smoketest",
3389 scope: &Scope::empty(),
3390 relation_type: &RelationType::empty(),
3391 allow_aggregates: false,
3392 allow_subqueries: false,
3393 allow_parameters: false,
3394 allow_windows: false,
3395 };
3396 let arena = RowArena::new();
3397 let mut session = Session::<Timestamp>::dummy();
3398 session
3399 .start_transaction(to_datetime(0), None, None)
3400 .expect("must succeed");
3401 let prep_style = ExprPrepStyle::OneShot {
3402 logical_time: EvalTime::Time(Timestamp::MIN),
3403 session: &session,
3404 catalog_state: &catalog.state,
3405 };
3406
3407 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3410 if let Ok(hir) = res {
3411 if let Ok(mut mir) = hir.lower_uncorrelated() {
3412 prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3414
3415 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3416 if let Some(return_styp) = return_styp {
3417 let mir_typ = mir.typ(&[]);
3418 assert_eq!(mir_typ.scalar_type, return_styp);
3421 if !eval_result_datum.is_instance_of(&mir_typ) {
3425 panic!(
3426 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3427 );
3428 }
3429 if let Some((introduces_nulls, propagates_nulls)) =
3432 call_introduces_propagates_nulls(&mir)
3433 {
3434 if introduces_nulls {
3435 assert!(
3439 mir_typ.nullable,
3440 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3441 name, args, mir, mir_typ.nullable
3442 );
3443 } else {
3444 let any_input_null = args.iter().any(|arg| arg.is_null());
3445 if !any_input_null {
3446 assert!(
3447 !mir_typ.nullable,
3448 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3449 name, args, mir, mir_typ.nullable
3450 );
3451 } else {
3452 assert_eq!(
3453 mir_typ.nullable, propagates_nulls,
3454 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3455 name, args, mir, mir_typ.nullable
3456 );
3457 }
3458 }
3459 }
3460 let mut reduced = mir.clone();
3463 reduced.reduce(&[]);
3464 match reduced {
3465 MirScalarExpr::Literal(reduce_result, ctyp) => {
3466 match reduce_result {
3467 Ok(reduce_result_row) => {
3468 let reduce_result_datum = reduce_result_row.unpack_first();
3469 assert_eq!(
3470 reduce_result_datum,
3471 eval_result_datum,
3472 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3473 name,
3474 args,
3475 mir,
3476 eval_result_datum,
3477 mir_typ.scalar_type,
3478 reduce_result_datum,
3479 ctyp.scalar_type
3480 );
3481 assert_eq!(
3487 ctyp.scalar_type,
3488 mir_typ.scalar_type,
3489 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3490 name,
3491 args,
3492 mir,
3493 eval_result_datum,
3494 mir_typ.scalar_type,
3495 reduce_result_datum,
3496 ctyp.scalar_type
3497 );
3498 }
3499 Err(..) => {} }
3501 }
3502 _ => unreachable!(
3503 "all args are literals, so should have reduced to a literal"
3504 ),
3505 }
3506 }
3507 }
3508 }
3509 }
3510 }
3511
3512 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3517 match mir_func_call {
3518 MirScalarExpr::CallUnary { func, expr } => {
3519 if expr.is_literal() {
3520 Some((func.introduces_nulls(), func.propagates_nulls()))
3521 } else {
3522 None
3523 }
3524 }
3525 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3526 if expr1.is_literal() && expr2.is_literal() {
3527 Some((func.introduces_nulls(), func.propagates_nulls()))
3528 } else {
3529 None
3530 }
3531 }
3532 MirScalarExpr::CallVariadic { func, exprs } => {
3533 if exprs.iter().all(|arg| arg.is_literal()) {
3534 Some((func.introduces_nulls(), func.propagates_nulls()))
3535 } else {
3536 None
3537 }
3538 }
3539 _ => None,
3540 }
3541 }
3542
3543 #[mz_ore::test(tokio::test)]
3545 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3547 Catalog::with_debug(|catalog| async move {
3548 let conn_catalog = catalog.for_system_session();
3549
3550 for view in BUILTINS::views().filter(|view| {
3551 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3552 }) {
3553 let item = conn_catalog
3554 .resolve_item(&PartialItemName {
3555 database: None,
3556 schema: Some(view.schema.to_string()),
3557 item: view.name.to_string(),
3558 })
3559 .expect("unable to resolve view")
3560 .at_version(RelationVersionSelector::Latest);
3562 let full_name = conn_catalog.resolve_full_name(item.name());
3563 for col_type in item
3564 .desc(&full_name)
3565 .expect("invalid item type")
3566 .iter_types()
3567 {
3568 match &col_type.scalar_type {
3569 typ @ ScalarType::UInt16
3570 | typ @ ScalarType::UInt32
3571 | typ @ ScalarType::UInt64
3572 | typ @ ScalarType::MzTimestamp
3573 | typ @ ScalarType::List { .. }
3574 | typ @ ScalarType::Map { .. }
3575 | typ @ ScalarType::MzAclItem => {
3576 panic!("{typ:?} type found in {full_name}");
3577 }
3578 ScalarType::AclItem
3579 | ScalarType::Bool
3580 | ScalarType::Int16
3581 | ScalarType::Int32
3582 | ScalarType::Int64
3583 | ScalarType::Float32
3584 | ScalarType::Float64
3585 | ScalarType::Numeric { .. }
3586 | ScalarType::Date
3587 | ScalarType::Time
3588 | ScalarType::Timestamp { .. }
3589 | ScalarType::TimestampTz { .. }
3590 | ScalarType::Interval
3591 | ScalarType::PgLegacyChar
3592 | ScalarType::Bytes
3593 | ScalarType::String
3594 | ScalarType::Char { .. }
3595 | ScalarType::VarChar { .. }
3596 | ScalarType::Jsonb
3597 | ScalarType::Uuid
3598 | ScalarType::Array(_)
3599 | ScalarType::Record { .. }
3600 | ScalarType::Oid
3601 | ScalarType::RegProc
3602 | ScalarType::RegType
3603 | ScalarType::RegClass
3604 | ScalarType::Int2Vector
3605 | ScalarType::Range { .. }
3606 | ScalarType::PgLegacyName => {}
3607 }
3608 }
3609 }
3610 catalog.expire().await;
3611 })
3612 .await
3613 }
3614
3615 #[mz_ore::test(tokio::test)]
3618 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3620 Catalog::with_debug(|catalog| async move {
3621 let conn_catalog = catalog.for_system_session();
3622
3623 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3624 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3625
3626 for entry in catalog.entries() {
3627 let schema_spec = entry.name().qualifiers.schema_spec;
3628 let introspection_deps = catalog.introspection_dependencies(entry.id);
3629 if introspection_deps.is_empty() {
3630 assert!(
3631 schema_spec != introspection_schema_spec,
3632 "entry does not depend on introspection sources but is in \
3633 `mz_introspection`: {}",
3634 conn_catalog.resolve_full_name(entry.name()),
3635 );
3636 } else {
3637 assert!(
3638 schema_spec == introspection_schema_spec,
3639 "entry depends on introspection sources but is not in \
3640 `mz_introspection`: {}",
3641 conn_catalog.resolve_full_name(entry.name()),
3642 );
3643 }
3644 }
3645 })
3646 .await
3647 }
3648
3649 #[mz_ore::test(tokio::test)]
3650 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3652 let persist_client = PersistClient::new_for_tests().await;
3653 let bootstrap_args = test_bootstrap_args();
3654 let organization_id = Uuid::new_v4();
3655 let db_name = "DB";
3656
3657 let mut writer_catalog = Catalog::open_debug_catalog(
3658 persist_client.clone(),
3659 organization_id.clone(),
3660 &bootstrap_args,
3661 )
3662 .await
3663 .expect("open_debug_catalog");
3664 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3665 persist_client.clone(),
3666 organization_id.clone(),
3667 &bootstrap_args,
3668 )
3669 .await
3670 .expect("open_debug_read_only_catalog");
3671 assert_err!(writer_catalog.resolve_database(db_name));
3672 assert_err!(read_only_catalog.resolve_database(db_name));
3673
3674 let commit_ts = writer_catalog.current_upper().await;
3675 writer_catalog
3676 .transact(
3677 None,
3678 commit_ts,
3679 None,
3680 vec![Op::CreateDatabase {
3681 name: db_name.to_string(),
3682 owner_id: MZ_SYSTEM_ROLE_ID,
3683 }],
3684 )
3685 .await
3686 .expect("failed to transact");
3687
3688 let write_db = writer_catalog
3689 .resolve_database(db_name)
3690 .expect("resolve_database");
3691 read_only_catalog
3692 .sync_to_current_updates()
3693 .await
3694 .expect("sync_to_current_updates");
3695 let read_db = read_only_catalog
3696 .resolve_database(db_name)
3697 .expect("resolve_database");
3698
3699 assert_eq!(write_db, read_db);
3700
3701 let writer_catalog_fencer =
3702 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3703 .await
3704 .expect("open_debug_catalog for fencer");
3705 let fencer_db = writer_catalog_fencer
3706 .resolve_database(db_name)
3707 .expect("resolve_database for fencer");
3708 assert_eq!(fencer_db, read_db);
3709
3710 let write_fence_err = writer_catalog
3711 .sync_to_current_updates()
3712 .await
3713 .expect_err("sync_to_current_updates for fencer");
3714 assert!(matches!(
3715 write_fence_err,
3716 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3717 ));
3718 let read_fence_err = read_only_catalog
3719 .sync_to_current_updates()
3720 .await
3721 .expect_err("sync_to_current_updates after fencer");
3722 assert!(matches!(
3723 read_fence_err,
3724 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3725 ));
3726
3727 writer_catalog.expire().await;
3728 read_only_catalog.expire().await;
3729 writer_catalog_fencer.expire().await;
3730 }
3731}