1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100#[cfg(test)]
101use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
102use crate::session::{Portal, PreparedStatement, Session};
103use crate::util::ResultExt;
104use crate::{AdapterError, AdapterNotice, ExecuteResponse};
105
106mod builtin_table_updates;
107pub(crate) mod consistency;
108mod migrate;
109
110mod apply;
111mod open;
112mod state;
113mod timeline;
114mod transact;
115
116#[derive(Debug)]
139pub struct Catalog {
140 state: CatalogState,
141 plans: CatalogPlans,
142 expr_cache_handle: Option<ExpressionCacheHandle>,
143 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
144 transient_revision: u64,
145}
146
147impl Clone for Catalog {
150 fn clone(&self) -> Self {
151 Self {
152 state: self.state.clone(),
153 plans: self.plans.clone(),
154 expr_cache_handle: self.expr_cache_handle.clone(),
155 storage: Arc::clone(&self.storage),
156 transient_revision: self.transient_revision,
157 }
158 }
159}
160
161#[derive(Default, Debug, Clone)]
162pub struct CatalogPlans {
163 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
164 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
165 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
166 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
167}
168
169impl Catalog {
170 #[mz_ore::instrument(level = "trace")]
172 pub fn set_optimized_plan(
173 &mut self,
174 id: GlobalId,
175 plan: DataflowDescription<OptimizedMirRelationExpr>,
176 ) {
177 self.plans.optimized_plan_by_id.insert(id, plan.into());
178 }
179
180 #[mz_ore::instrument(level = "trace")]
182 pub fn set_physical_plan(
183 &mut self,
184 id: GlobalId,
185 plan: DataflowDescription<mz_compute_types::plan::Plan>,
186 ) {
187 self.plans.physical_plan_by_id.insert(id, plan.into());
188 }
189
190 #[mz_ore::instrument(level = "trace")]
192 pub fn try_get_optimized_plan(
193 &self,
194 id: &GlobalId,
195 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
196 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
197 }
198
199 #[mz_ore::instrument(level = "trace")]
201 pub fn try_get_physical_plan(
202 &self,
203 id: &GlobalId,
204 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
205 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
206 }
207
208 #[mz_ore::instrument(level = "trace")]
210 pub fn set_dataflow_metainfo(
211 &mut self,
212 id: GlobalId,
213 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
214 ) {
215 for notice in metainfo.optimizer_notices.iter() {
217 for dep_id in notice.dependencies.iter() {
218 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
219 entry.push(Arc::clone(notice))
220 }
221 if let Some(item_id) = notice.item_id {
222 soft_assert_eq_or_log!(
223 item_id,
224 id,
225 "notice.item_id should match the id for whom we are saving the notice"
226 );
227 }
228 }
229 self.plans.dataflow_metainfos.insert(id, metainfo);
231 }
232
233 #[mz_ore::instrument(level = "trace")]
235 pub fn try_get_dataflow_metainfo(
236 &self,
237 id: &GlobalId,
238 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
239 self.plans.dataflow_metainfos.get(id)
240 }
241
242 #[mz_ore::instrument(level = "trace")]
251 pub fn drop_plans_and_metainfos(
252 &mut self,
253 drop_ids: &BTreeSet<GlobalId>,
254 ) -> BTreeSet<Arc<OptimizerNotice>> {
255 let mut dropped_notices = BTreeSet::new();
257
258 for id in drop_ids {
260 self.plans.optimized_plan_by_id.remove(id);
261 self.plans.physical_plan_by_id.remove(id);
262 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
263 soft_assert_or_log!(
264 metainfo.optimizer_notices.iter().all_unique(),
265 "should have been pushed there by `push_optimizer_notice_dedup`"
266 );
267 for n in metainfo.optimizer_notices.drain(..) {
268 for dep_id in n.dependencies.iter() {
270 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
271 soft_assert_or_log!(
272 notices.iter().any(|x| &n == x),
273 "corrupt notices_by_dep_id"
274 );
275 notices.retain(|x| &n != x)
276 }
277 }
278 dropped_notices.insert(n);
279 }
280 }
281 }
282
283 for id in drop_ids {
285 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
286 for n in notices.drain(..) {
287 if let Some(item_id) = n.item_id.as_ref() {
289 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
290 metainfo.optimizer_notices.iter().for_each(|n2| {
291 if let Some(item_id_2) = n2.item_id {
292 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
293 }
294 });
295 metainfo.optimizer_notices.retain(|x| &n != x);
296 }
297 }
298 dropped_notices.insert(n);
299 }
300 }
301 }
302
303 let mut todo_dep_ids = BTreeSet::new();
306 for notice in dropped_notices.iter() {
307 for dep_id in notice.dependencies.iter() {
308 if !drop_ids.contains(dep_id) {
309 todo_dep_ids.insert(*dep_id);
310 }
311 }
312 }
313 for id in todo_dep_ids {
316 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
317 notices.retain(|n| !dropped_notices.contains(n))
318 }
319 }
320
321 if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
322 use mz_ore::str::{bracketed, separated};
323 let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
324 let bad_notices = bad_notices.map(|n| {
325 let mut dataflow_metainfo_occurrences = Vec::new();
328 for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
329 if meta_info.optimizer_notices.contains(n) {
330 dataflow_metainfo_occurrences.push(id);
331 }
332 }
333 let mut notices_by_dep_id_occurrences = Vec::new();
335 for (id, notices) in self.plans.notices_by_dep_id.iter() {
336 if notices.iter().contains(n) {
337 notices_by_dep_id_occurrences.push(id);
338 }
339 }
340 format!(
341 "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
342 dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
343 n.id,
344 n.kind,
345 n.dependencies,
346 Arc::strong_count(n),
347 dataflow_metainfo_occurrences,
348 notices_by_dep_id_occurrences
349 )
350 });
351 let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
352 error!(
353 "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
354 bad_notices = {bad_notices}; \
355 drop_ids = {drop_ids:?}"
356 );
357 }
358
359 dropped_notices
360 }
361
362 pub(crate) fn invalidate_for_index(
373 &self,
374 ons: impl Iterator<Item = GlobalId>,
375 ) -> BTreeSet<GlobalId> {
376 let mut dependencies = BTreeSet::new();
377 let mut queue = VecDeque::new();
378 let mut seen = HashSet::new();
379 for on in ons {
380 let entry = self.get_entry_by_global_id(&on);
381 dependencies.insert(on);
382 seen.insert(entry.id);
383 let uses = entry.uses();
384 queue.extend(uses.clone());
385 }
386
387 while let Some(cur) = queue.pop_front() {
388 let entry = self.get_entry(&cur);
389 if seen.insert(cur) {
390 let global_ids = entry.global_ids();
391 match entry.item_type() {
392 CatalogItemType::Table
393 | CatalogItemType::Source
394 | CatalogItemType::MaterializedView
395 | CatalogItemType::Sink
396 | CatalogItemType::Index
397 | CatalogItemType::Type
398 | CatalogItemType::Func
399 | CatalogItemType::Secret
400 | CatalogItemType::Connection
401 | CatalogItemType::ContinualTask => {
402 dependencies.extend(global_ids);
403 }
404 CatalogItemType::View => {
405 dependencies.extend(global_ids);
406 queue.extend(entry.uses());
407 }
408 }
409 }
410 }
411 dependencies
412 }
413}
414
415#[derive(Debug)]
416pub struct ConnCatalog<'a> {
417 state: Cow<'a, CatalogState>,
418 unresolvable_ids: BTreeSet<CatalogItemId>,
428 conn_id: ConnectionId,
429 cluster: String,
430 database: Option<DatabaseId>,
431 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
432 role_id: RoleId,
433 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
434 portals: Option<&'a BTreeMap<String, Portal>>,
435 notices_tx: UnboundedSender<AdapterNotice>,
436}
437
438impl ConnCatalog<'_> {
439 pub fn conn_id(&self) -> &ConnectionId {
440 &self.conn_id
441 }
442
443 pub fn state(&self) -> &CatalogState {
444 &*self.state
445 }
446
447 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
457 assert_eq!(
458 self.role_id, MZ_SYSTEM_ROLE_ID,
459 "only the system role can mark IDs unresolvable",
460 );
461 self.unresolvable_ids.insert(id);
462 }
463
464 pub fn effective_search_path(
470 &self,
471 include_temp_schema: bool,
472 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
473 self.state
474 .effective_search_path(&self.search_path, include_temp_schema)
475 }
476}
477
478impl ConnectionResolver for ConnCatalog<'_> {
479 fn resolve_connection(
480 &self,
481 id: CatalogItemId,
482 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
483 self.state().resolve_connection(id)
484 }
485}
486
487impl Catalog {
488 pub fn transient_revision(&self) -> u64 {
492 self.transient_revision
493 }
494
495 pub async fn with_debug<F, Fut, T>(f: F) -> T
507 where
508 F: FnOnce(Catalog) -> Fut,
509 Fut: Future<Output = T>,
510 {
511 let persist_client = PersistClient::new_for_tests().await;
512 let organization_id = Uuid::new_v4();
513 let bootstrap_args = test_bootstrap_args();
514 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
515 .await
516 .expect("can open debug catalog");
517 f(catalog).await
518 }
519
520 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
523 where
524 F: FnOnce(Catalog) -> Fut,
525 Fut: Future<Output = T>,
526 {
527 let persist_client = PersistClient::new_for_tests().await;
528 let organization_id = Uuid::new_v4();
529 let bootstrap_args = test_bootstrap_args();
530 let mut catalog =
531 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
532 .await
533 .expect("can open debug catalog");
534
535 let now = SYSTEM_TIME.clone();
537 let openable_storage = TestCatalogStateBuilder::new(persist_client)
538 .with_organization_id(organization_id)
539 .with_default_deploy_generation()
540 .build()
541 .await
542 .expect("can create durable catalog");
543 let mut storage = openable_storage
544 .open(now().into(), &bootstrap_args)
545 .await
546 .expect("can open durable catalog")
547 .0;
548 let _ = storage
550 .sync_to_current_updates()
551 .await
552 .expect("can sync to current updates");
553 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
554
555 f(catalog).await
556 }
557
558 pub async fn open_debug_catalog(
562 persist_client: PersistClient,
563 organization_id: Uuid,
564 bootstrap_args: &BootstrapArgs,
565 ) -> Result<Catalog, anyhow::Error> {
566 let now = SYSTEM_TIME.clone();
567 let environment_id = None;
568 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
569 .with_organization_id(organization_id)
570 .with_default_deploy_generation()
571 .build()
572 .await?;
573 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
574 let system_parameter_defaults = BTreeMap::default();
575 Self::open_debug_catalog_inner(
576 persist_client,
577 storage,
578 now,
579 environment_id,
580 &DUMMY_BUILD_INFO,
581 system_parameter_defaults,
582 bootstrap_args,
583 None,
584 )
585 .await
586 }
587
588 pub async fn open_debug_read_only_catalog(
593 persist_client: PersistClient,
594 organization_id: Uuid,
595 bootstrap_args: &BootstrapArgs,
596 ) -> Result<Catalog, anyhow::Error> {
597 let now = SYSTEM_TIME.clone();
598 let environment_id = None;
599 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
600 .with_organization_id(organization_id)
601 .build()
602 .await?;
603 let storage = openable_storage
604 .open_read_only(&test_bootstrap_args())
605 .await?;
606 let system_parameter_defaults = BTreeMap::default();
607 Self::open_debug_catalog_inner(
608 persist_client,
609 storage,
610 now,
611 environment_id,
612 &DUMMY_BUILD_INFO,
613 system_parameter_defaults,
614 bootstrap_args,
615 None,
616 )
617 .await
618 }
619
620 pub async fn open_debug_read_only_persist_catalog_config(
625 persist_client: PersistClient,
626 now: NowFn,
627 environment_id: EnvironmentId,
628 system_parameter_defaults: BTreeMap<String, String>,
629 build_info: &'static BuildInfo,
630 bootstrap_args: &BootstrapArgs,
631 enable_expression_cache_override: Option<bool>,
632 ) -> Result<Catalog, anyhow::Error> {
633 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
634 .with_organization_id(environment_id.organization_id())
635 .with_version(
636 build_info
637 .version
638 .parse()
639 .expect("build version is parseable"),
640 )
641 .build()
642 .await?;
643 let storage = openable_storage.open_read_only(bootstrap_args).await?;
644 Self::open_debug_catalog_inner(
645 persist_client,
646 storage,
647 now,
648 Some(environment_id),
649 build_info,
650 system_parameter_defaults,
651 bootstrap_args,
652 enable_expression_cache_override,
653 )
654 .await
655 }
656
657 async fn open_debug_catalog_inner(
658 persist_client: PersistClient,
659 storage: Box<dyn DurableCatalogState>,
660 now: NowFn,
661 environment_id: Option<EnvironmentId>,
662 build_info: &'static BuildInfo,
663 system_parameter_defaults: BTreeMap<String, String>,
664 bootstrap_args: &BootstrapArgs,
665 enable_expression_cache_override: Option<bool>,
666 ) -> Result<Catalog, anyhow::Error> {
667 let metrics_registry = &MetricsRegistry::new();
668 let secrets_reader = Arc::new(InMemorySecretsController::new());
669 let previous_ts = now().into();
672 let replica_size = &bootstrap_args.default_cluster_replica_size;
673 let read_only = false;
674
675 let OpenCatalogResult {
676 catalog,
677 migrated_storage_collections_0dt: _,
678 new_builtin_collections: _,
679 builtin_table_updates: _,
680 cached_global_exprs: _,
681 uncached_local_exprs: _,
682 } = Catalog::open(Config {
683 storage,
684 metrics_registry,
685 state: StateConfig {
686 unsafe_mode: true,
687 all_features: false,
688 build_info,
689 deploy_generation: 0,
690 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
691 read_only,
692 now,
693 boot_ts: previous_ts,
694 skip_migrations: true,
695 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
696 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
697 size: replica_size.clone(),
698 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
699 },
700 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
701 size: replica_size.clone(),
702 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
703 },
704 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
705 size: replica_size.clone(),
706 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
707 },
708 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
709 size: replica_size.clone(),
710 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711 },
712 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
713 size: replica_size.clone(),
714 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
715 },
716 system_parameter_defaults,
717 remote_system_parameters: None,
718 availability_zones: vec![],
719 egress_addresses: vec![],
720 aws_principal_context: None,
721 aws_privatelink_availability_zones: None,
722 http_host_name: None,
723 connection_context: ConnectionContext::for_tests(secrets_reader),
724 builtin_item_migration_config: BuiltinItemMigrationConfig {
725 persist_client: persist_client.clone(),
726 read_only,
727 force_migration: None,
728 },
729 persist_client,
730 enable_expression_cache_override,
731 helm_chart_version: None,
732 external_login_password_mz_system: None,
733 license_key: ValidatedLicenseKey::for_tests(),
734 },
735 })
736 .await?;
737 Ok(catalog)
738 }
739
740 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
741 self.state.for_session(session)
742 }
743
744 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
745 self.state.for_sessionless_user(role_id)
746 }
747
748 pub fn for_system_session(&self) -> ConnCatalog<'_> {
749 self.state.for_system_session()
750 }
751
752 async fn storage<'a>(
753 &'a self,
754 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
755 self.storage.lock().await
756 }
757
758 pub async fn current_upper(&self) -> mz_repr::Timestamp {
759 self.storage().await.current_upper().await
760 }
761
762 pub async fn allocate_user_id(
763 &self,
764 commit_ts: mz_repr::Timestamp,
765 ) -> Result<(CatalogItemId, GlobalId), Error> {
766 self.storage()
767 .await
768 .allocate_user_id(commit_ts)
769 .await
770 .maybe_terminate("allocating user ids")
771 .err_into()
772 }
773
774 pub async fn allocate_user_ids(
776 &self,
777 amount: u64,
778 commit_ts: mz_repr::Timestamp,
779 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
780 self.storage()
781 .await
782 .allocate_user_ids(amount, commit_ts)
783 .await
784 .maybe_terminate("allocating user ids")
785 .err_into()
786 }
787
788 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
789 let commit_ts = self.storage().await.current_upper().await;
790 self.allocate_user_id(commit_ts).await
791 }
792
793 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
795 self.storage()
796 .await
797 .get_next_user_item_id()
798 .await
799 .err_into()
800 }
801
802 #[cfg(test)]
803 pub async fn allocate_system_id(
804 &self,
805 commit_ts: mz_repr::Timestamp,
806 ) -> Result<(CatalogItemId, GlobalId), Error> {
807 use mz_ore::collections::CollectionExt;
808
809 let mut storage = self.storage().await;
810 let mut txn = storage.transaction().await?;
811 let id = txn
812 .allocate_system_item_ids(1)
813 .maybe_terminate("allocating system ids")?
814 .into_element();
815 let _ = txn.get_and_commit_op_updates();
817 txn.commit(commit_ts).await?;
818 Ok(id)
819 }
820
821 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
823 self.storage()
824 .await
825 .get_next_system_item_id()
826 .await
827 .err_into()
828 }
829
830 pub async fn allocate_user_cluster_id(
831 &self,
832 commit_ts: mz_repr::Timestamp,
833 ) -> Result<ClusterId, Error> {
834 self.storage()
835 .await
836 .allocate_user_cluster_id(commit_ts)
837 .await
838 .maybe_terminate("allocating user cluster ids")
839 .err_into()
840 }
841
842 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
844 self.storage()
845 .await
846 .get_next_system_replica_id()
847 .await
848 .err_into()
849 }
850
851 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
853 self.storage()
854 .await
855 .get_next_user_replica_id()
856 .await
857 .err_into()
858 }
859
860 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
861 self.state.resolve_database(database_name)
862 }
863
864 pub fn resolve_schema(
865 &self,
866 current_database: Option<&DatabaseId>,
867 database_name: Option<&str>,
868 schema_name: &str,
869 conn_id: &ConnectionId,
870 ) -> Result<&Schema, SqlCatalogError> {
871 self.state
872 .resolve_schema(current_database, database_name, schema_name, conn_id)
873 }
874
875 pub fn resolve_schema_in_database(
876 &self,
877 database_spec: &ResolvedDatabaseSpecifier,
878 schema_name: &str,
879 conn_id: &ConnectionId,
880 ) -> Result<&Schema, SqlCatalogError> {
881 self.state
882 .resolve_schema_in_database(database_spec, schema_name, conn_id)
883 }
884
885 pub fn resolve_replica_in_cluster(
886 &self,
887 cluster_id: &ClusterId,
888 replica_name: &str,
889 ) -> Result<&ClusterReplica, SqlCatalogError> {
890 self.state
891 .resolve_replica_in_cluster(cluster_id, replica_name)
892 }
893
894 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
895 self.state.resolve_system_schema(name)
896 }
897
898 pub fn resolve_search_path(
899 &self,
900 session: &Session,
901 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
902 self.state.resolve_search_path(session)
903 }
904
905 pub fn resolve_entry(
907 &self,
908 current_database: Option<&DatabaseId>,
909 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
910 name: &PartialItemName,
911 conn_id: &ConnectionId,
912 ) -> Result<&CatalogEntry, SqlCatalogError> {
913 self.state
914 .resolve_entry(current_database, search_path, name, conn_id)
915 }
916
917 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
919 self.state.resolve_builtin_table(builtin)
920 }
921
922 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
924 self.state.resolve_builtin_log(builtin).0
925 }
926
927 pub fn resolve_builtin_storage_collection(
929 &self,
930 builtin: &'static BuiltinSource,
931 ) -> CatalogItemId {
932 self.state.resolve_builtin_source(builtin)
933 }
934
935 pub fn resolve_function(
937 &self,
938 current_database: Option<&DatabaseId>,
939 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
940 name: &PartialItemName,
941 conn_id: &ConnectionId,
942 ) -> Result<&CatalogEntry, SqlCatalogError> {
943 self.state
944 .resolve_function(current_database, search_path, name, conn_id)
945 }
946
947 pub fn resolve_type(
949 &self,
950 current_database: Option<&DatabaseId>,
951 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
952 name: &PartialItemName,
953 conn_id: &ConnectionId,
954 ) -> Result<&CatalogEntry, SqlCatalogError> {
955 self.state
956 .resolve_type(current_database, search_path, name, conn_id)
957 }
958
959 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
960 self.state.resolve_cluster(name)
961 }
962
963 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
969 self.state.resolve_builtin_cluster(cluster)
970 }
971
972 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
973 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
974 }
975
976 pub fn resolve_target_cluster(
978 &self,
979 target_cluster: TargetCluster,
980 session: &Session,
981 ) -> Result<&Cluster, AdapterError> {
982 match target_cluster {
983 TargetCluster::CatalogServer => {
984 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
985 }
986 TargetCluster::Active => self.active_cluster(session),
987 TargetCluster::Transaction(cluster_id) => self
988 .try_get_cluster(cluster_id)
989 .ok_or(AdapterError::ConcurrentClusterDrop),
990 }
991 }
992
993 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
994 if session.user().name != SYSTEM_USER.name
998 && session.user().name != SUPPORT_USER.name
999 && session.vars().cluster() == SYSTEM_USER.name
1000 {
1001 coord_bail!(
1002 "system cluster '{}' cannot execute user queries",
1003 SYSTEM_USER.name
1004 );
1005 }
1006 let cluster = self.resolve_cluster(session.vars().cluster())?;
1007 Ok(cluster)
1008 }
1009
1010 pub fn state(&self) -> &CatalogState {
1011 &self.state
1012 }
1013
1014 pub fn resolve_full_name(
1015 &self,
1016 name: &QualifiedItemName,
1017 conn_id: Option<&ConnectionId>,
1018 ) -> FullItemName {
1019 self.state.resolve_full_name(name, conn_id)
1020 }
1021
1022 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1023 self.state.try_get_entry(id)
1024 }
1025
1026 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1027 self.state.try_get_entry_by_global_id(id)
1028 }
1029
1030 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1031 self.state.get_entry(id)
1032 }
1033
1034 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1035 self.state.get_entry_by_global_id(id)
1036 }
1037
1038 pub fn get_global_ids<'a>(
1039 &'a self,
1040 id: &CatalogItemId,
1041 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1042 self.get_entry(id).global_ids()
1043 }
1044
1045 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1046 self.get_entry_by_global_id(id).id()
1047 }
1048
1049 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1050 let item = self.try_get_entry_by_global_id(id)?;
1051 Some(item.id())
1052 }
1053
1054 pub fn get_schema(
1055 &self,
1056 database_spec: &ResolvedDatabaseSpecifier,
1057 schema_spec: &SchemaSpecifier,
1058 conn_id: &ConnectionId,
1059 ) -> &Schema {
1060 self.state.get_schema(database_spec, schema_spec, conn_id)
1061 }
1062
1063 pub fn try_get_schema(
1064 &self,
1065 database_spec: &ResolvedDatabaseSpecifier,
1066 schema_spec: &SchemaSpecifier,
1067 conn_id: &ConnectionId,
1068 ) -> Option<&Schema> {
1069 self.state
1070 .try_get_schema(database_spec, schema_spec, conn_id)
1071 }
1072
1073 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1074 self.state.get_mz_catalog_schema_id()
1075 }
1076
1077 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1078 self.state.get_pg_catalog_schema_id()
1079 }
1080
1081 pub fn get_information_schema_id(&self) -> SchemaId {
1082 self.state.get_information_schema_id()
1083 }
1084
1085 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1086 self.state.get_mz_internal_schema_id()
1087 }
1088
1089 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1090 self.state.get_mz_introspection_schema_id()
1091 }
1092
1093 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1094 self.state.get_mz_unsafe_schema_id()
1095 }
1096
1097 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1098 self.state.system_schema_ids()
1099 }
1100
1101 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1102 self.state.get_database(id)
1103 }
1104
1105 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1106 self.state.try_get_role(id)
1107 }
1108
1109 pub fn get_role(&self, id: &RoleId) -> &Role {
1110 self.state.get_role(id)
1111 }
1112
1113 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1114 self.state.try_get_role_by_name(role_name)
1115 }
1116
1117 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1118 self.state.try_get_role_auth_by_id(id)
1119 }
1120
1121 pub fn create_temporary_schema(
1124 &mut self,
1125 conn_id: &ConnectionId,
1126 owner_id: RoleId,
1127 ) -> Result<(), Error> {
1128 self.state.create_temporary_schema(conn_id, owner_id)
1129 }
1130
1131 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1132 self.state
1134 .temporary_schemas
1135 .get(conn_id)
1136 .map(|schema| schema.items.contains_key(item_name))
1137 .unwrap_or(false)
1138 }
1139
1140 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1143 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1144 return Ok(());
1145 };
1146 if !schema.items.is_empty() {
1147 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1148 }
1149 Ok(())
1150 }
1151
1152 pub(crate) fn object_dependents(
1153 &self,
1154 object_ids: &Vec<ObjectId>,
1155 conn_id: &ConnectionId,
1156 ) -> Vec<ObjectId> {
1157 let mut seen = BTreeSet::new();
1158 self.state.object_dependents(object_ids, conn_id, &mut seen)
1159 }
1160
1161 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1162 FullNameV1 {
1163 database: name.database.to_string(),
1164 schema: name.schema.clone(),
1165 item: name.item.clone(),
1166 }
1167 }
1168
1169 pub fn find_available_cluster_name(&self, name: &str) -> String {
1170 let mut i = 0;
1171 let mut candidate = name.to_string();
1172 while self.state.clusters_by_name.contains_key(&candidate) {
1173 i += 1;
1174 candidate = format!("{}{}", name, i);
1175 }
1176 candidate
1177 }
1178
1179 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1180 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1181 self.cluster_replica_sizes()
1182 .enabled_allocations()
1183 .map(|a| a.0.to_owned())
1184 .collect::<Vec<_>>()
1185 } else {
1186 self.system_config().allowed_cluster_replica_sizes()
1187 }
1188 }
1189
1190 pub fn concretize_replica_location(
1191 &self,
1192 location: mz_catalog::durable::ReplicaLocation,
1193 allowed_sizes: &Vec<String>,
1194 allowed_availability_zones: Option<&[String]>,
1195 ) -> Result<ReplicaLocation, Error> {
1196 self.state
1197 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1198 }
1199
1200 pub(crate) fn ensure_valid_replica_size(
1201 &self,
1202 allowed_sizes: &[String],
1203 size: &String,
1204 ) -> Result<(), Error> {
1205 self.state.ensure_valid_replica_size(allowed_sizes, size)
1206 }
1207
1208 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1209 &self.state.cluster_replica_sizes
1210 }
1211
1212 pub fn get_privileges(
1214 &self,
1215 id: &SystemObjectId,
1216 conn_id: &ConnectionId,
1217 ) -> Option<&PrivilegeMap> {
1218 match id {
1219 SystemObjectId::Object(id) => match id {
1220 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1221 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1222 ObjectId::Schema((database_spec, schema_spec)) => Some(
1223 self.get_schema(database_spec, schema_spec, conn_id)
1224 .privileges(),
1225 ),
1226 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1227 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1228 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1229 },
1230 SystemObjectId::System => Some(&self.state.system_privileges),
1231 }
1232 }
1233
1234 #[mz_ore::instrument(level = "debug")]
1235 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1236 Ok(self.storage().await.confirm_leadership().await?)
1237 }
1238
1239 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1241 self.state.introspection_dependencies(id)
1242 }
1243
1244 pub fn dump(&self) -> Result<CatalogDump, Error> {
1250 Ok(CatalogDump::new(self.state.dump(None)?))
1251 }
1252
1253 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1257 self.state.check_consistency().map_err(|inconsistencies| {
1258 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1259 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1260 })
1261 })
1262 }
1263
1264 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1265 self.state.config()
1266 }
1267
1268 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1269 self.state.entry_by_id.values()
1270 }
1271
1272 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1273 self.entries()
1274 .filter(|entry| entry.is_connection() && entry.id().is_user())
1275 }
1276
1277 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1278 self.entries()
1279 .filter(|entry| entry.is_table() && entry.id().is_user())
1280 }
1281
1282 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1283 self.entries()
1284 .filter(|entry| entry.is_source() && entry.id().is_user())
1285 }
1286
1287 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1288 self.entries()
1289 .filter(|entry| entry.is_sink() && entry.id().is_user())
1290 }
1291
1292 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1293 self.entries()
1294 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1295 }
1296
1297 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1298 self.entries()
1299 .filter(|entry| entry.is_secret() && entry.id().is_user())
1300 }
1301
1302 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1303 self.state.get_network_policy(&network_policy_id)
1304 }
1305
1306 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1307 self.state.try_get_network_policy_by_name(name)
1308 }
1309
1310 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1311 self.state.clusters_by_id.values()
1312 }
1313
1314 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1315 self.state.get_cluster(cluster_id)
1316 }
1317
1318 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1319 self.state.try_get_cluster(cluster_id)
1320 }
1321
1322 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1323 self.clusters().filter(|cluster| cluster.id.is_user())
1324 }
1325
1326 pub fn get_cluster_replica(
1327 &self,
1328 cluster_id: ClusterId,
1329 replica_id: ReplicaId,
1330 ) -> &ClusterReplica {
1331 self.state.get_cluster_replica(cluster_id, replica_id)
1332 }
1333
1334 pub fn try_get_cluster_replica(
1335 &self,
1336 cluster_id: ClusterId,
1337 replica_id: ReplicaId,
1338 ) -> Option<&ClusterReplica> {
1339 self.state.try_get_cluster_replica(cluster_id, replica_id)
1340 }
1341
1342 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1343 self.user_clusters()
1344 .flat_map(|cluster| cluster.user_replicas())
1345 }
1346
1347 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1348 self.state.database_by_id.values()
1349 }
1350
1351 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1352 self.state
1353 .roles_by_id
1354 .values()
1355 .filter(|role| role.is_user())
1356 }
1357
1358 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1359 self.entries()
1360 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1361 }
1362
1363 pub fn system_privileges(&self) -> &PrivilegeMap {
1364 &self.state.system_privileges
1365 }
1366
1367 pub fn default_privileges(
1368 &self,
1369 ) -> impl Iterator<
1370 Item = (
1371 &DefaultPrivilegeObject,
1372 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1373 ),
1374 > {
1375 self.state.default_privileges.iter()
1376 }
1377
1378 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1379 self.state
1380 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1381 }
1382
1383 pub fn pack_storage_usage_update(
1384 &self,
1385 event: VersionedStorageUsage,
1386 diff: Diff,
1387 ) -> BuiltinTableUpdate {
1388 self.state
1389 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1390 }
1391
1392 pub fn system_config(&self) -> &SystemVars {
1393 self.state.system_config()
1394 }
1395
1396 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1397 self.state.system_config_mut()
1398 }
1399
1400 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1401 self.state.ensure_not_reserved_role(role_id)
1402 }
1403
1404 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1405 self.state.ensure_grantable_role(role_id)
1406 }
1407
1408 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1409 self.state.ensure_not_system_role(role_id)
1410 }
1411
1412 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1413 self.state.ensure_not_predefined_role(role_id)
1414 }
1415
1416 pub fn ensure_not_reserved_network_policy(
1417 &self,
1418 network_policy_id: &NetworkPolicyId,
1419 ) -> Result<(), Error> {
1420 self.state
1421 .ensure_not_reserved_network_policy(network_policy_id)
1422 }
1423
1424 pub fn ensure_not_reserved_object(
1425 &self,
1426 object_id: &ObjectId,
1427 conn_id: &ConnectionId,
1428 ) -> Result<(), Error> {
1429 match object_id {
1430 ObjectId::Cluster(cluster_id) => {
1431 if cluster_id.is_system() {
1432 let cluster = self.get_cluster(*cluster_id);
1433 Err(Error::new(ErrorKind::ReadOnlyCluster(
1434 cluster.name().to_string(),
1435 )))
1436 } else {
1437 Ok(())
1438 }
1439 }
1440 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1441 if replica_id.is_system() {
1442 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1443 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1444 replica.name().to_string(),
1445 )))
1446 } else {
1447 Ok(())
1448 }
1449 }
1450 ObjectId::Database(database_id) => {
1451 if database_id.is_system() {
1452 let database = self.get_database(database_id);
1453 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1454 database.name().to_string(),
1455 )))
1456 } else {
1457 Ok(())
1458 }
1459 }
1460 ObjectId::Schema((database_spec, schema_spec)) => {
1461 if schema_spec.is_system() {
1462 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1463 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1464 schema.name().schema.clone(),
1465 )))
1466 } else {
1467 Ok(())
1468 }
1469 }
1470 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1471 ObjectId::Item(item_id) => {
1472 if item_id.is_system() {
1473 let item = self.get_entry(item_id);
1474 let name = self.resolve_full_name(item.name(), Some(conn_id));
1475 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1476 } else {
1477 Ok(())
1478 }
1479 }
1480 ObjectId::NetworkPolicy(network_policy_id) => {
1481 self.ensure_not_reserved_network_policy(network_policy_id)
1482 }
1483 }
1484 }
1485
1486 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1488 &mut self,
1489 create_sql: &str,
1490 force_if_exists_skip: bool,
1491 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1492 self.state
1493 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1494 }
1495
1496 pub(crate) fn update_expression_cache<'a, 'b>(
1497 &'a self,
1498 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1499 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1500 ) -> BoxFuture<'b, ()> {
1501 if let Some(expr_cache) = &self.expr_cache_handle {
1502 let ons = new_local_expressions
1503 .iter()
1504 .map(|(id, _)| id)
1505 .chain(new_global_expressions.iter().map(|(id, _)| id))
1506 .map(|id| self.get_entry_by_global_id(id))
1507 .filter_map(|entry| entry.index().map(|index| index.on));
1508 let invalidate_ids = self.invalidate_for_index(ons);
1509 expr_cache
1510 .update(
1511 new_local_expressions,
1512 new_global_expressions,
1513 invalidate_ids,
1514 )
1515 .boxed()
1516 } else {
1517 async {}.boxed()
1518 }
1519 }
1520
1521 #[cfg(test)]
1525 async fn sync_to_current_updates(
1526 &mut self,
1527 ) -> Result<
1528 (
1529 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1530 Vec<ParsedStateUpdate>,
1531 ),
1532 CatalogError,
1533 > {
1534 let updates = self.storage().await.sync_to_current_updates().await?;
1535 let (builtin_table_updates, catalog_updates) = self
1536 .state
1537 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1538 .await;
1539 Ok((builtin_table_updates, catalog_updates))
1540 }
1541}
1542
1543pub fn is_reserved_name(name: &str) -> bool {
1544 BUILTIN_PREFIXES
1545 .iter()
1546 .any(|prefix| name.starts_with(prefix))
1547}
1548
1549pub fn is_reserved_role_name(name: &str) -> bool {
1550 is_reserved_name(name) || is_public_role(name)
1551}
1552
1553pub fn is_public_role(name: &str) -> bool {
1554 name == &*PUBLIC_ROLE_NAME
1555}
1556
1557pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1558 object_type_to_audit_object_type(sql_type.into())
1559}
1560
1561pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1562 match id {
1563 CommentObjectId::Table(_) => ObjectType::Table,
1564 CommentObjectId::View(_) => ObjectType::View,
1565 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1566 CommentObjectId::Source(_) => ObjectType::Source,
1567 CommentObjectId::Sink(_) => ObjectType::Sink,
1568 CommentObjectId::Index(_) => ObjectType::Index,
1569 CommentObjectId::Func(_) => ObjectType::Func,
1570 CommentObjectId::Connection(_) => ObjectType::Connection,
1571 CommentObjectId::Type(_) => ObjectType::Type,
1572 CommentObjectId::Secret(_) => ObjectType::Secret,
1573 CommentObjectId::Role(_) => ObjectType::Role,
1574 CommentObjectId::Database(_) => ObjectType::Database,
1575 CommentObjectId::Schema(_) => ObjectType::Schema,
1576 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1577 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1578 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1579 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1580 }
1581}
1582
1583pub(crate) fn object_type_to_audit_object_type(
1584 object_type: mz_sql::catalog::ObjectType,
1585) -> ObjectType {
1586 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1587}
1588
1589pub(crate) fn system_object_type_to_audit_object_type(
1590 system_type: &SystemObjectType,
1591) -> ObjectType {
1592 match system_type {
1593 SystemObjectType::Object(object_type) => match object_type {
1594 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1595 mz_sql::catalog::ObjectType::View => ObjectType::View,
1596 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1597 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1598 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1599 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1600 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1601 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1602 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1603 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1604 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1605 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1606 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1607 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1608 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1609 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1610 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1611 },
1612 SystemObjectType::System => ObjectType::System,
1613 }
1614}
1615
1616#[derive(Debug, Copy, Clone)]
1617pub enum UpdatePrivilegeVariant {
1618 Grant,
1619 Revoke,
1620}
1621
1622impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1623 fn from(variant: UpdatePrivilegeVariant) -> Self {
1624 match variant {
1625 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1626 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1627 }
1628 }
1629}
1630
1631impl From<UpdatePrivilegeVariant> for EventType {
1632 fn from(variant: UpdatePrivilegeVariant) -> Self {
1633 match variant {
1634 UpdatePrivilegeVariant::Grant => EventType::Grant,
1635 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1636 }
1637 }
1638}
1639
1640impl ConnCatalog<'_> {
1641 fn resolve_item_name(
1642 &self,
1643 name: &PartialItemName,
1644 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1645 self.resolve_item(name).map(|entry| entry.name())
1646 }
1647
1648 fn resolve_function_name(
1649 &self,
1650 name: &PartialItemName,
1651 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1652 self.resolve_function(name).map(|entry| entry.name())
1653 }
1654
1655 fn resolve_type_name(
1656 &self,
1657 name: &PartialItemName,
1658 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1659 self.resolve_type(name).map(|entry| entry.name())
1660 }
1661}
1662
1663impl ExprHumanizer for ConnCatalog<'_> {
1664 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1665 let entry = self.state.try_get_entry_by_global_id(&id)?;
1666 Some(self.resolve_full_name(entry.name()).to_string())
1667 }
1668
1669 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1670 let entry = self.state.try_get_entry_by_global_id(&id)?;
1671 Some(entry.name().item.clone())
1672 }
1673
1674 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1675 let entry = self.state.try_get_entry_by_global_id(&id)?;
1676 Some(self.resolve_full_name(entry.name()).into_parts())
1677 }
1678
1679 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1680 use SqlScalarType::*;
1681
1682 match typ {
1683 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1684 List {
1685 custom_id: Some(item_id),
1686 ..
1687 }
1688 | Map {
1689 custom_id: Some(item_id),
1690 ..
1691 } => {
1692 let item = self.get_item(item_id);
1693 self.minimal_qualification(item.name()).to_string()
1694 }
1695 List { element_type, .. } => {
1696 format!(
1697 "{} list",
1698 self.humanize_scalar_type(element_type, postgres_compat)
1699 )
1700 }
1701 Map { value_type, .. } => format!(
1702 "map[{}=>{}]",
1703 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1704 self.humanize_scalar_type(value_type, postgres_compat)
1705 ),
1706 Record {
1707 custom_id: Some(item_id),
1708 ..
1709 } => {
1710 let item = self.get_item(item_id);
1711 self.minimal_qualification(item.name()).to_string()
1712 }
1713 Record { fields, .. } => format!(
1714 "record({})",
1715 fields
1716 .iter()
1717 .map(|f| format!(
1718 "{}: {}",
1719 f.0,
1720 self.humanize_column_type(&f.1, postgres_compat)
1721 ))
1722 .join(",")
1723 ),
1724 PgLegacyChar => "\"char\"".into(),
1725 Char { length } if !postgres_compat => match length {
1726 None => "char".into(),
1727 Some(length) => format!("char({})", length.into_u32()),
1728 },
1729 VarChar { max_length } if !postgres_compat => match max_length {
1730 None => "varchar".into(),
1731 Some(length) => format!("varchar({})", length.into_u32()),
1732 },
1733 UInt16 => "uint2".into(),
1734 UInt32 => "uint4".into(),
1735 UInt64 => "uint8".into(),
1736 ty => {
1737 let pgrepr_type = mz_pgrepr::Type::from(ty);
1738 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1739
1740 let res = if self
1741 .effective_search_path(true)
1742 .iter()
1743 .any(|(_, schema)| schema == &pg_catalog_schema)
1744 {
1745 pgrepr_type.name().to_string()
1746 } else {
1747 let name = QualifiedItemName {
1750 qualifiers: ItemQualifiers {
1751 database_spec: ResolvedDatabaseSpecifier::Ambient,
1752 schema_spec: pg_catalog_schema,
1753 },
1754 item: pgrepr_type.name().to_string(),
1755 };
1756 self.resolve_full_name(&name).to_string()
1757 };
1758 res
1759 }
1760 }
1761 }
1762
1763 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1764 let entry = self.state.try_get_entry_by_global_id(&id)?;
1765
1766 match entry.index() {
1767 Some(index) => {
1768 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1769 let mut on_names = on_desc
1770 .iter_names()
1771 .map(|col_name| col_name.to_string())
1772 .collect::<Vec<_>>();
1773
1774 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1775
1776 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1780 let mut ix_names = vec![String::new(); ix_arity];
1781
1782 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1784 let on_name = on_names.get_mut(on_pos).expect("on_name");
1785 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1786 std::mem::swap(on_name, ix_name);
1787 }
1788
1789 Some(ix_names) }
1791 None => {
1792 let desc = self.state.try_get_desc_by_global_id(&id)?;
1793 let column_names = desc
1794 .iter_names()
1795 .map(|col_name| col_name.to_string())
1796 .collect();
1797
1798 Some(column_names)
1799 }
1800 }
1801 }
1802
1803 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1804 let desc = self.state.try_get_desc_by_global_id(&id)?;
1805 Some(desc.get_name(column).to_string())
1806 }
1807
1808 fn id_exists(&self, id: GlobalId) -> bool {
1809 self.state.entry_by_global_id.contains_key(&id)
1810 }
1811}
1812
1813impl SessionCatalog for ConnCatalog<'_> {
1814 fn active_role_id(&self) -> &RoleId {
1815 &self.role_id
1816 }
1817
1818 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1819 self.prepared_statements
1820 .as_ref()
1821 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1822 .flatten()
1823 }
1824
1825 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1826 self.portals
1827 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1828 }
1829
1830 fn active_database(&self) -> Option<&DatabaseId> {
1831 self.database.as_ref()
1832 }
1833
1834 fn active_cluster(&self) -> &str {
1835 &self.cluster
1836 }
1837
1838 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1839 &self.search_path
1840 }
1841
1842 fn resolve_database(
1843 &self,
1844 database_name: &str,
1845 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1846 Ok(self.state.resolve_database(database_name)?)
1847 }
1848
1849 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1850 self.state
1851 .database_by_id
1852 .get(id)
1853 .expect("database doesn't exist")
1854 }
1855
1856 #[allow(clippy::as_conversions)]
1858 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1859 self.state
1860 .database_by_id
1861 .values()
1862 .map(|database| database as &dyn CatalogDatabase)
1863 .collect()
1864 }
1865
1866 fn resolve_schema(
1867 &self,
1868 database_name: Option<&str>,
1869 schema_name: &str,
1870 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1871 Ok(self.state.resolve_schema(
1872 self.database.as_ref(),
1873 database_name,
1874 schema_name,
1875 &self.conn_id,
1876 )?)
1877 }
1878
1879 fn resolve_schema_in_database(
1880 &self,
1881 database_spec: &ResolvedDatabaseSpecifier,
1882 schema_name: &str,
1883 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1884 Ok(self
1885 .state
1886 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1887 }
1888
1889 fn get_schema(
1890 &self,
1891 database_spec: &ResolvedDatabaseSpecifier,
1892 schema_spec: &SchemaSpecifier,
1893 ) -> &dyn CatalogSchema {
1894 self.state
1895 .get_schema(database_spec, schema_spec, &self.conn_id)
1896 }
1897
1898 #[allow(clippy::as_conversions)]
1900 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1901 self.get_databases()
1902 .into_iter()
1903 .flat_map(|database| database.schemas().into_iter())
1904 .chain(
1905 self.state
1906 .ambient_schemas_by_id
1907 .values()
1908 .chain(self.state.temporary_schemas.values())
1909 .map(|schema| schema as &dyn CatalogSchema),
1910 )
1911 .collect()
1912 }
1913
1914 fn get_mz_internal_schema_id(&self) -> SchemaId {
1915 self.state().get_mz_internal_schema_id()
1916 }
1917
1918 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1919 self.state().get_mz_unsafe_schema_id()
1920 }
1921
1922 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1923 self.state.is_system_schema_specifier(schema)
1924 }
1925
1926 fn resolve_role(
1927 &self,
1928 role_name: &str,
1929 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1930 match self.state.try_get_role_by_name(role_name) {
1931 Some(role) => Ok(role),
1932 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1933 }
1934 }
1935
1936 fn resolve_network_policy(
1937 &self,
1938 policy_name: &str,
1939 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1940 match self.state.try_get_network_policy_by_name(policy_name) {
1941 Some(policy) => Ok(policy),
1942 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1943 }
1944 }
1945
1946 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1947 Some(self.state.roles_by_id.get(id)?)
1948 }
1949
1950 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1951 self.state.get_role(id)
1952 }
1953
1954 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1955 #[allow(clippy::as_conversions)]
1957 self.state
1958 .roles_by_id
1959 .values()
1960 .map(|role| role as &dyn CatalogRole)
1961 .collect()
1962 }
1963
1964 fn mz_system_role_id(&self) -> RoleId {
1965 MZ_SYSTEM_ROLE_ID
1966 }
1967
1968 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1969 self.state.collect_role_membership(id)
1970 }
1971
1972 fn get_network_policy(
1973 &self,
1974 id: &NetworkPolicyId,
1975 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1976 self.state.get_network_policy(id)
1977 }
1978
1979 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1980 #[allow(clippy::as_conversions)]
1982 self.state
1983 .network_policies_by_id
1984 .values()
1985 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1986 .collect()
1987 }
1988
1989 fn resolve_cluster(
1990 &self,
1991 cluster_name: Option<&str>,
1992 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1993 Ok(self
1994 .state
1995 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1996 }
1997
1998 fn resolve_cluster_replica(
1999 &self,
2000 cluster_replica_name: &QualifiedReplica,
2001 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
2002 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
2003 }
2004
2005 fn resolve_item(
2006 &self,
2007 name: &PartialItemName,
2008 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2009 let r = self.state.resolve_entry(
2010 self.database.as_ref(),
2011 &self.effective_search_path(true),
2012 name,
2013 &self.conn_id,
2014 )?;
2015 if self.unresolvable_ids.contains(&r.id()) {
2016 Err(SqlCatalogError::UnknownItem(name.to_string()))
2017 } else {
2018 Ok(r)
2019 }
2020 }
2021
2022 fn resolve_function(
2023 &self,
2024 name: &PartialItemName,
2025 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2026 let r = self.state.resolve_function(
2027 self.database.as_ref(),
2028 &self.effective_search_path(false),
2029 name,
2030 &self.conn_id,
2031 )?;
2032
2033 if self.unresolvable_ids.contains(&r.id()) {
2034 Err(SqlCatalogError::UnknownFunction {
2035 name: name.to_string(),
2036 alternative: None,
2037 })
2038 } else {
2039 Ok(r)
2040 }
2041 }
2042
2043 fn resolve_type(
2044 &self,
2045 name: &PartialItemName,
2046 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2047 let r = self.state.resolve_type(
2048 self.database.as_ref(),
2049 &self.effective_search_path(false),
2050 name,
2051 &self.conn_id,
2052 )?;
2053
2054 if self.unresolvable_ids.contains(&r.id()) {
2055 Err(SqlCatalogError::UnknownType {
2056 name: name.to_string(),
2057 })
2058 } else {
2059 Ok(r)
2060 }
2061 }
2062
2063 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2064 self.state.get_system_type(name)
2065 }
2066
2067 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2068 Some(self.state.try_get_entry(id)?)
2069 }
2070
2071 fn try_get_item_by_global_id(
2072 &self,
2073 id: &GlobalId,
2074 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2075 let entry = self.state.try_get_entry_by_global_id(id)?;
2076 let entry = match &entry.item {
2077 CatalogItem::Table(table) => {
2078 let (version, _gid) = table
2079 .collections
2080 .iter()
2081 .find(|(_version, gid)| *gid == id)
2082 .expect("catalog out of sync, mismatched GlobalId");
2083 entry.at_version(RelationVersionSelector::Specific(*version))
2084 }
2085 _ => entry.at_version(RelationVersionSelector::Latest),
2086 };
2087 Some(entry)
2088 }
2089
2090 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2091 self.state.get_entry(id)
2092 }
2093
2094 fn get_item_by_global_id(
2095 &self,
2096 id: &GlobalId,
2097 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2098 let entry = self.state.get_entry_by_global_id(id);
2099 let entry = match &entry.item {
2100 CatalogItem::Table(table) => {
2101 let (version, _gid) = table
2102 .collections
2103 .iter()
2104 .find(|(_version, gid)| *gid == id)
2105 .expect("catalog out of sync, mismatched GlobalId");
2106 entry.at_version(RelationVersionSelector::Specific(*version))
2107 }
2108 _ => entry.at_version(RelationVersionSelector::Latest),
2109 };
2110 entry
2111 }
2112
2113 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2114 self.get_schemas()
2115 .into_iter()
2116 .flat_map(|schema| schema.item_ids())
2117 .map(|id| self.get_item(&id))
2118 .collect()
2119 }
2120
2121 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2122 self.state
2123 .get_item_by_name(name, &self.conn_id)
2124 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2125 }
2126
2127 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2128 self.state
2129 .get_type_by_name(name, &self.conn_id)
2130 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2131 }
2132
2133 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2134 &self.state.clusters_by_id[&id]
2135 }
2136
2137 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2138 self.state
2139 .clusters_by_id
2140 .values()
2141 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2142 .collect()
2143 }
2144
2145 fn get_cluster_replica(
2146 &self,
2147 cluster_id: ClusterId,
2148 replica_id: ReplicaId,
2149 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2150 let cluster = self.get_cluster(cluster_id);
2151 cluster.replica(replica_id)
2152 }
2153
2154 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2155 self.get_clusters()
2156 .into_iter()
2157 .flat_map(|cluster| cluster.replicas().into_iter())
2158 .collect()
2159 }
2160
2161 fn get_system_privileges(&self) -> &PrivilegeMap {
2162 &self.state.system_privileges
2163 }
2164
2165 fn get_default_privileges(
2166 &self,
2167 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2168 self.state
2169 .default_privileges
2170 .iter()
2171 .map(|(object, acl_items)| (object, acl_items.collect()))
2172 .collect()
2173 }
2174
2175 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2176 self.state.find_available_name(name, &self.conn_id)
2177 }
2178
2179 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2180 self.state.resolve_full_name(name, Some(&self.conn_id))
2181 }
2182
2183 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2184 self.state.resolve_full_schema_name(name)
2185 }
2186
2187 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2188 self.state.get_entry_by_global_id(global_id).id()
2189 }
2190
2191 fn resolve_global_id(
2192 &self,
2193 item_id: &CatalogItemId,
2194 version: RelationVersionSelector,
2195 ) -> GlobalId {
2196 self.state
2197 .get_entry(item_id)
2198 .at_version(version)
2199 .global_id()
2200 }
2201
2202 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2203 self.state.config()
2204 }
2205
2206 fn now(&self) -> EpochMillis {
2207 (self.state.config().now)()
2208 }
2209
2210 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2211 self.state.aws_privatelink_availability_zones.clone()
2212 }
2213
2214 fn system_vars(&self) -> &SystemVars {
2215 &self.state.system_configuration
2216 }
2217
2218 fn system_vars_mut(&mut self) -> &mut SystemVars {
2219 &mut self.state.to_mut().system_configuration
2220 }
2221
2222 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2223 self.state().get_owner_id(id, self.conn_id())
2224 }
2225
2226 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2227 match id {
2228 SystemObjectId::System => Some(&self.state.system_privileges),
2229 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2230 Some(self.get_cluster(*id).privileges())
2231 }
2232 SystemObjectId::Object(ObjectId::Database(id)) => {
2233 Some(self.get_database(id).privileges())
2234 }
2235 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2236 self.state
2239 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2240 .map(|schema| schema.privileges())
2241 }
2242 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2243 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2244 Some(self.get_network_policy(id).privileges())
2245 }
2246 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2247 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2248 }
2249 }
2250
2251 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2252 let mut seen = BTreeSet::new();
2253 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2254 }
2255
2256 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2257 let mut seen = BTreeSet::new();
2258 self.state.item_dependents(id, &mut seen)
2259 }
2260
2261 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2262 rbac::all_object_privileges(object_type)
2263 }
2264
2265 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2266 self.state.get_object_type(object_id)
2267 }
2268
2269 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2270 self.state.get_system_object_type(id)
2271 }
2272
2273 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2280 if qualified_name.qualifiers.schema_spec.is_temporary() {
2281 return qualified_name.item.clone().into();
2289 }
2290
2291 let database_id = match &qualified_name.qualifiers.database_spec {
2292 ResolvedDatabaseSpecifier::Ambient => None,
2293 ResolvedDatabaseSpecifier::Id(id)
2294 if self.database.is_some() && self.database == Some(*id) =>
2295 {
2296 None
2297 }
2298 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2299 };
2300
2301 let schema_spec = if database_id.is_none()
2302 && self.resolve_item_name(&PartialItemName {
2303 database: None,
2304 schema: None,
2305 item: qualified_name.item.clone(),
2306 }) == Ok(qualified_name)
2307 || self.resolve_function_name(&PartialItemName {
2308 database: None,
2309 schema: None,
2310 item: qualified_name.item.clone(),
2311 }) == Ok(qualified_name)
2312 || self.resolve_type_name(&PartialItemName {
2313 database: None,
2314 schema: None,
2315 item: qualified_name.item.clone(),
2316 }) == Ok(qualified_name)
2317 {
2318 None
2319 } else {
2320 Some(qualified_name.qualifiers.schema_spec.clone())
2323 };
2324
2325 let res = PartialItemName {
2326 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2327 schema: schema_spec.map(|spec| {
2328 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2329 .name()
2330 .schema
2331 .clone()
2332 }),
2333 item: qualified_name.item.clone(),
2334 };
2335 assert!(
2336 self.resolve_item_name(&res) == Ok(qualified_name)
2337 || self.resolve_function_name(&res) == Ok(qualified_name)
2338 || self.resolve_type_name(&res) == Ok(qualified_name)
2339 );
2340 res
2341 }
2342
2343 fn add_notice(&self, notice: PlanNotice) {
2344 let _ = self.notices_tx.send(notice.into());
2345 }
2346
2347 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2348 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2349 self.state.comments.get_object_comments(comment_id)
2350 }
2351
2352 fn is_cluster_size_cc(&self, size: &str) -> bool {
2353 self.state
2354 .cluster_replica_sizes
2355 .0
2356 .get(size)
2357 .map_or(false, |a| a.is_cc)
2358 }
2359}
2360
2361#[cfg(test)]
2362mod tests {
2363 use std::collections::{BTreeMap, BTreeSet};
2364 use std::sync::Arc;
2365 use std::{env, iter};
2366
2367 use itertools::Itertools;
2368 use mz_catalog::memory::objects::CatalogItem;
2369 use tokio_postgres::NoTls;
2370 use tokio_postgres::types::Type;
2371 use uuid::Uuid;
2372
2373 use mz_catalog::SYSTEM_CONN_ID;
2374 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2375 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2376 use mz_controller_types::{ClusterId, ReplicaId};
2377 use mz_expr::MirScalarExpr;
2378 use mz_ore::now::to_datetime;
2379 use mz_ore::{assert_err, assert_ok, task};
2380 use mz_persist_client::PersistClient;
2381 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2382 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2383 use mz_repr::role_id::RoleId;
2384 use mz_repr::{
2385 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2386 SqlScalarType, Timestamp,
2387 };
2388 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2389 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2390 use mz_sql::names::{
2391 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2392 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2393 };
2394 use mz_sql::plan::{
2395 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2396 Scope, StatementContext,
2397 };
2398 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2399 use mz_sql::session::vars::{SystemVars, VarInput};
2400
2401 use crate::catalog::state::LocalExpressionCache;
2402 use crate::catalog::{Catalog, Op};
2403 use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2404 use crate::session::Session;
2405
2406 #[mz_ore::test(tokio::test)]
2413 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2415 Catalog::with_debug(|catalog| async move {
2416 struct TestCase {
2417 input: QualifiedItemName,
2418 system_output: PartialItemName,
2419 normal_output: PartialItemName,
2420 }
2421
2422 let test_cases = vec![
2423 TestCase {
2424 input: QualifiedItemName {
2425 qualifiers: ItemQualifiers {
2426 database_spec: ResolvedDatabaseSpecifier::Ambient,
2427 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2428 },
2429 item: "numeric".to_string(),
2430 },
2431 system_output: PartialItemName {
2432 database: None,
2433 schema: None,
2434 item: "numeric".to_string(),
2435 },
2436 normal_output: PartialItemName {
2437 database: None,
2438 schema: None,
2439 item: "numeric".to_string(),
2440 },
2441 },
2442 TestCase {
2443 input: QualifiedItemName {
2444 qualifiers: ItemQualifiers {
2445 database_spec: ResolvedDatabaseSpecifier::Ambient,
2446 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2447 },
2448 item: "mz_array_types".to_string(),
2449 },
2450 system_output: PartialItemName {
2451 database: None,
2452 schema: None,
2453 item: "mz_array_types".to_string(),
2454 },
2455 normal_output: PartialItemName {
2456 database: None,
2457 schema: None,
2458 item: "mz_array_types".to_string(),
2459 },
2460 },
2461 ];
2462
2463 for tc in test_cases {
2464 assert_eq!(
2465 catalog
2466 .for_system_session()
2467 .minimal_qualification(&tc.input),
2468 tc.system_output
2469 );
2470 assert_eq!(
2471 catalog
2472 .for_session(&Session::dummy())
2473 .minimal_qualification(&tc.input),
2474 tc.normal_output
2475 );
2476 }
2477 catalog.expire().await;
2478 })
2479 .await
2480 }
2481
2482 #[mz_ore::test(tokio::test)]
2483 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2485 let persist_client = PersistClient::new_for_tests().await;
2486 let organization_id = Uuid::new_v4();
2487 let bootstrap_args = test_bootstrap_args();
2488 {
2489 let mut catalog = Catalog::open_debug_catalog(
2490 persist_client.clone(),
2491 organization_id.clone(),
2492 &bootstrap_args,
2493 )
2494 .await
2495 .expect("unable to open debug catalog");
2496 assert_eq!(catalog.transient_revision(), 1);
2497 let commit_ts = catalog.current_upper().await;
2498 catalog
2499 .transact(
2500 None,
2501 commit_ts,
2502 None,
2503 vec![Op::CreateDatabase {
2504 name: "test".to_string(),
2505 owner_id: MZ_SYSTEM_ROLE_ID,
2506 }],
2507 )
2508 .await
2509 .expect("failed to transact");
2510 assert_eq!(catalog.transient_revision(), 2);
2511 catalog.expire().await;
2512 }
2513 {
2514 let catalog =
2515 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2516 .await
2517 .expect("unable to open debug catalog");
2518 assert_eq!(catalog.transient_revision(), 1);
2520 catalog.expire().await;
2521 }
2522 }
2523
2524 #[mz_ore::test(tokio::test)]
2525 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2527 Catalog::with_debug(|catalog| async move {
2528 let mz_catalog_schema = (
2529 ResolvedDatabaseSpecifier::Ambient,
2530 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2531 );
2532 let pg_catalog_schema = (
2533 ResolvedDatabaseSpecifier::Ambient,
2534 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2535 );
2536 let mz_temp_schema = (
2537 ResolvedDatabaseSpecifier::Ambient,
2538 SchemaSpecifier::Temporary,
2539 );
2540
2541 let session = Session::dummy();
2543 let conn_catalog = catalog.for_session(&session);
2544 assert_ne!(
2545 conn_catalog.effective_search_path(false),
2546 conn_catalog.search_path
2547 );
2548 assert_ne!(
2549 conn_catalog.effective_search_path(true),
2550 conn_catalog.search_path
2551 );
2552 assert_eq!(
2553 conn_catalog.effective_search_path(false),
2554 vec![
2555 mz_catalog_schema.clone(),
2556 pg_catalog_schema.clone(),
2557 conn_catalog.search_path[0].clone()
2558 ]
2559 );
2560 assert_eq!(
2561 conn_catalog.effective_search_path(true),
2562 vec![
2563 mz_temp_schema.clone(),
2564 mz_catalog_schema.clone(),
2565 pg_catalog_schema.clone(),
2566 conn_catalog.search_path[0].clone()
2567 ]
2568 );
2569
2570 let mut session = Session::dummy();
2572 session
2573 .vars_mut()
2574 .set(
2575 &SystemVars::new(),
2576 "search_path",
2577 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2578 false,
2579 )
2580 .expect("failed to set search_path");
2581 let conn_catalog = catalog.for_session(&session);
2582 assert_ne!(
2583 conn_catalog.effective_search_path(false),
2584 conn_catalog.search_path
2585 );
2586 assert_ne!(
2587 conn_catalog.effective_search_path(true),
2588 conn_catalog.search_path
2589 );
2590 assert_eq!(
2591 conn_catalog.effective_search_path(false),
2592 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2593 );
2594 assert_eq!(
2595 conn_catalog.effective_search_path(true),
2596 vec![
2597 mz_temp_schema.clone(),
2598 mz_catalog_schema.clone(),
2599 pg_catalog_schema.clone()
2600 ]
2601 );
2602
2603 let mut session = Session::dummy();
2604 session
2605 .vars_mut()
2606 .set(
2607 &SystemVars::new(),
2608 "search_path",
2609 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2610 false,
2611 )
2612 .expect("failed to set search_path");
2613 let conn_catalog = catalog.for_session(&session);
2614 assert_ne!(
2615 conn_catalog.effective_search_path(false),
2616 conn_catalog.search_path
2617 );
2618 assert_ne!(
2619 conn_catalog.effective_search_path(true),
2620 conn_catalog.search_path
2621 );
2622 assert_eq!(
2623 conn_catalog.effective_search_path(false),
2624 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2625 );
2626 assert_eq!(
2627 conn_catalog.effective_search_path(true),
2628 vec![
2629 mz_temp_schema.clone(),
2630 pg_catalog_schema.clone(),
2631 mz_catalog_schema.clone()
2632 ]
2633 );
2634
2635 let mut session = Session::dummy();
2636 session
2637 .vars_mut()
2638 .set(
2639 &SystemVars::new(),
2640 "search_path",
2641 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2642 false,
2643 )
2644 .expect("failed to set search_path");
2645 let conn_catalog = catalog.for_session(&session);
2646 assert_ne!(
2647 conn_catalog.effective_search_path(false),
2648 conn_catalog.search_path
2649 );
2650 assert_ne!(
2651 conn_catalog.effective_search_path(true),
2652 conn_catalog.search_path
2653 );
2654 assert_eq!(
2655 conn_catalog.effective_search_path(false),
2656 vec![
2657 mz_catalog_schema.clone(),
2658 pg_catalog_schema.clone(),
2659 mz_temp_schema.clone()
2660 ]
2661 );
2662 assert_eq!(
2663 conn_catalog.effective_search_path(true),
2664 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2665 );
2666 catalog.expire().await;
2667 })
2668 .await
2669 }
2670
2671 #[mz_ore::test(tokio::test)]
2672 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2674 use mz_ore::collections::CollectionExt;
2675 Catalog::with_debug(|catalog| async move {
2676 let conn_catalog = catalog.for_system_session();
2677 let scx = &mut StatementContext::new(None, &conn_catalog);
2678
2679 let parsed = mz_sql_parser::parser::parse_statements(
2680 "create view public.foo as select 1 as bar",
2681 )
2682 .expect("")
2683 .into_element()
2684 .ast;
2685
2686 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2687
2688 assert_eq!(
2690 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2691 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2692 );
2693 catalog.expire().await;
2694 })
2695 .await;
2696 }
2697
2698 #[mz_ore::test(tokio::test)]
2700 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2702 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2703 let column = ", 1";
2704 let view_def_size = view_def.bytes().count();
2705 let column_size = column.bytes().count();
2706 let column_count =
2707 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2708 let columns = iter::repeat(column).take(column_count).join("");
2709 let create_sql = format!("{view_def}{columns})");
2710 let create_sql_check = create_sql.clone();
2711 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2712 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2713 &create_sql
2714 ));
2715
2716 let persist_client = PersistClient::new_for_tests().await;
2717 let organization_id = Uuid::new_v4();
2718 let id = CatalogItemId::User(1);
2719 let gid = GlobalId::User(1);
2720 let bootstrap_args = test_bootstrap_args();
2721 {
2722 let mut catalog = Catalog::open_debug_catalog(
2723 persist_client.clone(),
2724 organization_id.clone(),
2725 &bootstrap_args,
2726 )
2727 .await
2728 .expect("unable to open debug catalog");
2729 let item = catalog
2730 .state()
2731 .deserialize_item(
2732 gid,
2733 &create_sql,
2734 &BTreeMap::new(),
2735 &mut LocalExpressionCache::Closed,
2736 None,
2737 )
2738 .expect("unable to parse view");
2739 let commit_ts = catalog.current_upper().await;
2740 catalog
2741 .transact(
2742 None,
2743 commit_ts,
2744 None,
2745 vec![Op::CreateItem {
2746 item,
2747 name: QualifiedItemName {
2748 qualifiers: ItemQualifiers {
2749 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2750 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2751 },
2752 item: "v".to_string(),
2753 },
2754 id,
2755 owner_id: MZ_SYSTEM_ROLE_ID,
2756 }],
2757 )
2758 .await
2759 .expect("failed to transact");
2760 catalog.expire().await;
2761 }
2762 {
2763 let catalog =
2764 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2765 .await
2766 .expect("unable to open debug catalog");
2767 let view = catalog.get_entry(&id);
2768 assert_eq!("v", view.name.item);
2769 match &view.item {
2770 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2771 item => panic!("expected view, got {}", item.typ()),
2772 }
2773 catalog.expire().await;
2774 }
2775 }
2776
2777 #[mz_ore::test(tokio::test)]
2778 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2780 Catalog::with_debug(|catalog| async move {
2781 let conn_catalog = catalog.for_system_session();
2782
2783 assert_eq!(
2784 mz_sql::catalog::ObjectType::ClusterReplica,
2785 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2786 ClusterId::user(1).expect("1 is a valid ID"),
2787 ReplicaId::User(1)
2788 )))
2789 );
2790 assert_eq!(
2791 mz_sql::catalog::ObjectType::Role,
2792 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2793 );
2794 catalog.expire().await;
2795 })
2796 .await;
2797 }
2798
2799 #[mz_ore::test(tokio::test)]
2800 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2802 Catalog::with_debug(|catalog| async move {
2803 let conn_catalog = catalog.for_system_session();
2804
2805 assert_eq!(
2806 None,
2807 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2808 ClusterId::user(1).expect("1 is a valid ID"),
2809 ReplicaId::User(1),
2810 ))))
2811 );
2812 assert_eq!(
2813 None,
2814 conn_catalog
2815 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2816 );
2817 catalog.expire().await;
2818 })
2819 .await;
2820 }
2821
2822 #[mz_ore::test(tokio::test)]
2823 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2825 Catalog::with_debug(|catalog| async move {
2826 let conn_catalog = catalog.for_system_session();
2827
2828 let builtins_cfg = BuiltinsConfig {
2829 include_continual_tasks: true,
2830 };
2831 for builtin in BUILTINS::iter(&builtins_cfg) {
2832 let (schema, name, expected_desc) = match builtin {
2833 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2834 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2835 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2836 Builtin::Log(_)
2837 | Builtin::Type(_)
2838 | Builtin::Func(_)
2839 | Builtin::ContinualTask(_)
2840 | Builtin::Index(_)
2841 | Builtin::Connection(_) => continue,
2842 };
2843 let item = conn_catalog
2844 .resolve_item(&PartialItemName {
2845 database: None,
2846 schema: Some(schema.to_string()),
2847 item: name.to_string(),
2848 })
2849 .expect("unable to resolve item")
2850 .at_version(RelationVersionSelector::Latest);
2851
2852 let actual_desc = item.relation_desc().expect("invalid item type");
2853 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2854 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2855 {
2856 assert_eq!(
2857 actual_name, expected_name,
2858 "item {schema}.{name} column {index} name did not match its expected name"
2859 );
2860 assert_eq!(
2861 actual_typ, expected_typ,
2862 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2863 );
2864 }
2865 assert_eq!(
2866 &*actual_desc, expected_desc,
2867 "item {schema}.{name} did not match its expected RelationDesc"
2868 );
2869 }
2870 catalog.expire().await;
2871 })
2872 .await
2873 }
2874
2875 #[mz_ore::test(tokio::test)]
2878 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2880 async fn inner(catalog: Catalog) {
2881 let (client, connection) = tokio_postgres::connect(
2885 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2886 NoTls,
2887 )
2888 .await
2889 .expect("failed to connect to Postgres");
2890
2891 task::spawn(|| "compare_builtin_postgres", async move {
2892 if let Err(e) = connection.await {
2893 panic!("connection error: {}", e);
2894 }
2895 });
2896
2897 struct PgProc {
2898 name: String,
2899 arg_oids: Vec<u32>,
2900 ret_oid: Option<u32>,
2901 ret_set: bool,
2902 }
2903
2904 struct PgType {
2905 name: String,
2906 ty: String,
2907 elem: u32,
2908 array: u32,
2909 input: u32,
2910 receive: u32,
2911 }
2912
2913 struct PgOper {
2914 oprresult: u32,
2915 name: String,
2916 }
2917
2918 let pg_proc: BTreeMap<_, _> = client
2919 .query(
2920 "SELECT
2921 p.oid,
2922 proname,
2923 proargtypes,
2924 prorettype,
2925 proretset
2926 FROM pg_proc p
2927 JOIN pg_namespace n ON p.pronamespace = n.oid",
2928 &[],
2929 )
2930 .await
2931 .expect("pg query failed")
2932 .into_iter()
2933 .map(|row| {
2934 let oid: u32 = row.get("oid");
2935 let pg_proc = PgProc {
2936 name: row.get("proname"),
2937 arg_oids: row.get("proargtypes"),
2938 ret_oid: row.get("prorettype"),
2939 ret_set: row.get("proretset"),
2940 };
2941 (oid, pg_proc)
2942 })
2943 .collect();
2944
2945 let pg_type: BTreeMap<_, _> = client
2946 .query(
2947 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2948 &[],
2949 )
2950 .await
2951 .expect("pg query failed")
2952 .into_iter()
2953 .map(|row| {
2954 let oid: u32 = row.get("oid");
2955 let pg_type = PgType {
2956 name: row.get("typname"),
2957 ty: row.get("typtype"),
2958 elem: row.get("typelem"),
2959 array: row.get("typarray"),
2960 input: row.get("typinput"),
2961 receive: row.get("typreceive"),
2962 };
2963 (oid, pg_type)
2964 })
2965 .collect();
2966
2967 let pg_oper: BTreeMap<_, _> = client
2968 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2969 .await
2970 .expect("pg query failed")
2971 .into_iter()
2972 .map(|row| {
2973 let oid: u32 = row.get("oid");
2974 let pg_oper = PgOper {
2975 name: row.get("oprname"),
2976 oprresult: row.get("oprresult"),
2977 };
2978 (oid, pg_oper)
2979 })
2980 .collect();
2981
2982 let conn_catalog = catalog.for_system_session();
2983 let resolve_type_oid = |item: &str| {
2984 conn_catalog
2985 .resolve_type(&PartialItemName {
2986 database: None,
2987 schema: Some(PG_CATALOG_SCHEMA.into()),
2990 item: item.to_string(),
2991 })
2992 .expect("unable to resolve type")
2993 .oid()
2994 };
2995
2996 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2997 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2998 .collect();
2999
3000 let mut all_oids = BTreeSet::new();
3001
3002 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
3005 [
3006 (Type::NAME, Type::TEXT),
3008 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
3009 (Type::TIME, Type::TIMETZ),
3011 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
3012 ]
3013 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
3014 );
3015 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
3016 1619, ]);
3018 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
3019 if ignore_return_types.contains(&fn_oid) {
3020 return true;
3021 }
3022 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
3023 return true;
3024 }
3025 a == b
3026 };
3027
3028 let builtins_cfg = BuiltinsConfig {
3029 include_continual_tasks: true,
3030 };
3031 for builtin in BUILTINS::iter(&builtins_cfg) {
3032 match builtin {
3033 Builtin::Type(ty) => {
3034 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3035
3036 if ty.oid >= FIRST_MATERIALIZE_OID {
3037 continue;
3040 }
3041
3042 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3045 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3046 });
3047 assert_eq!(
3048 ty.name, pg_ty.name,
3049 "oid {} has name {} in postgres; expected {}",
3050 ty.oid, pg_ty.name, ty.name,
3051 );
3052
3053 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3054 None => (0, 0),
3055 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3056 };
3057 assert_eq!(
3058 typinput_oid, pg_ty.input,
3059 "type {} has typinput OID {:?} in mz but {:?} in pg",
3060 ty.name, typinput_oid, pg_ty.input,
3061 );
3062 assert_eq!(
3063 typreceive_oid, pg_ty.receive,
3064 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3065 ty.name, typreceive_oid, pg_ty.receive,
3066 );
3067 if typinput_oid != 0 {
3068 assert!(
3069 func_oids.contains(&typinput_oid),
3070 "type {} has typinput OID {} that does not exist in pg_proc",
3071 ty.name,
3072 typinput_oid,
3073 );
3074 }
3075 if typreceive_oid != 0 {
3076 assert!(
3077 func_oids.contains(&typreceive_oid),
3078 "type {} has typreceive OID {} that does not exist in pg_proc",
3079 ty.name,
3080 typreceive_oid,
3081 );
3082 }
3083
3084 match &ty.details.typ {
3086 CatalogType::Array { element_reference } => {
3087 let elem_ty = BUILTINS::iter(&builtins_cfg)
3088 .filter_map(|builtin| match builtin {
3089 Builtin::Type(ty @ BuiltinType { name, .. })
3090 if element_reference == name =>
3091 {
3092 Some(ty)
3093 }
3094 _ => None,
3095 })
3096 .next();
3097 let elem_ty = match elem_ty {
3098 Some(ty) => ty,
3099 None => {
3100 panic!("{} is unexpectedly not a type", element_reference)
3101 }
3102 };
3103 assert_eq!(
3104 pg_ty.elem, elem_ty.oid,
3105 "type {} has mismatched element OIDs",
3106 ty.name
3107 )
3108 }
3109 CatalogType::Pseudo => {
3110 assert_eq!(
3111 pg_ty.ty, "p",
3112 "type {} is not a pseudo type as expected",
3113 ty.name
3114 )
3115 }
3116 CatalogType::Range { .. } => {
3117 assert_eq!(
3118 pg_ty.ty, "r",
3119 "type {} is not a range type as expected",
3120 ty.name
3121 );
3122 }
3123 _ => {
3124 assert_eq!(
3125 pg_ty.ty, "b",
3126 "type {} is not a base type as expected",
3127 ty.name
3128 )
3129 }
3130 }
3131
3132 let schema = catalog
3134 .resolve_schema_in_database(
3135 &ResolvedDatabaseSpecifier::Ambient,
3136 ty.schema,
3137 &SYSTEM_CONN_ID,
3138 )
3139 .expect("unable to resolve schema");
3140 let allocated_type = catalog
3141 .resolve_type(
3142 None,
3143 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3144 &PartialItemName {
3145 database: None,
3146 schema: Some(schema.name().schema.clone()),
3147 item: ty.name.to_string(),
3148 },
3149 &SYSTEM_CONN_ID,
3150 )
3151 .expect("unable to resolve type");
3152 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3153 ty
3154 } else {
3155 panic!("unexpectedly not a type")
3156 };
3157 match ty.details.array_id {
3158 Some(array_id) => {
3159 let array_ty = catalog.get_entry(&array_id);
3160 assert_eq!(
3161 pg_ty.array, array_ty.oid,
3162 "type {} has mismatched array OIDs",
3163 allocated_type.name.item,
3164 );
3165 }
3166 None => assert_eq!(
3167 pg_ty.array, 0,
3168 "type {} does not have an array type in mz but does in pg",
3169 allocated_type.name.item,
3170 ),
3171 }
3172 }
3173 Builtin::Func(func) => {
3174 for imp in func.inner.func_impls() {
3175 assert!(
3176 all_oids.insert(imp.oid),
3177 "{} reused oid {}",
3178 func.name,
3179 imp.oid
3180 );
3181
3182 assert!(
3183 imp.oid < FIRST_USER_OID,
3184 "built-in function {} erroneously has OID in user space ({})",
3185 func.name,
3186 imp.oid,
3187 );
3188
3189 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3192 continue;
3193 } else {
3194 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3195 panic!(
3196 "pg_proc missing function {}: oid {}",
3197 func.name, imp.oid
3198 )
3199 })
3200 };
3201 assert_eq!(
3202 func.name, pg_fn.name,
3203 "funcs with oid {} don't match names: {} in mz, {} in pg",
3204 imp.oid, func.name, pg_fn.name
3205 );
3206
3207 let imp_arg_oids = imp
3210 .arg_typs
3211 .iter()
3212 .map(|item| resolve_type_oid(item))
3213 .collect::<Vec<_>>();
3214
3215 if imp_arg_oids != pg_fn.arg_oids {
3216 println!(
3217 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3218 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3219 );
3220 }
3221
3222 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3223
3224 assert!(
3225 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3226 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3227 imp.oid,
3228 func.name,
3229 imp_return_oid,
3230 pg_fn.ret_oid
3231 );
3232
3233 assert_eq!(
3234 imp.return_is_set, pg_fn.ret_set,
3235 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3236 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3237 );
3238 }
3239 }
3240 _ => (),
3241 }
3242 }
3243
3244 for (op, func) in OP_IMPLS.iter() {
3245 for imp in func.func_impls() {
3246 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3247
3248 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3250 continue;
3251 } else {
3252 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3253 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3254 })
3255 };
3256
3257 assert_eq!(*op, pg_op.name);
3258
3259 let imp_return_oid =
3260 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3261 if imp_return_oid != pg_op.oprresult {
3262 panic!(
3263 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3264 imp.oid, op, imp_return_oid, pg_op.oprresult
3265 );
3266 }
3267 }
3268 }
3269 catalog.expire().await;
3270 }
3271
3272 Catalog::with_debug(inner).await
3273 }
3274
3275 #[mz_ore::test(tokio::test)]
3277 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3279 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3280 let catalog = Arc::new(catalog);
3281 let conn_catalog = catalog.for_system_session();
3282
3283 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3284 let mut handles = Vec::new();
3285
3286 let ignore_names = BTreeSet::from([
3288 "avg",
3289 "avg_internal_v1",
3290 "bool_and",
3291 "bool_or",
3292 "has_table_privilege", "has_type_privilege", "mod",
3295 "mz_panic",
3296 "mz_sleep",
3297 "pow",
3298 "stddev_pop",
3299 "stddev_samp",
3300 "stddev",
3301 "var_pop",
3302 "var_samp",
3303 "variance",
3304 ]);
3305
3306 let fns = BUILTINS::funcs()
3307 .map(|func| (&func.name, func.inner))
3308 .chain(OP_IMPLS.iter());
3309
3310 for (name, func) in fns {
3311 if ignore_names.contains(name) {
3312 continue;
3313 }
3314 let Func::Scalar(impls) = func else {
3315 continue;
3316 };
3317
3318 'outer: for imp in impls {
3319 let details = imp.details();
3320 let mut styps = Vec::new();
3321 for item in details.arg_typs.iter() {
3322 let oid = resolve_type_oid(item);
3323 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3324 continue 'outer;
3325 };
3326 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3327 }
3328 let datums = styps
3329 .iter()
3330 .map(|styp| {
3331 let mut datums = vec![Datum::Null];
3332 datums.extend(styp.interesting_datums());
3333 datums
3334 })
3335 .collect::<Vec<_>>();
3336 if datums.is_empty() {
3338 continue;
3339 }
3340
3341 let return_oid = details
3342 .return_typ
3343 .map(resolve_type_oid)
3344 .expect("must exist");
3345 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3346 .ok()
3347 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3348
3349 let mut idxs = vec![0; datums.len()];
3350 while idxs[0] < datums[0].len() {
3351 let mut args = Vec::with_capacity(idxs.len());
3352 for i in 0..(datums.len()) {
3353 args.push(datums[i][idxs[i]]);
3354 }
3355
3356 let op = &imp.op;
3357 let scalars = args
3358 .iter()
3359 .enumerate()
3360 .map(|(i, datum)| {
3361 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3362 datum.clone(),
3363 styps[i].clone(),
3364 ))
3365 })
3366 .collect();
3367
3368 let call_name = format!(
3369 "{name}({}) (oid: {})",
3370 args.iter()
3371 .map(|d| d.to_string())
3372 .collect::<Vec<_>>()
3373 .join(", "),
3374 imp.oid
3375 );
3376 let catalog = Arc::clone(&catalog);
3377 let call_name_fn = call_name.clone();
3378 let return_styp = return_styp.clone();
3379 let handle = task::spawn_blocking(
3380 || call_name,
3381 move || {
3382 smoketest_fn(
3383 name,
3384 call_name_fn,
3385 op,
3386 imp,
3387 args,
3388 catalog,
3389 scalars,
3390 return_styp,
3391 )
3392 },
3393 );
3394 handles.push(handle);
3395
3396 for i in (0..datums.len()).rev() {
3398 idxs[i] += 1;
3399 if idxs[i] >= datums[i].len() {
3400 if i == 0 {
3401 break;
3402 }
3403 idxs[i] = 0;
3404 continue;
3405 } else {
3406 break;
3407 }
3408 }
3409 }
3410 }
3411 }
3412 handles
3413 }
3414
3415 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3416 for handle in handles {
3417 handle.await;
3418 }
3419 }
3420
3421 fn smoketest_fn(
3422 name: &&str,
3423 call_name: String,
3424 op: &Operation<HirScalarExpr>,
3425 imp: &FuncImpl<HirScalarExpr>,
3426 args: Vec<Datum<'_>>,
3427 catalog: Arc<Catalog>,
3428 scalars: Vec<CoercibleScalarExpr>,
3429 return_styp: Option<SqlScalarType>,
3430 ) {
3431 let conn_catalog = catalog.for_system_session();
3432 let pcx = PlanContext::zero();
3433 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3434 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3435 let ecx = ExprContext {
3436 qcx: &qcx,
3437 name: "smoketest",
3438 scope: &Scope::empty(),
3439 relation_type: &SqlRelationType::empty(),
3440 allow_aggregates: false,
3441 allow_subqueries: false,
3442 allow_parameters: false,
3443 allow_windows: false,
3444 };
3445 let arena = RowArena::new();
3446 let mut session = Session::<Timestamp>::dummy();
3447 session
3448 .start_transaction(to_datetime(0), None, None)
3449 .expect("must succeed");
3450 let prep_style = ExprPrepStyle::OneShot {
3451 logical_time: EvalTime::Time(Timestamp::MIN),
3452 session: &session,
3453 catalog_state: &catalog.state,
3454 };
3455
3456 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3459 if let Ok(hir) = res {
3460 if let Ok(mut mir) = hir.lower_uncorrelated() {
3461 prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3463
3464 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3465 if let Some(return_styp) = return_styp {
3466 let mir_typ = mir.typ(&[]);
3467 assert_eq!(mir_typ.scalar_type, return_styp);
3470 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3474 panic!(
3475 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3476 );
3477 }
3478 if let Some((introduces_nulls, propagates_nulls)) =
3481 call_introduces_propagates_nulls(&mir)
3482 {
3483 if introduces_nulls {
3484 assert!(
3488 mir_typ.nullable,
3489 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3490 name, args, mir, mir_typ.nullable
3491 );
3492 } else {
3493 let any_input_null = args.iter().any(|arg| arg.is_null());
3494 if !any_input_null {
3495 assert!(
3496 !mir_typ.nullable,
3497 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3498 name, args, mir, mir_typ.nullable
3499 );
3500 } else {
3501 assert_eq!(
3502 mir_typ.nullable, propagates_nulls,
3503 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3504 name, args, mir, mir_typ.nullable
3505 );
3506 }
3507 }
3508 }
3509 let mut reduced = mir.clone();
3512 reduced.reduce(&[]);
3513 match reduced {
3514 MirScalarExpr::Literal(reduce_result, ctyp) => {
3515 match reduce_result {
3516 Ok(reduce_result_row) => {
3517 let reduce_result_datum = reduce_result_row.unpack_first();
3518 assert_eq!(
3519 reduce_result_datum,
3520 eval_result_datum,
3521 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3522 name,
3523 args,
3524 mir,
3525 eval_result_datum,
3526 mir_typ.scalar_type,
3527 reduce_result_datum,
3528 ctyp.scalar_type
3529 );
3530 assert_eq!(
3536 ctyp.scalar_type,
3537 mir_typ.scalar_type,
3538 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3539 name,
3540 args,
3541 mir,
3542 eval_result_datum,
3543 mir_typ.scalar_type,
3544 reduce_result_datum,
3545 ctyp.scalar_type
3546 );
3547 }
3548 Err(..) => {} }
3550 }
3551 _ => unreachable!(
3552 "all args are literals, so should have reduced to a literal"
3553 ),
3554 }
3555 }
3556 }
3557 }
3558 }
3559 }
3560
3561 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3566 match mir_func_call {
3567 MirScalarExpr::CallUnary { func, expr } => {
3568 if expr.is_literal() {
3569 Some((func.introduces_nulls(), func.propagates_nulls()))
3570 } else {
3571 None
3572 }
3573 }
3574 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3575 if expr1.is_literal() && expr2.is_literal() {
3576 Some((func.introduces_nulls(), func.propagates_nulls()))
3577 } else {
3578 None
3579 }
3580 }
3581 MirScalarExpr::CallVariadic { func, exprs } => {
3582 if exprs.iter().all(|arg| arg.is_literal()) {
3583 Some((func.introduces_nulls(), func.propagates_nulls()))
3584 } else {
3585 None
3586 }
3587 }
3588 _ => None,
3589 }
3590 }
3591
3592 #[mz_ore::test(tokio::test)]
3594 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3596 Catalog::with_debug(|catalog| async move {
3597 let conn_catalog = catalog.for_system_session();
3598
3599 for view in BUILTINS::views().filter(|view| {
3600 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3601 }) {
3602 let item = conn_catalog
3603 .resolve_item(&PartialItemName {
3604 database: None,
3605 schema: Some(view.schema.to_string()),
3606 item: view.name.to_string(),
3607 })
3608 .expect("unable to resolve view")
3609 .at_version(RelationVersionSelector::Latest);
3611 let full_name = conn_catalog.resolve_full_name(item.name());
3612 let desc = item.relation_desc().expect("invalid item type");
3613 for col_type in desc.iter_types() {
3614 match &col_type.scalar_type {
3615 typ @ SqlScalarType::UInt16
3616 | typ @ SqlScalarType::UInt32
3617 | typ @ SqlScalarType::UInt64
3618 | typ @ SqlScalarType::MzTimestamp
3619 | typ @ SqlScalarType::List { .. }
3620 | typ @ SqlScalarType::Map { .. }
3621 | typ @ SqlScalarType::MzAclItem => {
3622 panic!("{typ:?} type found in {full_name}");
3623 }
3624 SqlScalarType::AclItem
3625 | SqlScalarType::Bool
3626 | SqlScalarType::Int16
3627 | SqlScalarType::Int32
3628 | SqlScalarType::Int64
3629 | SqlScalarType::Float32
3630 | SqlScalarType::Float64
3631 | SqlScalarType::Numeric { .. }
3632 | SqlScalarType::Date
3633 | SqlScalarType::Time
3634 | SqlScalarType::Timestamp { .. }
3635 | SqlScalarType::TimestampTz { .. }
3636 | SqlScalarType::Interval
3637 | SqlScalarType::PgLegacyChar
3638 | SqlScalarType::Bytes
3639 | SqlScalarType::String
3640 | SqlScalarType::Char { .. }
3641 | SqlScalarType::VarChar { .. }
3642 | SqlScalarType::Jsonb
3643 | SqlScalarType::Uuid
3644 | SqlScalarType::Array(_)
3645 | SqlScalarType::Record { .. }
3646 | SqlScalarType::Oid
3647 | SqlScalarType::RegProc
3648 | SqlScalarType::RegType
3649 | SqlScalarType::RegClass
3650 | SqlScalarType::Int2Vector
3651 | SqlScalarType::Range { .. }
3652 | SqlScalarType::PgLegacyName => {}
3653 }
3654 }
3655 }
3656 catalog.expire().await;
3657 })
3658 .await
3659 }
3660
3661 #[mz_ore::test(tokio::test)]
3664 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3666 Catalog::with_debug(|catalog| async move {
3667 let conn_catalog = catalog.for_system_session();
3668
3669 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3670 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3671
3672 for entry in catalog.entries() {
3673 let schema_spec = entry.name().qualifiers.schema_spec;
3674 let introspection_deps = catalog.introspection_dependencies(entry.id);
3675 if introspection_deps.is_empty() {
3676 assert!(
3677 schema_spec != introspection_schema_spec,
3678 "entry does not depend on introspection sources but is in \
3679 `mz_introspection`: {}",
3680 conn_catalog.resolve_full_name(entry.name()),
3681 );
3682 } else {
3683 assert!(
3684 schema_spec == introspection_schema_spec,
3685 "entry depends on introspection sources but is not in \
3686 `mz_introspection`: {}",
3687 conn_catalog.resolve_full_name(entry.name()),
3688 );
3689 }
3690 }
3691 })
3692 .await
3693 }
3694
3695 #[mz_ore::test(tokio::test)]
3696 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3698 let persist_client = PersistClient::new_for_tests().await;
3699 let bootstrap_args = test_bootstrap_args();
3700 let organization_id = Uuid::new_v4();
3701 let db_name = "DB";
3702
3703 let mut writer_catalog = Catalog::open_debug_catalog(
3704 persist_client.clone(),
3705 organization_id.clone(),
3706 &bootstrap_args,
3707 )
3708 .await
3709 .expect("open_debug_catalog");
3710 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3711 persist_client.clone(),
3712 organization_id.clone(),
3713 &bootstrap_args,
3714 )
3715 .await
3716 .expect("open_debug_read_only_catalog");
3717 assert_err!(writer_catalog.resolve_database(db_name));
3718 assert_err!(read_only_catalog.resolve_database(db_name));
3719
3720 let commit_ts = writer_catalog.current_upper().await;
3721 writer_catalog
3722 .transact(
3723 None,
3724 commit_ts,
3725 None,
3726 vec![Op::CreateDatabase {
3727 name: db_name.to_string(),
3728 owner_id: MZ_SYSTEM_ROLE_ID,
3729 }],
3730 )
3731 .await
3732 .expect("failed to transact");
3733
3734 let write_db = writer_catalog
3735 .resolve_database(db_name)
3736 .expect("resolve_database");
3737 read_only_catalog
3738 .sync_to_current_updates()
3739 .await
3740 .expect("sync_to_current_updates");
3741 let read_db = read_only_catalog
3742 .resolve_database(db_name)
3743 .expect("resolve_database");
3744
3745 assert_eq!(write_db, read_db);
3746
3747 let writer_catalog_fencer =
3748 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3749 .await
3750 .expect("open_debug_catalog for fencer");
3751 let fencer_db = writer_catalog_fencer
3752 .resolve_database(db_name)
3753 .expect("resolve_database for fencer");
3754 assert_eq!(fencer_db, read_db);
3755
3756 let write_fence_err = writer_catalog
3757 .sync_to_current_updates()
3758 .await
3759 .expect_err("sync_to_current_updates for fencer");
3760 assert!(matches!(
3761 write_fence_err,
3762 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3763 ));
3764 let read_fence_err = read_only_catalog
3765 .sync_to_current_updates()
3766 .await
3767 .expect_err("sync_to_current_updates after fencer");
3768 assert!(matches!(
3769 read_fence_err,
3770 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3771 ));
3772
3773 writer_catalog.expire().await;
3774 read_only_catalog.expire().await;
3775 writer_catalog_fencer.expire().await;
3776 }
3777}