1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{Portal, PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 plans: CatalogPlans,
140 expr_cache_handle: Option<ExpressionCacheHandle>,
141 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142 transient_revision: u64,
143}
144
145impl Clone for Catalog {
148 fn clone(&self) -> Self {
149 Self {
150 state: self.state.clone(),
151 plans: self.plans.clone(),
152 expr_cache_handle: self.expr_cache_handle.clone(),
153 storage: Arc::clone(&self.storage),
154 transient_revision: self.transient_revision,
155 }
156 }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168 #[mz_ore::instrument(level = "trace")]
170 pub fn set_optimized_plan(
171 &mut self,
172 id: GlobalId,
173 plan: DataflowDescription<OptimizedMirRelationExpr>,
174 ) {
175 self.plans.optimized_plan_by_id.insert(id, plan.into());
176 }
177
178 #[mz_ore::instrument(level = "trace")]
180 pub fn set_physical_plan(
181 &mut self,
182 id: GlobalId,
183 plan: DataflowDescription<mz_compute_types::plan::Plan>,
184 ) {
185 self.plans.physical_plan_by_id.insert(id, plan.into());
186 }
187
188 #[mz_ore::instrument(level = "trace")]
190 pub fn try_get_optimized_plan(
191 &self,
192 id: &GlobalId,
193 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195 }
196
197 #[mz_ore::instrument(level = "trace")]
199 pub fn try_get_physical_plan(
200 &self,
201 id: &GlobalId,
202 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
208 pub fn set_dataflow_metainfo(
209 &mut self,
210 id: GlobalId,
211 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212 ) {
213 for notice in metainfo.optimizer_notices.iter() {
215 for dep_id in notice.dependencies.iter() {
216 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217 entry.push(Arc::clone(notice))
218 }
219 if let Some(item_id) = notice.item_id {
220 soft_assert_eq_or_log!(
221 item_id,
222 id,
223 "notice.item_id should match the id for whom we are saving the notice"
224 );
225 }
226 }
227 self.plans.dataflow_metainfos.insert(id, metainfo);
229 }
230
231 #[mz_ore::instrument(level = "trace")]
233 pub fn try_get_dataflow_metainfo(
234 &self,
235 id: &GlobalId,
236 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237 self.plans.dataflow_metainfos.get(id)
238 }
239
240 #[mz_ore::instrument(level = "trace")]
249 pub fn drop_plans_and_metainfos(
250 &mut self,
251 drop_ids: &BTreeSet<GlobalId>,
252 ) -> BTreeSet<Arc<OptimizerNotice>> {
253 let mut dropped_notices = BTreeSet::new();
255
256 for id in drop_ids {
258 self.plans.optimized_plan_by_id.remove(id);
259 self.plans.physical_plan_by_id.remove(id);
260 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261 soft_assert_or_log!(
262 metainfo.optimizer_notices.iter().all_unique(),
263 "should have been pushed there by `push_optimizer_notice_dedup`"
264 );
265 for n in metainfo.optimizer_notices.drain(..) {
266 for dep_id in n.dependencies.iter() {
268 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269 soft_assert_or_log!(
270 notices.iter().any(|x| &n == x),
271 "corrupt notices_by_dep_id"
272 );
273 notices.retain(|x| &n != x)
274 }
275 }
276 dropped_notices.insert(n);
277 }
278 }
279 }
280
281 for id in drop_ids {
283 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284 for n in notices.drain(..) {
285 if let Some(item_id) = n.item_id.as_ref() {
287 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288 metainfo.optimizer_notices.iter().for_each(|n2| {
289 if let Some(item_id_2) = n2.item_id {
290 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291 }
292 });
293 metainfo.optimizer_notices.retain(|x| &n != x);
294 }
295 }
296 dropped_notices.insert(n);
297 }
298 }
299 }
300
301 let mut todo_dep_ids = BTreeSet::new();
304 for notice in dropped_notices.iter() {
305 for dep_id in notice.dependencies.iter() {
306 if !drop_ids.contains(dep_id) {
307 todo_dep_ids.insert(*dep_id);
308 }
309 }
310 }
311 for id in todo_dep_ids {
314 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315 notices.retain(|n| !dropped_notices.contains(n))
316 }
317 }
318
319 if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
320 use mz_ore::str::{bracketed, separated};
321 let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
322 let bad_notices = bad_notices.map(|n| {
323 let mut dataflow_metainfo_occurrences = Vec::new();
326 for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
327 if meta_info.optimizer_notices.contains(n) {
328 dataflow_metainfo_occurrences.push(id);
329 }
330 }
331 let mut notices_by_dep_id_occurrences = Vec::new();
333 for (id, notices) in self.plans.notices_by_dep_id.iter() {
334 if notices.iter().contains(n) {
335 notices_by_dep_id_occurrences.push(id);
336 }
337 }
338 format!(
339 "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
340 dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
341 n.id,
342 n.kind,
343 n.dependencies,
344 Arc::strong_count(n),
345 dataflow_metainfo_occurrences,
346 notices_by_dep_id_occurrences
347 )
348 });
349 let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
350 error!(
351 "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
352 bad_notices = {bad_notices}; \
353 drop_ids = {drop_ids:?}"
354 );
355 }
356
357 dropped_notices
358 }
359
360 pub(crate) fn invalidate_for_index(
371 &self,
372 ons: impl Iterator<Item = GlobalId>,
373 ) -> BTreeSet<GlobalId> {
374 let mut dependencies = BTreeSet::new();
375 let mut queue = VecDeque::new();
376 let mut seen = HashSet::new();
377 for on in ons {
378 let entry = self.get_entry_by_global_id(&on);
379 dependencies.insert(on);
380 seen.insert(entry.id);
381 let uses = entry.uses();
382 queue.extend(uses.clone());
383 }
384
385 while let Some(cur) = queue.pop_front() {
386 let entry = self.get_entry(&cur);
387 if seen.insert(cur) {
388 let global_ids = entry.global_ids();
389 match entry.item_type() {
390 CatalogItemType::Table
391 | CatalogItemType::Source
392 | CatalogItemType::MaterializedView
393 | CatalogItemType::Sink
394 | CatalogItemType::Index
395 | CatalogItemType::Type
396 | CatalogItemType::Func
397 | CatalogItemType::Secret
398 | CatalogItemType::Connection
399 | CatalogItemType::ContinualTask => {
400 dependencies.extend(global_ids);
401 }
402 CatalogItemType::View => {
403 dependencies.extend(global_ids);
404 queue.extend(entry.uses());
405 }
406 }
407 }
408 }
409 dependencies
410 }
411}
412
413#[derive(Debug)]
414pub struct ConnCatalog<'a> {
415 state: Cow<'a, CatalogState>,
416 unresolvable_ids: BTreeSet<CatalogItemId>,
426 conn_id: ConnectionId,
427 cluster: String,
428 database: Option<DatabaseId>,
429 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
430 role_id: RoleId,
431 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
432 portals: Option<&'a BTreeMap<String, Portal>>,
433 notices_tx: UnboundedSender<AdapterNotice>,
434}
435
436impl ConnCatalog<'_> {
437 pub fn conn_id(&self) -> &ConnectionId {
438 &self.conn_id
439 }
440
441 pub fn state(&self) -> &CatalogState {
442 &*self.state
443 }
444
445 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
455 assert_eq!(
456 self.role_id, MZ_SYSTEM_ROLE_ID,
457 "only the system role can mark IDs unresolvable",
458 );
459 self.unresolvable_ids.insert(id);
460 }
461
462 pub fn effective_search_path(
468 &self,
469 include_temp_schema: bool,
470 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
471 self.state
472 .effective_search_path(&self.search_path, include_temp_schema)
473 }
474}
475
476impl ConnectionResolver for ConnCatalog<'_> {
477 fn resolve_connection(
478 &self,
479 id: CatalogItemId,
480 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
481 self.state().resolve_connection(id)
482 }
483}
484
485impl Catalog {
486 pub fn transient_revision(&self) -> u64 {
490 self.transient_revision
491 }
492
493 pub async fn with_debug<F, Fut, T>(f: F) -> T
505 where
506 F: FnOnce(Catalog) -> Fut,
507 Fut: Future<Output = T>,
508 {
509 let persist_client = PersistClient::new_for_tests().await;
510 let organization_id = Uuid::new_v4();
511 let bootstrap_args = test_bootstrap_args();
512 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
513 .await
514 .expect("can open debug catalog");
515 f(catalog).await
516 }
517
518 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
521 where
522 F: FnOnce(Catalog) -> Fut,
523 Fut: Future<Output = T>,
524 {
525 let persist_client = PersistClient::new_for_tests().await;
526 let organization_id = Uuid::new_v4();
527 let bootstrap_args = test_bootstrap_args();
528 let mut catalog =
529 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
530 .await
531 .expect("can open debug catalog");
532
533 let now = SYSTEM_TIME.clone();
535 let openable_storage = TestCatalogStateBuilder::new(persist_client)
536 .with_organization_id(organization_id)
537 .with_default_deploy_generation()
538 .build()
539 .await
540 .expect("can create durable catalog");
541 let mut storage = openable_storage
542 .open(now().into(), &bootstrap_args)
543 .await
544 .expect("can open durable catalog")
545 .0;
546 let _ = storage
548 .sync_to_current_updates()
549 .await
550 .expect("can sync to current updates");
551 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
552
553 f(catalog).await
554 }
555
556 pub async fn open_debug_catalog(
560 persist_client: PersistClient,
561 organization_id: Uuid,
562 bootstrap_args: &BootstrapArgs,
563 ) -> Result<Catalog, anyhow::Error> {
564 let now = SYSTEM_TIME.clone();
565 let environment_id = None;
566 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
567 .with_organization_id(organization_id)
568 .with_default_deploy_generation()
569 .build()
570 .await?;
571 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
572 let system_parameter_defaults = BTreeMap::default();
573 Self::open_debug_catalog_inner(
574 persist_client,
575 storage,
576 now,
577 environment_id,
578 &DUMMY_BUILD_INFO,
579 system_parameter_defaults,
580 bootstrap_args,
581 None,
582 )
583 .await
584 }
585
586 pub async fn open_debug_read_only_catalog(
591 persist_client: PersistClient,
592 organization_id: Uuid,
593 bootstrap_args: &BootstrapArgs,
594 ) -> Result<Catalog, anyhow::Error> {
595 let now = SYSTEM_TIME.clone();
596 let environment_id = None;
597 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
598 .with_organization_id(organization_id)
599 .build()
600 .await?;
601 let storage = openable_storage
602 .open_read_only(&test_bootstrap_args())
603 .await?;
604 let system_parameter_defaults = BTreeMap::default();
605 Self::open_debug_catalog_inner(
606 persist_client,
607 storage,
608 now,
609 environment_id,
610 &DUMMY_BUILD_INFO,
611 system_parameter_defaults,
612 bootstrap_args,
613 None,
614 )
615 .await
616 }
617
618 pub async fn open_debug_read_only_persist_catalog_config(
623 persist_client: PersistClient,
624 now: NowFn,
625 environment_id: EnvironmentId,
626 system_parameter_defaults: BTreeMap<String, String>,
627 build_info: &'static BuildInfo,
628 bootstrap_args: &BootstrapArgs,
629 enable_expression_cache_override: Option<bool>,
630 ) -> Result<Catalog, anyhow::Error> {
631 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
632 .with_organization_id(environment_id.organization_id())
633 .with_version(
634 build_info
635 .version
636 .parse()
637 .expect("build version is parseable"),
638 )
639 .build()
640 .await?;
641 let storage = openable_storage.open_read_only(bootstrap_args).await?;
642 Self::open_debug_catalog_inner(
643 persist_client,
644 storage,
645 now,
646 Some(environment_id),
647 build_info,
648 system_parameter_defaults,
649 bootstrap_args,
650 enable_expression_cache_override,
651 )
652 .await
653 }
654
655 async fn open_debug_catalog_inner(
656 persist_client: PersistClient,
657 storage: Box<dyn DurableCatalogState>,
658 now: NowFn,
659 environment_id: Option<EnvironmentId>,
660 build_info: &'static BuildInfo,
661 system_parameter_defaults: BTreeMap<String, String>,
662 bootstrap_args: &BootstrapArgs,
663 enable_expression_cache_override: Option<bool>,
664 ) -> Result<Catalog, anyhow::Error> {
665 let metrics_registry = &MetricsRegistry::new();
666 let secrets_reader = Arc::new(InMemorySecretsController::new());
667 let previous_ts = now().into();
670 let replica_size = &bootstrap_args.default_cluster_replica_size;
671 let read_only = false;
672
673 let OpenCatalogResult {
674 catalog,
675 migrated_storage_collections_0dt: _,
676 new_builtin_collections: _,
677 builtin_table_updates: _,
678 cached_global_exprs: _,
679 uncached_local_exprs: _,
680 } = Catalog::open(Config {
681 storage,
682 metrics_registry,
683 state: StateConfig {
684 unsafe_mode: true,
685 all_features: false,
686 build_info,
687 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
688 read_only,
689 now,
690 boot_ts: previous_ts,
691 skip_migrations: true,
692 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
693 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
694 size: replica_size.clone(),
695 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
696 },
697 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
698 size: replica_size.clone(),
699 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
700 },
701 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
702 size: replica_size.clone(),
703 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
704 },
705 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
706 size: replica_size.clone(),
707 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
708 },
709 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
710 size: replica_size.clone(),
711 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
712 },
713 system_parameter_defaults,
714 remote_system_parameters: None,
715 availability_zones: vec![],
716 egress_addresses: vec![],
717 aws_principal_context: None,
718 aws_privatelink_availability_zones: None,
719 http_host_name: None,
720 connection_context: ConnectionContext::for_tests(secrets_reader),
721 builtin_item_migration_config: BuiltinItemMigrationConfig {
722 persist_client: persist_client.clone(),
723 read_only,
724 },
725 persist_client,
726 enable_expression_cache_override,
727 helm_chart_version: None,
728 external_login_password_mz_system: None,
729 license_key: ValidatedLicenseKey::for_tests(),
730 },
731 })
732 .await?;
733 Ok(catalog)
734 }
735
736 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
737 self.state.for_session(session)
738 }
739
740 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
741 self.state.for_sessionless_user(role_id)
742 }
743
744 pub fn for_system_session(&self) -> ConnCatalog<'_> {
745 self.state.for_system_session()
746 }
747
748 async fn storage<'a>(
749 &'a self,
750 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
751 self.storage.lock().await
752 }
753
754 pub async fn current_upper(&self) -> mz_repr::Timestamp {
755 self.storage().await.current_upper().await
756 }
757
758 pub async fn allocate_user_id(
759 &self,
760 commit_ts: mz_repr::Timestamp,
761 ) -> Result<(CatalogItemId, GlobalId), Error> {
762 self.storage()
763 .await
764 .allocate_user_id(commit_ts)
765 .await
766 .maybe_terminate("allocating user ids")
767 .err_into()
768 }
769
770 pub async fn allocate_user_ids(
772 &self,
773 amount: u64,
774 commit_ts: mz_repr::Timestamp,
775 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
776 self.storage()
777 .await
778 .allocate_user_ids(amount, commit_ts)
779 .await
780 .maybe_terminate("allocating user ids")
781 .err_into()
782 }
783
784 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
785 let commit_ts = self.storage().await.current_upper().await;
786 self.allocate_user_id(commit_ts).await
787 }
788
789 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
791 self.storage()
792 .await
793 .get_next_user_item_id()
794 .await
795 .err_into()
796 }
797
798 #[cfg(test)]
799 pub async fn allocate_system_id(
800 &self,
801 commit_ts: mz_repr::Timestamp,
802 ) -> Result<(CatalogItemId, GlobalId), Error> {
803 use mz_ore::collections::CollectionExt;
804
805 let mut storage = self.storage().await;
806 let mut txn = storage.transaction().await?;
807 let id = txn
808 .allocate_system_item_ids(1)
809 .maybe_terminate("allocating system ids")?
810 .into_element();
811 let _ = txn.get_and_commit_op_updates();
813 txn.commit(commit_ts).await?;
814 Ok(id)
815 }
816
817 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
819 self.storage()
820 .await
821 .get_next_system_item_id()
822 .await
823 .err_into()
824 }
825
826 pub async fn allocate_user_cluster_id(
827 &self,
828 commit_ts: mz_repr::Timestamp,
829 ) -> Result<ClusterId, Error> {
830 self.storage()
831 .await
832 .allocate_user_cluster_id(commit_ts)
833 .await
834 .maybe_terminate("allocating user cluster ids")
835 .err_into()
836 }
837
838 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
840 self.storage()
841 .await
842 .get_next_system_replica_id()
843 .await
844 .err_into()
845 }
846
847 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
849 self.storage()
850 .await
851 .get_next_user_replica_id()
852 .await
853 .err_into()
854 }
855
856 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
857 self.state.resolve_database(database_name)
858 }
859
860 pub fn resolve_schema(
861 &self,
862 current_database: Option<&DatabaseId>,
863 database_name: Option<&str>,
864 schema_name: &str,
865 conn_id: &ConnectionId,
866 ) -> Result<&Schema, SqlCatalogError> {
867 self.state
868 .resolve_schema(current_database, database_name, schema_name, conn_id)
869 }
870
871 pub fn resolve_schema_in_database(
872 &self,
873 database_spec: &ResolvedDatabaseSpecifier,
874 schema_name: &str,
875 conn_id: &ConnectionId,
876 ) -> Result<&Schema, SqlCatalogError> {
877 self.state
878 .resolve_schema_in_database(database_spec, schema_name, conn_id)
879 }
880
881 pub fn resolve_replica_in_cluster(
882 &self,
883 cluster_id: &ClusterId,
884 replica_name: &str,
885 ) -> Result<&ClusterReplica, SqlCatalogError> {
886 self.state
887 .resolve_replica_in_cluster(cluster_id, replica_name)
888 }
889
890 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
891 self.state.resolve_system_schema(name)
892 }
893
894 pub fn resolve_search_path(
895 &self,
896 session: &Session,
897 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
898 self.state.resolve_search_path(session)
899 }
900
901 pub fn resolve_entry(
903 &self,
904 current_database: Option<&DatabaseId>,
905 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
906 name: &PartialItemName,
907 conn_id: &ConnectionId,
908 ) -> Result<&CatalogEntry, SqlCatalogError> {
909 self.state
910 .resolve_entry(current_database, search_path, name, conn_id)
911 }
912
913 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
915 self.state.resolve_builtin_table(builtin)
916 }
917
918 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
920 self.state.resolve_builtin_log(builtin).0
921 }
922
923 pub fn resolve_builtin_storage_collection(
925 &self,
926 builtin: &'static BuiltinSource,
927 ) -> CatalogItemId {
928 self.state.resolve_builtin_source(builtin)
929 }
930
931 pub fn resolve_function(
933 &self,
934 current_database: Option<&DatabaseId>,
935 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
936 name: &PartialItemName,
937 conn_id: &ConnectionId,
938 ) -> Result<&CatalogEntry, SqlCatalogError> {
939 self.state
940 .resolve_function(current_database, search_path, name, conn_id)
941 }
942
943 pub fn resolve_type(
945 &self,
946 current_database: Option<&DatabaseId>,
947 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
948 name: &PartialItemName,
949 conn_id: &ConnectionId,
950 ) -> Result<&CatalogEntry, SqlCatalogError> {
951 self.state
952 .resolve_type(current_database, search_path, name, conn_id)
953 }
954
955 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
956 self.state.resolve_cluster(name)
957 }
958
959 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
965 self.state.resolve_builtin_cluster(cluster)
966 }
967
968 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
969 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
970 }
971
972 pub fn resolve_target_cluster(
974 &self,
975 target_cluster: TargetCluster,
976 session: &Session,
977 ) -> Result<&Cluster, AdapterError> {
978 match target_cluster {
979 TargetCluster::CatalogServer => {
980 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
981 }
982 TargetCluster::Active => self.active_cluster(session),
983 TargetCluster::Transaction(cluster_id) => self
984 .try_get_cluster(cluster_id)
985 .ok_or(AdapterError::ConcurrentClusterDrop),
986 }
987 }
988
989 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
990 if session.user().name != SYSTEM_USER.name
994 && session.user().name != SUPPORT_USER.name
995 && session.vars().cluster() == SYSTEM_USER.name
996 {
997 coord_bail!(
998 "system cluster '{}' cannot execute user queries",
999 SYSTEM_USER.name
1000 );
1001 }
1002 let cluster = self.resolve_cluster(session.vars().cluster())?;
1003 Ok(cluster)
1004 }
1005
1006 pub fn state(&self) -> &CatalogState {
1007 &self.state
1008 }
1009
1010 pub fn resolve_full_name(
1011 &self,
1012 name: &QualifiedItemName,
1013 conn_id: Option<&ConnectionId>,
1014 ) -> FullItemName {
1015 self.state.resolve_full_name(name, conn_id)
1016 }
1017
1018 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1019 self.state.try_get_entry(id)
1020 }
1021
1022 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1023 self.state.try_get_entry_by_global_id(id)
1024 }
1025
1026 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1027 self.state.get_entry(id)
1028 }
1029
1030 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1031 self.state.get_entry_by_global_id(id)
1032 }
1033
1034 pub fn get_global_ids<'a>(
1035 &'a self,
1036 id: &CatalogItemId,
1037 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1038 self.get_entry(id).global_ids()
1039 }
1040
1041 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1042 self.get_entry_by_global_id(id).id()
1043 }
1044
1045 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1046 let item = self.try_get_entry_by_global_id(id)?;
1047 Some(item.id())
1048 }
1049
1050 pub fn get_schema(
1051 &self,
1052 database_spec: &ResolvedDatabaseSpecifier,
1053 schema_spec: &SchemaSpecifier,
1054 conn_id: &ConnectionId,
1055 ) -> &Schema {
1056 self.state.get_schema(database_spec, schema_spec, conn_id)
1057 }
1058
1059 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1060 self.state.get_mz_catalog_schema_id()
1061 }
1062
1063 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1064 self.state.get_pg_catalog_schema_id()
1065 }
1066
1067 pub fn get_information_schema_id(&self) -> SchemaId {
1068 self.state.get_information_schema_id()
1069 }
1070
1071 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1072 self.state.get_mz_internal_schema_id()
1073 }
1074
1075 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1076 self.state.get_mz_introspection_schema_id()
1077 }
1078
1079 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1080 self.state.get_mz_unsafe_schema_id()
1081 }
1082
1083 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1084 self.state.system_schema_ids()
1085 }
1086
1087 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1088 self.state.get_database(id)
1089 }
1090
1091 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1092 self.state.try_get_role(id)
1093 }
1094
1095 pub fn get_role(&self, id: &RoleId) -> &Role {
1096 self.state.get_role(id)
1097 }
1098
1099 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1100 self.state.try_get_role_by_name(role_name)
1101 }
1102
1103 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1104 self.state.try_get_role_auth_by_id(id)
1105 }
1106
1107 pub fn create_temporary_schema(
1110 &mut self,
1111 conn_id: &ConnectionId,
1112 owner_id: RoleId,
1113 ) -> Result<(), Error> {
1114 self.state.create_temporary_schema(conn_id, owner_id)
1115 }
1116
1117 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1118 self.state.temporary_schemas[conn_id]
1119 .items
1120 .contains_key(item_name)
1121 }
1122
1123 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1126 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1127 return Ok(());
1128 };
1129 if !schema.items.is_empty() {
1130 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1131 }
1132 Ok(())
1133 }
1134
1135 pub(crate) fn object_dependents(
1136 &self,
1137 object_ids: &Vec<ObjectId>,
1138 conn_id: &ConnectionId,
1139 ) -> Vec<ObjectId> {
1140 let mut seen = BTreeSet::new();
1141 self.state.object_dependents(object_ids, conn_id, &mut seen)
1142 }
1143
1144 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1145 FullNameV1 {
1146 database: name.database.to_string(),
1147 schema: name.schema.clone(),
1148 item: name.item.clone(),
1149 }
1150 }
1151
1152 pub fn find_available_cluster_name(&self, name: &str) -> String {
1153 let mut i = 0;
1154 let mut candidate = name.to_string();
1155 while self.state.clusters_by_name.contains_key(&candidate) {
1156 i += 1;
1157 candidate = format!("{}{}", name, i);
1158 }
1159 candidate
1160 }
1161
1162 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1163 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1164 self.cluster_replica_sizes()
1165 .enabled_allocations()
1166 .map(|a| a.0.to_owned())
1167 .collect::<Vec<_>>()
1168 } else {
1169 self.system_config().allowed_cluster_replica_sizes()
1170 }
1171 }
1172
1173 pub fn concretize_replica_location(
1174 &self,
1175 location: mz_catalog::durable::ReplicaLocation,
1176 allowed_sizes: &Vec<String>,
1177 allowed_availability_zones: Option<&[String]>,
1178 ) -> Result<ReplicaLocation, Error> {
1179 self.state
1180 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1181 }
1182
1183 pub(crate) fn ensure_valid_replica_size(
1184 &self,
1185 allowed_sizes: &[String],
1186 size: &String,
1187 ) -> Result<(), Error> {
1188 self.state.ensure_valid_replica_size(allowed_sizes, size)
1189 }
1190
1191 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1192 &self.state.cluster_replica_sizes
1193 }
1194
1195 pub fn get_privileges(
1197 &self,
1198 id: &SystemObjectId,
1199 conn_id: &ConnectionId,
1200 ) -> Option<&PrivilegeMap> {
1201 match id {
1202 SystemObjectId::Object(id) => match id {
1203 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1204 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1205 ObjectId::Schema((database_spec, schema_spec)) => Some(
1206 self.get_schema(database_spec, schema_spec, conn_id)
1207 .privileges(),
1208 ),
1209 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1210 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1211 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1212 },
1213 SystemObjectId::System => Some(&self.state.system_privileges),
1214 }
1215 }
1216
1217 #[mz_ore::instrument(level = "debug")]
1218 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1219 Ok(self.storage().await.confirm_leadership().await?)
1220 }
1221
1222 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1224 self.state.introspection_dependencies(id)
1225 }
1226
1227 pub fn dump(&self) -> Result<CatalogDump, Error> {
1233 Ok(CatalogDump::new(self.state.dump(None)?))
1234 }
1235
1236 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1240 self.state.check_consistency().map_err(|inconsistencies| {
1241 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1242 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1243 })
1244 })
1245 }
1246
1247 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1248 self.state.config()
1249 }
1250
1251 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1252 self.state.entry_by_id.values()
1253 }
1254
1255 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1256 self.entries()
1257 .filter(|entry| entry.is_connection() && entry.id().is_user())
1258 }
1259
1260 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1261 self.entries()
1262 .filter(|entry| entry.is_table() && entry.id().is_user())
1263 }
1264
1265 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1266 self.entries()
1267 .filter(|entry| entry.is_source() && entry.id().is_user())
1268 }
1269
1270 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1271 self.entries()
1272 .filter(|entry| entry.is_sink() && entry.id().is_user())
1273 }
1274
1275 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1276 self.entries()
1277 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1278 }
1279
1280 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1281 self.entries()
1282 .filter(|entry| entry.is_secret() && entry.id().is_user())
1283 }
1284
1285 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1286 self.state.get_network_policy(&network_policy_id)
1287 }
1288
1289 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1290 self.state.try_get_network_policy_by_name(name)
1291 }
1292
1293 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1294 self.state.clusters_by_id.values()
1295 }
1296
1297 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1298 self.state.get_cluster(cluster_id)
1299 }
1300
1301 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1302 self.state.try_get_cluster(cluster_id)
1303 }
1304
1305 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1306 self.clusters().filter(|cluster| cluster.id.is_user())
1307 }
1308
1309 pub fn get_cluster_replica(
1310 &self,
1311 cluster_id: ClusterId,
1312 replica_id: ReplicaId,
1313 ) -> &ClusterReplica {
1314 self.state.get_cluster_replica(cluster_id, replica_id)
1315 }
1316
1317 pub fn try_get_cluster_replica(
1318 &self,
1319 cluster_id: ClusterId,
1320 replica_id: ReplicaId,
1321 ) -> Option<&ClusterReplica> {
1322 self.state.try_get_cluster_replica(cluster_id, replica_id)
1323 }
1324
1325 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1326 self.user_clusters()
1327 .flat_map(|cluster| cluster.user_replicas())
1328 }
1329
1330 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1331 self.state.database_by_id.values()
1332 }
1333
1334 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1335 self.state
1336 .roles_by_id
1337 .values()
1338 .filter(|role| role.is_user())
1339 }
1340
1341 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1342 self.entries()
1343 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1344 }
1345
1346 pub fn system_privileges(&self) -> &PrivilegeMap {
1347 &self.state.system_privileges
1348 }
1349
1350 pub fn default_privileges(
1351 &self,
1352 ) -> impl Iterator<
1353 Item = (
1354 &DefaultPrivilegeObject,
1355 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1356 ),
1357 > {
1358 self.state.default_privileges.iter()
1359 }
1360
1361 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1362 self.state
1363 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1364 }
1365
1366 pub fn pack_storage_usage_update(
1367 &self,
1368 event: VersionedStorageUsage,
1369 diff: Diff,
1370 ) -> BuiltinTableUpdate {
1371 self.state
1372 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1373 }
1374
1375 pub fn system_config(&self) -> &SystemVars {
1376 self.state.system_config()
1377 }
1378
1379 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1380 self.state.system_config_mut()
1381 }
1382
1383 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1384 self.state.ensure_not_reserved_role(role_id)
1385 }
1386
1387 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1388 self.state.ensure_grantable_role(role_id)
1389 }
1390
1391 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1392 self.state.ensure_not_system_role(role_id)
1393 }
1394
1395 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1396 self.state.ensure_not_predefined_role(role_id)
1397 }
1398
1399 pub fn ensure_not_reserved_network_policy(
1400 &self,
1401 network_policy_id: &NetworkPolicyId,
1402 ) -> Result<(), Error> {
1403 self.state
1404 .ensure_not_reserved_network_policy(network_policy_id)
1405 }
1406
1407 pub fn ensure_not_reserved_object(
1408 &self,
1409 object_id: &ObjectId,
1410 conn_id: &ConnectionId,
1411 ) -> Result<(), Error> {
1412 match object_id {
1413 ObjectId::Cluster(cluster_id) => {
1414 if cluster_id.is_system() {
1415 let cluster = self.get_cluster(*cluster_id);
1416 Err(Error::new(ErrorKind::ReadOnlyCluster(
1417 cluster.name().to_string(),
1418 )))
1419 } else {
1420 Ok(())
1421 }
1422 }
1423 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1424 if replica_id.is_system() {
1425 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1426 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1427 replica.name().to_string(),
1428 )))
1429 } else {
1430 Ok(())
1431 }
1432 }
1433 ObjectId::Database(database_id) => {
1434 if database_id.is_system() {
1435 let database = self.get_database(database_id);
1436 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1437 database.name().to_string(),
1438 )))
1439 } else {
1440 Ok(())
1441 }
1442 }
1443 ObjectId::Schema((database_spec, schema_spec)) => {
1444 if schema_spec.is_system() {
1445 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1446 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1447 schema.name().schema.clone(),
1448 )))
1449 } else {
1450 Ok(())
1451 }
1452 }
1453 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1454 ObjectId::Item(item_id) => {
1455 if item_id.is_system() {
1456 let item = self.get_entry(item_id);
1457 let name = self.resolve_full_name(item.name(), Some(conn_id));
1458 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1459 } else {
1460 Ok(())
1461 }
1462 }
1463 ObjectId::NetworkPolicy(network_policy_id) => {
1464 self.ensure_not_reserved_network_policy(network_policy_id)
1465 }
1466 }
1467 }
1468
1469 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1471 &mut self,
1472 create_sql: &str,
1473 force_if_exists_skip: bool,
1474 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1475 self.state
1476 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1477 }
1478
1479 pub(crate) fn update_expression_cache<'a, 'b>(
1480 &'a self,
1481 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1482 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1483 ) -> BoxFuture<'b, ()> {
1484 if let Some(expr_cache) = &self.expr_cache_handle {
1485 let ons = new_local_expressions
1486 .iter()
1487 .map(|(id, _)| id)
1488 .chain(new_global_expressions.iter().map(|(id, _)| id))
1489 .map(|id| self.get_entry_by_global_id(id))
1490 .filter_map(|entry| entry.index().map(|index| index.on));
1491 let invalidate_ids = self.invalidate_for_index(ons);
1492 expr_cache
1493 .update(
1494 new_local_expressions,
1495 new_global_expressions,
1496 invalidate_ids,
1497 )
1498 .boxed()
1499 } else {
1500 async {}.boxed()
1501 }
1502 }
1503
1504 #[cfg(test)]
1508 async fn sync_to_current_updates(
1509 &mut self,
1510 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1511 let updates = self.storage().await.sync_to_current_updates().await?;
1512 let builtin_table_updates = self.state.apply_updates(updates)?;
1513 Ok(builtin_table_updates)
1514 }
1515}
1516
1517pub fn is_reserved_name(name: &str) -> bool {
1518 BUILTIN_PREFIXES
1519 .iter()
1520 .any(|prefix| name.starts_with(prefix))
1521}
1522
1523pub fn is_reserved_role_name(name: &str) -> bool {
1524 is_reserved_name(name) || is_public_role(name)
1525}
1526
1527pub fn is_public_role(name: &str) -> bool {
1528 name == &*PUBLIC_ROLE_NAME
1529}
1530
1531pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1532 object_type_to_audit_object_type(sql_type.into())
1533}
1534
1535pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1536 match id {
1537 CommentObjectId::Table(_) => ObjectType::Table,
1538 CommentObjectId::View(_) => ObjectType::View,
1539 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1540 CommentObjectId::Source(_) => ObjectType::Source,
1541 CommentObjectId::Sink(_) => ObjectType::Sink,
1542 CommentObjectId::Index(_) => ObjectType::Index,
1543 CommentObjectId::Func(_) => ObjectType::Func,
1544 CommentObjectId::Connection(_) => ObjectType::Connection,
1545 CommentObjectId::Type(_) => ObjectType::Type,
1546 CommentObjectId::Secret(_) => ObjectType::Secret,
1547 CommentObjectId::Role(_) => ObjectType::Role,
1548 CommentObjectId::Database(_) => ObjectType::Database,
1549 CommentObjectId::Schema(_) => ObjectType::Schema,
1550 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1551 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1552 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1553 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1554 }
1555}
1556
1557pub(crate) fn object_type_to_audit_object_type(
1558 object_type: mz_sql::catalog::ObjectType,
1559) -> ObjectType {
1560 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1561}
1562
1563pub(crate) fn system_object_type_to_audit_object_type(
1564 system_type: &SystemObjectType,
1565) -> ObjectType {
1566 match system_type {
1567 SystemObjectType::Object(object_type) => match object_type {
1568 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1569 mz_sql::catalog::ObjectType::View => ObjectType::View,
1570 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1571 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1572 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1573 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1574 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1575 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1576 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1577 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1578 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1579 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1580 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1581 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1582 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1583 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1584 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1585 },
1586 SystemObjectType::System => ObjectType::System,
1587 }
1588}
1589
1590#[derive(Debug, Copy, Clone)]
1591pub enum UpdatePrivilegeVariant {
1592 Grant,
1593 Revoke,
1594}
1595
1596impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1597 fn from(variant: UpdatePrivilegeVariant) -> Self {
1598 match variant {
1599 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1600 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1601 }
1602 }
1603}
1604
1605impl From<UpdatePrivilegeVariant> for EventType {
1606 fn from(variant: UpdatePrivilegeVariant) -> Self {
1607 match variant {
1608 UpdatePrivilegeVariant::Grant => EventType::Grant,
1609 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1610 }
1611 }
1612}
1613
1614impl ConnCatalog<'_> {
1615 fn resolve_item_name(
1616 &self,
1617 name: &PartialItemName,
1618 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1619 self.resolve_item(name).map(|entry| entry.name())
1620 }
1621
1622 fn resolve_function_name(
1623 &self,
1624 name: &PartialItemName,
1625 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1626 self.resolve_function(name).map(|entry| entry.name())
1627 }
1628
1629 fn resolve_type_name(
1630 &self,
1631 name: &PartialItemName,
1632 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1633 self.resolve_type(name).map(|entry| entry.name())
1634 }
1635}
1636
1637impl ExprHumanizer for ConnCatalog<'_> {
1638 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1639 let entry = self.state.try_get_entry_by_global_id(&id)?;
1640 Some(self.resolve_full_name(entry.name()).to_string())
1641 }
1642
1643 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1644 let entry = self.state.try_get_entry_by_global_id(&id)?;
1645 Some(entry.name().item.clone())
1646 }
1647
1648 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1649 let entry = self.state.try_get_entry_by_global_id(&id)?;
1650 Some(self.resolve_full_name(entry.name()).into_parts())
1651 }
1652
1653 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1654 use SqlScalarType::*;
1655
1656 match typ {
1657 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1658 List {
1659 custom_id: Some(item_id),
1660 ..
1661 }
1662 | Map {
1663 custom_id: Some(item_id),
1664 ..
1665 } => {
1666 let item = self.get_item(item_id);
1667 self.minimal_qualification(item.name()).to_string()
1668 }
1669 List { element_type, .. } => {
1670 format!(
1671 "{} list",
1672 self.humanize_scalar_type(element_type, postgres_compat)
1673 )
1674 }
1675 Map { value_type, .. } => format!(
1676 "map[{}=>{}]",
1677 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1678 self.humanize_scalar_type(value_type, postgres_compat)
1679 ),
1680 Record {
1681 custom_id: Some(item_id),
1682 ..
1683 } => {
1684 let item = self.get_item(item_id);
1685 self.minimal_qualification(item.name()).to_string()
1686 }
1687 Record { fields, .. } => format!(
1688 "record({})",
1689 fields
1690 .iter()
1691 .map(|f| format!(
1692 "{}: {}",
1693 f.0,
1694 self.humanize_column_type(&f.1, postgres_compat)
1695 ))
1696 .join(",")
1697 ),
1698 PgLegacyChar => "\"char\"".into(),
1699 Char { length } if !postgres_compat => match length {
1700 None => "char".into(),
1701 Some(length) => format!("char({})", length.into_u32()),
1702 },
1703 VarChar { max_length } if !postgres_compat => match max_length {
1704 None => "varchar".into(),
1705 Some(length) => format!("varchar({})", length.into_u32()),
1706 },
1707 UInt16 => "uint2".into(),
1708 UInt32 => "uint4".into(),
1709 UInt64 => "uint8".into(),
1710 ty => {
1711 let pgrepr_type = mz_pgrepr::Type::from(ty);
1712 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1713
1714 let res = if self
1715 .effective_search_path(true)
1716 .iter()
1717 .any(|(_, schema)| schema == &pg_catalog_schema)
1718 {
1719 pgrepr_type.name().to_string()
1720 } else {
1721 let name = QualifiedItemName {
1724 qualifiers: ItemQualifiers {
1725 database_spec: ResolvedDatabaseSpecifier::Ambient,
1726 schema_spec: pg_catalog_schema,
1727 },
1728 item: pgrepr_type.name().to_string(),
1729 };
1730 self.resolve_full_name(&name).to_string()
1731 };
1732 res
1733 }
1734 }
1735 }
1736
1737 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1738 let entry = self.state.try_get_entry_by_global_id(&id)?;
1739
1740 match entry.index() {
1741 Some(index) => {
1742 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1743 let mut on_names = on_desc
1744 .iter_names()
1745 .map(|col_name| col_name.to_string())
1746 .collect::<Vec<_>>();
1747
1748 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1749
1750 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1754 let mut ix_names = vec![String::new(); ix_arity];
1755
1756 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1758 let on_name = on_names.get_mut(on_pos).expect("on_name");
1759 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1760 std::mem::swap(on_name, ix_name);
1761 }
1762
1763 Some(ix_names) }
1765 None => {
1766 let desc = self.state.try_get_desc_by_global_id(&id)?;
1767 let column_names = desc
1768 .iter_names()
1769 .map(|col_name| col_name.to_string())
1770 .collect();
1771
1772 Some(column_names)
1773 }
1774 }
1775 }
1776
1777 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1778 let desc = self.state.try_get_desc_by_global_id(&id)?;
1779 Some(desc.get_name(column).to_string())
1780 }
1781
1782 fn id_exists(&self, id: GlobalId) -> bool {
1783 self.state.entry_by_global_id.contains_key(&id)
1784 }
1785}
1786
1787impl SessionCatalog for ConnCatalog<'_> {
1788 fn active_role_id(&self) -> &RoleId {
1789 &self.role_id
1790 }
1791
1792 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1793 self.prepared_statements
1794 .as_ref()
1795 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1796 .flatten()
1797 }
1798
1799 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1800 self.portals
1801 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1802 }
1803
1804 fn active_database(&self) -> Option<&DatabaseId> {
1805 self.database.as_ref()
1806 }
1807
1808 fn active_cluster(&self) -> &str {
1809 &self.cluster
1810 }
1811
1812 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1813 &self.search_path
1814 }
1815
1816 fn resolve_database(
1817 &self,
1818 database_name: &str,
1819 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1820 Ok(self.state.resolve_database(database_name)?)
1821 }
1822
1823 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1824 self.state
1825 .database_by_id
1826 .get(id)
1827 .expect("database doesn't exist")
1828 }
1829
1830 #[allow(clippy::as_conversions)]
1832 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1833 self.state
1834 .database_by_id
1835 .values()
1836 .map(|database| database as &dyn CatalogDatabase)
1837 .collect()
1838 }
1839
1840 fn resolve_schema(
1841 &self,
1842 database_name: Option<&str>,
1843 schema_name: &str,
1844 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1845 Ok(self.state.resolve_schema(
1846 self.database.as_ref(),
1847 database_name,
1848 schema_name,
1849 &self.conn_id,
1850 )?)
1851 }
1852
1853 fn resolve_schema_in_database(
1854 &self,
1855 database_spec: &ResolvedDatabaseSpecifier,
1856 schema_name: &str,
1857 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1858 Ok(self
1859 .state
1860 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1861 }
1862
1863 fn get_schema(
1864 &self,
1865 database_spec: &ResolvedDatabaseSpecifier,
1866 schema_spec: &SchemaSpecifier,
1867 ) -> &dyn CatalogSchema {
1868 self.state
1869 .get_schema(database_spec, schema_spec, &self.conn_id)
1870 }
1871
1872 #[allow(clippy::as_conversions)]
1874 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1875 self.get_databases()
1876 .into_iter()
1877 .flat_map(|database| database.schemas().into_iter())
1878 .chain(
1879 self.state
1880 .ambient_schemas_by_id
1881 .values()
1882 .chain(self.state.temporary_schemas.values())
1883 .map(|schema| schema as &dyn CatalogSchema),
1884 )
1885 .collect()
1886 }
1887
1888 fn get_mz_internal_schema_id(&self) -> SchemaId {
1889 self.state().get_mz_internal_schema_id()
1890 }
1891
1892 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1893 self.state().get_mz_unsafe_schema_id()
1894 }
1895
1896 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1897 self.state.is_system_schema_specifier(schema)
1898 }
1899
1900 fn resolve_role(
1901 &self,
1902 role_name: &str,
1903 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1904 match self.state.try_get_role_by_name(role_name) {
1905 Some(role) => Ok(role),
1906 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1907 }
1908 }
1909
1910 fn resolve_network_policy(
1911 &self,
1912 policy_name: &str,
1913 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1914 match self.state.try_get_network_policy_by_name(policy_name) {
1915 Some(policy) => Ok(policy),
1916 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1917 }
1918 }
1919
1920 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1921 Some(self.state.roles_by_id.get(id)?)
1922 }
1923
1924 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1925 self.state.get_role(id)
1926 }
1927
1928 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1929 #[allow(clippy::as_conversions)]
1931 self.state
1932 .roles_by_id
1933 .values()
1934 .map(|role| role as &dyn CatalogRole)
1935 .collect()
1936 }
1937
1938 fn mz_system_role_id(&self) -> RoleId {
1939 MZ_SYSTEM_ROLE_ID
1940 }
1941
1942 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1943 self.state.collect_role_membership(id)
1944 }
1945
1946 fn get_network_policy(
1947 &self,
1948 id: &NetworkPolicyId,
1949 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1950 self.state.get_network_policy(id)
1951 }
1952
1953 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1954 #[allow(clippy::as_conversions)]
1956 self.state
1957 .network_policies_by_id
1958 .values()
1959 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1960 .collect()
1961 }
1962
1963 fn resolve_cluster(
1964 &self,
1965 cluster_name: Option<&str>,
1966 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1967 Ok(self
1968 .state
1969 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1970 }
1971
1972 fn resolve_cluster_replica(
1973 &self,
1974 cluster_replica_name: &QualifiedReplica,
1975 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1976 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1977 }
1978
1979 fn resolve_item(
1980 &self,
1981 name: &PartialItemName,
1982 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1983 let r = self.state.resolve_entry(
1984 self.database.as_ref(),
1985 &self.effective_search_path(true),
1986 name,
1987 &self.conn_id,
1988 )?;
1989 if self.unresolvable_ids.contains(&r.id()) {
1990 Err(SqlCatalogError::UnknownItem(name.to_string()))
1991 } else {
1992 Ok(r)
1993 }
1994 }
1995
1996 fn resolve_function(
1997 &self,
1998 name: &PartialItemName,
1999 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2000 let r = self.state.resolve_function(
2001 self.database.as_ref(),
2002 &self.effective_search_path(false),
2003 name,
2004 &self.conn_id,
2005 )?;
2006
2007 if self.unresolvable_ids.contains(&r.id()) {
2008 Err(SqlCatalogError::UnknownFunction {
2009 name: name.to_string(),
2010 alternative: None,
2011 })
2012 } else {
2013 Ok(r)
2014 }
2015 }
2016
2017 fn resolve_type(
2018 &self,
2019 name: &PartialItemName,
2020 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2021 let r = self.state.resolve_type(
2022 self.database.as_ref(),
2023 &self.effective_search_path(false),
2024 name,
2025 &self.conn_id,
2026 )?;
2027
2028 if self.unresolvable_ids.contains(&r.id()) {
2029 Err(SqlCatalogError::UnknownType {
2030 name: name.to_string(),
2031 })
2032 } else {
2033 Ok(r)
2034 }
2035 }
2036
2037 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2038 self.state.get_system_type(name)
2039 }
2040
2041 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2042 Some(self.state.try_get_entry(id)?)
2043 }
2044
2045 fn try_get_item_by_global_id(
2046 &self,
2047 id: &GlobalId,
2048 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2049 let entry = self.state.try_get_entry_by_global_id(id)?;
2050 let entry = match &entry.item {
2051 CatalogItem::Table(table) => {
2052 let (version, _gid) = table
2053 .collections
2054 .iter()
2055 .find(|(_version, gid)| *gid == id)
2056 .expect("catalog out of sync, mismatched GlobalId");
2057 entry.at_version(RelationVersionSelector::Specific(*version))
2058 }
2059 _ => entry.at_version(RelationVersionSelector::Latest),
2060 };
2061 Some(entry)
2062 }
2063
2064 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2065 self.state.get_entry(id)
2066 }
2067
2068 fn get_item_by_global_id(
2069 &self,
2070 id: &GlobalId,
2071 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2072 let entry = self.state.get_entry_by_global_id(id);
2073 let entry = match &entry.item {
2074 CatalogItem::Table(table) => {
2075 let (version, _gid) = table
2076 .collections
2077 .iter()
2078 .find(|(_version, gid)| *gid == id)
2079 .expect("catalog out of sync, mismatched GlobalId");
2080 entry.at_version(RelationVersionSelector::Specific(*version))
2081 }
2082 _ => entry.at_version(RelationVersionSelector::Latest),
2083 };
2084 entry
2085 }
2086
2087 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2088 self.get_schemas()
2089 .into_iter()
2090 .flat_map(|schema| schema.item_ids())
2091 .map(|id| self.get_item(&id))
2092 .collect()
2093 }
2094
2095 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2096 self.state
2097 .get_item_by_name(name, &self.conn_id)
2098 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2099 }
2100
2101 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2102 self.state
2103 .get_type_by_name(name, &self.conn_id)
2104 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2105 }
2106
2107 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2108 &self.state.clusters_by_id[&id]
2109 }
2110
2111 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2112 self.state
2113 .clusters_by_id
2114 .values()
2115 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2116 .collect()
2117 }
2118
2119 fn get_cluster_replica(
2120 &self,
2121 cluster_id: ClusterId,
2122 replica_id: ReplicaId,
2123 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2124 let cluster = self.get_cluster(cluster_id);
2125 cluster.replica(replica_id)
2126 }
2127
2128 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2129 self.get_clusters()
2130 .into_iter()
2131 .flat_map(|cluster| cluster.replicas().into_iter())
2132 .collect()
2133 }
2134
2135 fn get_system_privileges(&self) -> &PrivilegeMap {
2136 &self.state.system_privileges
2137 }
2138
2139 fn get_default_privileges(
2140 &self,
2141 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2142 self.state
2143 .default_privileges
2144 .iter()
2145 .map(|(object, acl_items)| (object, acl_items.collect()))
2146 .collect()
2147 }
2148
2149 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2150 self.state.find_available_name(name, &self.conn_id)
2151 }
2152
2153 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2154 self.state.resolve_full_name(name, Some(&self.conn_id))
2155 }
2156
2157 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2158 self.state.resolve_full_schema_name(name)
2159 }
2160
2161 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2162 self.state.get_entry_by_global_id(global_id).id()
2163 }
2164
2165 fn resolve_global_id(
2166 &self,
2167 item_id: &CatalogItemId,
2168 version: RelationVersionSelector,
2169 ) -> GlobalId {
2170 self.state
2171 .get_entry(item_id)
2172 .at_version(version)
2173 .global_id()
2174 }
2175
2176 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2177 self.state.config()
2178 }
2179
2180 fn now(&self) -> EpochMillis {
2181 (self.state.config().now)()
2182 }
2183
2184 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2185 self.state.aws_privatelink_availability_zones.clone()
2186 }
2187
2188 fn system_vars(&self) -> &SystemVars {
2189 &self.state.system_configuration
2190 }
2191
2192 fn system_vars_mut(&mut self) -> &mut SystemVars {
2193 &mut self.state.to_mut().system_configuration
2194 }
2195
2196 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2197 self.state().get_owner_id(id, self.conn_id())
2198 }
2199
2200 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2201 match id {
2202 SystemObjectId::System => Some(&self.state.system_privileges),
2203 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2204 Some(self.get_cluster(*id).privileges())
2205 }
2206 SystemObjectId::Object(ObjectId::Database(id)) => {
2207 Some(self.get_database(id).privileges())
2208 }
2209 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2210 Some(self.get_schema(database_spec, schema_spec).privileges())
2211 }
2212 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2213 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2214 Some(self.get_network_policy(id).privileges())
2215 }
2216 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2217 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2218 }
2219 }
2220
2221 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2222 let mut seen = BTreeSet::new();
2223 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2224 }
2225
2226 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2227 let mut seen = BTreeSet::new();
2228 self.state.item_dependents(id, &mut seen)
2229 }
2230
2231 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2232 rbac::all_object_privileges(object_type)
2233 }
2234
2235 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2236 self.state.get_object_type(object_id)
2237 }
2238
2239 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2240 self.state.get_system_object_type(id)
2241 }
2242
2243 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2246 let database_id = match &qualified_name.qualifiers.database_spec {
2247 ResolvedDatabaseSpecifier::Ambient => None,
2248 ResolvedDatabaseSpecifier::Id(id)
2249 if self.database.is_some() && self.database == Some(*id) =>
2250 {
2251 None
2252 }
2253 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2254 };
2255
2256 let schema_spec = if database_id.is_none()
2257 && self.resolve_item_name(&PartialItemName {
2258 database: None,
2259 schema: None,
2260 item: qualified_name.item.clone(),
2261 }) == Ok(qualified_name)
2262 || self.resolve_function_name(&PartialItemName {
2263 database: None,
2264 schema: None,
2265 item: qualified_name.item.clone(),
2266 }) == Ok(qualified_name)
2267 || self.resolve_type_name(&PartialItemName {
2268 database: None,
2269 schema: None,
2270 item: qualified_name.item.clone(),
2271 }) == Ok(qualified_name)
2272 {
2273 None
2274 } else {
2275 Some(qualified_name.qualifiers.schema_spec.clone())
2278 };
2279
2280 let res = PartialItemName {
2281 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2282 schema: schema_spec.map(|spec| {
2283 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2284 .name()
2285 .schema
2286 .clone()
2287 }),
2288 item: qualified_name.item.clone(),
2289 };
2290 assert!(
2291 self.resolve_item_name(&res) == Ok(qualified_name)
2292 || self.resolve_function_name(&res) == Ok(qualified_name)
2293 || self.resolve_type_name(&res) == Ok(qualified_name)
2294 );
2295 res
2296 }
2297
2298 fn add_notice(&self, notice: PlanNotice) {
2299 let _ = self.notices_tx.send(notice.into());
2300 }
2301
2302 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2303 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2304 self.state.comments.get_object_comments(comment_id)
2305 }
2306
2307 fn is_cluster_size_cc(&self, size: &str) -> bool {
2308 self.state
2309 .cluster_replica_sizes
2310 .0
2311 .get(size)
2312 .map_or(false, |a| a.is_cc)
2313 }
2314}
2315
2316#[cfg(test)]
2317mod tests {
2318 use std::collections::{BTreeMap, BTreeSet};
2319 use std::sync::Arc;
2320 use std::{env, iter};
2321
2322 use itertools::Itertools;
2323 use mz_catalog::memory::objects::CatalogItem;
2324 use tokio_postgres::NoTls;
2325 use tokio_postgres::types::Type;
2326 use uuid::Uuid;
2327
2328 use mz_catalog::SYSTEM_CONN_ID;
2329 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2330 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2331 use mz_controller_types::{ClusterId, ReplicaId};
2332 use mz_expr::MirScalarExpr;
2333 use mz_ore::now::to_datetime;
2334 use mz_ore::{assert_err, assert_ok, task};
2335 use mz_persist_client::PersistClient;
2336 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2337 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2338 use mz_repr::role_id::RoleId;
2339 use mz_repr::{
2340 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2341 SqlScalarType, Timestamp,
2342 };
2343 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2344 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2345 use mz_sql::names::{
2346 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2347 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2348 };
2349 use mz_sql::plan::{
2350 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2351 Scope, StatementContext,
2352 };
2353 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2354 use mz_sql::session::vars::{SystemVars, VarInput};
2355
2356 use crate::catalog::state::LocalExpressionCache;
2357 use crate::catalog::{Catalog, Op};
2358 use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2359 use crate::session::Session;
2360
2361 #[mz_ore::test(tokio::test)]
2368 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2370 Catalog::with_debug(|catalog| async move {
2371 struct TestCase {
2372 input: QualifiedItemName,
2373 system_output: PartialItemName,
2374 normal_output: PartialItemName,
2375 }
2376
2377 let test_cases = vec![
2378 TestCase {
2379 input: QualifiedItemName {
2380 qualifiers: ItemQualifiers {
2381 database_spec: ResolvedDatabaseSpecifier::Ambient,
2382 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2383 },
2384 item: "numeric".to_string(),
2385 },
2386 system_output: PartialItemName {
2387 database: None,
2388 schema: None,
2389 item: "numeric".to_string(),
2390 },
2391 normal_output: PartialItemName {
2392 database: None,
2393 schema: None,
2394 item: "numeric".to_string(),
2395 },
2396 },
2397 TestCase {
2398 input: QualifiedItemName {
2399 qualifiers: ItemQualifiers {
2400 database_spec: ResolvedDatabaseSpecifier::Ambient,
2401 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2402 },
2403 item: "mz_array_types".to_string(),
2404 },
2405 system_output: PartialItemName {
2406 database: None,
2407 schema: None,
2408 item: "mz_array_types".to_string(),
2409 },
2410 normal_output: PartialItemName {
2411 database: None,
2412 schema: None,
2413 item: "mz_array_types".to_string(),
2414 },
2415 },
2416 ];
2417
2418 for tc in test_cases {
2419 assert_eq!(
2420 catalog
2421 .for_system_session()
2422 .minimal_qualification(&tc.input),
2423 tc.system_output
2424 );
2425 assert_eq!(
2426 catalog
2427 .for_session(&Session::dummy())
2428 .minimal_qualification(&tc.input),
2429 tc.normal_output
2430 );
2431 }
2432 catalog.expire().await;
2433 })
2434 .await
2435 }
2436
2437 #[mz_ore::test(tokio::test)]
2438 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2440 let persist_client = PersistClient::new_for_tests().await;
2441 let organization_id = Uuid::new_v4();
2442 let bootstrap_args = test_bootstrap_args();
2443 {
2444 let mut catalog = Catalog::open_debug_catalog(
2445 persist_client.clone(),
2446 organization_id.clone(),
2447 &bootstrap_args,
2448 )
2449 .await
2450 .expect("unable to open debug catalog");
2451 assert_eq!(catalog.transient_revision(), 1);
2452 let commit_ts = catalog.current_upper().await;
2453 catalog
2454 .transact(
2455 None,
2456 commit_ts,
2457 None,
2458 vec![Op::CreateDatabase {
2459 name: "test".to_string(),
2460 owner_id: MZ_SYSTEM_ROLE_ID,
2461 }],
2462 )
2463 .await
2464 .expect("failed to transact");
2465 assert_eq!(catalog.transient_revision(), 2);
2466 catalog.expire().await;
2467 }
2468 {
2469 let catalog =
2470 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2471 .await
2472 .expect("unable to open debug catalog");
2473 assert_eq!(catalog.transient_revision(), 1);
2475 catalog.expire().await;
2476 }
2477 }
2478
2479 #[mz_ore::test(tokio::test)]
2480 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2482 Catalog::with_debug(|catalog| async move {
2483 let mz_catalog_schema = (
2484 ResolvedDatabaseSpecifier::Ambient,
2485 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2486 );
2487 let pg_catalog_schema = (
2488 ResolvedDatabaseSpecifier::Ambient,
2489 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2490 );
2491 let mz_temp_schema = (
2492 ResolvedDatabaseSpecifier::Ambient,
2493 SchemaSpecifier::Temporary,
2494 );
2495
2496 let session = Session::dummy();
2498 let conn_catalog = catalog.for_session(&session);
2499 assert_ne!(
2500 conn_catalog.effective_search_path(false),
2501 conn_catalog.search_path
2502 );
2503 assert_ne!(
2504 conn_catalog.effective_search_path(true),
2505 conn_catalog.search_path
2506 );
2507 assert_eq!(
2508 conn_catalog.effective_search_path(false),
2509 vec![
2510 mz_catalog_schema.clone(),
2511 pg_catalog_schema.clone(),
2512 conn_catalog.search_path[0].clone()
2513 ]
2514 );
2515 assert_eq!(
2516 conn_catalog.effective_search_path(true),
2517 vec![
2518 mz_temp_schema.clone(),
2519 mz_catalog_schema.clone(),
2520 pg_catalog_schema.clone(),
2521 conn_catalog.search_path[0].clone()
2522 ]
2523 );
2524
2525 let mut session = Session::dummy();
2527 session
2528 .vars_mut()
2529 .set(
2530 &SystemVars::new(),
2531 "search_path",
2532 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2533 false,
2534 )
2535 .expect("failed to set search_path");
2536 let conn_catalog = catalog.for_session(&session);
2537 assert_ne!(
2538 conn_catalog.effective_search_path(false),
2539 conn_catalog.search_path
2540 );
2541 assert_ne!(
2542 conn_catalog.effective_search_path(true),
2543 conn_catalog.search_path
2544 );
2545 assert_eq!(
2546 conn_catalog.effective_search_path(false),
2547 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2548 );
2549 assert_eq!(
2550 conn_catalog.effective_search_path(true),
2551 vec![
2552 mz_temp_schema.clone(),
2553 mz_catalog_schema.clone(),
2554 pg_catalog_schema.clone()
2555 ]
2556 );
2557
2558 let mut session = Session::dummy();
2559 session
2560 .vars_mut()
2561 .set(
2562 &SystemVars::new(),
2563 "search_path",
2564 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2565 false,
2566 )
2567 .expect("failed to set search_path");
2568 let conn_catalog = catalog.for_session(&session);
2569 assert_ne!(
2570 conn_catalog.effective_search_path(false),
2571 conn_catalog.search_path
2572 );
2573 assert_ne!(
2574 conn_catalog.effective_search_path(true),
2575 conn_catalog.search_path
2576 );
2577 assert_eq!(
2578 conn_catalog.effective_search_path(false),
2579 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2580 );
2581 assert_eq!(
2582 conn_catalog.effective_search_path(true),
2583 vec![
2584 mz_temp_schema.clone(),
2585 pg_catalog_schema.clone(),
2586 mz_catalog_schema.clone()
2587 ]
2588 );
2589
2590 let mut session = Session::dummy();
2591 session
2592 .vars_mut()
2593 .set(
2594 &SystemVars::new(),
2595 "search_path",
2596 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2597 false,
2598 )
2599 .expect("failed to set search_path");
2600 let conn_catalog = catalog.for_session(&session);
2601 assert_ne!(
2602 conn_catalog.effective_search_path(false),
2603 conn_catalog.search_path
2604 );
2605 assert_ne!(
2606 conn_catalog.effective_search_path(true),
2607 conn_catalog.search_path
2608 );
2609 assert_eq!(
2610 conn_catalog.effective_search_path(false),
2611 vec![
2612 mz_catalog_schema.clone(),
2613 pg_catalog_schema.clone(),
2614 mz_temp_schema.clone()
2615 ]
2616 );
2617 assert_eq!(
2618 conn_catalog.effective_search_path(true),
2619 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2620 );
2621 catalog.expire().await;
2622 })
2623 .await
2624 }
2625
2626 #[mz_ore::test(tokio::test)]
2627 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2629 use mz_ore::collections::CollectionExt;
2630 Catalog::with_debug(|catalog| async move {
2631 let conn_catalog = catalog.for_system_session();
2632 let scx = &mut StatementContext::new(None, &conn_catalog);
2633
2634 let parsed = mz_sql_parser::parser::parse_statements(
2635 "create view public.foo as select 1 as bar",
2636 )
2637 .expect("")
2638 .into_element()
2639 .ast;
2640
2641 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2642
2643 assert_eq!(
2645 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2646 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2647 );
2648 catalog.expire().await;
2649 })
2650 .await;
2651 }
2652
2653 #[mz_ore::test(tokio::test)]
2655 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2657 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2658 let column = ", 1";
2659 let view_def_size = view_def.bytes().count();
2660 let column_size = column.bytes().count();
2661 let column_count =
2662 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2663 let columns = iter::repeat(column).take(column_count).join("");
2664 let create_sql = format!("{view_def}{columns})");
2665 let create_sql_check = create_sql.clone();
2666 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2667 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2668 &create_sql
2669 ));
2670
2671 let persist_client = PersistClient::new_for_tests().await;
2672 let organization_id = Uuid::new_v4();
2673 let id = CatalogItemId::User(1);
2674 let gid = GlobalId::User(1);
2675 let bootstrap_args = test_bootstrap_args();
2676 {
2677 let mut catalog = Catalog::open_debug_catalog(
2678 persist_client.clone(),
2679 organization_id.clone(),
2680 &bootstrap_args,
2681 )
2682 .await
2683 .expect("unable to open debug catalog");
2684 let item = catalog
2685 .state()
2686 .deserialize_item(
2687 gid,
2688 &create_sql,
2689 &BTreeMap::new(),
2690 &mut LocalExpressionCache::Closed,
2691 None,
2692 )
2693 .expect("unable to parse view");
2694 let commit_ts = catalog.current_upper().await;
2695 catalog
2696 .transact(
2697 None,
2698 commit_ts,
2699 None,
2700 vec![Op::CreateItem {
2701 item,
2702 name: QualifiedItemName {
2703 qualifiers: ItemQualifiers {
2704 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2705 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2706 },
2707 item: "v".to_string(),
2708 },
2709 id,
2710 owner_id: MZ_SYSTEM_ROLE_ID,
2711 }],
2712 )
2713 .await
2714 .expect("failed to transact");
2715 catalog.expire().await;
2716 }
2717 {
2718 let catalog =
2719 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2720 .await
2721 .expect("unable to open debug catalog");
2722 let view = catalog.get_entry(&id);
2723 assert_eq!("v", view.name.item);
2724 match &view.item {
2725 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2726 item => panic!("expected view, got {}", item.typ()),
2727 }
2728 catalog.expire().await;
2729 }
2730 }
2731
2732 #[mz_ore::test(tokio::test)]
2733 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2735 Catalog::with_debug(|catalog| async move {
2736 let conn_catalog = catalog.for_system_session();
2737
2738 assert_eq!(
2739 mz_sql::catalog::ObjectType::ClusterReplica,
2740 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2741 ClusterId::user(1).expect("1 is a valid ID"),
2742 ReplicaId::User(1)
2743 )))
2744 );
2745 assert_eq!(
2746 mz_sql::catalog::ObjectType::Role,
2747 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2748 );
2749 catalog.expire().await;
2750 })
2751 .await;
2752 }
2753
2754 #[mz_ore::test(tokio::test)]
2755 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2757 Catalog::with_debug(|catalog| async move {
2758 let conn_catalog = catalog.for_system_session();
2759
2760 assert_eq!(
2761 None,
2762 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2763 ClusterId::user(1).expect("1 is a valid ID"),
2764 ReplicaId::User(1),
2765 ))))
2766 );
2767 assert_eq!(
2768 None,
2769 conn_catalog
2770 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2771 );
2772 catalog.expire().await;
2773 })
2774 .await;
2775 }
2776
2777 #[mz_ore::test(tokio::test)]
2778 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2780 Catalog::with_debug(|catalog| async move {
2781 let conn_catalog = catalog.for_system_session();
2782
2783 let builtins_cfg = BuiltinsConfig {
2784 include_continual_tasks: true,
2785 };
2786 for builtin in BUILTINS::iter(&builtins_cfg) {
2787 let (schema, name, expected_desc) = match builtin {
2788 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2789 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2790 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2791 Builtin::Log(_)
2792 | Builtin::Type(_)
2793 | Builtin::Func(_)
2794 | Builtin::ContinualTask(_)
2795 | Builtin::Index(_)
2796 | Builtin::Connection(_) => continue,
2797 };
2798 let item = conn_catalog
2799 .resolve_item(&PartialItemName {
2800 database: None,
2801 schema: Some(schema.to_string()),
2802 item: name.to_string(),
2803 })
2804 .expect("unable to resolve item")
2805 .at_version(RelationVersionSelector::Latest);
2806
2807 let full_name = conn_catalog.resolve_full_name(item.name());
2808 let actual_desc = item.desc(&full_name).expect("invalid item type");
2809 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2810 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2811 {
2812 assert_eq!(
2813 actual_name, expected_name,
2814 "item {schema}.{name} column {index} name did not match its expected name"
2815 );
2816 assert_eq!(
2817 actual_typ, expected_typ,
2818 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2819 );
2820 }
2821 assert_eq!(
2822 &*actual_desc, expected_desc,
2823 "item {schema}.{name} did not match its expected RelationDesc"
2824 );
2825 }
2826 catalog.expire().await;
2827 })
2828 .await
2829 }
2830
2831 #[mz_ore::test(tokio::test)]
2834 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2836 async fn inner(catalog: Catalog) {
2837 let (client, connection) = tokio_postgres::connect(
2841 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2842 NoTls,
2843 )
2844 .await
2845 .expect("failed to connect to Postgres");
2846
2847 task::spawn(|| "compare_builtin_postgres", async move {
2848 if let Err(e) = connection.await {
2849 panic!("connection error: {}", e);
2850 }
2851 });
2852
2853 struct PgProc {
2854 name: String,
2855 arg_oids: Vec<u32>,
2856 ret_oid: Option<u32>,
2857 ret_set: bool,
2858 }
2859
2860 struct PgType {
2861 name: String,
2862 ty: String,
2863 elem: u32,
2864 array: u32,
2865 input: u32,
2866 receive: u32,
2867 }
2868
2869 struct PgOper {
2870 oprresult: u32,
2871 name: String,
2872 }
2873
2874 let pg_proc: BTreeMap<_, _> = client
2875 .query(
2876 "SELECT
2877 p.oid,
2878 proname,
2879 proargtypes,
2880 prorettype,
2881 proretset
2882 FROM pg_proc p
2883 JOIN pg_namespace n ON p.pronamespace = n.oid",
2884 &[],
2885 )
2886 .await
2887 .expect("pg query failed")
2888 .into_iter()
2889 .map(|row| {
2890 let oid: u32 = row.get("oid");
2891 let pg_proc = PgProc {
2892 name: row.get("proname"),
2893 arg_oids: row.get("proargtypes"),
2894 ret_oid: row.get("prorettype"),
2895 ret_set: row.get("proretset"),
2896 };
2897 (oid, pg_proc)
2898 })
2899 .collect();
2900
2901 let pg_type: BTreeMap<_, _> = client
2902 .query(
2903 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2904 &[],
2905 )
2906 .await
2907 .expect("pg query failed")
2908 .into_iter()
2909 .map(|row| {
2910 let oid: u32 = row.get("oid");
2911 let pg_type = PgType {
2912 name: row.get("typname"),
2913 ty: row.get("typtype"),
2914 elem: row.get("typelem"),
2915 array: row.get("typarray"),
2916 input: row.get("typinput"),
2917 receive: row.get("typreceive"),
2918 };
2919 (oid, pg_type)
2920 })
2921 .collect();
2922
2923 let pg_oper: BTreeMap<_, _> = client
2924 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2925 .await
2926 .expect("pg query failed")
2927 .into_iter()
2928 .map(|row| {
2929 let oid: u32 = row.get("oid");
2930 let pg_oper = PgOper {
2931 name: row.get("oprname"),
2932 oprresult: row.get("oprresult"),
2933 };
2934 (oid, pg_oper)
2935 })
2936 .collect();
2937
2938 let conn_catalog = catalog.for_system_session();
2939 let resolve_type_oid = |item: &str| {
2940 conn_catalog
2941 .resolve_type(&PartialItemName {
2942 database: None,
2943 schema: Some(PG_CATALOG_SCHEMA.into()),
2946 item: item.to_string(),
2947 })
2948 .expect("unable to resolve type")
2949 .oid()
2950 };
2951
2952 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2953 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2954 .collect();
2955
2956 let mut all_oids = BTreeSet::new();
2957
2958 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2961 [
2962 (Type::NAME, Type::TEXT),
2964 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2965 (Type::TIME, Type::TIMETZ),
2967 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2968 ]
2969 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2970 );
2971 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2972 1619, ]);
2974 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2975 if ignore_return_types.contains(&fn_oid) {
2976 return true;
2977 }
2978 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2979 return true;
2980 }
2981 a == b
2982 };
2983
2984 let builtins_cfg = BuiltinsConfig {
2985 include_continual_tasks: true,
2986 };
2987 for builtin in BUILTINS::iter(&builtins_cfg) {
2988 match builtin {
2989 Builtin::Type(ty) => {
2990 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2991
2992 if ty.oid >= FIRST_MATERIALIZE_OID {
2993 continue;
2996 }
2997
2998 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3001 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3002 });
3003 assert_eq!(
3004 ty.name, pg_ty.name,
3005 "oid {} has name {} in postgres; expected {}",
3006 ty.oid, pg_ty.name, ty.name,
3007 );
3008
3009 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3010 None => (0, 0),
3011 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3012 };
3013 assert_eq!(
3014 typinput_oid, pg_ty.input,
3015 "type {} has typinput OID {:?} in mz but {:?} in pg",
3016 ty.name, typinput_oid, pg_ty.input,
3017 );
3018 assert_eq!(
3019 typreceive_oid, pg_ty.receive,
3020 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3021 ty.name, typreceive_oid, pg_ty.receive,
3022 );
3023 if typinput_oid != 0 {
3024 assert!(
3025 func_oids.contains(&typinput_oid),
3026 "type {} has typinput OID {} that does not exist in pg_proc",
3027 ty.name,
3028 typinput_oid,
3029 );
3030 }
3031 if typreceive_oid != 0 {
3032 assert!(
3033 func_oids.contains(&typreceive_oid),
3034 "type {} has typreceive OID {} that does not exist in pg_proc",
3035 ty.name,
3036 typreceive_oid,
3037 );
3038 }
3039
3040 match &ty.details.typ {
3042 CatalogType::Array { element_reference } => {
3043 let elem_ty = BUILTINS::iter(&builtins_cfg)
3044 .filter_map(|builtin| match builtin {
3045 Builtin::Type(ty @ BuiltinType { name, .. })
3046 if element_reference == name =>
3047 {
3048 Some(ty)
3049 }
3050 _ => None,
3051 })
3052 .next();
3053 let elem_ty = match elem_ty {
3054 Some(ty) => ty,
3055 None => {
3056 panic!("{} is unexpectedly not a type", element_reference)
3057 }
3058 };
3059 assert_eq!(
3060 pg_ty.elem, elem_ty.oid,
3061 "type {} has mismatched element OIDs",
3062 ty.name
3063 )
3064 }
3065 CatalogType::Pseudo => {
3066 assert_eq!(
3067 pg_ty.ty, "p",
3068 "type {} is not a pseudo type as expected",
3069 ty.name
3070 )
3071 }
3072 CatalogType::Range { .. } => {
3073 assert_eq!(
3074 pg_ty.ty, "r",
3075 "type {} is not a range type as expected",
3076 ty.name
3077 );
3078 }
3079 _ => {
3080 assert_eq!(
3081 pg_ty.ty, "b",
3082 "type {} is not a base type as expected",
3083 ty.name
3084 )
3085 }
3086 }
3087
3088 let schema = catalog
3090 .resolve_schema_in_database(
3091 &ResolvedDatabaseSpecifier::Ambient,
3092 ty.schema,
3093 &SYSTEM_CONN_ID,
3094 )
3095 .expect("unable to resolve schema");
3096 let allocated_type = catalog
3097 .resolve_type(
3098 None,
3099 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3100 &PartialItemName {
3101 database: None,
3102 schema: Some(schema.name().schema.clone()),
3103 item: ty.name.to_string(),
3104 },
3105 &SYSTEM_CONN_ID,
3106 )
3107 .expect("unable to resolve type");
3108 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3109 ty
3110 } else {
3111 panic!("unexpectedly not a type")
3112 };
3113 match ty.details.array_id {
3114 Some(array_id) => {
3115 let array_ty = catalog.get_entry(&array_id);
3116 assert_eq!(
3117 pg_ty.array, array_ty.oid,
3118 "type {} has mismatched array OIDs",
3119 allocated_type.name.item,
3120 );
3121 }
3122 None => assert_eq!(
3123 pg_ty.array, 0,
3124 "type {} does not have an array type in mz but does in pg",
3125 allocated_type.name.item,
3126 ),
3127 }
3128 }
3129 Builtin::Func(func) => {
3130 for imp in func.inner.func_impls() {
3131 assert!(
3132 all_oids.insert(imp.oid),
3133 "{} reused oid {}",
3134 func.name,
3135 imp.oid
3136 );
3137
3138 assert!(
3139 imp.oid < FIRST_USER_OID,
3140 "built-in function {} erroneously has OID in user space ({})",
3141 func.name,
3142 imp.oid,
3143 );
3144
3145 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3148 continue;
3149 } else {
3150 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3151 panic!(
3152 "pg_proc missing function {}: oid {}",
3153 func.name, imp.oid
3154 )
3155 })
3156 };
3157 assert_eq!(
3158 func.name, pg_fn.name,
3159 "funcs with oid {} don't match names: {} in mz, {} in pg",
3160 imp.oid, func.name, pg_fn.name
3161 );
3162
3163 let imp_arg_oids = imp
3166 .arg_typs
3167 .iter()
3168 .map(|item| resolve_type_oid(item))
3169 .collect::<Vec<_>>();
3170
3171 if imp_arg_oids != pg_fn.arg_oids {
3172 println!(
3173 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3174 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3175 );
3176 }
3177
3178 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3179
3180 assert!(
3181 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3182 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3183 imp.oid,
3184 func.name,
3185 imp_return_oid,
3186 pg_fn.ret_oid
3187 );
3188
3189 assert_eq!(
3190 imp.return_is_set, pg_fn.ret_set,
3191 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3192 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3193 );
3194 }
3195 }
3196 _ => (),
3197 }
3198 }
3199
3200 for (op, func) in OP_IMPLS.iter() {
3201 for imp in func.func_impls() {
3202 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3203
3204 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3206 continue;
3207 } else {
3208 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3209 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3210 })
3211 };
3212
3213 assert_eq!(*op, pg_op.name);
3214
3215 let imp_return_oid =
3216 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3217 if imp_return_oid != pg_op.oprresult {
3218 panic!(
3219 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3220 imp.oid, op, imp_return_oid, pg_op.oprresult
3221 );
3222 }
3223 }
3224 }
3225 catalog.expire().await;
3226 }
3227
3228 Catalog::with_debug(inner).await
3229 }
3230
3231 #[mz_ore::test(tokio::test)]
3233 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3235 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3236 let catalog = Arc::new(catalog);
3237 let conn_catalog = catalog.for_system_session();
3238
3239 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3240 let mut handles = Vec::new();
3241
3242 let ignore_names = BTreeSet::from([
3244 "avg",
3245 "avg_internal_v1",
3246 "bool_and",
3247 "bool_or",
3248 "has_table_privilege", "has_type_privilege", "mod",
3251 "mz_panic",
3252 "mz_sleep",
3253 "pow",
3254 "stddev_pop",
3255 "stddev_samp",
3256 "stddev",
3257 "var_pop",
3258 "var_samp",
3259 "variance",
3260 ]);
3261
3262 let fns = BUILTINS::funcs()
3263 .map(|func| (&func.name, func.inner))
3264 .chain(OP_IMPLS.iter());
3265
3266 for (name, func) in fns {
3267 if ignore_names.contains(name) {
3268 continue;
3269 }
3270 let Func::Scalar(impls) = func else {
3271 continue;
3272 };
3273
3274 'outer: for imp in impls {
3275 let details = imp.details();
3276 let mut styps = Vec::new();
3277 for item in details.arg_typs.iter() {
3278 let oid = resolve_type_oid(item);
3279 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3280 continue 'outer;
3281 };
3282 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3283 }
3284 let datums = styps
3285 .iter()
3286 .map(|styp| {
3287 let mut datums = vec![Datum::Null];
3288 datums.extend(styp.interesting_datums());
3289 datums
3290 })
3291 .collect::<Vec<_>>();
3292 if datums.is_empty() {
3294 continue;
3295 }
3296
3297 let return_oid = details
3298 .return_typ
3299 .map(resolve_type_oid)
3300 .expect("must exist");
3301 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3302 .ok()
3303 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3304
3305 let mut idxs = vec![0; datums.len()];
3306 while idxs[0] < datums[0].len() {
3307 let mut args = Vec::with_capacity(idxs.len());
3308 for i in 0..(datums.len()) {
3309 args.push(datums[i][idxs[i]]);
3310 }
3311
3312 let op = &imp.op;
3313 let scalars = args
3314 .iter()
3315 .enumerate()
3316 .map(|(i, datum)| {
3317 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3318 datum.clone(),
3319 styps[i].clone(),
3320 ))
3321 })
3322 .collect();
3323
3324 let call_name = format!(
3325 "{name}({}) (oid: {})",
3326 args.iter()
3327 .map(|d| d.to_string())
3328 .collect::<Vec<_>>()
3329 .join(", "),
3330 imp.oid
3331 );
3332 let catalog = Arc::clone(&catalog);
3333 let call_name_fn = call_name.clone();
3334 let return_styp = return_styp.clone();
3335 let handle = task::spawn_blocking(
3336 || call_name,
3337 move || {
3338 smoketest_fn(
3339 name,
3340 call_name_fn,
3341 op,
3342 imp,
3343 args,
3344 catalog,
3345 scalars,
3346 return_styp,
3347 )
3348 },
3349 );
3350 handles.push(handle);
3351
3352 for i in (0..datums.len()).rev() {
3354 idxs[i] += 1;
3355 if idxs[i] >= datums[i].len() {
3356 if i == 0 {
3357 break;
3358 }
3359 idxs[i] = 0;
3360 continue;
3361 } else {
3362 break;
3363 }
3364 }
3365 }
3366 }
3367 }
3368 handles
3369 }
3370
3371 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3372 for handle in handles {
3373 handle.await.expect("must succeed");
3374 }
3375 }
3376
3377 fn smoketest_fn(
3378 name: &&str,
3379 call_name: String,
3380 op: &Operation<HirScalarExpr>,
3381 imp: &FuncImpl<HirScalarExpr>,
3382 args: Vec<Datum<'_>>,
3383 catalog: Arc<Catalog>,
3384 scalars: Vec<CoercibleScalarExpr>,
3385 return_styp: Option<SqlScalarType>,
3386 ) {
3387 let conn_catalog = catalog.for_system_session();
3388 let pcx = PlanContext::zero();
3389 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3390 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3391 let ecx = ExprContext {
3392 qcx: &qcx,
3393 name: "smoketest",
3394 scope: &Scope::empty(),
3395 relation_type: &SqlRelationType::empty(),
3396 allow_aggregates: false,
3397 allow_subqueries: false,
3398 allow_parameters: false,
3399 allow_windows: false,
3400 };
3401 let arena = RowArena::new();
3402 let mut session = Session::<Timestamp>::dummy();
3403 session
3404 .start_transaction(to_datetime(0), None, None)
3405 .expect("must succeed");
3406 let prep_style = ExprPrepStyle::OneShot {
3407 logical_time: EvalTime::Time(Timestamp::MIN),
3408 session: &session,
3409 catalog_state: &catalog.state,
3410 };
3411
3412 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3415 if let Ok(hir) = res {
3416 if let Ok(mut mir) = hir.lower_uncorrelated() {
3417 prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3419
3420 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3421 if let Some(return_styp) = return_styp {
3422 let mir_typ = mir.typ(&[]);
3423 assert_eq!(mir_typ.scalar_type, return_styp);
3426 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3430 panic!(
3431 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3432 );
3433 }
3434 if let Some((introduces_nulls, propagates_nulls)) =
3437 call_introduces_propagates_nulls(&mir)
3438 {
3439 if introduces_nulls {
3440 assert!(
3444 mir_typ.nullable,
3445 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3446 name, args, mir, mir_typ.nullable
3447 );
3448 } else {
3449 let any_input_null = args.iter().any(|arg| arg.is_null());
3450 if !any_input_null {
3451 assert!(
3452 !mir_typ.nullable,
3453 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3454 name, args, mir, mir_typ.nullable
3455 );
3456 } else {
3457 assert_eq!(
3458 mir_typ.nullable, propagates_nulls,
3459 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3460 name, args, mir, mir_typ.nullable
3461 );
3462 }
3463 }
3464 }
3465 let mut reduced = mir.clone();
3468 reduced.reduce(&[]);
3469 match reduced {
3470 MirScalarExpr::Literal(reduce_result, ctyp) => {
3471 match reduce_result {
3472 Ok(reduce_result_row) => {
3473 let reduce_result_datum = reduce_result_row.unpack_first();
3474 assert_eq!(
3475 reduce_result_datum,
3476 eval_result_datum,
3477 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3478 name,
3479 args,
3480 mir,
3481 eval_result_datum,
3482 mir_typ.scalar_type,
3483 reduce_result_datum,
3484 ctyp.scalar_type
3485 );
3486 assert_eq!(
3492 ctyp.scalar_type,
3493 mir_typ.scalar_type,
3494 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3495 name,
3496 args,
3497 mir,
3498 eval_result_datum,
3499 mir_typ.scalar_type,
3500 reduce_result_datum,
3501 ctyp.scalar_type
3502 );
3503 }
3504 Err(..) => {} }
3506 }
3507 _ => unreachable!(
3508 "all args are literals, so should have reduced to a literal"
3509 ),
3510 }
3511 }
3512 }
3513 }
3514 }
3515 }
3516
3517 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3522 match mir_func_call {
3523 MirScalarExpr::CallUnary { func, expr } => {
3524 if expr.is_literal() {
3525 Some((func.introduces_nulls(), func.propagates_nulls()))
3526 } else {
3527 None
3528 }
3529 }
3530 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3531 if expr1.is_literal() && expr2.is_literal() {
3532 Some((func.introduces_nulls(), func.propagates_nulls()))
3533 } else {
3534 None
3535 }
3536 }
3537 MirScalarExpr::CallVariadic { func, exprs } => {
3538 if exprs.iter().all(|arg| arg.is_literal()) {
3539 Some((func.introduces_nulls(), func.propagates_nulls()))
3540 } else {
3541 None
3542 }
3543 }
3544 _ => None,
3545 }
3546 }
3547
3548 #[mz_ore::test(tokio::test)]
3550 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3552 Catalog::with_debug(|catalog| async move {
3553 let conn_catalog = catalog.for_system_session();
3554
3555 for view in BUILTINS::views().filter(|view| {
3556 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3557 }) {
3558 let item = conn_catalog
3559 .resolve_item(&PartialItemName {
3560 database: None,
3561 schema: Some(view.schema.to_string()),
3562 item: view.name.to_string(),
3563 })
3564 .expect("unable to resolve view")
3565 .at_version(RelationVersionSelector::Latest);
3567 let full_name = conn_catalog.resolve_full_name(item.name());
3568 for col_type in item
3569 .desc(&full_name)
3570 .expect("invalid item type")
3571 .iter_types()
3572 {
3573 match &col_type.scalar_type {
3574 typ @ SqlScalarType::UInt16
3575 | typ @ SqlScalarType::UInt32
3576 | typ @ SqlScalarType::UInt64
3577 | typ @ SqlScalarType::MzTimestamp
3578 | typ @ SqlScalarType::List { .. }
3579 | typ @ SqlScalarType::Map { .. }
3580 | typ @ SqlScalarType::MzAclItem => {
3581 panic!("{typ:?} type found in {full_name}");
3582 }
3583 SqlScalarType::AclItem
3584 | SqlScalarType::Bool
3585 | SqlScalarType::Int16
3586 | SqlScalarType::Int32
3587 | SqlScalarType::Int64
3588 | SqlScalarType::Float32
3589 | SqlScalarType::Float64
3590 | SqlScalarType::Numeric { .. }
3591 | SqlScalarType::Date
3592 | SqlScalarType::Time
3593 | SqlScalarType::Timestamp { .. }
3594 | SqlScalarType::TimestampTz { .. }
3595 | SqlScalarType::Interval
3596 | SqlScalarType::PgLegacyChar
3597 | SqlScalarType::Bytes
3598 | SqlScalarType::String
3599 | SqlScalarType::Char { .. }
3600 | SqlScalarType::VarChar { .. }
3601 | SqlScalarType::Jsonb
3602 | SqlScalarType::Uuid
3603 | SqlScalarType::Array(_)
3604 | SqlScalarType::Record { .. }
3605 | SqlScalarType::Oid
3606 | SqlScalarType::RegProc
3607 | SqlScalarType::RegType
3608 | SqlScalarType::RegClass
3609 | SqlScalarType::Int2Vector
3610 | SqlScalarType::Range { .. }
3611 | SqlScalarType::PgLegacyName => {}
3612 }
3613 }
3614 }
3615 catalog.expire().await;
3616 })
3617 .await
3618 }
3619
3620 #[mz_ore::test(tokio::test)]
3623 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3625 Catalog::with_debug(|catalog| async move {
3626 let conn_catalog = catalog.for_system_session();
3627
3628 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3629 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3630
3631 for entry in catalog.entries() {
3632 let schema_spec = entry.name().qualifiers.schema_spec;
3633 let introspection_deps = catalog.introspection_dependencies(entry.id);
3634 if introspection_deps.is_empty() {
3635 assert!(
3636 schema_spec != introspection_schema_spec,
3637 "entry does not depend on introspection sources but is in \
3638 `mz_introspection`: {}",
3639 conn_catalog.resolve_full_name(entry.name()),
3640 );
3641 } else {
3642 assert!(
3643 schema_spec == introspection_schema_spec,
3644 "entry depends on introspection sources but is not in \
3645 `mz_introspection`: {}",
3646 conn_catalog.resolve_full_name(entry.name()),
3647 );
3648 }
3649 }
3650 })
3651 .await
3652 }
3653
3654 #[mz_ore::test(tokio::test)]
3655 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3657 let persist_client = PersistClient::new_for_tests().await;
3658 let bootstrap_args = test_bootstrap_args();
3659 let organization_id = Uuid::new_v4();
3660 let db_name = "DB";
3661
3662 let mut writer_catalog = Catalog::open_debug_catalog(
3663 persist_client.clone(),
3664 organization_id.clone(),
3665 &bootstrap_args,
3666 )
3667 .await
3668 .expect("open_debug_catalog");
3669 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3670 persist_client.clone(),
3671 organization_id.clone(),
3672 &bootstrap_args,
3673 )
3674 .await
3675 .expect("open_debug_read_only_catalog");
3676 assert_err!(writer_catalog.resolve_database(db_name));
3677 assert_err!(read_only_catalog.resolve_database(db_name));
3678
3679 let commit_ts = writer_catalog.current_upper().await;
3680 writer_catalog
3681 .transact(
3682 None,
3683 commit_ts,
3684 None,
3685 vec![Op::CreateDatabase {
3686 name: db_name.to_string(),
3687 owner_id: MZ_SYSTEM_ROLE_ID,
3688 }],
3689 )
3690 .await
3691 .expect("failed to transact");
3692
3693 let write_db = writer_catalog
3694 .resolve_database(db_name)
3695 .expect("resolve_database");
3696 read_only_catalog
3697 .sync_to_current_updates()
3698 .await
3699 .expect("sync_to_current_updates");
3700 let read_db = read_only_catalog
3701 .resolve_database(db_name)
3702 .expect("resolve_database");
3703
3704 assert_eq!(write_db, read_db);
3705
3706 let writer_catalog_fencer =
3707 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3708 .await
3709 .expect("open_debug_catalog for fencer");
3710 let fencer_db = writer_catalog_fencer
3711 .resolve_database(db_name)
3712 .expect("resolve_database for fencer");
3713 assert_eq!(fencer_db, read_db);
3714
3715 let write_fence_err = writer_catalog
3716 .sync_to_current_updates()
3717 .await
3718 .expect_err("sync_to_current_updates for fencer");
3719 assert!(matches!(
3720 write_fence_err,
3721 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3722 ));
3723 let read_fence_err = read_only_catalog
3724 .sync_to_current_updates()
3725 .await
3726 .expect_err("sync_to_current_updates after fencer");
3727 assert!(matches!(
3728 read_fence_err,
3729 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3730 ));
3731
3732 writer_catalog.expire().await;
3733 read_only_catalog.expire().await;
3734 writer_catalog_fencer.expire().await;
3735 }
3736}