1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 plans: CatalogPlans,
140 expr_cache_handle: Option<ExpressionCacheHandle>,
141 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142 transient_revision: u64,
143}
144
145impl Clone for Catalog {
148 fn clone(&self) -> Self {
149 Self {
150 state: self.state.clone(),
151 plans: self.plans.clone(),
152 expr_cache_handle: self.expr_cache_handle.clone(),
153 storage: Arc::clone(&self.storage),
154 transient_revision: self.transient_revision,
155 }
156 }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168 #[mz_ore::instrument(level = "trace")]
170 pub fn set_optimized_plan(
171 &mut self,
172 id: GlobalId,
173 plan: DataflowDescription<OptimizedMirRelationExpr>,
174 ) {
175 self.plans.optimized_plan_by_id.insert(id, plan.into());
176 }
177
178 #[mz_ore::instrument(level = "trace")]
180 pub fn set_physical_plan(
181 &mut self,
182 id: GlobalId,
183 plan: DataflowDescription<mz_compute_types::plan::Plan>,
184 ) {
185 self.plans.physical_plan_by_id.insert(id, plan.into());
186 }
187
188 #[mz_ore::instrument(level = "trace")]
190 pub fn try_get_optimized_plan(
191 &self,
192 id: &GlobalId,
193 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195 }
196
197 #[mz_ore::instrument(level = "trace")]
199 pub fn try_get_physical_plan(
200 &self,
201 id: &GlobalId,
202 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
208 pub fn set_dataflow_metainfo(
209 &mut self,
210 id: GlobalId,
211 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212 ) {
213 for notice in metainfo.optimizer_notices.iter() {
215 for dep_id in notice.dependencies.iter() {
216 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217 entry.push(Arc::clone(notice))
218 }
219 if let Some(item_id) = notice.item_id {
220 soft_assert_eq_or_log!(
221 item_id,
222 id,
223 "notice.item_id should match the id for whom we are saving the notice"
224 );
225 }
226 }
227 self.plans.dataflow_metainfos.insert(id, metainfo);
229 }
230
231 #[mz_ore::instrument(level = "trace")]
233 pub fn try_get_dataflow_metainfo(
234 &self,
235 id: &GlobalId,
236 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237 self.plans.dataflow_metainfos.get(id)
238 }
239
240 #[mz_ore::instrument(level = "trace")]
249 pub fn drop_plans_and_metainfos(
250 &mut self,
251 drop_ids: &BTreeSet<GlobalId>,
252 ) -> BTreeSet<Arc<OptimizerNotice>> {
253 let mut dropped_notices = BTreeSet::new();
255
256 for id in drop_ids {
258 self.plans.optimized_plan_by_id.remove(id);
259 self.plans.physical_plan_by_id.remove(id);
260 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261 soft_assert_or_log!(
262 metainfo.optimizer_notices.iter().all_unique(),
263 "should have been pushed there by `push_optimizer_notice_dedup`"
264 );
265 for n in metainfo.optimizer_notices.drain(..) {
266 for dep_id in n.dependencies.iter() {
268 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269 soft_assert_or_log!(
270 notices.iter().any(|x| &n == x),
271 "corrupt notices_by_dep_id"
272 );
273 notices.retain(|x| &n != x)
274 }
275 }
276 dropped_notices.insert(n);
277 }
278 }
279 }
280
281 for id in drop_ids {
283 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284 for n in notices.drain(..) {
285 if let Some(item_id) = n.item_id.as_ref() {
287 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288 metainfo.optimizer_notices.iter().for_each(|n2| {
289 if let Some(item_id_2) = n2.item_id {
290 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291 }
292 });
293 metainfo.optimizer_notices.retain(|x| &n != x);
294 }
295 }
296 dropped_notices.insert(n);
297 }
298 }
299 }
300
301 let mut todo_dep_ids = BTreeSet::new();
304 for notice in dropped_notices.iter() {
305 for dep_id in notice.dependencies.iter() {
306 if !drop_ids.contains(dep_id) {
307 todo_dep_ids.insert(*dep_id);
308 }
309 }
310 }
311 for id in todo_dep_ids {
314 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315 notices.retain(|n| !dropped_notices.contains(n))
316 }
317 }
318
319 if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
320 use mz_ore::str::{bracketed, separated};
321 let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
322 let bad_notices = bad_notices.map(|n| {
323 let mut dataflow_metainfo_occurrences = Vec::new();
326 for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
327 if meta_info.optimizer_notices.contains(n) {
328 dataflow_metainfo_occurrences.push(id);
329 }
330 }
331 let mut notices_by_dep_id_occurrences = Vec::new();
333 for (id, notices) in self.plans.notices_by_dep_id.iter() {
334 if notices.iter().contains(n) {
335 notices_by_dep_id_occurrences.push(id);
336 }
337 }
338 format!(
339 "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
340 dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
341 n.id,
342 n.kind,
343 n.dependencies,
344 Arc::strong_count(n),
345 dataflow_metainfo_occurrences,
346 notices_by_dep_id_occurrences
347 )
348 });
349 let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
350 error!(
351 "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
352 bad_notices = {bad_notices}; \
353 drop_ids = {drop_ids:?}"
354 );
355 }
356
357 dropped_notices
358 }
359
360 pub(crate) fn invalidate_for_index(
371 &self,
372 ons: impl Iterator<Item = GlobalId>,
373 ) -> BTreeSet<GlobalId> {
374 let mut dependencies = BTreeSet::new();
375 let mut queue = VecDeque::new();
376 let mut seen = HashSet::new();
377 for on in ons {
378 let entry = self.get_entry_by_global_id(&on);
379 dependencies.insert(on);
380 seen.insert(entry.id);
381 let uses = entry.uses();
382 queue.extend(uses.clone());
383 }
384
385 while let Some(cur) = queue.pop_front() {
386 let entry = self.get_entry(&cur);
387 if seen.insert(cur) {
388 let global_ids = entry.global_ids();
389 match entry.item_type() {
390 CatalogItemType::Table
391 | CatalogItemType::Source
392 | CatalogItemType::MaterializedView
393 | CatalogItemType::Sink
394 | CatalogItemType::Index
395 | CatalogItemType::Type
396 | CatalogItemType::Func
397 | CatalogItemType::Secret
398 | CatalogItemType::Connection
399 | CatalogItemType::ContinualTask => {
400 dependencies.extend(global_ids);
401 }
402 CatalogItemType::View => {
403 dependencies.extend(global_ids);
404 queue.extend(entry.uses());
405 }
406 }
407 }
408 }
409 dependencies
410 }
411}
412
413#[derive(Debug)]
414pub struct ConnCatalog<'a> {
415 state: Cow<'a, CatalogState>,
416 unresolvable_ids: BTreeSet<CatalogItemId>,
426 conn_id: ConnectionId,
427 cluster: String,
428 database: Option<DatabaseId>,
429 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
430 role_id: RoleId,
431 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
432 portals: Option<&'a BTreeMap<String, Portal>>,
433 notices_tx: UnboundedSender<AdapterNotice>,
434}
435
436impl ConnCatalog<'_> {
437 pub fn conn_id(&self) -> &ConnectionId {
438 &self.conn_id
439 }
440
441 pub fn state(&self) -> &CatalogState {
442 &*self.state
443 }
444
445 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
455 assert_eq!(
456 self.role_id, MZ_SYSTEM_ROLE_ID,
457 "only the system role can mark IDs unresolvable",
458 );
459 self.unresolvable_ids.insert(id);
460 }
461
462 pub fn effective_search_path(
468 &self,
469 include_temp_schema: bool,
470 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
471 self.state
472 .effective_search_path(&self.search_path, include_temp_schema)
473 }
474}
475
476impl ConnectionResolver for ConnCatalog<'_> {
477 fn resolve_connection(
478 &self,
479 id: CatalogItemId,
480 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
481 self.state().resolve_connection(id)
482 }
483}
484
485impl Catalog {
486 pub fn transient_revision(&self) -> u64 {
490 self.transient_revision
491 }
492
493 pub async fn with_debug<F, Fut, T>(f: F) -> T
505 where
506 F: FnOnce(Catalog) -> Fut,
507 Fut: Future<Output = T>,
508 {
509 let persist_client = PersistClient::new_for_tests().await;
510 let organization_id = Uuid::new_v4();
511 let bootstrap_args = test_bootstrap_args();
512 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
513 .await
514 .expect("can open debug catalog");
515 f(catalog).await
516 }
517
518 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
521 where
522 F: FnOnce(Catalog) -> Fut,
523 Fut: Future<Output = T>,
524 {
525 let persist_client = PersistClient::new_for_tests().await;
526 let organization_id = Uuid::new_v4();
527 let bootstrap_args = test_bootstrap_args();
528 let mut catalog =
529 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
530 .await
531 .expect("can open debug catalog");
532
533 let now = SYSTEM_TIME.clone();
535 let openable_storage = TestCatalogStateBuilder::new(persist_client)
536 .with_organization_id(organization_id)
537 .with_default_deploy_generation()
538 .build()
539 .await
540 .expect("can create durable catalog");
541 let mut storage = openable_storage
542 .open(now().into(), &bootstrap_args)
543 .await
544 .expect("can open durable catalog")
545 .0;
546 let _ = storage
548 .sync_to_current_updates()
549 .await
550 .expect("can sync to current updates");
551 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
552
553 f(catalog).await
554 }
555
556 pub async fn open_debug_catalog(
560 persist_client: PersistClient,
561 organization_id: Uuid,
562 bootstrap_args: &BootstrapArgs,
563 ) -> Result<Catalog, anyhow::Error> {
564 let now = SYSTEM_TIME.clone();
565 let environment_id = None;
566 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
567 .with_organization_id(organization_id)
568 .with_default_deploy_generation()
569 .build()
570 .await?;
571 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
572 let system_parameter_defaults = BTreeMap::default();
573 Self::open_debug_catalog_inner(
574 persist_client,
575 storage,
576 now,
577 environment_id,
578 &DUMMY_BUILD_INFO,
579 system_parameter_defaults,
580 bootstrap_args,
581 None,
582 )
583 .await
584 }
585
586 pub async fn open_debug_read_only_catalog(
591 persist_client: PersistClient,
592 organization_id: Uuid,
593 bootstrap_args: &BootstrapArgs,
594 ) -> Result<Catalog, anyhow::Error> {
595 let now = SYSTEM_TIME.clone();
596 let environment_id = None;
597 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
598 .with_organization_id(organization_id)
599 .build()
600 .await?;
601 let storage = openable_storage
602 .open_read_only(&test_bootstrap_args())
603 .await?;
604 let system_parameter_defaults = BTreeMap::default();
605 Self::open_debug_catalog_inner(
606 persist_client,
607 storage,
608 now,
609 environment_id,
610 &DUMMY_BUILD_INFO,
611 system_parameter_defaults,
612 bootstrap_args,
613 None,
614 )
615 .await
616 }
617
618 pub async fn open_debug_read_only_persist_catalog_config(
623 persist_client: PersistClient,
624 now: NowFn,
625 environment_id: EnvironmentId,
626 system_parameter_defaults: BTreeMap<String, String>,
627 build_info: &'static BuildInfo,
628 bootstrap_args: &BootstrapArgs,
629 enable_expression_cache_override: Option<bool>,
630 ) -> Result<Catalog, anyhow::Error> {
631 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
632 .with_organization_id(environment_id.organization_id())
633 .with_version(
634 build_info
635 .version
636 .parse()
637 .expect("build version is parseable"),
638 )
639 .build()
640 .await?;
641 let storage = openable_storage.open_read_only(bootstrap_args).await?;
642 Self::open_debug_catalog_inner(
643 persist_client,
644 storage,
645 now,
646 Some(environment_id),
647 build_info,
648 system_parameter_defaults,
649 bootstrap_args,
650 enable_expression_cache_override,
651 )
652 .await
653 }
654
655 async fn open_debug_catalog_inner(
656 persist_client: PersistClient,
657 storage: Box<dyn DurableCatalogState>,
658 now: NowFn,
659 environment_id: Option<EnvironmentId>,
660 build_info: &'static BuildInfo,
661 system_parameter_defaults: BTreeMap<String, String>,
662 bootstrap_args: &BootstrapArgs,
663 enable_expression_cache_override: Option<bool>,
664 ) -> Result<Catalog, anyhow::Error> {
665 let metrics_registry = &MetricsRegistry::new();
666 let secrets_reader = Arc::new(InMemorySecretsController::new());
667 let previous_ts = now().into();
670 let replica_size = &bootstrap_args.default_cluster_replica_size;
671 let read_only = false;
672
673 let OpenCatalogResult {
674 catalog,
675 migrated_storage_collections_0dt: _,
676 new_builtin_collections: _,
677 builtin_table_updates: _,
678 cached_global_exprs: _,
679 uncached_local_exprs: _,
680 } = Catalog::open(Config {
681 storage,
682 metrics_registry,
683 state: StateConfig {
684 unsafe_mode: true,
685 all_features: false,
686 build_info,
687 deploy_generation: 0,
688 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
689 read_only,
690 now,
691 boot_ts: previous_ts,
692 skip_migrations: true,
693 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
694 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
695 size: replica_size.clone(),
696 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
697 },
698 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
699 size: replica_size.clone(),
700 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
701 },
702 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
703 size: replica_size.clone(),
704 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
705 },
706 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
707 size: replica_size.clone(),
708 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
709 },
710 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
711 size: replica_size.clone(),
712 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
713 },
714 system_parameter_defaults,
715 remote_system_parameters: None,
716 availability_zones: vec![],
717 egress_addresses: vec![],
718 aws_principal_context: None,
719 aws_privatelink_availability_zones: None,
720 http_host_name: None,
721 connection_context: ConnectionContext::for_tests(secrets_reader),
722 builtin_item_migration_config: BuiltinItemMigrationConfig {
723 persist_client: persist_client.clone(),
724 read_only,
725 force_migration: None,
726 },
727 persist_client,
728 enable_expression_cache_override,
729 helm_chart_version: None,
730 external_login_password_mz_system: None,
731 license_key: ValidatedLicenseKey::for_tests(),
732 },
733 })
734 .await?;
735 Ok(catalog)
736 }
737
738 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
739 self.state.for_session(session)
740 }
741
742 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
743 self.state.for_sessionless_user(role_id)
744 }
745
746 pub fn for_system_session(&self) -> ConnCatalog<'_> {
747 self.state.for_system_session()
748 }
749
750 async fn storage<'a>(
751 &'a self,
752 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
753 self.storage.lock().await
754 }
755
756 pub async fn current_upper(&self) -> mz_repr::Timestamp {
757 self.storage().await.current_upper().await
758 }
759
760 pub async fn allocate_user_id(
761 &self,
762 commit_ts: mz_repr::Timestamp,
763 ) -> Result<(CatalogItemId, GlobalId), Error> {
764 self.storage()
765 .await
766 .allocate_user_id(commit_ts)
767 .await
768 .maybe_terminate("allocating user ids")
769 .err_into()
770 }
771
772 pub async fn allocate_user_ids(
774 &self,
775 amount: u64,
776 commit_ts: mz_repr::Timestamp,
777 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
778 self.storage()
779 .await
780 .allocate_user_ids(amount, commit_ts)
781 .await
782 .maybe_terminate("allocating user ids")
783 .err_into()
784 }
785
786 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
787 let commit_ts = self.storage().await.current_upper().await;
788 self.allocate_user_id(commit_ts).await
789 }
790
791 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
793 self.storage()
794 .await
795 .get_next_user_item_id()
796 .await
797 .err_into()
798 }
799
800 #[cfg(test)]
801 pub async fn allocate_system_id(
802 &self,
803 commit_ts: mz_repr::Timestamp,
804 ) -> Result<(CatalogItemId, GlobalId), Error> {
805 use mz_ore::collections::CollectionExt;
806
807 let mut storage = self.storage().await;
808 let mut txn = storage.transaction().await?;
809 let id = txn
810 .allocate_system_item_ids(1)
811 .maybe_terminate("allocating system ids")?
812 .into_element();
813 let _ = txn.get_and_commit_op_updates();
815 txn.commit(commit_ts).await?;
816 Ok(id)
817 }
818
819 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
821 self.storage()
822 .await
823 .get_next_system_item_id()
824 .await
825 .err_into()
826 }
827
828 pub async fn allocate_user_cluster_id(
829 &self,
830 commit_ts: mz_repr::Timestamp,
831 ) -> Result<ClusterId, Error> {
832 self.storage()
833 .await
834 .allocate_user_cluster_id(commit_ts)
835 .await
836 .maybe_terminate("allocating user cluster ids")
837 .err_into()
838 }
839
840 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
842 self.storage()
843 .await
844 .get_next_system_replica_id()
845 .await
846 .err_into()
847 }
848
849 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
851 self.storage()
852 .await
853 .get_next_user_replica_id()
854 .await
855 .err_into()
856 }
857
858 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
859 self.state.resolve_database(database_name)
860 }
861
862 pub fn resolve_schema(
863 &self,
864 current_database: Option<&DatabaseId>,
865 database_name: Option<&str>,
866 schema_name: &str,
867 conn_id: &ConnectionId,
868 ) -> Result<&Schema, SqlCatalogError> {
869 self.state
870 .resolve_schema(current_database, database_name, schema_name, conn_id)
871 }
872
873 pub fn resolve_schema_in_database(
874 &self,
875 database_spec: &ResolvedDatabaseSpecifier,
876 schema_name: &str,
877 conn_id: &ConnectionId,
878 ) -> Result<&Schema, SqlCatalogError> {
879 self.state
880 .resolve_schema_in_database(database_spec, schema_name, conn_id)
881 }
882
883 pub fn resolve_replica_in_cluster(
884 &self,
885 cluster_id: &ClusterId,
886 replica_name: &str,
887 ) -> Result<&ClusterReplica, SqlCatalogError> {
888 self.state
889 .resolve_replica_in_cluster(cluster_id, replica_name)
890 }
891
892 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
893 self.state.resolve_system_schema(name)
894 }
895
896 pub fn resolve_search_path(
897 &self,
898 session: &Session,
899 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
900 self.state.resolve_search_path(session)
901 }
902
903 pub fn resolve_entry(
905 &self,
906 current_database: Option<&DatabaseId>,
907 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
908 name: &PartialItemName,
909 conn_id: &ConnectionId,
910 ) -> Result<&CatalogEntry, SqlCatalogError> {
911 self.state
912 .resolve_entry(current_database, search_path, name, conn_id)
913 }
914
915 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
917 self.state.resolve_builtin_table(builtin)
918 }
919
920 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
922 self.state.resolve_builtin_log(builtin).0
923 }
924
925 pub fn resolve_builtin_storage_collection(
927 &self,
928 builtin: &'static BuiltinSource,
929 ) -> CatalogItemId {
930 self.state.resolve_builtin_source(builtin)
931 }
932
933 pub fn resolve_function(
935 &self,
936 current_database: Option<&DatabaseId>,
937 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
938 name: &PartialItemName,
939 conn_id: &ConnectionId,
940 ) -> Result<&CatalogEntry, SqlCatalogError> {
941 self.state
942 .resolve_function(current_database, search_path, name, conn_id)
943 }
944
945 pub fn resolve_type(
947 &self,
948 current_database: Option<&DatabaseId>,
949 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
950 name: &PartialItemName,
951 conn_id: &ConnectionId,
952 ) -> Result<&CatalogEntry, SqlCatalogError> {
953 self.state
954 .resolve_type(current_database, search_path, name, conn_id)
955 }
956
957 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
958 self.state.resolve_cluster(name)
959 }
960
961 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
967 self.state.resolve_builtin_cluster(cluster)
968 }
969
970 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
971 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
972 }
973
974 pub fn resolve_target_cluster(
976 &self,
977 target_cluster: TargetCluster,
978 session: &Session,
979 ) -> Result<&Cluster, AdapterError> {
980 match target_cluster {
981 TargetCluster::CatalogServer => {
982 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
983 }
984 TargetCluster::Active => self.active_cluster(session),
985 TargetCluster::Transaction(cluster_id) => self
986 .try_get_cluster(cluster_id)
987 .ok_or(AdapterError::ConcurrentClusterDrop),
988 }
989 }
990
991 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
992 if session.user().name != SYSTEM_USER.name
996 && session.user().name != SUPPORT_USER.name
997 && session.vars().cluster() == SYSTEM_USER.name
998 {
999 coord_bail!(
1000 "system cluster '{}' cannot execute user queries",
1001 SYSTEM_USER.name
1002 );
1003 }
1004 let cluster = self.resolve_cluster(session.vars().cluster())?;
1005 Ok(cluster)
1006 }
1007
1008 pub fn state(&self) -> &CatalogState {
1009 &self.state
1010 }
1011
1012 pub fn resolve_full_name(
1013 &self,
1014 name: &QualifiedItemName,
1015 conn_id: Option<&ConnectionId>,
1016 ) -> FullItemName {
1017 self.state.resolve_full_name(name, conn_id)
1018 }
1019
1020 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1021 self.state.try_get_entry(id)
1022 }
1023
1024 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1025 self.state.try_get_entry_by_global_id(id)
1026 }
1027
1028 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1029 self.state.get_entry(id)
1030 }
1031
1032 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1033 self.state.get_entry_by_global_id(id)
1034 }
1035
1036 pub fn get_global_ids<'a>(
1037 &'a self,
1038 id: &CatalogItemId,
1039 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1040 self.get_entry(id).global_ids()
1041 }
1042
1043 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1044 self.get_entry_by_global_id(id).id()
1045 }
1046
1047 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1048 let item = self.try_get_entry_by_global_id(id)?;
1049 Some(item.id())
1050 }
1051
1052 pub fn get_schema(
1053 &self,
1054 database_spec: &ResolvedDatabaseSpecifier,
1055 schema_spec: &SchemaSpecifier,
1056 conn_id: &ConnectionId,
1057 ) -> &Schema {
1058 self.state.get_schema(database_spec, schema_spec, conn_id)
1059 }
1060
1061 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1062 self.state.get_mz_catalog_schema_id()
1063 }
1064
1065 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1066 self.state.get_pg_catalog_schema_id()
1067 }
1068
1069 pub fn get_information_schema_id(&self) -> SchemaId {
1070 self.state.get_information_schema_id()
1071 }
1072
1073 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1074 self.state.get_mz_internal_schema_id()
1075 }
1076
1077 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1078 self.state.get_mz_introspection_schema_id()
1079 }
1080
1081 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1082 self.state.get_mz_unsafe_schema_id()
1083 }
1084
1085 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1086 self.state.system_schema_ids()
1087 }
1088
1089 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1090 self.state.get_database(id)
1091 }
1092
1093 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1094 self.state.try_get_role(id)
1095 }
1096
1097 pub fn get_role(&self, id: &RoleId) -> &Role {
1098 self.state.get_role(id)
1099 }
1100
1101 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1102 self.state.try_get_role_by_name(role_name)
1103 }
1104
1105 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1106 self.state.try_get_role_auth_by_id(id)
1107 }
1108
1109 pub fn create_temporary_schema(
1112 &mut self,
1113 conn_id: &ConnectionId,
1114 owner_id: RoleId,
1115 ) -> Result<(), Error> {
1116 self.state.create_temporary_schema(conn_id, owner_id)
1117 }
1118
1119 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1120 self.state.temporary_schemas[conn_id]
1121 .items
1122 .contains_key(item_name)
1123 }
1124
1125 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1128 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1129 return Ok(());
1130 };
1131 if !schema.items.is_empty() {
1132 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1133 }
1134 Ok(())
1135 }
1136
1137 pub(crate) fn object_dependents(
1138 &self,
1139 object_ids: &Vec<ObjectId>,
1140 conn_id: &ConnectionId,
1141 ) -> Vec<ObjectId> {
1142 let mut seen = BTreeSet::new();
1143 self.state.object_dependents(object_ids, conn_id, &mut seen)
1144 }
1145
1146 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1147 FullNameV1 {
1148 database: name.database.to_string(),
1149 schema: name.schema.clone(),
1150 item: name.item.clone(),
1151 }
1152 }
1153
1154 pub fn find_available_cluster_name(&self, name: &str) -> String {
1155 let mut i = 0;
1156 let mut candidate = name.to_string();
1157 while self.state.clusters_by_name.contains_key(&candidate) {
1158 i += 1;
1159 candidate = format!("{}{}", name, i);
1160 }
1161 candidate
1162 }
1163
1164 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1165 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1166 self.cluster_replica_sizes()
1167 .enabled_allocations()
1168 .map(|a| a.0.to_owned())
1169 .collect::<Vec<_>>()
1170 } else {
1171 self.system_config().allowed_cluster_replica_sizes()
1172 }
1173 }
1174
1175 pub fn concretize_replica_location(
1176 &self,
1177 location: mz_catalog::durable::ReplicaLocation,
1178 allowed_sizes: &Vec<String>,
1179 allowed_availability_zones: Option<&[String]>,
1180 ) -> Result<ReplicaLocation, Error> {
1181 self.state
1182 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1183 }
1184
1185 pub(crate) fn ensure_valid_replica_size(
1186 &self,
1187 allowed_sizes: &[String],
1188 size: &String,
1189 ) -> Result<(), Error> {
1190 self.state.ensure_valid_replica_size(allowed_sizes, size)
1191 }
1192
1193 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1194 &self.state.cluster_replica_sizes
1195 }
1196
1197 pub fn get_privileges(
1199 &self,
1200 id: &SystemObjectId,
1201 conn_id: &ConnectionId,
1202 ) -> Option<&PrivilegeMap> {
1203 match id {
1204 SystemObjectId::Object(id) => match id {
1205 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1206 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1207 ObjectId::Schema((database_spec, schema_spec)) => Some(
1208 self.get_schema(database_spec, schema_spec, conn_id)
1209 .privileges(),
1210 ),
1211 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1212 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1213 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1214 },
1215 SystemObjectId::System => Some(&self.state.system_privileges),
1216 }
1217 }
1218
1219 #[mz_ore::instrument(level = "debug")]
1220 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1221 Ok(self.storage().await.confirm_leadership().await?)
1222 }
1223
1224 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1226 self.state.introspection_dependencies(id)
1227 }
1228
1229 pub fn dump(&self) -> Result<CatalogDump, Error> {
1235 Ok(CatalogDump::new(self.state.dump(None)?))
1236 }
1237
1238 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1242 self.state.check_consistency().map_err(|inconsistencies| {
1243 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1244 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1245 })
1246 })
1247 }
1248
1249 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1250 self.state.config()
1251 }
1252
1253 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1254 self.state.entry_by_id.values()
1255 }
1256
1257 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1258 self.entries()
1259 .filter(|entry| entry.is_connection() && entry.id().is_user())
1260 }
1261
1262 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1263 self.entries()
1264 .filter(|entry| entry.is_table() && entry.id().is_user())
1265 }
1266
1267 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1268 self.entries()
1269 .filter(|entry| entry.is_source() && entry.id().is_user())
1270 }
1271
1272 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1273 self.entries()
1274 .filter(|entry| entry.is_sink() && entry.id().is_user())
1275 }
1276
1277 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1278 self.entries()
1279 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1280 }
1281
1282 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1283 self.entries()
1284 .filter(|entry| entry.is_secret() && entry.id().is_user())
1285 }
1286
1287 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1288 self.state.get_network_policy(&network_policy_id)
1289 }
1290
1291 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1292 self.state.try_get_network_policy_by_name(name)
1293 }
1294
1295 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1296 self.state.clusters_by_id.values()
1297 }
1298
1299 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1300 self.state.get_cluster(cluster_id)
1301 }
1302
1303 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1304 self.state.try_get_cluster(cluster_id)
1305 }
1306
1307 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1308 self.clusters().filter(|cluster| cluster.id.is_user())
1309 }
1310
1311 pub fn get_cluster_replica(
1312 &self,
1313 cluster_id: ClusterId,
1314 replica_id: ReplicaId,
1315 ) -> &ClusterReplica {
1316 self.state.get_cluster_replica(cluster_id, replica_id)
1317 }
1318
1319 pub fn try_get_cluster_replica(
1320 &self,
1321 cluster_id: ClusterId,
1322 replica_id: ReplicaId,
1323 ) -> Option<&ClusterReplica> {
1324 self.state.try_get_cluster_replica(cluster_id, replica_id)
1325 }
1326
1327 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1328 self.user_clusters()
1329 .flat_map(|cluster| cluster.user_replicas())
1330 }
1331
1332 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1333 self.state.database_by_id.values()
1334 }
1335
1336 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1337 self.state
1338 .roles_by_id
1339 .values()
1340 .filter(|role| role.is_user())
1341 }
1342
1343 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1344 self.entries()
1345 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1346 }
1347
1348 pub fn system_privileges(&self) -> &PrivilegeMap {
1349 &self.state.system_privileges
1350 }
1351
1352 pub fn default_privileges(
1353 &self,
1354 ) -> impl Iterator<
1355 Item = (
1356 &DefaultPrivilegeObject,
1357 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1358 ),
1359 > {
1360 self.state.default_privileges.iter()
1361 }
1362
1363 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1364 self.state
1365 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1366 }
1367
1368 pub fn pack_storage_usage_update(
1369 &self,
1370 event: VersionedStorageUsage,
1371 diff: Diff,
1372 ) -> BuiltinTableUpdate {
1373 self.state
1374 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1375 }
1376
1377 pub fn system_config(&self) -> &SystemVars {
1378 self.state.system_config()
1379 }
1380
1381 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1382 self.state.system_config_mut()
1383 }
1384
1385 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1386 self.state.ensure_not_reserved_role(role_id)
1387 }
1388
1389 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1390 self.state.ensure_grantable_role(role_id)
1391 }
1392
1393 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1394 self.state.ensure_not_system_role(role_id)
1395 }
1396
1397 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1398 self.state.ensure_not_predefined_role(role_id)
1399 }
1400
1401 pub fn ensure_not_reserved_network_policy(
1402 &self,
1403 network_policy_id: &NetworkPolicyId,
1404 ) -> Result<(), Error> {
1405 self.state
1406 .ensure_not_reserved_network_policy(network_policy_id)
1407 }
1408
1409 pub fn ensure_not_reserved_object(
1410 &self,
1411 object_id: &ObjectId,
1412 conn_id: &ConnectionId,
1413 ) -> Result<(), Error> {
1414 match object_id {
1415 ObjectId::Cluster(cluster_id) => {
1416 if cluster_id.is_system() {
1417 let cluster = self.get_cluster(*cluster_id);
1418 Err(Error::new(ErrorKind::ReadOnlyCluster(
1419 cluster.name().to_string(),
1420 )))
1421 } else {
1422 Ok(())
1423 }
1424 }
1425 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1426 if replica_id.is_system() {
1427 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1428 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1429 replica.name().to_string(),
1430 )))
1431 } else {
1432 Ok(())
1433 }
1434 }
1435 ObjectId::Database(database_id) => {
1436 if database_id.is_system() {
1437 let database = self.get_database(database_id);
1438 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1439 database.name().to_string(),
1440 )))
1441 } else {
1442 Ok(())
1443 }
1444 }
1445 ObjectId::Schema((database_spec, schema_spec)) => {
1446 if schema_spec.is_system() {
1447 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1448 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1449 schema.name().schema.clone(),
1450 )))
1451 } else {
1452 Ok(())
1453 }
1454 }
1455 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1456 ObjectId::Item(item_id) => {
1457 if item_id.is_system() {
1458 let item = self.get_entry(item_id);
1459 let name = self.resolve_full_name(item.name(), Some(conn_id));
1460 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1461 } else {
1462 Ok(())
1463 }
1464 }
1465 ObjectId::NetworkPolicy(network_policy_id) => {
1466 self.ensure_not_reserved_network_policy(network_policy_id)
1467 }
1468 }
1469 }
1470
1471 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1473 &mut self,
1474 create_sql: &str,
1475 force_if_exists_skip: bool,
1476 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1477 self.state
1478 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1479 }
1480
1481 pub(crate) fn update_expression_cache<'a, 'b>(
1482 &'a self,
1483 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1484 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1485 ) -> BoxFuture<'b, ()> {
1486 if let Some(expr_cache) = &self.expr_cache_handle {
1487 let ons = new_local_expressions
1488 .iter()
1489 .map(|(id, _)| id)
1490 .chain(new_global_expressions.iter().map(|(id, _)| id))
1491 .map(|id| self.get_entry_by_global_id(id))
1492 .filter_map(|entry| entry.index().map(|index| index.on));
1493 let invalidate_ids = self.invalidate_for_index(ons);
1494 expr_cache
1495 .update(
1496 new_local_expressions,
1497 new_global_expressions,
1498 invalidate_ids,
1499 )
1500 .boxed()
1501 } else {
1502 async {}.boxed()
1503 }
1504 }
1505
1506 #[cfg(test)]
1510 async fn sync_to_current_updates(
1511 &mut self,
1512 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1513 let updates = self.storage().await.sync_to_current_updates().await?;
1514 let builtin_table_updates = self.state.apply_updates(updates)?;
1515 Ok(builtin_table_updates)
1516 }
1517}
1518
1519pub fn is_reserved_name(name: &str) -> bool {
1520 BUILTIN_PREFIXES
1521 .iter()
1522 .any(|prefix| name.starts_with(prefix))
1523}
1524
1525pub fn is_reserved_role_name(name: &str) -> bool {
1526 is_reserved_name(name) || is_public_role(name)
1527}
1528
1529pub fn is_public_role(name: &str) -> bool {
1530 name == &*PUBLIC_ROLE_NAME
1531}
1532
1533pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1534 object_type_to_audit_object_type(sql_type.into())
1535}
1536
1537pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1538 match id {
1539 CommentObjectId::Table(_) => ObjectType::Table,
1540 CommentObjectId::View(_) => ObjectType::View,
1541 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1542 CommentObjectId::Source(_) => ObjectType::Source,
1543 CommentObjectId::Sink(_) => ObjectType::Sink,
1544 CommentObjectId::Index(_) => ObjectType::Index,
1545 CommentObjectId::Func(_) => ObjectType::Func,
1546 CommentObjectId::Connection(_) => ObjectType::Connection,
1547 CommentObjectId::Type(_) => ObjectType::Type,
1548 CommentObjectId::Secret(_) => ObjectType::Secret,
1549 CommentObjectId::Role(_) => ObjectType::Role,
1550 CommentObjectId::Database(_) => ObjectType::Database,
1551 CommentObjectId::Schema(_) => ObjectType::Schema,
1552 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1553 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1554 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1555 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1556 }
1557}
1558
1559pub(crate) fn object_type_to_audit_object_type(
1560 object_type: mz_sql::catalog::ObjectType,
1561) -> ObjectType {
1562 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1563}
1564
1565pub(crate) fn system_object_type_to_audit_object_type(
1566 system_type: &SystemObjectType,
1567) -> ObjectType {
1568 match system_type {
1569 SystemObjectType::Object(object_type) => match object_type {
1570 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1571 mz_sql::catalog::ObjectType::View => ObjectType::View,
1572 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1573 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1574 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1575 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1576 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1577 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1578 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1579 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1580 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1581 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1582 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1583 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1584 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1585 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1586 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1587 },
1588 SystemObjectType::System => ObjectType::System,
1589 }
1590}
1591
1592#[derive(Debug, Copy, Clone)]
1593pub enum UpdatePrivilegeVariant {
1594 Grant,
1595 Revoke,
1596}
1597
1598impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1599 fn from(variant: UpdatePrivilegeVariant) -> Self {
1600 match variant {
1601 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1602 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1603 }
1604 }
1605}
1606
1607impl From<UpdatePrivilegeVariant> for EventType {
1608 fn from(variant: UpdatePrivilegeVariant) -> Self {
1609 match variant {
1610 UpdatePrivilegeVariant::Grant => EventType::Grant,
1611 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1612 }
1613 }
1614}
1615
1616impl ConnCatalog<'_> {
1617 fn resolve_item_name(
1618 &self,
1619 name: &PartialItemName,
1620 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1621 self.resolve_item(name).map(|entry| entry.name())
1622 }
1623
1624 fn resolve_function_name(
1625 &self,
1626 name: &PartialItemName,
1627 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1628 self.resolve_function(name).map(|entry| entry.name())
1629 }
1630
1631 fn resolve_type_name(
1632 &self,
1633 name: &PartialItemName,
1634 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1635 self.resolve_type(name).map(|entry| entry.name())
1636 }
1637}
1638
1639impl ExprHumanizer for ConnCatalog<'_> {
1640 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1641 let entry = self.state.try_get_entry_by_global_id(&id)?;
1642 Some(self.resolve_full_name(entry.name()).to_string())
1643 }
1644
1645 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1646 let entry = self.state.try_get_entry_by_global_id(&id)?;
1647 Some(entry.name().item.clone())
1648 }
1649
1650 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1651 let entry = self.state.try_get_entry_by_global_id(&id)?;
1652 Some(self.resolve_full_name(entry.name()).into_parts())
1653 }
1654
1655 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1656 use SqlScalarType::*;
1657
1658 match typ {
1659 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1660 List {
1661 custom_id: Some(item_id),
1662 ..
1663 }
1664 | Map {
1665 custom_id: Some(item_id),
1666 ..
1667 } => {
1668 let item = self.get_item(item_id);
1669 self.minimal_qualification(item.name()).to_string()
1670 }
1671 List { element_type, .. } => {
1672 format!(
1673 "{} list",
1674 self.humanize_scalar_type(element_type, postgres_compat)
1675 )
1676 }
1677 Map { value_type, .. } => format!(
1678 "map[{}=>{}]",
1679 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1680 self.humanize_scalar_type(value_type, postgres_compat)
1681 ),
1682 Record {
1683 custom_id: Some(item_id),
1684 ..
1685 } => {
1686 let item = self.get_item(item_id);
1687 self.minimal_qualification(item.name()).to_string()
1688 }
1689 Record { fields, .. } => format!(
1690 "record({})",
1691 fields
1692 .iter()
1693 .map(|f| format!(
1694 "{}: {}",
1695 f.0,
1696 self.humanize_column_type(&f.1, postgres_compat)
1697 ))
1698 .join(",")
1699 ),
1700 PgLegacyChar => "\"char\"".into(),
1701 Char { length } if !postgres_compat => match length {
1702 None => "char".into(),
1703 Some(length) => format!("char({})", length.into_u32()),
1704 },
1705 VarChar { max_length } if !postgres_compat => match max_length {
1706 None => "varchar".into(),
1707 Some(length) => format!("varchar({})", length.into_u32()),
1708 },
1709 UInt16 => "uint2".into(),
1710 UInt32 => "uint4".into(),
1711 UInt64 => "uint8".into(),
1712 ty => {
1713 let pgrepr_type = mz_pgrepr::Type::from(ty);
1714 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1715
1716 let res = if self
1717 .effective_search_path(true)
1718 .iter()
1719 .any(|(_, schema)| schema == &pg_catalog_schema)
1720 {
1721 pgrepr_type.name().to_string()
1722 } else {
1723 let name = QualifiedItemName {
1726 qualifiers: ItemQualifiers {
1727 database_spec: ResolvedDatabaseSpecifier::Ambient,
1728 schema_spec: pg_catalog_schema,
1729 },
1730 item: pgrepr_type.name().to_string(),
1731 };
1732 self.resolve_full_name(&name).to_string()
1733 };
1734 res
1735 }
1736 }
1737 }
1738
1739 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1740 let entry = self.state.try_get_entry_by_global_id(&id)?;
1741
1742 match entry.index() {
1743 Some(index) => {
1744 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1745 let mut on_names = on_desc
1746 .iter_names()
1747 .map(|col_name| col_name.to_string())
1748 .collect::<Vec<_>>();
1749
1750 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1751
1752 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1756 let mut ix_names = vec![String::new(); ix_arity];
1757
1758 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1760 let on_name = on_names.get_mut(on_pos).expect("on_name");
1761 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1762 std::mem::swap(on_name, ix_name);
1763 }
1764
1765 Some(ix_names) }
1767 None => {
1768 let desc = self.state.try_get_desc_by_global_id(&id)?;
1769 let column_names = desc
1770 .iter_names()
1771 .map(|col_name| col_name.to_string())
1772 .collect();
1773
1774 Some(column_names)
1775 }
1776 }
1777 }
1778
1779 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1780 let desc = self.state.try_get_desc_by_global_id(&id)?;
1781 Some(desc.get_name(column).to_string())
1782 }
1783
1784 fn id_exists(&self, id: GlobalId) -> bool {
1785 self.state.entry_by_global_id.contains_key(&id)
1786 }
1787}
1788
1789impl SessionCatalog for ConnCatalog<'_> {
1790 fn active_role_id(&self) -> &RoleId {
1791 &self.role_id
1792 }
1793
1794 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1795 self.prepared_statements
1796 .as_ref()
1797 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1798 .flatten()
1799 }
1800
1801 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1802 self.portals
1803 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1804 }
1805
1806 fn active_database(&self) -> Option<&DatabaseId> {
1807 self.database.as_ref()
1808 }
1809
1810 fn active_cluster(&self) -> &str {
1811 &self.cluster
1812 }
1813
1814 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1815 &self.search_path
1816 }
1817
1818 fn resolve_database(
1819 &self,
1820 database_name: &str,
1821 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1822 Ok(self.state.resolve_database(database_name)?)
1823 }
1824
1825 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1826 self.state
1827 .database_by_id
1828 .get(id)
1829 .expect("database doesn't exist")
1830 }
1831
1832 #[allow(clippy::as_conversions)]
1834 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1835 self.state
1836 .database_by_id
1837 .values()
1838 .map(|database| database as &dyn CatalogDatabase)
1839 .collect()
1840 }
1841
1842 fn resolve_schema(
1843 &self,
1844 database_name: Option<&str>,
1845 schema_name: &str,
1846 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1847 Ok(self.state.resolve_schema(
1848 self.database.as_ref(),
1849 database_name,
1850 schema_name,
1851 &self.conn_id,
1852 )?)
1853 }
1854
1855 fn resolve_schema_in_database(
1856 &self,
1857 database_spec: &ResolvedDatabaseSpecifier,
1858 schema_name: &str,
1859 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1860 Ok(self
1861 .state
1862 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1863 }
1864
1865 fn get_schema(
1866 &self,
1867 database_spec: &ResolvedDatabaseSpecifier,
1868 schema_spec: &SchemaSpecifier,
1869 ) -> &dyn CatalogSchema {
1870 self.state
1871 .get_schema(database_spec, schema_spec, &self.conn_id)
1872 }
1873
1874 #[allow(clippy::as_conversions)]
1876 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1877 self.get_databases()
1878 .into_iter()
1879 .flat_map(|database| database.schemas().into_iter())
1880 .chain(
1881 self.state
1882 .ambient_schemas_by_id
1883 .values()
1884 .chain(self.state.temporary_schemas.values())
1885 .map(|schema| schema as &dyn CatalogSchema),
1886 )
1887 .collect()
1888 }
1889
1890 fn get_mz_internal_schema_id(&self) -> SchemaId {
1891 self.state().get_mz_internal_schema_id()
1892 }
1893
1894 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1895 self.state().get_mz_unsafe_schema_id()
1896 }
1897
1898 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1899 self.state.is_system_schema_specifier(schema)
1900 }
1901
1902 fn resolve_role(
1903 &self,
1904 role_name: &str,
1905 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1906 match self.state.try_get_role_by_name(role_name) {
1907 Some(role) => Ok(role),
1908 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1909 }
1910 }
1911
1912 fn resolve_network_policy(
1913 &self,
1914 policy_name: &str,
1915 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1916 match self.state.try_get_network_policy_by_name(policy_name) {
1917 Some(policy) => Ok(policy),
1918 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1919 }
1920 }
1921
1922 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1923 Some(self.state.roles_by_id.get(id)?)
1924 }
1925
1926 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1927 self.state.get_role(id)
1928 }
1929
1930 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1931 #[allow(clippy::as_conversions)]
1933 self.state
1934 .roles_by_id
1935 .values()
1936 .map(|role| role as &dyn CatalogRole)
1937 .collect()
1938 }
1939
1940 fn mz_system_role_id(&self) -> RoleId {
1941 MZ_SYSTEM_ROLE_ID
1942 }
1943
1944 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1945 self.state.collect_role_membership(id)
1946 }
1947
1948 fn get_network_policy(
1949 &self,
1950 id: &NetworkPolicyId,
1951 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1952 self.state.get_network_policy(id)
1953 }
1954
1955 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1956 #[allow(clippy::as_conversions)]
1958 self.state
1959 .network_policies_by_id
1960 .values()
1961 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1962 .collect()
1963 }
1964
1965 fn resolve_cluster(
1966 &self,
1967 cluster_name: Option<&str>,
1968 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1969 Ok(self
1970 .state
1971 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1972 }
1973
1974 fn resolve_cluster_replica(
1975 &self,
1976 cluster_replica_name: &QualifiedReplica,
1977 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1978 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1979 }
1980
1981 fn resolve_item(
1982 &self,
1983 name: &PartialItemName,
1984 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1985 let r = self.state.resolve_entry(
1986 self.database.as_ref(),
1987 &self.effective_search_path(true),
1988 name,
1989 &self.conn_id,
1990 )?;
1991 if self.unresolvable_ids.contains(&r.id()) {
1992 Err(SqlCatalogError::UnknownItem(name.to_string()))
1993 } else {
1994 Ok(r)
1995 }
1996 }
1997
1998 fn resolve_function(
1999 &self,
2000 name: &PartialItemName,
2001 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2002 let r = self.state.resolve_function(
2003 self.database.as_ref(),
2004 &self.effective_search_path(false),
2005 name,
2006 &self.conn_id,
2007 )?;
2008
2009 if self.unresolvable_ids.contains(&r.id()) {
2010 Err(SqlCatalogError::UnknownFunction {
2011 name: name.to_string(),
2012 alternative: None,
2013 })
2014 } else {
2015 Ok(r)
2016 }
2017 }
2018
2019 fn resolve_type(
2020 &self,
2021 name: &PartialItemName,
2022 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2023 let r = self.state.resolve_type(
2024 self.database.as_ref(),
2025 &self.effective_search_path(false),
2026 name,
2027 &self.conn_id,
2028 )?;
2029
2030 if self.unresolvable_ids.contains(&r.id()) {
2031 Err(SqlCatalogError::UnknownType {
2032 name: name.to_string(),
2033 })
2034 } else {
2035 Ok(r)
2036 }
2037 }
2038
2039 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2040 self.state.get_system_type(name)
2041 }
2042
2043 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2044 Some(self.state.try_get_entry(id)?)
2045 }
2046
2047 fn try_get_item_by_global_id(
2048 &self,
2049 id: &GlobalId,
2050 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2051 let entry = self.state.try_get_entry_by_global_id(id)?;
2052 let entry = match &entry.item {
2053 CatalogItem::Table(table) => {
2054 let (version, _gid) = table
2055 .collections
2056 .iter()
2057 .find(|(_version, gid)| *gid == id)
2058 .expect("catalog out of sync, mismatched GlobalId");
2059 entry.at_version(RelationVersionSelector::Specific(*version))
2060 }
2061 _ => entry.at_version(RelationVersionSelector::Latest),
2062 };
2063 Some(entry)
2064 }
2065
2066 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2067 self.state.get_entry(id)
2068 }
2069
2070 fn get_item_by_global_id(
2071 &self,
2072 id: &GlobalId,
2073 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2074 let entry = self.state.get_entry_by_global_id(id);
2075 let entry = match &entry.item {
2076 CatalogItem::Table(table) => {
2077 let (version, _gid) = table
2078 .collections
2079 .iter()
2080 .find(|(_version, gid)| *gid == id)
2081 .expect("catalog out of sync, mismatched GlobalId");
2082 entry.at_version(RelationVersionSelector::Specific(*version))
2083 }
2084 _ => entry.at_version(RelationVersionSelector::Latest),
2085 };
2086 entry
2087 }
2088
2089 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2090 self.get_schemas()
2091 .into_iter()
2092 .flat_map(|schema| schema.item_ids())
2093 .map(|id| self.get_item(&id))
2094 .collect()
2095 }
2096
2097 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2098 self.state
2099 .get_item_by_name(name, &self.conn_id)
2100 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2101 }
2102
2103 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2104 self.state
2105 .get_type_by_name(name, &self.conn_id)
2106 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2107 }
2108
2109 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2110 &self.state.clusters_by_id[&id]
2111 }
2112
2113 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2114 self.state
2115 .clusters_by_id
2116 .values()
2117 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2118 .collect()
2119 }
2120
2121 fn get_cluster_replica(
2122 &self,
2123 cluster_id: ClusterId,
2124 replica_id: ReplicaId,
2125 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2126 let cluster = self.get_cluster(cluster_id);
2127 cluster.replica(replica_id)
2128 }
2129
2130 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2131 self.get_clusters()
2132 .into_iter()
2133 .flat_map(|cluster| cluster.replicas().into_iter())
2134 .collect()
2135 }
2136
2137 fn get_system_privileges(&self) -> &PrivilegeMap {
2138 &self.state.system_privileges
2139 }
2140
2141 fn get_default_privileges(
2142 &self,
2143 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2144 self.state
2145 .default_privileges
2146 .iter()
2147 .map(|(object, acl_items)| (object, acl_items.collect()))
2148 .collect()
2149 }
2150
2151 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2152 self.state.find_available_name(name, &self.conn_id)
2153 }
2154
2155 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2156 self.state.resolve_full_name(name, Some(&self.conn_id))
2157 }
2158
2159 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2160 self.state.resolve_full_schema_name(name)
2161 }
2162
2163 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2164 self.state.get_entry_by_global_id(global_id).id()
2165 }
2166
2167 fn resolve_global_id(
2168 &self,
2169 item_id: &CatalogItemId,
2170 version: RelationVersionSelector,
2171 ) -> GlobalId {
2172 self.state
2173 .get_entry(item_id)
2174 .at_version(version)
2175 .global_id()
2176 }
2177
2178 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2179 self.state.config()
2180 }
2181
2182 fn now(&self) -> EpochMillis {
2183 (self.state.config().now)()
2184 }
2185
2186 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2187 self.state.aws_privatelink_availability_zones.clone()
2188 }
2189
2190 fn system_vars(&self) -> &SystemVars {
2191 &self.state.system_configuration
2192 }
2193
2194 fn system_vars_mut(&mut self) -> &mut SystemVars {
2195 &mut self.state.to_mut().system_configuration
2196 }
2197
2198 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2199 self.state().get_owner_id(id, self.conn_id())
2200 }
2201
2202 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2203 match id {
2204 SystemObjectId::System => Some(&self.state.system_privileges),
2205 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2206 Some(self.get_cluster(*id).privileges())
2207 }
2208 SystemObjectId::Object(ObjectId::Database(id)) => {
2209 Some(self.get_database(id).privileges())
2210 }
2211 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2212 Some(self.get_schema(database_spec, schema_spec).privileges())
2213 }
2214 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2215 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2216 Some(self.get_network_policy(id).privileges())
2217 }
2218 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2219 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2220 }
2221 }
2222
2223 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2224 let mut seen = BTreeSet::new();
2225 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2226 }
2227
2228 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2229 let mut seen = BTreeSet::new();
2230 self.state.item_dependents(id, &mut seen)
2231 }
2232
2233 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2234 rbac::all_object_privileges(object_type)
2235 }
2236
2237 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2238 self.state.get_object_type(object_id)
2239 }
2240
2241 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2242 self.state.get_system_object_type(id)
2243 }
2244
2245 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2248 let database_id = match &qualified_name.qualifiers.database_spec {
2249 ResolvedDatabaseSpecifier::Ambient => None,
2250 ResolvedDatabaseSpecifier::Id(id)
2251 if self.database.is_some() && self.database == Some(*id) =>
2252 {
2253 None
2254 }
2255 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2256 };
2257
2258 let schema_spec = if database_id.is_none()
2259 && self.resolve_item_name(&PartialItemName {
2260 database: None,
2261 schema: None,
2262 item: qualified_name.item.clone(),
2263 }) == Ok(qualified_name)
2264 || self.resolve_function_name(&PartialItemName {
2265 database: None,
2266 schema: None,
2267 item: qualified_name.item.clone(),
2268 }) == Ok(qualified_name)
2269 || self.resolve_type_name(&PartialItemName {
2270 database: None,
2271 schema: None,
2272 item: qualified_name.item.clone(),
2273 }) == Ok(qualified_name)
2274 {
2275 None
2276 } else {
2277 Some(qualified_name.qualifiers.schema_spec.clone())
2280 };
2281
2282 let res = PartialItemName {
2283 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2284 schema: schema_spec.map(|spec| {
2285 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2286 .name()
2287 .schema
2288 .clone()
2289 }),
2290 item: qualified_name.item.clone(),
2291 };
2292 assert!(
2293 self.resolve_item_name(&res) == Ok(qualified_name)
2294 || self.resolve_function_name(&res) == Ok(qualified_name)
2295 || self.resolve_type_name(&res) == Ok(qualified_name)
2296 );
2297 res
2298 }
2299
2300 fn add_notice(&self, notice: PlanNotice) {
2301 let _ = self.notices_tx.send(notice.into());
2302 }
2303
2304 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2305 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2306 self.state.comments.get_object_comments(comment_id)
2307 }
2308
2309 fn is_cluster_size_cc(&self, size: &str) -> bool {
2310 self.state
2311 .cluster_replica_sizes
2312 .0
2313 .get(size)
2314 .map_or(false, |a| a.is_cc)
2315 }
2316}
2317
2318#[cfg(test)]
2319mod tests {
2320 use std::collections::{BTreeMap, BTreeSet};
2321 use std::sync::Arc;
2322 use std::{env, iter};
2323
2324 use itertools::Itertools;
2325 use mz_catalog::memory::objects::CatalogItem;
2326 use tokio_postgres::NoTls;
2327 use tokio_postgres::types::Type;
2328 use uuid::Uuid;
2329
2330 use mz_catalog::SYSTEM_CONN_ID;
2331 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2332 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2333 use mz_controller_types::{ClusterId, ReplicaId};
2334 use mz_expr::MirScalarExpr;
2335 use mz_ore::now::to_datetime;
2336 use mz_ore::{assert_err, assert_ok, task};
2337 use mz_persist_client::PersistClient;
2338 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2339 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2340 use mz_repr::role_id::RoleId;
2341 use mz_repr::{
2342 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2343 SqlScalarType, Timestamp,
2344 };
2345 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2346 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2347 use mz_sql::names::{
2348 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2349 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2350 };
2351 use mz_sql::plan::{
2352 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2353 Scope, StatementContext,
2354 };
2355 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2356 use mz_sql::session::vars::{SystemVars, VarInput};
2357
2358 use crate::catalog::state::LocalExpressionCache;
2359 use crate::catalog::{Catalog, Op};
2360 use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2361 use crate::session::Session;
2362
2363 #[mz_ore::test(tokio::test)]
2370 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2372 Catalog::with_debug(|catalog| async move {
2373 struct TestCase {
2374 input: QualifiedItemName,
2375 system_output: PartialItemName,
2376 normal_output: PartialItemName,
2377 }
2378
2379 let test_cases = vec![
2380 TestCase {
2381 input: QualifiedItemName {
2382 qualifiers: ItemQualifiers {
2383 database_spec: ResolvedDatabaseSpecifier::Ambient,
2384 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2385 },
2386 item: "numeric".to_string(),
2387 },
2388 system_output: PartialItemName {
2389 database: None,
2390 schema: None,
2391 item: "numeric".to_string(),
2392 },
2393 normal_output: PartialItemName {
2394 database: None,
2395 schema: None,
2396 item: "numeric".to_string(),
2397 },
2398 },
2399 TestCase {
2400 input: QualifiedItemName {
2401 qualifiers: ItemQualifiers {
2402 database_spec: ResolvedDatabaseSpecifier::Ambient,
2403 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2404 },
2405 item: "mz_array_types".to_string(),
2406 },
2407 system_output: PartialItemName {
2408 database: None,
2409 schema: None,
2410 item: "mz_array_types".to_string(),
2411 },
2412 normal_output: PartialItemName {
2413 database: None,
2414 schema: None,
2415 item: "mz_array_types".to_string(),
2416 },
2417 },
2418 ];
2419
2420 for tc in test_cases {
2421 assert_eq!(
2422 catalog
2423 .for_system_session()
2424 .minimal_qualification(&tc.input),
2425 tc.system_output
2426 );
2427 assert_eq!(
2428 catalog
2429 .for_session(&Session::dummy())
2430 .minimal_qualification(&tc.input),
2431 tc.normal_output
2432 );
2433 }
2434 catalog.expire().await;
2435 })
2436 .await
2437 }
2438
2439 #[mz_ore::test(tokio::test)]
2440 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2442 let persist_client = PersistClient::new_for_tests().await;
2443 let organization_id = Uuid::new_v4();
2444 let bootstrap_args = test_bootstrap_args();
2445 {
2446 let mut catalog = Catalog::open_debug_catalog(
2447 persist_client.clone(),
2448 organization_id.clone(),
2449 &bootstrap_args,
2450 )
2451 .await
2452 .expect("unable to open debug catalog");
2453 assert_eq!(catalog.transient_revision(), 1);
2454 let commit_ts = catalog.current_upper().await;
2455 catalog
2456 .transact(
2457 None,
2458 commit_ts,
2459 None,
2460 vec![Op::CreateDatabase {
2461 name: "test".to_string(),
2462 owner_id: MZ_SYSTEM_ROLE_ID,
2463 }],
2464 )
2465 .await
2466 .expect("failed to transact");
2467 assert_eq!(catalog.transient_revision(), 2);
2468 catalog.expire().await;
2469 }
2470 {
2471 let catalog =
2472 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2473 .await
2474 .expect("unable to open debug catalog");
2475 assert_eq!(catalog.transient_revision(), 1);
2477 catalog.expire().await;
2478 }
2479 }
2480
2481 #[mz_ore::test(tokio::test)]
2482 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2484 Catalog::with_debug(|catalog| async move {
2485 let mz_catalog_schema = (
2486 ResolvedDatabaseSpecifier::Ambient,
2487 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2488 );
2489 let pg_catalog_schema = (
2490 ResolvedDatabaseSpecifier::Ambient,
2491 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2492 );
2493 let mz_temp_schema = (
2494 ResolvedDatabaseSpecifier::Ambient,
2495 SchemaSpecifier::Temporary,
2496 );
2497
2498 let session = Session::dummy();
2500 let conn_catalog = catalog.for_session(&session);
2501 assert_ne!(
2502 conn_catalog.effective_search_path(false),
2503 conn_catalog.search_path
2504 );
2505 assert_ne!(
2506 conn_catalog.effective_search_path(true),
2507 conn_catalog.search_path
2508 );
2509 assert_eq!(
2510 conn_catalog.effective_search_path(false),
2511 vec![
2512 mz_catalog_schema.clone(),
2513 pg_catalog_schema.clone(),
2514 conn_catalog.search_path[0].clone()
2515 ]
2516 );
2517 assert_eq!(
2518 conn_catalog.effective_search_path(true),
2519 vec![
2520 mz_temp_schema.clone(),
2521 mz_catalog_schema.clone(),
2522 pg_catalog_schema.clone(),
2523 conn_catalog.search_path[0].clone()
2524 ]
2525 );
2526
2527 let mut session = Session::dummy();
2529 session
2530 .vars_mut()
2531 .set(
2532 &SystemVars::new(),
2533 "search_path",
2534 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2535 false,
2536 )
2537 .expect("failed to set search_path");
2538 let conn_catalog = catalog.for_session(&session);
2539 assert_ne!(
2540 conn_catalog.effective_search_path(false),
2541 conn_catalog.search_path
2542 );
2543 assert_ne!(
2544 conn_catalog.effective_search_path(true),
2545 conn_catalog.search_path
2546 );
2547 assert_eq!(
2548 conn_catalog.effective_search_path(false),
2549 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2550 );
2551 assert_eq!(
2552 conn_catalog.effective_search_path(true),
2553 vec![
2554 mz_temp_schema.clone(),
2555 mz_catalog_schema.clone(),
2556 pg_catalog_schema.clone()
2557 ]
2558 );
2559
2560 let mut session = Session::dummy();
2561 session
2562 .vars_mut()
2563 .set(
2564 &SystemVars::new(),
2565 "search_path",
2566 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2567 false,
2568 )
2569 .expect("failed to set search_path");
2570 let conn_catalog = catalog.for_session(&session);
2571 assert_ne!(
2572 conn_catalog.effective_search_path(false),
2573 conn_catalog.search_path
2574 );
2575 assert_ne!(
2576 conn_catalog.effective_search_path(true),
2577 conn_catalog.search_path
2578 );
2579 assert_eq!(
2580 conn_catalog.effective_search_path(false),
2581 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2582 );
2583 assert_eq!(
2584 conn_catalog.effective_search_path(true),
2585 vec![
2586 mz_temp_schema.clone(),
2587 pg_catalog_schema.clone(),
2588 mz_catalog_schema.clone()
2589 ]
2590 );
2591
2592 let mut session = Session::dummy();
2593 session
2594 .vars_mut()
2595 .set(
2596 &SystemVars::new(),
2597 "search_path",
2598 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2599 false,
2600 )
2601 .expect("failed to set search_path");
2602 let conn_catalog = catalog.for_session(&session);
2603 assert_ne!(
2604 conn_catalog.effective_search_path(false),
2605 conn_catalog.search_path
2606 );
2607 assert_ne!(
2608 conn_catalog.effective_search_path(true),
2609 conn_catalog.search_path
2610 );
2611 assert_eq!(
2612 conn_catalog.effective_search_path(false),
2613 vec![
2614 mz_catalog_schema.clone(),
2615 pg_catalog_schema.clone(),
2616 mz_temp_schema.clone()
2617 ]
2618 );
2619 assert_eq!(
2620 conn_catalog.effective_search_path(true),
2621 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2622 );
2623 catalog.expire().await;
2624 })
2625 .await
2626 }
2627
2628 #[mz_ore::test(tokio::test)]
2629 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2631 use mz_ore::collections::CollectionExt;
2632 Catalog::with_debug(|catalog| async move {
2633 let conn_catalog = catalog.for_system_session();
2634 let scx = &mut StatementContext::new(None, &conn_catalog);
2635
2636 let parsed = mz_sql_parser::parser::parse_statements(
2637 "create view public.foo as select 1 as bar",
2638 )
2639 .expect("")
2640 .into_element()
2641 .ast;
2642
2643 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2644
2645 assert_eq!(
2647 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2648 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2649 );
2650 catalog.expire().await;
2651 })
2652 .await;
2653 }
2654
2655 #[mz_ore::test(tokio::test)]
2657 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2659 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2660 let column = ", 1";
2661 let view_def_size = view_def.bytes().count();
2662 let column_size = column.bytes().count();
2663 let column_count =
2664 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2665 let columns = iter::repeat(column).take(column_count).join("");
2666 let create_sql = format!("{view_def}{columns})");
2667 let create_sql_check = create_sql.clone();
2668 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2669 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2670 &create_sql
2671 ));
2672
2673 let persist_client = PersistClient::new_for_tests().await;
2674 let organization_id = Uuid::new_v4();
2675 let id = CatalogItemId::User(1);
2676 let gid = GlobalId::User(1);
2677 let bootstrap_args = test_bootstrap_args();
2678 {
2679 let mut catalog = Catalog::open_debug_catalog(
2680 persist_client.clone(),
2681 organization_id.clone(),
2682 &bootstrap_args,
2683 )
2684 .await
2685 .expect("unable to open debug catalog");
2686 let item = catalog
2687 .state()
2688 .deserialize_item(
2689 gid,
2690 &create_sql,
2691 &BTreeMap::new(),
2692 &mut LocalExpressionCache::Closed,
2693 None,
2694 )
2695 .expect("unable to parse view");
2696 let commit_ts = catalog.current_upper().await;
2697 catalog
2698 .transact(
2699 None,
2700 commit_ts,
2701 None,
2702 vec![Op::CreateItem {
2703 item,
2704 name: QualifiedItemName {
2705 qualifiers: ItemQualifiers {
2706 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2707 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2708 },
2709 item: "v".to_string(),
2710 },
2711 id,
2712 owner_id: MZ_SYSTEM_ROLE_ID,
2713 }],
2714 )
2715 .await
2716 .expect("failed to transact");
2717 catalog.expire().await;
2718 }
2719 {
2720 let catalog =
2721 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2722 .await
2723 .expect("unable to open debug catalog");
2724 let view = catalog.get_entry(&id);
2725 assert_eq!("v", view.name.item);
2726 match &view.item {
2727 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2728 item => panic!("expected view, got {}", item.typ()),
2729 }
2730 catalog.expire().await;
2731 }
2732 }
2733
2734 #[mz_ore::test(tokio::test)]
2735 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2737 Catalog::with_debug(|catalog| async move {
2738 let conn_catalog = catalog.for_system_session();
2739
2740 assert_eq!(
2741 mz_sql::catalog::ObjectType::ClusterReplica,
2742 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2743 ClusterId::user(1).expect("1 is a valid ID"),
2744 ReplicaId::User(1)
2745 )))
2746 );
2747 assert_eq!(
2748 mz_sql::catalog::ObjectType::Role,
2749 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2750 );
2751 catalog.expire().await;
2752 })
2753 .await;
2754 }
2755
2756 #[mz_ore::test(tokio::test)]
2757 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2759 Catalog::with_debug(|catalog| async move {
2760 let conn_catalog = catalog.for_system_session();
2761
2762 assert_eq!(
2763 None,
2764 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2765 ClusterId::user(1).expect("1 is a valid ID"),
2766 ReplicaId::User(1),
2767 ))))
2768 );
2769 assert_eq!(
2770 None,
2771 conn_catalog
2772 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2773 );
2774 catalog.expire().await;
2775 })
2776 .await;
2777 }
2778
2779 #[mz_ore::test(tokio::test)]
2780 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2782 Catalog::with_debug(|catalog| async move {
2783 let conn_catalog = catalog.for_system_session();
2784
2785 let builtins_cfg = BuiltinsConfig {
2786 include_continual_tasks: true,
2787 };
2788 for builtin in BUILTINS::iter(&builtins_cfg) {
2789 let (schema, name, expected_desc) = match builtin {
2790 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2791 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2792 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2793 Builtin::Log(_)
2794 | Builtin::Type(_)
2795 | Builtin::Func(_)
2796 | Builtin::ContinualTask(_)
2797 | Builtin::Index(_)
2798 | Builtin::Connection(_) => continue,
2799 };
2800 let item = conn_catalog
2801 .resolve_item(&PartialItemName {
2802 database: None,
2803 schema: Some(schema.to_string()),
2804 item: name.to_string(),
2805 })
2806 .expect("unable to resolve item")
2807 .at_version(RelationVersionSelector::Latest);
2808
2809 let full_name = conn_catalog.resolve_full_name(item.name());
2810 let actual_desc = item.desc(&full_name).expect("invalid item type");
2811 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2812 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2813 {
2814 assert_eq!(
2815 actual_name, expected_name,
2816 "item {schema}.{name} column {index} name did not match its expected name"
2817 );
2818 assert_eq!(
2819 actual_typ, expected_typ,
2820 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2821 );
2822 }
2823 assert_eq!(
2824 &*actual_desc, expected_desc,
2825 "item {schema}.{name} did not match its expected RelationDesc"
2826 );
2827 }
2828 catalog.expire().await;
2829 })
2830 .await
2831 }
2832
2833 #[mz_ore::test(tokio::test)]
2836 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2838 async fn inner(catalog: Catalog) {
2839 let (client, connection) = tokio_postgres::connect(
2843 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2844 NoTls,
2845 )
2846 .await
2847 .expect("failed to connect to Postgres");
2848
2849 task::spawn(|| "compare_builtin_postgres", async move {
2850 if let Err(e) = connection.await {
2851 panic!("connection error: {}", e);
2852 }
2853 });
2854
2855 struct PgProc {
2856 name: String,
2857 arg_oids: Vec<u32>,
2858 ret_oid: Option<u32>,
2859 ret_set: bool,
2860 }
2861
2862 struct PgType {
2863 name: String,
2864 ty: String,
2865 elem: u32,
2866 array: u32,
2867 input: u32,
2868 receive: u32,
2869 }
2870
2871 struct PgOper {
2872 oprresult: u32,
2873 name: String,
2874 }
2875
2876 let pg_proc: BTreeMap<_, _> = client
2877 .query(
2878 "SELECT
2879 p.oid,
2880 proname,
2881 proargtypes,
2882 prorettype,
2883 proretset
2884 FROM pg_proc p
2885 JOIN pg_namespace n ON p.pronamespace = n.oid",
2886 &[],
2887 )
2888 .await
2889 .expect("pg query failed")
2890 .into_iter()
2891 .map(|row| {
2892 let oid: u32 = row.get("oid");
2893 let pg_proc = PgProc {
2894 name: row.get("proname"),
2895 arg_oids: row.get("proargtypes"),
2896 ret_oid: row.get("prorettype"),
2897 ret_set: row.get("proretset"),
2898 };
2899 (oid, pg_proc)
2900 })
2901 .collect();
2902
2903 let pg_type: BTreeMap<_, _> = client
2904 .query(
2905 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2906 &[],
2907 )
2908 .await
2909 .expect("pg query failed")
2910 .into_iter()
2911 .map(|row| {
2912 let oid: u32 = row.get("oid");
2913 let pg_type = PgType {
2914 name: row.get("typname"),
2915 ty: row.get("typtype"),
2916 elem: row.get("typelem"),
2917 array: row.get("typarray"),
2918 input: row.get("typinput"),
2919 receive: row.get("typreceive"),
2920 };
2921 (oid, pg_type)
2922 })
2923 .collect();
2924
2925 let pg_oper: BTreeMap<_, _> = client
2926 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2927 .await
2928 .expect("pg query failed")
2929 .into_iter()
2930 .map(|row| {
2931 let oid: u32 = row.get("oid");
2932 let pg_oper = PgOper {
2933 name: row.get("oprname"),
2934 oprresult: row.get("oprresult"),
2935 };
2936 (oid, pg_oper)
2937 })
2938 .collect();
2939
2940 let conn_catalog = catalog.for_system_session();
2941 let resolve_type_oid = |item: &str| {
2942 conn_catalog
2943 .resolve_type(&PartialItemName {
2944 database: None,
2945 schema: Some(PG_CATALOG_SCHEMA.into()),
2948 item: item.to_string(),
2949 })
2950 .expect("unable to resolve type")
2951 .oid()
2952 };
2953
2954 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2955 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2956 .collect();
2957
2958 let mut all_oids = BTreeSet::new();
2959
2960 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2963 [
2964 (Type::NAME, Type::TEXT),
2966 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2967 (Type::TIME, Type::TIMETZ),
2969 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2970 ]
2971 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2972 );
2973 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2974 1619, ]);
2976 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2977 if ignore_return_types.contains(&fn_oid) {
2978 return true;
2979 }
2980 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2981 return true;
2982 }
2983 a == b
2984 };
2985
2986 let builtins_cfg = BuiltinsConfig {
2987 include_continual_tasks: true,
2988 };
2989 for builtin in BUILTINS::iter(&builtins_cfg) {
2990 match builtin {
2991 Builtin::Type(ty) => {
2992 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2993
2994 if ty.oid >= FIRST_MATERIALIZE_OID {
2995 continue;
2998 }
2999
3000 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3003 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3004 });
3005 assert_eq!(
3006 ty.name, pg_ty.name,
3007 "oid {} has name {} in postgres; expected {}",
3008 ty.oid, pg_ty.name, ty.name,
3009 );
3010
3011 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3012 None => (0, 0),
3013 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3014 };
3015 assert_eq!(
3016 typinput_oid, pg_ty.input,
3017 "type {} has typinput OID {:?} in mz but {:?} in pg",
3018 ty.name, typinput_oid, pg_ty.input,
3019 );
3020 assert_eq!(
3021 typreceive_oid, pg_ty.receive,
3022 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3023 ty.name, typreceive_oid, pg_ty.receive,
3024 );
3025 if typinput_oid != 0 {
3026 assert!(
3027 func_oids.contains(&typinput_oid),
3028 "type {} has typinput OID {} that does not exist in pg_proc",
3029 ty.name,
3030 typinput_oid,
3031 );
3032 }
3033 if typreceive_oid != 0 {
3034 assert!(
3035 func_oids.contains(&typreceive_oid),
3036 "type {} has typreceive OID {} that does not exist in pg_proc",
3037 ty.name,
3038 typreceive_oid,
3039 );
3040 }
3041
3042 match &ty.details.typ {
3044 CatalogType::Array { element_reference } => {
3045 let elem_ty = BUILTINS::iter(&builtins_cfg)
3046 .filter_map(|builtin| match builtin {
3047 Builtin::Type(ty @ BuiltinType { name, .. })
3048 if element_reference == name =>
3049 {
3050 Some(ty)
3051 }
3052 _ => None,
3053 })
3054 .next();
3055 let elem_ty = match elem_ty {
3056 Some(ty) => ty,
3057 None => {
3058 panic!("{} is unexpectedly not a type", element_reference)
3059 }
3060 };
3061 assert_eq!(
3062 pg_ty.elem, elem_ty.oid,
3063 "type {} has mismatched element OIDs",
3064 ty.name
3065 )
3066 }
3067 CatalogType::Pseudo => {
3068 assert_eq!(
3069 pg_ty.ty, "p",
3070 "type {} is not a pseudo type as expected",
3071 ty.name
3072 )
3073 }
3074 CatalogType::Range { .. } => {
3075 assert_eq!(
3076 pg_ty.ty, "r",
3077 "type {} is not a range type as expected",
3078 ty.name
3079 );
3080 }
3081 _ => {
3082 assert_eq!(
3083 pg_ty.ty, "b",
3084 "type {} is not a base type as expected",
3085 ty.name
3086 )
3087 }
3088 }
3089
3090 let schema = catalog
3092 .resolve_schema_in_database(
3093 &ResolvedDatabaseSpecifier::Ambient,
3094 ty.schema,
3095 &SYSTEM_CONN_ID,
3096 )
3097 .expect("unable to resolve schema");
3098 let allocated_type = catalog
3099 .resolve_type(
3100 None,
3101 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3102 &PartialItemName {
3103 database: None,
3104 schema: Some(schema.name().schema.clone()),
3105 item: ty.name.to_string(),
3106 },
3107 &SYSTEM_CONN_ID,
3108 )
3109 .expect("unable to resolve type");
3110 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3111 ty
3112 } else {
3113 panic!("unexpectedly not a type")
3114 };
3115 match ty.details.array_id {
3116 Some(array_id) => {
3117 let array_ty = catalog.get_entry(&array_id);
3118 assert_eq!(
3119 pg_ty.array, array_ty.oid,
3120 "type {} has mismatched array OIDs",
3121 allocated_type.name.item,
3122 );
3123 }
3124 None => assert_eq!(
3125 pg_ty.array, 0,
3126 "type {} does not have an array type in mz but does in pg",
3127 allocated_type.name.item,
3128 ),
3129 }
3130 }
3131 Builtin::Func(func) => {
3132 for imp in func.inner.func_impls() {
3133 assert!(
3134 all_oids.insert(imp.oid),
3135 "{} reused oid {}",
3136 func.name,
3137 imp.oid
3138 );
3139
3140 assert!(
3141 imp.oid < FIRST_USER_OID,
3142 "built-in function {} erroneously has OID in user space ({})",
3143 func.name,
3144 imp.oid,
3145 );
3146
3147 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3150 continue;
3151 } else {
3152 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3153 panic!(
3154 "pg_proc missing function {}: oid {}",
3155 func.name, imp.oid
3156 )
3157 })
3158 };
3159 assert_eq!(
3160 func.name, pg_fn.name,
3161 "funcs with oid {} don't match names: {} in mz, {} in pg",
3162 imp.oid, func.name, pg_fn.name
3163 );
3164
3165 let imp_arg_oids = imp
3168 .arg_typs
3169 .iter()
3170 .map(|item| resolve_type_oid(item))
3171 .collect::<Vec<_>>();
3172
3173 if imp_arg_oids != pg_fn.arg_oids {
3174 println!(
3175 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3176 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3177 );
3178 }
3179
3180 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3181
3182 assert!(
3183 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3184 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3185 imp.oid,
3186 func.name,
3187 imp_return_oid,
3188 pg_fn.ret_oid
3189 );
3190
3191 assert_eq!(
3192 imp.return_is_set, pg_fn.ret_set,
3193 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3194 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3195 );
3196 }
3197 }
3198 _ => (),
3199 }
3200 }
3201
3202 for (op, func) in OP_IMPLS.iter() {
3203 for imp in func.func_impls() {
3204 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3205
3206 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3208 continue;
3209 } else {
3210 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3211 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3212 })
3213 };
3214
3215 assert_eq!(*op, pg_op.name);
3216
3217 let imp_return_oid =
3218 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3219 if imp_return_oid != pg_op.oprresult {
3220 panic!(
3221 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3222 imp.oid, op, imp_return_oid, pg_op.oprresult
3223 );
3224 }
3225 }
3226 }
3227 catalog.expire().await;
3228 }
3229
3230 Catalog::with_debug(inner).await
3231 }
3232
3233 #[mz_ore::test(tokio::test)]
3235 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3237 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3238 let catalog = Arc::new(catalog);
3239 let conn_catalog = catalog.for_system_session();
3240
3241 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3242 let mut handles = Vec::new();
3243
3244 let ignore_names = BTreeSet::from([
3246 "avg",
3247 "avg_internal_v1",
3248 "bool_and",
3249 "bool_or",
3250 "has_table_privilege", "has_type_privilege", "mod",
3253 "mz_panic",
3254 "mz_sleep",
3255 "pow",
3256 "stddev_pop",
3257 "stddev_samp",
3258 "stddev",
3259 "var_pop",
3260 "var_samp",
3261 "variance",
3262 ]);
3263
3264 let fns = BUILTINS::funcs()
3265 .map(|func| (&func.name, func.inner))
3266 .chain(OP_IMPLS.iter());
3267
3268 for (name, func) in fns {
3269 if ignore_names.contains(name) {
3270 continue;
3271 }
3272 let Func::Scalar(impls) = func else {
3273 continue;
3274 };
3275
3276 'outer: for imp in impls {
3277 let details = imp.details();
3278 let mut styps = Vec::new();
3279 for item in details.arg_typs.iter() {
3280 let oid = resolve_type_oid(item);
3281 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3282 continue 'outer;
3283 };
3284 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3285 }
3286 let datums = styps
3287 .iter()
3288 .map(|styp| {
3289 let mut datums = vec![Datum::Null];
3290 datums.extend(styp.interesting_datums());
3291 datums
3292 })
3293 .collect::<Vec<_>>();
3294 if datums.is_empty() {
3296 continue;
3297 }
3298
3299 let return_oid = details
3300 .return_typ
3301 .map(resolve_type_oid)
3302 .expect("must exist");
3303 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3304 .ok()
3305 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3306
3307 let mut idxs = vec![0; datums.len()];
3308 while idxs[0] < datums[0].len() {
3309 let mut args = Vec::with_capacity(idxs.len());
3310 for i in 0..(datums.len()) {
3311 args.push(datums[i][idxs[i]]);
3312 }
3313
3314 let op = &imp.op;
3315 let scalars = args
3316 .iter()
3317 .enumerate()
3318 .map(|(i, datum)| {
3319 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3320 datum.clone(),
3321 styps[i].clone(),
3322 ))
3323 })
3324 .collect();
3325
3326 let call_name = format!(
3327 "{name}({}) (oid: {})",
3328 args.iter()
3329 .map(|d| d.to_string())
3330 .collect::<Vec<_>>()
3331 .join(", "),
3332 imp.oid
3333 );
3334 let catalog = Arc::clone(&catalog);
3335 let call_name_fn = call_name.clone();
3336 let return_styp = return_styp.clone();
3337 let handle = task::spawn_blocking(
3338 || call_name,
3339 move || {
3340 smoketest_fn(
3341 name,
3342 call_name_fn,
3343 op,
3344 imp,
3345 args,
3346 catalog,
3347 scalars,
3348 return_styp,
3349 )
3350 },
3351 );
3352 handles.push(handle);
3353
3354 for i in (0..datums.len()).rev() {
3356 idxs[i] += 1;
3357 if idxs[i] >= datums[i].len() {
3358 if i == 0 {
3359 break;
3360 }
3361 idxs[i] = 0;
3362 continue;
3363 } else {
3364 break;
3365 }
3366 }
3367 }
3368 }
3369 }
3370 handles
3371 }
3372
3373 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3374 for handle in handles {
3375 handle.await.expect("must succeed");
3376 }
3377 }
3378
3379 fn smoketest_fn(
3380 name: &&str,
3381 call_name: String,
3382 op: &Operation<HirScalarExpr>,
3383 imp: &FuncImpl<HirScalarExpr>,
3384 args: Vec<Datum<'_>>,
3385 catalog: Arc<Catalog>,
3386 scalars: Vec<CoercibleScalarExpr>,
3387 return_styp: Option<SqlScalarType>,
3388 ) {
3389 let conn_catalog = catalog.for_system_session();
3390 let pcx = PlanContext::zero();
3391 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3392 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3393 let ecx = ExprContext {
3394 qcx: &qcx,
3395 name: "smoketest",
3396 scope: &Scope::empty(),
3397 relation_type: &SqlRelationType::empty(),
3398 allow_aggregates: false,
3399 allow_subqueries: false,
3400 allow_parameters: false,
3401 allow_windows: false,
3402 };
3403 let arena = RowArena::new();
3404 let mut session = Session::<Timestamp>::dummy();
3405 session
3406 .start_transaction(to_datetime(0), None, None)
3407 .expect("must succeed");
3408 let prep_style = ExprPrepStyle::OneShot {
3409 logical_time: EvalTime::Time(Timestamp::MIN),
3410 session: &session,
3411 catalog_state: &catalog.state,
3412 };
3413
3414 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3417 if let Ok(hir) = res {
3418 if let Ok(mut mir) = hir.lower_uncorrelated() {
3419 prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3421
3422 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3423 if let Some(return_styp) = return_styp {
3424 let mir_typ = mir.typ(&[]);
3425 assert_eq!(mir_typ.scalar_type, return_styp);
3428 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3432 panic!(
3433 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3434 );
3435 }
3436 if let Some((introduces_nulls, propagates_nulls)) =
3439 call_introduces_propagates_nulls(&mir)
3440 {
3441 if introduces_nulls {
3442 assert!(
3446 mir_typ.nullable,
3447 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3448 name, args, mir, mir_typ.nullable
3449 );
3450 } else {
3451 let any_input_null = args.iter().any(|arg| arg.is_null());
3452 if !any_input_null {
3453 assert!(
3454 !mir_typ.nullable,
3455 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3456 name, args, mir, mir_typ.nullable
3457 );
3458 } else {
3459 assert_eq!(
3460 mir_typ.nullable, propagates_nulls,
3461 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3462 name, args, mir, mir_typ.nullable
3463 );
3464 }
3465 }
3466 }
3467 let mut reduced = mir.clone();
3470 reduced.reduce(&[]);
3471 match reduced {
3472 MirScalarExpr::Literal(reduce_result, ctyp) => {
3473 match reduce_result {
3474 Ok(reduce_result_row) => {
3475 let reduce_result_datum = reduce_result_row.unpack_first();
3476 assert_eq!(
3477 reduce_result_datum,
3478 eval_result_datum,
3479 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3480 name,
3481 args,
3482 mir,
3483 eval_result_datum,
3484 mir_typ.scalar_type,
3485 reduce_result_datum,
3486 ctyp.scalar_type
3487 );
3488 assert_eq!(
3494 ctyp.scalar_type,
3495 mir_typ.scalar_type,
3496 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3497 name,
3498 args,
3499 mir,
3500 eval_result_datum,
3501 mir_typ.scalar_type,
3502 reduce_result_datum,
3503 ctyp.scalar_type
3504 );
3505 }
3506 Err(..) => {} }
3508 }
3509 _ => unreachable!(
3510 "all args are literals, so should have reduced to a literal"
3511 ),
3512 }
3513 }
3514 }
3515 }
3516 }
3517 }
3518
3519 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3524 match mir_func_call {
3525 MirScalarExpr::CallUnary { func, expr } => {
3526 if expr.is_literal() {
3527 Some((func.introduces_nulls(), func.propagates_nulls()))
3528 } else {
3529 None
3530 }
3531 }
3532 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3533 if expr1.is_literal() && expr2.is_literal() {
3534 Some((func.introduces_nulls(), func.propagates_nulls()))
3535 } else {
3536 None
3537 }
3538 }
3539 MirScalarExpr::CallVariadic { func, exprs } => {
3540 if exprs.iter().all(|arg| arg.is_literal()) {
3541 Some((func.introduces_nulls(), func.propagates_nulls()))
3542 } else {
3543 None
3544 }
3545 }
3546 _ => None,
3547 }
3548 }
3549
3550 #[mz_ore::test(tokio::test)]
3552 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3554 Catalog::with_debug(|catalog| async move {
3555 let conn_catalog = catalog.for_system_session();
3556
3557 for view in BUILTINS::views().filter(|view| {
3558 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3559 }) {
3560 let item = conn_catalog
3561 .resolve_item(&PartialItemName {
3562 database: None,
3563 schema: Some(view.schema.to_string()),
3564 item: view.name.to_string(),
3565 })
3566 .expect("unable to resolve view")
3567 .at_version(RelationVersionSelector::Latest);
3569 let full_name = conn_catalog.resolve_full_name(item.name());
3570 for col_type in item
3571 .desc(&full_name)
3572 .expect("invalid item type")
3573 .iter_types()
3574 {
3575 match &col_type.scalar_type {
3576 typ @ SqlScalarType::UInt16
3577 | typ @ SqlScalarType::UInt32
3578 | typ @ SqlScalarType::UInt64
3579 | typ @ SqlScalarType::MzTimestamp
3580 | typ @ SqlScalarType::List { .. }
3581 | typ @ SqlScalarType::Map { .. }
3582 | typ @ SqlScalarType::MzAclItem => {
3583 panic!("{typ:?} type found in {full_name}");
3584 }
3585 SqlScalarType::AclItem
3586 | SqlScalarType::Bool
3587 | SqlScalarType::Int16
3588 | SqlScalarType::Int32
3589 | SqlScalarType::Int64
3590 | SqlScalarType::Float32
3591 | SqlScalarType::Float64
3592 | SqlScalarType::Numeric { .. }
3593 | SqlScalarType::Date
3594 | SqlScalarType::Time
3595 | SqlScalarType::Timestamp { .. }
3596 | SqlScalarType::TimestampTz { .. }
3597 | SqlScalarType::Interval
3598 | SqlScalarType::PgLegacyChar
3599 | SqlScalarType::Bytes
3600 | SqlScalarType::String
3601 | SqlScalarType::Char { .. }
3602 | SqlScalarType::VarChar { .. }
3603 | SqlScalarType::Jsonb
3604 | SqlScalarType::Uuid
3605 | SqlScalarType::Array(_)
3606 | SqlScalarType::Record { .. }
3607 | SqlScalarType::Oid
3608 | SqlScalarType::RegProc
3609 | SqlScalarType::RegType
3610 | SqlScalarType::RegClass
3611 | SqlScalarType::Int2Vector
3612 | SqlScalarType::Range { .. }
3613 | SqlScalarType::PgLegacyName => {}
3614 }
3615 }
3616 }
3617 catalog.expire().await;
3618 })
3619 .await
3620 }
3621
3622 #[mz_ore::test(tokio::test)]
3625 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3627 Catalog::with_debug(|catalog| async move {
3628 let conn_catalog = catalog.for_system_session();
3629
3630 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3631 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3632
3633 for entry in catalog.entries() {
3634 let schema_spec = entry.name().qualifiers.schema_spec;
3635 let introspection_deps = catalog.introspection_dependencies(entry.id);
3636 if introspection_deps.is_empty() {
3637 assert!(
3638 schema_spec != introspection_schema_spec,
3639 "entry does not depend on introspection sources but is in \
3640 `mz_introspection`: {}",
3641 conn_catalog.resolve_full_name(entry.name()),
3642 );
3643 } else {
3644 assert!(
3645 schema_spec == introspection_schema_spec,
3646 "entry depends on introspection sources but is not in \
3647 `mz_introspection`: {}",
3648 conn_catalog.resolve_full_name(entry.name()),
3649 );
3650 }
3651 }
3652 })
3653 .await
3654 }
3655
3656 #[mz_ore::test(tokio::test)]
3657 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3659 let persist_client = PersistClient::new_for_tests().await;
3660 let bootstrap_args = test_bootstrap_args();
3661 let organization_id = Uuid::new_v4();
3662 let db_name = "DB";
3663
3664 let mut writer_catalog = Catalog::open_debug_catalog(
3665 persist_client.clone(),
3666 organization_id.clone(),
3667 &bootstrap_args,
3668 )
3669 .await
3670 .expect("open_debug_catalog");
3671 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3672 persist_client.clone(),
3673 organization_id.clone(),
3674 &bootstrap_args,
3675 )
3676 .await
3677 .expect("open_debug_read_only_catalog");
3678 assert_err!(writer_catalog.resolve_database(db_name));
3679 assert_err!(read_only_catalog.resolve_database(db_name));
3680
3681 let commit_ts = writer_catalog.current_upper().await;
3682 writer_catalog
3683 .transact(
3684 None,
3685 commit_ts,
3686 None,
3687 vec![Op::CreateDatabase {
3688 name: db_name.to_string(),
3689 owner_id: MZ_SYSTEM_ROLE_ID,
3690 }],
3691 )
3692 .await
3693 .expect("failed to transact");
3694
3695 let write_db = writer_catalog
3696 .resolve_database(db_name)
3697 .expect("resolve_database");
3698 read_only_catalog
3699 .sync_to_current_updates()
3700 .await
3701 .expect("sync_to_current_updates");
3702 let read_db = read_only_catalog
3703 .resolve_database(db_name)
3704 .expect("resolve_database");
3705
3706 assert_eq!(write_db, read_db);
3707
3708 let writer_catalog_fencer =
3709 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3710 .await
3711 .expect("open_debug_catalog for fencer");
3712 let fencer_db = writer_catalog_fencer
3713 .resolve_database(db_name)
3714 .expect("resolve_database for fencer");
3715 assert_eq!(fencer_db, read_db);
3716
3717 let write_fence_err = writer_catalog
3718 .sync_to_current_updates()
3719 .await
3720 .expect_err("sync_to_current_updates for fencer");
3721 assert!(matches!(
3722 write_fence_err,
3723 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3724 ));
3725 let read_fence_err = read_only_catalog
3726 .sync_to_current_updates()
3727 .await
3728 .expect_err("sync_to_current_updates after fencer");
3729 assert!(matches!(
3730 read_fence_err,
3731 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3732 ));
3733
3734 writer_catalog.expire().await;
3735 read_only_catalog.expire().await;
3736 writer_catalog_fencer.expire().await;
3737 }
3738}