1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100use crate::session::{PreparedStatement, Session};
101use crate::util::ResultExt;
102use crate::{AdapterError, AdapterNotice, ExecuteResponse};
103
104mod builtin_table_updates;
105pub(crate) mod consistency;
106mod migrate;
107
108mod apply;
109mod open;
110mod state;
111mod timeline;
112mod transact;
113
114#[derive(Debug)]
137pub struct Catalog {
138 state: CatalogState,
139 plans: CatalogPlans,
140 expr_cache_handle: Option<ExpressionCacheHandle>,
141 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
142 transient_revision: u64,
143}
144
145impl Clone for Catalog {
148 fn clone(&self) -> Self {
149 Self {
150 state: self.state.clone(),
151 plans: self.plans.clone(),
152 expr_cache_handle: self.expr_cache_handle.clone(),
153 storage: Arc::clone(&self.storage),
154 transient_revision: self.transient_revision,
155 }
156 }
157}
158
159#[derive(Default, Debug, Clone)]
160pub struct CatalogPlans {
161 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
162 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
163 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
164 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
165}
166
167impl Catalog {
168 #[mz_ore::instrument(level = "trace")]
170 pub fn set_optimized_plan(
171 &mut self,
172 id: GlobalId,
173 plan: DataflowDescription<OptimizedMirRelationExpr>,
174 ) {
175 self.plans.optimized_plan_by_id.insert(id, plan.into());
176 }
177
178 #[mz_ore::instrument(level = "trace")]
180 pub fn set_physical_plan(
181 &mut self,
182 id: GlobalId,
183 plan: DataflowDescription<mz_compute_types::plan::Plan>,
184 ) {
185 self.plans.physical_plan_by_id.insert(id, plan.into());
186 }
187
188 #[mz_ore::instrument(level = "trace")]
190 pub fn try_get_optimized_plan(
191 &self,
192 id: &GlobalId,
193 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
194 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
195 }
196
197 #[mz_ore::instrument(level = "trace")]
199 pub fn try_get_physical_plan(
200 &self,
201 id: &GlobalId,
202 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
203 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
204 }
205
206 #[mz_ore::instrument(level = "trace")]
208 pub fn set_dataflow_metainfo(
209 &mut self,
210 id: GlobalId,
211 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
212 ) {
213 for notice in metainfo.optimizer_notices.iter() {
215 for dep_id in notice.dependencies.iter() {
216 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
217 entry.push(Arc::clone(notice))
218 }
219 if let Some(item_id) = notice.item_id {
220 soft_assert_eq_or_log!(
221 item_id,
222 id,
223 "notice.item_id should match the id for whom we are saving the notice"
224 );
225 }
226 }
227 self.plans.dataflow_metainfos.insert(id, metainfo);
229 }
230
231 #[mz_ore::instrument(level = "trace")]
233 pub fn try_get_dataflow_metainfo(
234 &self,
235 id: &GlobalId,
236 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
237 self.plans.dataflow_metainfos.get(id)
238 }
239
240 #[mz_ore::instrument(level = "trace")]
249 pub fn drop_plans_and_metainfos(
250 &mut self,
251 drop_ids: &BTreeSet<GlobalId>,
252 ) -> BTreeSet<Arc<OptimizerNotice>> {
253 let mut dropped_notices = BTreeSet::new();
255
256 for id in drop_ids {
258 self.plans.optimized_plan_by_id.remove(id);
259 self.plans.physical_plan_by_id.remove(id);
260 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
261 soft_assert_or_log!(
262 metainfo.optimizer_notices.iter().all_unique(),
263 "should have been pushed there by `push_optimizer_notice_dedup`"
264 );
265 for n in metainfo.optimizer_notices.drain(..) {
266 for dep_id in n.dependencies.iter() {
268 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
269 soft_assert_or_log!(
270 notices.iter().any(|x| &n == x),
271 "corrupt notices_by_dep_id"
272 );
273 notices.retain(|x| &n != x)
274 }
275 }
276 dropped_notices.insert(n);
277 }
278 }
279 }
280
281 for id in drop_ids {
283 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
284 for n in notices.drain(..) {
285 if let Some(item_id) = n.item_id.as_ref() {
287 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
288 metainfo.optimizer_notices.iter().for_each(|n2| {
289 if let Some(item_id_2) = n2.item_id {
290 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
291 }
292 });
293 metainfo.optimizer_notices.retain(|x| &n != x);
294 }
295 }
296 dropped_notices.insert(n);
297 }
298 }
299 }
300
301 let mut todo_dep_ids = BTreeSet::new();
304 for notice in dropped_notices.iter() {
305 for dep_id in notice.dependencies.iter() {
306 if !drop_ids.contains(dep_id) {
307 todo_dep_ids.insert(*dep_id);
308 }
309 }
310 }
311 for id in todo_dep_ids {
314 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
315 notices.retain(|n| !dropped_notices.contains(n))
316 }
317 }
318
319 if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
320 use mz_ore::str::{bracketed, separated};
321 let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
322 let bad_notices = bad_notices.map(|n| {
323 let mut dataflow_metainfo_occurrences = Vec::new();
326 for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
327 if meta_info.optimizer_notices.contains(n) {
328 dataflow_metainfo_occurrences.push(id);
329 }
330 }
331 let mut notices_by_dep_id_occurrences = Vec::new();
333 for (id, notices) in self.plans.notices_by_dep_id.iter() {
334 if notices.iter().contains(n) {
335 notices_by_dep_id_occurrences.push(id);
336 }
337 }
338 format!(
339 "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
340 dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
341 n.id,
342 n.kind,
343 n.dependencies,
344 Arc::strong_count(n),
345 dataflow_metainfo_occurrences,
346 notices_by_dep_id_occurrences
347 )
348 });
349 let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
350 error!(
351 "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
352 bad_notices = {bad_notices}; \
353 drop_ids = {drop_ids:?}"
354 );
355 }
356
357 dropped_notices
358 }
359
360 pub(crate) fn invalidate_for_index(
371 &self,
372 ons: impl Iterator<Item = GlobalId>,
373 ) -> BTreeSet<GlobalId> {
374 let mut dependencies = BTreeSet::new();
375 let mut queue = VecDeque::new();
376 let mut seen = HashSet::new();
377 for on in ons {
378 let entry = self.get_entry_by_global_id(&on);
379 dependencies.insert(on);
380 seen.insert(entry.id);
381 let uses = entry.uses();
382 queue.extend(uses.clone());
383 }
384
385 while let Some(cur) = queue.pop_front() {
386 let entry = self.get_entry(&cur);
387 if seen.insert(cur) {
388 let global_ids = entry.global_ids();
389 match entry.item_type() {
390 CatalogItemType::Table
391 | CatalogItemType::Source
392 | CatalogItemType::MaterializedView
393 | CatalogItemType::Sink
394 | CatalogItemType::Index
395 | CatalogItemType::Type
396 | CatalogItemType::Func
397 | CatalogItemType::Secret
398 | CatalogItemType::Connection
399 | CatalogItemType::ContinualTask => {
400 dependencies.extend(global_ids);
401 }
402 CatalogItemType::View => {
403 dependencies.extend(global_ids);
404 queue.extend(entry.uses());
405 }
406 }
407 }
408 }
409 dependencies
410 }
411}
412
413#[derive(Debug)]
414pub struct ConnCatalog<'a> {
415 state: Cow<'a, CatalogState>,
416 unresolvable_ids: BTreeSet<CatalogItemId>,
426 conn_id: ConnectionId,
427 cluster: String,
428 database: Option<DatabaseId>,
429 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
430 role_id: RoleId,
431 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
432 notices_tx: UnboundedSender<AdapterNotice>,
433}
434
435impl ConnCatalog<'_> {
436 pub fn conn_id(&self) -> &ConnectionId {
437 &self.conn_id
438 }
439
440 pub fn state(&self) -> &CatalogState {
441 &*self.state
442 }
443
444 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
454 assert_eq!(
455 self.role_id, MZ_SYSTEM_ROLE_ID,
456 "only the system role can mark IDs unresolvable",
457 );
458 self.unresolvable_ids.insert(id);
459 }
460
461 pub fn effective_search_path(
467 &self,
468 include_temp_schema: bool,
469 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
470 self.state
471 .effective_search_path(&self.search_path, include_temp_schema)
472 }
473}
474
475impl ConnectionResolver for ConnCatalog<'_> {
476 fn resolve_connection(
477 &self,
478 id: CatalogItemId,
479 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
480 self.state().resolve_connection(id)
481 }
482}
483
484impl Catalog {
485 pub fn transient_revision(&self) -> u64 {
489 self.transient_revision
490 }
491
492 pub async fn with_debug<F, Fut, T>(f: F) -> T
504 where
505 F: FnOnce(Catalog) -> Fut,
506 Fut: Future<Output = T>,
507 {
508 let persist_client = PersistClient::new_for_tests().await;
509 let organization_id = Uuid::new_v4();
510 let bootstrap_args = test_bootstrap_args();
511 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
512 .await
513 .expect("can open debug catalog");
514 f(catalog).await
515 }
516
517 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
520 where
521 F: FnOnce(Catalog) -> Fut,
522 Fut: Future<Output = T>,
523 {
524 let persist_client = PersistClient::new_for_tests().await;
525 let organization_id = Uuid::new_v4();
526 let bootstrap_args = test_bootstrap_args();
527 let mut catalog =
528 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
529 .await
530 .expect("can open debug catalog");
531
532 let now = SYSTEM_TIME.clone();
534 let openable_storage = TestCatalogStateBuilder::new(persist_client)
535 .with_organization_id(organization_id)
536 .with_default_deploy_generation()
537 .build()
538 .await
539 .expect("can create durable catalog");
540 let mut storage = openable_storage
541 .open(now().into(), &bootstrap_args)
542 .await
543 .expect("can open durable catalog")
544 .0;
545 let _ = storage
547 .sync_to_current_updates()
548 .await
549 .expect("can sync to current updates");
550 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
551
552 f(catalog).await
553 }
554
555 pub async fn open_debug_catalog(
559 persist_client: PersistClient,
560 organization_id: Uuid,
561 bootstrap_args: &BootstrapArgs,
562 ) -> Result<Catalog, anyhow::Error> {
563 let now = SYSTEM_TIME.clone();
564 let environment_id = None;
565 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
566 .with_organization_id(organization_id)
567 .with_default_deploy_generation()
568 .build()
569 .await?;
570 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
571 let system_parameter_defaults = BTreeMap::default();
572 Self::open_debug_catalog_inner(
573 persist_client,
574 storage,
575 now,
576 environment_id,
577 &DUMMY_BUILD_INFO,
578 system_parameter_defaults,
579 bootstrap_args,
580 None,
581 )
582 .await
583 }
584
585 pub async fn open_debug_read_only_catalog(
590 persist_client: PersistClient,
591 organization_id: Uuid,
592 bootstrap_args: &BootstrapArgs,
593 ) -> Result<Catalog, anyhow::Error> {
594 let now = SYSTEM_TIME.clone();
595 let environment_id = None;
596 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
597 .with_organization_id(organization_id)
598 .build()
599 .await?;
600 let storage = openable_storage
601 .open_read_only(&test_bootstrap_args())
602 .await?;
603 let system_parameter_defaults = BTreeMap::default();
604 Self::open_debug_catalog_inner(
605 persist_client,
606 storage,
607 now,
608 environment_id,
609 &DUMMY_BUILD_INFO,
610 system_parameter_defaults,
611 bootstrap_args,
612 None,
613 )
614 .await
615 }
616
617 pub async fn open_debug_read_only_persist_catalog_config(
622 persist_client: PersistClient,
623 now: NowFn,
624 environment_id: EnvironmentId,
625 system_parameter_defaults: BTreeMap<String, String>,
626 build_info: &'static BuildInfo,
627 bootstrap_args: &BootstrapArgs,
628 enable_expression_cache_override: Option<bool>,
629 ) -> Result<Catalog, anyhow::Error> {
630 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
631 .with_organization_id(environment_id.organization_id())
632 .with_version(
633 build_info
634 .version
635 .parse()
636 .expect("build version is parseable"),
637 )
638 .build()
639 .await?;
640 let storage = openable_storage.open_read_only(bootstrap_args).await?;
641 Self::open_debug_catalog_inner(
642 persist_client,
643 storage,
644 now,
645 Some(environment_id),
646 build_info,
647 system_parameter_defaults,
648 bootstrap_args,
649 enable_expression_cache_override,
650 )
651 .await
652 }
653
654 async fn open_debug_catalog_inner(
655 persist_client: PersistClient,
656 storage: Box<dyn DurableCatalogState>,
657 now: NowFn,
658 environment_id: Option<EnvironmentId>,
659 build_info: &'static BuildInfo,
660 system_parameter_defaults: BTreeMap<String, String>,
661 bootstrap_args: &BootstrapArgs,
662 enable_expression_cache_override: Option<bool>,
663 ) -> Result<Catalog, anyhow::Error> {
664 let metrics_registry = &MetricsRegistry::new();
665 let secrets_reader = Arc::new(InMemorySecretsController::new());
666 let previous_ts = now().into();
669 let replica_size = &bootstrap_args.default_cluster_replica_size;
670 let read_only = false;
671
672 let OpenCatalogResult {
673 catalog,
674 migrated_storage_collections_0dt: _,
675 new_builtin_collections: _,
676 builtin_table_updates: _,
677 cached_global_exprs: _,
678 uncached_local_exprs: _,
679 } = Catalog::open(Config {
680 storage,
681 metrics_registry,
682 state: StateConfig {
683 unsafe_mode: true,
684 all_features: false,
685 build_info,
686 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
687 read_only,
688 now,
689 boot_ts: previous_ts,
690 skip_migrations: true,
691 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
692 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
693 size: replica_size.clone(),
694 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
695 },
696 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
697 size: replica_size.clone(),
698 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
699 },
700 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
701 size: replica_size.clone(),
702 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
703 },
704 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
705 size: replica_size.clone(),
706 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
707 },
708 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
709 size: replica_size.clone(),
710 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711 },
712 system_parameter_defaults,
713 remote_system_parameters: None,
714 availability_zones: vec![],
715 egress_addresses: vec![],
716 aws_principal_context: None,
717 aws_privatelink_availability_zones: None,
718 http_host_name: None,
719 connection_context: ConnectionContext::for_tests(secrets_reader),
720 builtin_item_migration_config: BuiltinItemMigrationConfig {
721 persist_client: persist_client.clone(),
722 read_only,
723 },
724 persist_client,
725 enable_expression_cache_override,
726 enable_0dt_deployment: true,
727 helm_chart_version: None,
728 external_login_password_mz_system: None,
729 license_key: ValidatedLicenseKey::for_tests(),
730 },
731 })
732 .await?;
733 Ok(catalog)
734 }
735
736 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
737 self.state.for_session(session)
738 }
739
740 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
741 self.state.for_sessionless_user(role_id)
742 }
743
744 pub fn for_system_session(&self) -> ConnCatalog<'_> {
745 self.state.for_system_session()
746 }
747
748 async fn storage<'a>(
749 &'a self,
750 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
751 self.storage.lock().await
752 }
753
754 pub async fn current_upper(&self) -> mz_repr::Timestamp {
755 self.storage().await.current_upper().await
756 }
757
758 pub async fn allocate_user_id(
759 &self,
760 commit_ts: mz_repr::Timestamp,
761 ) -> Result<(CatalogItemId, GlobalId), Error> {
762 self.storage()
763 .await
764 .allocate_user_id(commit_ts)
765 .await
766 .maybe_terminate("allocating user ids")
767 .err_into()
768 }
769
770 pub async fn allocate_user_ids(
772 &self,
773 amount: u64,
774 commit_ts: mz_repr::Timestamp,
775 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
776 self.storage()
777 .await
778 .allocate_user_ids(amount, commit_ts)
779 .await
780 .maybe_terminate("allocating user ids")
781 .err_into()
782 }
783
784 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
785 let commit_ts = self.storage().await.current_upper().await;
786 self.allocate_user_id(commit_ts).await
787 }
788
789 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
791 self.storage()
792 .await
793 .get_next_user_item_id()
794 .await
795 .err_into()
796 }
797
798 #[cfg(test)]
799 pub async fn allocate_system_id(
800 &self,
801 commit_ts: mz_repr::Timestamp,
802 ) -> Result<(CatalogItemId, GlobalId), Error> {
803 use mz_ore::collections::CollectionExt;
804
805 let mut storage = self.storage().await;
806 let mut txn = storage.transaction().await?;
807 let id = txn
808 .allocate_system_item_ids(1)
809 .maybe_terminate("allocating system ids")?
810 .into_element();
811 let _ = txn.get_and_commit_op_updates();
813 txn.commit(commit_ts).await?;
814 Ok(id)
815 }
816
817 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
819 self.storage()
820 .await
821 .get_next_system_item_id()
822 .await
823 .err_into()
824 }
825
826 pub async fn allocate_user_cluster_id(
827 &self,
828 commit_ts: mz_repr::Timestamp,
829 ) -> Result<ClusterId, Error> {
830 self.storage()
831 .await
832 .allocate_user_cluster_id(commit_ts)
833 .await
834 .maybe_terminate("allocating user cluster ids")
835 .err_into()
836 }
837
838 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
840 self.storage()
841 .await
842 .get_next_system_replica_id()
843 .await
844 .err_into()
845 }
846
847 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
849 self.storage()
850 .await
851 .get_next_user_replica_id()
852 .await
853 .err_into()
854 }
855
856 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
857 self.state.resolve_database(database_name)
858 }
859
860 pub fn resolve_schema(
861 &self,
862 current_database: Option<&DatabaseId>,
863 database_name: Option<&str>,
864 schema_name: &str,
865 conn_id: &ConnectionId,
866 ) -> Result<&Schema, SqlCatalogError> {
867 self.state
868 .resolve_schema(current_database, database_name, schema_name, conn_id)
869 }
870
871 pub fn resolve_schema_in_database(
872 &self,
873 database_spec: &ResolvedDatabaseSpecifier,
874 schema_name: &str,
875 conn_id: &ConnectionId,
876 ) -> Result<&Schema, SqlCatalogError> {
877 self.state
878 .resolve_schema_in_database(database_spec, schema_name, conn_id)
879 }
880
881 pub fn resolve_replica_in_cluster(
882 &self,
883 cluster_id: &ClusterId,
884 replica_name: &str,
885 ) -> Result<&ClusterReplica, SqlCatalogError> {
886 self.state
887 .resolve_replica_in_cluster(cluster_id, replica_name)
888 }
889
890 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
891 self.state.resolve_system_schema(name)
892 }
893
894 pub fn resolve_search_path(
895 &self,
896 session: &Session,
897 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
898 self.state.resolve_search_path(session)
899 }
900
901 pub fn resolve_entry(
903 &self,
904 current_database: Option<&DatabaseId>,
905 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
906 name: &PartialItemName,
907 conn_id: &ConnectionId,
908 ) -> Result<&CatalogEntry, SqlCatalogError> {
909 self.state
910 .resolve_entry(current_database, search_path, name, conn_id)
911 }
912
913 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
915 self.state.resolve_builtin_table(builtin)
916 }
917
918 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
920 self.state.resolve_builtin_log(builtin).0
921 }
922
923 pub fn resolve_builtin_storage_collection(
925 &self,
926 builtin: &'static BuiltinSource,
927 ) -> CatalogItemId {
928 self.state.resolve_builtin_source(builtin)
929 }
930
931 pub fn resolve_function(
933 &self,
934 current_database: Option<&DatabaseId>,
935 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
936 name: &PartialItemName,
937 conn_id: &ConnectionId,
938 ) -> Result<&CatalogEntry, SqlCatalogError> {
939 self.state
940 .resolve_function(current_database, search_path, name, conn_id)
941 }
942
943 pub fn resolve_type(
945 &self,
946 current_database: Option<&DatabaseId>,
947 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
948 name: &PartialItemName,
949 conn_id: &ConnectionId,
950 ) -> Result<&CatalogEntry, SqlCatalogError> {
951 self.state
952 .resolve_type(current_database, search_path, name, conn_id)
953 }
954
955 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
956 self.state.resolve_cluster(name)
957 }
958
959 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
965 self.state.resolve_builtin_cluster(cluster)
966 }
967
968 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
969 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
970 }
971
972 pub fn resolve_target_cluster(
974 &self,
975 target_cluster: TargetCluster,
976 session: &Session,
977 ) -> Result<&Cluster, AdapterError> {
978 match target_cluster {
979 TargetCluster::CatalogServer => {
980 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
981 }
982 TargetCluster::Active => self.active_cluster(session),
983 TargetCluster::Transaction(cluster_id) => self
984 .try_get_cluster(cluster_id)
985 .ok_or(AdapterError::ConcurrentClusterDrop),
986 }
987 }
988
989 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
990 if session.user().name != SYSTEM_USER.name
994 && session.user().name != SUPPORT_USER.name
995 && session.vars().cluster() == SYSTEM_USER.name
996 {
997 coord_bail!(
998 "system cluster '{}' cannot execute user queries",
999 SYSTEM_USER.name
1000 );
1001 }
1002 let cluster = self.resolve_cluster(session.vars().cluster())?;
1003 Ok(cluster)
1004 }
1005
1006 pub fn state(&self) -> &CatalogState {
1007 &self.state
1008 }
1009
1010 pub fn resolve_full_name(
1011 &self,
1012 name: &QualifiedItemName,
1013 conn_id: Option<&ConnectionId>,
1014 ) -> FullItemName {
1015 self.state.resolve_full_name(name, conn_id)
1016 }
1017
1018 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1019 self.state.try_get_entry(id)
1020 }
1021
1022 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1023 self.state.try_get_entry_by_global_id(id)
1024 }
1025
1026 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1027 self.state.get_entry(id)
1028 }
1029
1030 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1031 self.state.get_entry_by_global_id(id)
1032 }
1033
1034 pub fn get_global_ids<'a>(
1035 &'a self,
1036 id: &CatalogItemId,
1037 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1038 self.get_entry(id).global_ids()
1039 }
1040
1041 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1042 self.get_entry_by_global_id(id).id()
1043 }
1044
1045 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1046 let item = self.try_get_entry_by_global_id(id)?;
1047 Some(item.id())
1048 }
1049
1050 pub fn get_schema(
1051 &self,
1052 database_spec: &ResolvedDatabaseSpecifier,
1053 schema_spec: &SchemaSpecifier,
1054 conn_id: &ConnectionId,
1055 ) -> &Schema {
1056 self.state.get_schema(database_spec, schema_spec, conn_id)
1057 }
1058
1059 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1060 self.state.get_mz_catalog_schema_id()
1061 }
1062
1063 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1064 self.state.get_pg_catalog_schema_id()
1065 }
1066
1067 pub fn get_information_schema_id(&self) -> SchemaId {
1068 self.state.get_information_schema_id()
1069 }
1070
1071 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1072 self.state.get_mz_internal_schema_id()
1073 }
1074
1075 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1076 self.state.get_mz_introspection_schema_id()
1077 }
1078
1079 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1080 self.state.get_mz_unsafe_schema_id()
1081 }
1082
1083 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1084 self.state.system_schema_ids()
1085 }
1086
1087 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1088 self.state.get_database(id)
1089 }
1090
1091 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1092 self.state.try_get_role(id)
1093 }
1094
1095 pub fn get_role(&self, id: &RoleId) -> &Role {
1096 self.state.get_role(id)
1097 }
1098
1099 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1100 self.state.try_get_role_by_name(role_name)
1101 }
1102
1103 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1104 self.state.try_get_role_auth_by_id(id)
1105 }
1106
1107 pub fn create_temporary_schema(
1110 &mut self,
1111 conn_id: &ConnectionId,
1112 owner_id: RoleId,
1113 ) -> Result<(), Error> {
1114 self.state.create_temporary_schema(conn_id, owner_id)
1115 }
1116
1117 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1118 self.state.temporary_schemas[conn_id]
1119 .items
1120 .contains_key(item_name)
1121 }
1122
1123 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1126 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1127 return Ok(());
1128 };
1129 if !schema.items.is_empty() {
1130 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1131 }
1132 Ok(())
1133 }
1134
1135 pub(crate) fn object_dependents(
1136 &self,
1137 object_ids: &Vec<ObjectId>,
1138 conn_id: &ConnectionId,
1139 ) -> Vec<ObjectId> {
1140 let mut seen = BTreeSet::new();
1141 self.state.object_dependents(object_ids, conn_id, &mut seen)
1142 }
1143
1144 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1145 FullNameV1 {
1146 database: name.database.to_string(),
1147 schema: name.schema.clone(),
1148 item: name.item.clone(),
1149 }
1150 }
1151
1152 pub fn find_available_cluster_name(&self, name: &str) -> String {
1153 let mut i = 0;
1154 let mut candidate = name.to_string();
1155 while self.state.clusters_by_name.contains_key(&candidate) {
1156 i += 1;
1157 candidate = format!("{}{}", name, i);
1158 }
1159 candidate
1160 }
1161
1162 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1163 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1164 self.cluster_replica_sizes()
1165 .enabled_allocations()
1166 .map(|a| a.0.to_owned())
1167 .collect::<Vec<_>>()
1168 } else {
1169 self.system_config().allowed_cluster_replica_sizes()
1170 }
1171 }
1172
1173 pub fn concretize_replica_location(
1174 &self,
1175 location: mz_catalog::durable::ReplicaLocation,
1176 allowed_sizes: &Vec<String>,
1177 allowed_availability_zones: Option<&[String]>,
1178 ) -> Result<ReplicaLocation, Error> {
1179 self.state
1180 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1181 }
1182
1183 pub(crate) fn ensure_valid_replica_size(
1184 &self,
1185 allowed_sizes: &[String],
1186 size: &String,
1187 ) -> Result<(), Error> {
1188 self.state.ensure_valid_replica_size(allowed_sizes, size)
1189 }
1190
1191 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1192 &self.state.cluster_replica_sizes
1193 }
1194
1195 pub fn get_privileges(
1197 &self,
1198 id: &SystemObjectId,
1199 conn_id: &ConnectionId,
1200 ) -> Option<&PrivilegeMap> {
1201 match id {
1202 SystemObjectId::Object(id) => match id {
1203 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1204 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1205 ObjectId::Schema((database_spec, schema_spec)) => Some(
1206 self.get_schema(database_spec, schema_spec, conn_id)
1207 .privileges(),
1208 ),
1209 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1210 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1211 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1212 },
1213 SystemObjectId::System => Some(&self.state.system_privileges),
1214 }
1215 }
1216
1217 #[mz_ore::instrument(level = "debug")]
1218 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1219 Ok(self.storage().await.confirm_leadership().await?)
1220 }
1221
1222 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1224 self.state.introspection_dependencies(id)
1225 }
1226
1227 pub fn dump(&self) -> Result<CatalogDump, Error> {
1233 Ok(CatalogDump::new(self.state.dump(None)?))
1234 }
1235
1236 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1240 self.state.check_consistency().map_err(|inconsistencies| {
1241 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1242 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1243 })
1244 })
1245 }
1246
1247 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1248 self.state.config()
1249 }
1250
1251 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1252 self.state.entry_by_id.values()
1253 }
1254
1255 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1256 self.entries()
1257 .filter(|entry| entry.is_connection() && entry.id().is_user())
1258 }
1259
1260 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1261 self.entries()
1262 .filter(|entry| entry.is_table() && entry.id().is_user())
1263 }
1264
1265 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1266 self.entries()
1267 .filter(|entry| entry.is_source() && entry.id().is_user())
1268 }
1269
1270 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1271 self.entries()
1272 .filter(|entry| entry.is_sink() && entry.id().is_user())
1273 }
1274
1275 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1276 self.entries()
1277 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1278 }
1279
1280 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1281 self.entries()
1282 .filter(|entry| entry.is_secret() && entry.id().is_user())
1283 }
1284
1285 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1286 self.state.get_network_policy(&network_policy_id)
1287 }
1288
1289 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1290 self.state.try_get_network_policy_by_name(name)
1291 }
1292
1293 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1294 self.state.clusters_by_id.values()
1295 }
1296
1297 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1298 self.state.get_cluster(cluster_id)
1299 }
1300
1301 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1302 self.state.try_get_cluster(cluster_id)
1303 }
1304
1305 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1306 self.clusters().filter(|cluster| cluster.id.is_user())
1307 }
1308
1309 pub fn get_cluster_replica(
1310 &self,
1311 cluster_id: ClusterId,
1312 replica_id: ReplicaId,
1313 ) -> &ClusterReplica {
1314 self.state.get_cluster_replica(cluster_id, replica_id)
1315 }
1316
1317 pub fn try_get_cluster_replica(
1318 &self,
1319 cluster_id: ClusterId,
1320 replica_id: ReplicaId,
1321 ) -> Option<&ClusterReplica> {
1322 self.state.try_get_cluster_replica(cluster_id, replica_id)
1323 }
1324
1325 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1326 self.user_clusters()
1327 .flat_map(|cluster| cluster.user_replicas())
1328 }
1329
1330 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1331 self.state.database_by_id.values()
1332 }
1333
1334 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1335 self.state
1336 .roles_by_id
1337 .values()
1338 .filter(|role| role.is_user())
1339 }
1340
1341 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1342 self.entries()
1343 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1344 }
1345
1346 pub fn system_privileges(&self) -> &PrivilegeMap {
1347 &self.state.system_privileges
1348 }
1349
1350 pub fn default_privileges(
1351 &self,
1352 ) -> impl Iterator<
1353 Item = (
1354 &DefaultPrivilegeObject,
1355 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1356 ),
1357 > {
1358 self.state.default_privileges.iter()
1359 }
1360
1361 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1362 self.state
1363 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1364 }
1365
1366 pub fn pack_storage_usage_update(
1367 &self,
1368 event: VersionedStorageUsage,
1369 diff: Diff,
1370 ) -> BuiltinTableUpdate {
1371 self.state
1372 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1373 }
1374
1375 pub fn system_config(&self) -> &SystemVars {
1376 self.state.system_config()
1377 }
1378
1379 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1380 self.state.system_config_mut()
1381 }
1382
1383 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1384 self.state.ensure_not_reserved_role(role_id)
1385 }
1386
1387 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1388 self.state.ensure_grantable_role(role_id)
1389 }
1390
1391 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1392 self.state.ensure_not_system_role(role_id)
1393 }
1394
1395 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1396 self.state.ensure_not_predefined_role(role_id)
1397 }
1398
1399 pub fn ensure_not_reserved_network_policy(
1400 &self,
1401 network_policy_id: &NetworkPolicyId,
1402 ) -> Result<(), Error> {
1403 self.state
1404 .ensure_not_reserved_network_policy(network_policy_id)
1405 }
1406
1407 pub fn ensure_not_reserved_object(
1408 &self,
1409 object_id: &ObjectId,
1410 conn_id: &ConnectionId,
1411 ) -> Result<(), Error> {
1412 match object_id {
1413 ObjectId::Cluster(cluster_id) => {
1414 if cluster_id.is_system() {
1415 let cluster = self.get_cluster(*cluster_id);
1416 Err(Error::new(ErrorKind::ReadOnlyCluster(
1417 cluster.name().to_string(),
1418 )))
1419 } else {
1420 Ok(())
1421 }
1422 }
1423 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1424 if replica_id.is_system() {
1425 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1426 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1427 replica.name().to_string(),
1428 )))
1429 } else {
1430 Ok(())
1431 }
1432 }
1433 ObjectId::Database(database_id) => {
1434 if database_id.is_system() {
1435 let database = self.get_database(database_id);
1436 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1437 database.name().to_string(),
1438 )))
1439 } else {
1440 Ok(())
1441 }
1442 }
1443 ObjectId::Schema((database_spec, schema_spec)) => {
1444 if schema_spec.is_system() {
1445 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1446 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1447 schema.name().schema.clone(),
1448 )))
1449 } else {
1450 Ok(())
1451 }
1452 }
1453 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1454 ObjectId::Item(item_id) => {
1455 if item_id.is_system() {
1456 let item = self.get_entry(item_id);
1457 let name = self.resolve_full_name(item.name(), Some(conn_id));
1458 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1459 } else {
1460 Ok(())
1461 }
1462 }
1463 ObjectId::NetworkPolicy(network_policy_id) => {
1464 self.ensure_not_reserved_network_policy(network_policy_id)
1465 }
1466 }
1467 }
1468
1469 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1471 &mut self,
1472 create_sql: &str,
1473 force_if_exists_skip: bool,
1474 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1475 self.state
1476 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1477 }
1478
1479 pub(crate) fn update_expression_cache<'a, 'b>(
1480 &'a self,
1481 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1482 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1483 ) -> BoxFuture<'b, ()> {
1484 if let Some(expr_cache) = &self.expr_cache_handle {
1485 let ons = new_local_expressions
1486 .iter()
1487 .map(|(id, _)| id)
1488 .chain(new_global_expressions.iter().map(|(id, _)| id))
1489 .map(|id| self.get_entry_by_global_id(id))
1490 .filter_map(|entry| entry.index().map(|index| index.on));
1491 let invalidate_ids = self.invalidate_for_index(ons);
1492 expr_cache
1493 .update(
1494 new_local_expressions,
1495 new_global_expressions,
1496 invalidate_ids,
1497 )
1498 .boxed()
1499 } else {
1500 async {}.boxed()
1501 }
1502 }
1503
1504 #[cfg(test)]
1508 async fn sync_to_current_updates(
1509 &mut self,
1510 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, CatalogError> {
1511 let updates = self.storage().await.sync_to_current_updates().await?;
1512 let builtin_table_updates = self.state.apply_updates(updates)?;
1513 Ok(builtin_table_updates)
1514 }
1515}
1516
1517pub fn is_reserved_name(name: &str) -> bool {
1518 BUILTIN_PREFIXES
1519 .iter()
1520 .any(|prefix| name.starts_with(prefix))
1521}
1522
1523pub fn is_reserved_role_name(name: &str) -> bool {
1524 is_reserved_name(name) || is_public_role(name)
1525}
1526
1527pub fn is_public_role(name: &str) -> bool {
1528 name == &*PUBLIC_ROLE_NAME
1529}
1530
1531pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1532 object_type_to_audit_object_type(sql_type.into())
1533}
1534
1535pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1536 match id {
1537 CommentObjectId::Table(_) => ObjectType::Table,
1538 CommentObjectId::View(_) => ObjectType::View,
1539 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1540 CommentObjectId::Source(_) => ObjectType::Source,
1541 CommentObjectId::Sink(_) => ObjectType::Sink,
1542 CommentObjectId::Index(_) => ObjectType::Index,
1543 CommentObjectId::Func(_) => ObjectType::Func,
1544 CommentObjectId::Connection(_) => ObjectType::Connection,
1545 CommentObjectId::Type(_) => ObjectType::Type,
1546 CommentObjectId::Secret(_) => ObjectType::Secret,
1547 CommentObjectId::Role(_) => ObjectType::Role,
1548 CommentObjectId::Database(_) => ObjectType::Database,
1549 CommentObjectId::Schema(_) => ObjectType::Schema,
1550 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1551 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1552 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1553 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1554 }
1555}
1556
1557pub(crate) fn object_type_to_audit_object_type(
1558 object_type: mz_sql::catalog::ObjectType,
1559) -> ObjectType {
1560 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1561}
1562
1563pub(crate) fn system_object_type_to_audit_object_type(
1564 system_type: &SystemObjectType,
1565) -> ObjectType {
1566 match system_type {
1567 SystemObjectType::Object(object_type) => match object_type {
1568 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1569 mz_sql::catalog::ObjectType::View => ObjectType::View,
1570 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1571 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1572 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1573 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1574 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1575 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1576 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1577 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1578 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1579 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1580 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1581 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1582 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1583 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1584 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1585 },
1586 SystemObjectType::System => ObjectType::System,
1587 }
1588}
1589
1590#[derive(Debug, Copy, Clone)]
1591pub enum UpdatePrivilegeVariant {
1592 Grant,
1593 Revoke,
1594}
1595
1596impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1597 fn from(variant: UpdatePrivilegeVariant) -> Self {
1598 match variant {
1599 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1600 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1601 }
1602 }
1603}
1604
1605impl From<UpdatePrivilegeVariant> for EventType {
1606 fn from(variant: UpdatePrivilegeVariant) -> Self {
1607 match variant {
1608 UpdatePrivilegeVariant::Grant => EventType::Grant,
1609 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1610 }
1611 }
1612}
1613
1614impl ConnCatalog<'_> {
1615 fn resolve_item_name(
1616 &self,
1617 name: &PartialItemName,
1618 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1619 self.resolve_item(name).map(|entry| entry.name())
1620 }
1621
1622 fn resolve_function_name(
1623 &self,
1624 name: &PartialItemName,
1625 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1626 self.resolve_function(name).map(|entry| entry.name())
1627 }
1628
1629 fn resolve_type_name(
1630 &self,
1631 name: &PartialItemName,
1632 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1633 self.resolve_type(name).map(|entry| entry.name())
1634 }
1635}
1636
1637impl ExprHumanizer for ConnCatalog<'_> {
1638 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1639 let entry = self.state.try_get_entry_by_global_id(&id)?;
1640 Some(self.resolve_full_name(entry.name()).to_string())
1641 }
1642
1643 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1644 let entry = self.state.try_get_entry_by_global_id(&id)?;
1645 Some(entry.name().item.clone())
1646 }
1647
1648 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1649 let entry = self.state.try_get_entry_by_global_id(&id)?;
1650 Some(self.resolve_full_name(entry.name()).into_parts())
1651 }
1652
1653 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1654 use SqlScalarType::*;
1655
1656 match typ {
1657 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1658 List {
1659 custom_id: Some(item_id),
1660 ..
1661 }
1662 | Map {
1663 custom_id: Some(item_id),
1664 ..
1665 } => {
1666 let item = self.get_item(item_id);
1667 self.minimal_qualification(item.name()).to_string()
1668 }
1669 List { element_type, .. } => {
1670 format!(
1671 "{} list",
1672 self.humanize_scalar_type(element_type, postgres_compat)
1673 )
1674 }
1675 Map { value_type, .. } => format!(
1676 "map[{}=>{}]",
1677 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1678 self.humanize_scalar_type(value_type, postgres_compat)
1679 ),
1680 Record {
1681 custom_id: Some(item_id),
1682 ..
1683 } => {
1684 let item = self.get_item(item_id);
1685 self.minimal_qualification(item.name()).to_string()
1686 }
1687 Record { fields, .. } => format!(
1688 "record({})",
1689 fields
1690 .iter()
1691 .map(|f| format!(
1692 "{}: {}",
1693 f.0,
1694 self.humanize_column_type(&f.1, postgres_compat)
1695 ))
1696 .join(",")
1697 ),
1698 PgLegacyChar => "\"char\"".into(),
1699 Char { length } if !postgres_compat => match length {
1700 None => "char".into(),
1701 Some(length) => format!("char({})", length.into_u32()),
1702 },
1703 VarChar { max_length } if !postgres_compat => match max_length {
1704 None => "varchar".into(),
1705 Some(length) => format!("varchar({})", length.into_u32()),
1706 },
1707 UInt16 => "uint2".into(),
1708 UInt32 => "uint4".into(),
1709 UInt64 => "uint8".into(),
1710 ty => {
1711 let pgrepr_type = mz_pgrepr::Type::from(ty);
1712 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1713
1714 let res = if self
1715 .effective_search_path(true)
1716 .iter()
1717 .any(|(_, schema)| schema == &pg_catalog_schema)
1718 {
1719 pgrepr_type.name().to_string()
1720 } else {
1721 let name = QualifiedItemName {
1724 qualifiers: ItemQualifiers {
1725 database_spec: ResolvedDatabaseSpecifier::Ambient,
1726 schema_spec: pg_catalog_schema,
1727 },
1728 item: pgrepr_type.name().to_string(),
1729 };
1730 self.resolve_full_name(&name).to_string()
1731 };
1732 res
1733 }
1734 }
1735 }
1736
1737 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1738 let entry = self.state.try_get_entry_by_global_id(&id)?;
1739
1740 match entry.index() {
1741 Some(index) => {
1742 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1743 let mut on_names = on_desc
1744 .iter_names()
1745 .map(|col_name| col_name.to_string())
1746 .collect::<Vec<_>>();
1747
1748 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1749
1750 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1754 let mut ix_names = vec![String::new(); ix_arity];
1755
1756 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1758 let on_name = on_names.get_mut(on_pos).expect("on_name");
1759 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1760 std::mem::swap(on_name, ix_name);
1761 }
1762
1763 Some(ix_names) }
1765 None => {
1766 let desc = self.state.try_get_desc_by_global_id(&id)?;
1767 let column_names = desc
1768 .iter_names()
1769 .map(|col_name| col_name.to_string())
1770 .collect();
1771
1772 Some(column_names)
1773 }
1774 }
1775 }
1776
1777 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1778 let desc = self.state.try_get_desc_by_global_id(&id)?;
1779 Some(desc.get_name(column).to_string())
1780 }
1781
1782 fn id_exists(&self, id: GlobalId) -> bool {
1783 self.state.entry_by_global_id.contains_key(&id)
1784 }
1785}
1786
1787impl SessionCatalog for ConnCatalog<'_> {
1788 fn active_role_id(&self) -> &RoleId {
1789 &self.role_id
1790 }
1791
1792 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1793 self.prepared_statements
1794 .as_ref()
1795 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1796 .flatten()
1797 }
1798
1799 fn active_database(&self) -> Option<&DatabaseId> {
1800 self.database.as_ref()
1801 }
1802
1803 fn active_cluster(&self) -> &str {
1804 &self.cluster
1805 }
1806
1807 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1808 &self.search_path
1809 }
1810
1811 fn resolve_database(
1812 &self,
1813 database_name: &str,
1814 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1815 Ok(self.state.resolve_database(database_name)?)
1816 }
1817
1818 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1819 self.state
1820 .database_by_id
1821 .get(id)
1822 .expect("database doesn't exist")
1823 }
1824
1825 #[allow(clippy::as_conversions)]
1827 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1828 self.state
1829 .database_by_id
1830 .values()
1831 .map(|database| database as &dyn CatalogDatabase)
1832 .collect()
1833 }
1834
1835 fn resolve_schema(
1836 &self,
1837 database_name: Option<&str>,
1838 schema_name: &str,
1839 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1840 Ok(self.state.resolve_schema(
1841 self.database.as_ref(),
1842 database_name,
1843 schema_name,
1844 &self.conn_id,
1845 )?)
1846 }
1847
1848 fn resolve_schema_in_database(
1849 &self,
1850 database_spec: &ResolvedDatabaseSpecifier,
1851 schema_name: &str,
1852 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1853 Ok(self
1854 .state
1855 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1856 }
1857
1858 fn get_schema(
1859 &self,
1860 database_spec: &ResolvedDatabaseSpecifier,
1861 schema_spec: &SchemaSpecifier,
1862 ) -> &dyn CatalogSchema {
1863 self.state
1864 .get_schema(database_spec, schema_spec, &self.conn_id)
1865 }
1866
1867 #[allow(clippy::as_conversions)]
1869 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1870 self.get_databases()
1871 .into_iter()
1872 .flat_map(|database| database.schemas().into_iter())
1873 .chain(
1874 self.state
1875 .ambient_schemas_by_id
1876 .values()
1877 .chain(self.state.temporary_schemas.values())
1878 .map(|schema| schema as &dyn CatalogSchema),
1879 )
1880 .collect()
1881 }
1882
1883 fn get_mz_internal_schema_id(&self) -> SchemaId {
1884 self.state().get_mz_internal_schema_id()
1885 }
1886
1887 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1888 self.state().get_mz_unsafe_schema_id()
1889 }
1890
1891 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1892 self.state.is_system_schema_specifier(schema)
1893 }
1894
1895 fn resolve_role(
1896 &self,
1897 role_name: &str,
1898 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1899 match self.state.try_get_role_by_name(role_name) {
1900 Some(role) => Ok(role),
1901 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1902 }
1903 }
1904
1905 fn resolve_network_policy(
1906 &self,
1907 policy_name: &str,
1908 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1909 match self.state.try_get_network_policy_by_name(policy_name) {
1910 Some(policy) => Ok(policy),
1911 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1912 }
1913 }
1914
1915 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1916 Some(self.state.roles_by_id.get(id)?)
1917 }
1918
1919 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1920 self.state.get_role(id)
1921 }
1922
1923 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1924 #[allow(clippy::as_conversions)]
1926 self.state
1927 .roles_by_id
1928 .values()
1929 .map(|role| role as &dyn CatalogRole)
1930 .collect()
1931 }
1932
1933 fn mz_system_role_id(&self) -> RoleId {
1934 MZ_SYSTEM_ROLE_ID
1935 }
1936
1937 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1938 self.state.collect_role_membership(id)
1939 }
1940
1941 fn get_network_policy(
1942 &self,
1943 id: &NetworkPolicyId,
1944 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1945 self.state.get_network_policy(id)
1946 }
1947
1948 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1949 #[allow(clippy::as_conversions)]
1951 self.state
1952 .network_policies_by_id
1953 .values()
1954 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1955 .collect()
1956 }
1957
1958 fn resolve_cluster(
1959 &self,
1960 cluster_name: Option<&str>,
1961 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1962 Ok(self
1963 .state
1964 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1965 }
1966
1967 fn resolve_cluster_replica(
1968 &self,
1969 cluster_replica_name: &QualifiedReplica,
1970 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1971 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1972 }
1973
1974 fn resolve_item(
1975 &self,
1976 name: &PartialItemName,
1977 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1978 let r = self.state.resolve_entry(
1979 self.database.as_ref(),
1980 &self.effective_search_path(true),
1981 name,
1982 &self.conn_id,
1983 )?;
1984 if self.unresolvable_ids.contains(&r.id()) {
1985 Err(SqlCatalogError::UnknownItem(name.to_string()))
1986 } else {
1987 Ok(r)
1988 }
1989 }
1990
1991 fn resolve_function(
1992 &self,
1993 name: &PartialItemName,
1994 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1995 let r = self.state.resolve_function(
1996 self.database.as_ref(),
1997 &self.effective_search_path(false),
1998 name,
1999 &self.conn_id,
2000 )?;
2001
2002 if self.unresolvable_ids.contains(&r.id()) {
2003 Err(SqlCatalogError::UnknownFunction {
2004 name: name.to_string(),
2005 alternative: None,
2006 })
2007 } else {
2008 Ok(r)
2009 }
2010 }
2011
2012 fn resolve_type(
2013 &self,
2014 name: &PartialItemName,
2015 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2016 let r = self.state.resolve_type(
2017 self.database.as_ref(),
2018 &self.effective_search_path(false),
2019 name,
2020 &self.conn_id,
2021 )?;
2022
2023 if self.unresolvable_ids.contains(&r.id()) {
2024 Err(SqlCatalogError::UnknownType {
2025 name: name.to_string(),
2026 })
2027 } else {
2028 Ok(r)
2029 }
2030 }
2031
2032 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2033 self.state.get_system_type(name)
2034 }
2035
2036 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2037 Some(self.state.try_get_entry(id)?)
2038 }
2039
2040 fn try_get_item_by_global_id(
2041 &self,
2042 id: &GlobalId,
2043 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2044 let entry = self.state.try_get_entry_by_global_id(id)?;
2045 let entry = match &entry.item {
2046 CatalogItem::Table(table) => {
2047 let (version, _gid) = table
2048 .collections
2049 .iter()
2050 .find(|(_version, gid)| *gid == id)
2051 .expect("catalog out of sync, mismatched GlobalId");
2052 entry.at_version(RelationVersionSelector::Specific(*version))
2053 }
2054 _ => entry.at_version(RelationVersionSelector::Latest),
2055 };
2056 Some(entry)
2057 }
2058
2059 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2060 self.state.get_entry(id)
2061 }
2062
2063 fn get_item_by_global_id(
2064 &self,
2065 id: &GlobalId,
2066 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2067 let entry = self.state.get_entry_by_global_id(id);
2068 let entry = match &entry.item {
2069 CatalogItem::Table(table) => {
2070 let (version, _gid) = table
2071 .collections
2072 .iter()
2073 .find(|(_version, gid)| *gid == id)
2074 .expect("catalog out of sync, mismatched GlobalId");
2075 entry.at_version(RelationVersionSelector::Specific(*version))
2076 }
2077 _ => entry.at_version(RelationVersionSelector::Latest),
2078 };
2079 entry
2080 }
2081
2082 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2083 self.get_schemas()
2084 .into_iter()
2085 .flat_map(|schema| schema.item_ids())
2086 .map(|id| self.get_item(&id))
2087 .collect()
2088 }
2089
2090 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2091 self.state
2092 .get_item_by_name(name, &self.conn_id)
2093 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2094 }
2095
2096 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2097 self.state
2098 .get_type_by_name(name, &self.conn_id)
2099 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2100 }
2101
2102 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2103 &self.state.clusters_by_id[&id]
2104 }
2105
2106 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2107 self.state
2108 .clusters_by_id
2109 .values()
2110 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2111 .collect()
2112 }
2113
2114 fn get_cluster_replica(
2115 &self,
2116 cluster_id: ClusterId,
2117 replica_id: ReplicaId,
2118 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2119 let cluster = self.get_cluster(cluster_id);
2120 cluster.replica(replica_id)
2121 }
2122
2123 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2124 self.get_clusters()
2125 .into_iter()
2126 .flat_map(|cluster| cluster.replicas().into_iter())
2127 .collect()
2128 }
2129
2130 fn get_system_privileges(&self) -> &PrivilegeMap {
2131 &self.state.system_privileges
2132 }
2133
2134 fn get_default_privileges(
2135 &self,
2136 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2137 self.state
2138 .default_privileges
2139 .iter()
2140 .map(|(object, acl_items)| (object, acl_items.collect()))
2141 .collect()
2142 }
2143
2144 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2145 self.state.find_available_name(name, &self.conn_id)
2146 }
2147
2148 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2149 self.state.resolve_full_name(name, Some(&self.conn_id))
2150 }
2151
2152 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2153 self.state.resolve_full_schema_name(name)
2154 }
2155
2156 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2157 self.state.get_entry_by_global_id(global_id).id()
2158 }
2159
2160 fn resolve_global_id(
2161 &self,
2162 item_id: &CatalogItemId,
2163 version: RelationVersionSelector,
2164 ) -> GlobalId {
2165 self.state
2166 .get_entry(item_id)
2167 .at_version(version)
2168 .global_id()
2169 }
2170
2171 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2172 self.state.config()
2173 }
2174
2175 fn now(&self) -> EpochMillis {
2176 (self.state.config().now)()
2177 }
2178
2179 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2180 self.state.aws_privatelink_availability_zones.clone()
2181 }
2182
2183 fn system_vars(&self) -> &SystemVars {
2184 &self.state.system_configuration
2185 }
2186
2187 fn system_vars_mut(&mut self) -> &mut SystemVars {
2188 &mut self.state.to_mut().system_configuration
2189 }
2190
2191 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2192 self.state().get_owner_id(id, self.conn_id())
2193 }
2194
2195 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2196 match id {
2197 SystemObjectId::System => Some(&self.state.system_privileges),
2198 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2199 Some(self.get_cluster(*id).privileges())
2200 }
2201 SystemObjectId::Object(ObjectId::Database(id)) => {
2202 Some(self.get_database(id).privileges())
2203 }
2204 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2205 Some(self.get_schema(database_spec, schema_spec).privileges())
2206 }
2207 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2208 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2209 Some(self.get_network_policy(id).privileges())
2210 }
2211 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2212 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2213 }
2214 }
2215
2216 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2217 let mut seen = BTreeSet::new();
2218 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2219 }
2220
2221 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2222 let mut seen = BTreeSet::new();
2223 self.state.item_dependents(id, &mut seen)
2224 }
2225
2226 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2227 rbac::all_object_privileges(object_type)
2228 }
2229
2230 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2231 self.state.get_object_type(object_id)
2232 }
2233
2234 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2235 self.state.get_system_object_type(id)
2236 }
2237
2238 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2241 let database_id = match &qualified_name.qualifiers.database_spec {
2242 ResolvedDatabaseSpecifier::Ambient => None,
2243 ResolvedDatabaseSpecifier::Id(id)
2244 if self.database.is_some() && self.database == Some(*id) =>
2245 {
2246 None
2247 }
2248 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2249 };
2250
2251 let schema_spec = if database_id.is_none()
2252 && self.resolve_item_name(&PartialItemName {
2253 database: None,
2254 schema: None,
2255 item: qualified_name.item.clone(),
2256 }) == Ok(qualified_name)
2257 || self.resolve_function_name(&PartialItemName {
2258 database: None,
2259 schema: None,
2260 item: qualified_name.item.clone(),
2261 }) == Ok(qualified_name)
2262 || self.resolve_type_name(&PartialItemName {
2263 database: None,
2264 schema: None,
2265 item: qualified_name.item.clone(),
2266 }) == Ok(qualified_name)
2267 {
2268 None
2269 } else {
2270 Some(qualified_name.qualifiers.schema_spec.clone())
2273 };
2274
2275 let res = PartialItemName {
2276 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2277 schema: schema_spec.map(|spec| {
2278 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2279 .name()
2280 .schema
2281 .clone()
2282 }),
2283 item: qualified_name.item.clone(),
2284 };
2285 assert!(
2286 self.resolve_item_name(&res) == Ok(qualified_name)
2287 || self.resolve_function_name(&res) == Ok(qualified_name)
2288 || self.resolve_type_name(&res) == Ok(qualified_name)
2289 );
2290 res
2291 }
2292
2293 fn add_notice(&self, notice: PlanNotice) {
2294 let _ = self.notices_tx.send(notice.into());
2295 }
2296
2297 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2298 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2299 self.state.comments.get_object_comments(comment_id)
2300 }
2301
2302 fn is_cluster_size_cc(&self, size: &str) -> bool {
2303 self.state
2304 .cluster_replica_sizes
2305 .0
2306 .get(size)
2307 .map_or(false, |a| a.is_cc)
2308 }
2309}
2310
2311#[cfg(test)]
2312mod tests {
2313 use std::collections::{BTreeMap, BTreeSet};
2314 use std::sync::Arc;
2315 use std::{env, iter};
2316
2317 use itertools::Itertools;
2318 use mz_catalog::memory::objects::CatalogItem;
2319 use tokio_postgres::NoTls;
2320 use tokio_postgres::types::Type;
2321 use uuid::Uuid;
2322
2323 use mz_catalog::SYSTEM_CONN_ID;
2324 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2325 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2326 use mz_controller_types::{ClusterId, ReplicaId};
2327 use mz_expr::MirScalarExpr;
2328 use mz_ore::now::to_datetime;
2329 use mz_ore::{assert_err, assert_ok, task};
2330 use mz_persist_client::PersistClient;
2331 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2332 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2333 use mz_repr::role_id::RoleId;
2334 use mz_repr::{
2335 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2336 SqlScalarType, Timestamp,
2337 };
2338 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2339 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2340 use mz_sql::names::{
2341 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2342 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2343 };
2344 use mz_sql::plan::{
2345 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2346 Scope, StatementContext,
2347 };
2348 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2349 use mz_sql::session::vars::{SystemVars, VarInput};
2350
2351 use crate::catalog::state::LocalExpressionCache;
2352 use crate::catalog::{Catalog, Op};
2353 use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2354 use crate::session::Session;
2355
2356 #[mz_ore::test(tokio::test)]
2363 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2365 Catalog::with_debug(|catalog| async move {
2366 struct TestCase {
2367 input: QualifiedItemName,
2368 system_output: PartialItemName,
2369 normal_output: PartialItemName,
2370 }
2371
2372 let test_cases = vec![
2373 TestCase {
2374 input: QualifiedItemName {
2375 qualifiers: ItemQualifiers {
2376 database_spec: ResolvedDatabaseSpecifier::Ambient,
2377 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2378 },
2379 item: "numeric".to_string(),
2380 },
2381 system_output: PartialItemName {
2382 database: None,
2383 schema: None,
2384 item: "numeric".to_string(),
2385 },
2386 normal_output: PartialItemName {
2387 database: None,
2388 schema: None,
2389 item: "numeric".to_string(),
2390 },
2391 },
2392 TestCase {
2393 input: QualifiedItemName {
2394 qualifiers: ItemQualifiers {
2395 database_spec: ResolvedDatabaseSpecifier::Ambient,
2396 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2397 },
2398 item: "mz_array_types".to_string(),
2399 },
2400 system_output: PartialItemName {
2401 database: None,
2402 schema: None,
2403 item: "mz_array_types".to_string(),
2404 },
2405 normal_output: PartialItemName {
2406 database: None,
2407 schema: None,
2408 item: "mz_array_types".to_string(),
2409 },
2410 },
2411 ];
2412
2413 for tc in test_cases {
2414 assert_eq!(
2415 catalog
2416 .for_system_session()
2417 .minimal_qualification(&tc.input),
2418 tc.system_output
2419 );
2420 assert_eq!(
2421 catalog
2422 .for_session(&Session::dummy())
2423 .minimal_qualification(&tc.input),
2424 tc.normal_output
2425 );
2426 }
2427 catalog.expire().await;
2428 })
2429 .await
2430 }
2431
2432 #[mz_ore::test(tokio::test)]
2433 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2435 let persist_client = PersistClient::new_for_tests().await;
2436 let organization_id = Uuid::new_v4();
2437 let bootstrap_args = test_bootstrap_args();
2438 {
2439 let mut catalog = Catalog::open_debug_catalog(
2440 persist_client.clone(),
2441 organization_id.clone(),
2442 &bootstrap_args,
2443 )
2444 .await
2445 .expect("unable to open debug catalog");
2446 assert_eq!(catalog.transient_revision(), 1);
2447 let commit_ts = catalog.current_upper().await;
2448 catalog
2449 .transact(
2450 None,
2451 commit_ts,
2452 None,
2453 vec![Op::CreateDatabase {
2454 name: "test".to_string(),
2455 owner_id: MZ_SYSTEM_ROLE_ID,
2456 }],
2457 )
2458 .await
2459 .expect("failed to transact");
2460 assert_eq!(catalog.transient_revision(), 2);
2461 catalog.expire().await;
2462 }
2463 {
2464 let catalog =
2465 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2466 .await
2467 .expect("unable to open debug catalog");
2468 assert_eq!(catalog.transient_revision(), 1);
2470 catalog.expire().await;
2471 }
2472 }
2473
2474 #[mz_ore::test(tokio::test)]
2475 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2477 Catalog::with_debug(|catalog| async move {
2478 let mz_catalog_schema = (
2479 ResolvedDatabaseSpecifier::Ambient,
2480 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2481 );
2482 let pg_catalog_schema = (
2483 ResolvedDatabaseSpecifier::Ambient,
2484 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2485 );
2486 let mz_temp_schema = (
2487 ResolvedDatabaseSpecifier::Ambient,
2488 SchemaSpecifier::Temporary,
2489 );
2490
2491 let session = Session::dummy();
2493 let conn_catalog = catalog.for_session(&session);
2494 assert_ne!(
2495 conn_catalog.effective_search_path(false),
2496 conn_catalog.search_path
2497 );
2498 assert_ne!(
2499 conn_catalog.effective_search_path(true),
2500 conn_catalog.search_path
2501 );
2502 assert_eq!(
2503 conn_catalog.effective_search_path(false),
2504 vec![
2505 mz_catalog_schema.clone(),
2506 pg_catalog_schema.clone(),
2507 conn_catalog.search_path[0].clone()
2508 ]
2509 );
2510 assert_eq!(
2511 conn_catalog.effective_search_path(true),
2512 vec![
2513 mz_temp_schema.clone(),
2514 mz_catalog_schema.clone(),
2515 pg_catalog_schema.clone(),
2516 conn_catalog.search_path[0].clone()
2517 ]
2518 );
2519
2520 let mut session = Session::dummy();
2522 session
2523 .vars_mut()
2524 .set(
2525 &SystemVars::new(),
2526 "search_path",
2527 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2528 false,
2529 )
2530 .expect("failed to set search_path");
2531 let conn_catalog = catalog.for_session(&session);
2532 assert_ne!(
2533 conn_catalog.effective_search_path(false),
2534 conn_catalog.search_path
2535 );
2536 assert_ne!(
2537 conn_catalog.effective_search_path(true),
2538 conn_catalog.search_path
2539 );
2540 assert_eq!(
2541 conn_catalog.effective_search_path(false),
2542 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2543 );
2544 assert_eq!(
2545 conn_catalog.effective_search_path(true),
2546 vec![
2547 mz_temp_schema.clone(),
2548 mz_catalog_schema.clone(),
2549 pg_catalog_schema.clone()
2550 ]
2551 );
2552
2553 let mut session = Session::dummy();
2554 session
2555 .vars_mut()
2556 .set(
2557 &SystemVars::new(),
2558 "search_path",
2559 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2560 false,
2561 )
2562 .expect("failed to set search_path");
2563 let conn_catalog = catalog.for_session(&session);
2564 assert_ne!(
2565 conn_catalog.effective_search_path(false),
2566 conn_catalog.search_path
2567 );
2568 assert_ne!(
2569 conn_catalog.effective_search_path(true),
2570 conn_catalog.search_path
2571 );
2572 assert_eq!(
2573 conn_catalog.effective_search_path(false),
2574 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2575 );
2576 assert_eq!(
2577 conn_catalog.effective_search_path(true),
2578 vec![
2579 mz_temp_schema.clone(),
2580 pg_catalog_schema.clone(),
2581 mz_catalog_schema.clone()
2582 ]
2583 );
2584
2585 let mut session = Session::dummy();
2586 session
2587 .vars_mut()
2588 .set(
2589 &SystemVars::new(),
2590 "search_path",
2591 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2592 false,
2593 )
2594 .expect("failed to set search_path");
2595 let conn_catalog = catalog.for_session(&session);
2596 assert_ne!(
2597 conn_catalog.effective_search_path(false),
2598 conn_catalog.search_path
2599 );
2600 assert_ne!(
2601 conn_catalog.effective_search_path(true),
2602 conn_catalog.search_path
2603 );
2604 assert_eq!(
2605 conn_catalog.effective_search_path(false),
2606 vec![
2607 mz_catalog_schema.clone(),
2608 pg_catalog_schema.clone(),
2609 mz_temp_schema.clone()
2610 ]
2611 );
2612 assert_eq!(
2613 conn_catalog.effective_search_path(true),
2614 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2615 );
2616 catalog.expire().await;
2617 })
2618 .await
2619 }
2620
2621 #[mz_ore::test(tokio::test)]
2622 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2624 use mz_ore::collections::CollectionExt;
2625 Catalog::with_debug(|catalog| async move {
2626 let conn_catalog = catalog.for_system_session();
2627 let scx = &mut StatementContext::new(None, &conn_catalog);
2628
2629 let parsed = mz_sql_parser::parser::parse_statements(
2630 "create view public.foo as select 1 as bar",
2631 )
2632 .expect("")
2633 .into_element()
2634 .ast;
2635
2636 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2637
2638 assert_eq!(
2640 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2641 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2642 );
2643 catalog.expire().await;
2644 })
2645 .await;
2646 }
2647
2648 #[mz_ore::test(tokio::test)]
2650 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2652 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2653 let column = ", 1";
2654 let view_def_size = view_def.bytes().count();
2655 let column_size = column.bytes().count();
2656 let column_count =
2657 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2658 let columns = iter::repeat(column).take(column_count).join("");
2659 let create_sql = format!("{view_def}{columns})");
2660 let create_sql_check = create_sql.clone();
2661 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2662 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2663 &create_sql
2664 ));
2665
2666 let persist_client = PersistClient::new_for_tests().await;
2667 let organization_id = Uuid::new_v4();
2668 let id = CatalogItemId::User(1);
2669 let gid = GlobalId::User(1);
2670 let bootstrap_args = test_bootstrap_args();
2671 {
2672 let mut catalog = Catalog::open_debug_catalog(
2673 persist_client.clone(),
2674 organization_id.clone(),
2675 &bootstrap_args,
2676 )
2677 .await
2678 .expect("unable to open debug catalog");
2679 let item = catalog
2680 .state()
2681 .deserialize_item(
2682 gid,
2683 &create_sql,
2684 &BTreeMap::new(),
2685 &mut LocalExpressionCache::Closed,
2686 None,
2687 )
2688 .expect("unable to parse view");
2689 let commit_ts = catalog.current_upper().await;
2690 catalog
2691 .transact(
2692 None,
2693 commit_ts,
2694 None,
2695 vec![Op::CreateItem {
2696 item,
2697 name: QualifiedItemName {
2698 qualifiers: ItemQualifiers {
2699 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2700 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2701 },
2702 item: "v".to_string(),
2703 },
2704 id,
2705 owner_id: MZ_SYSTEM_ROLE_ID,
2706 }],
2707 )
2708 .await
2709 .expect("failed to transact");
2710 catalog.expire().await;
2711 }
2712 {
2713 let catalog =
2714 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2715 .await
2716 .expect("unable to open debug catalog");
2717 let view = catalog.get_entry(&id);
2718 assert_eq!("v", view.name.item);
2719 match &view.item {
2720 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2721 item => panic!("expected view, got {}", item.typ()),
2722 }
2723 catalog.expire().await;
2724 }
2725 }
2726
2727 #[mz_ore::test(tokio::test)]
2728 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2730 Catalog::with_debug(|catalog| async move {
2731 let conn_catalog = catalog.for_system_session();
2732
2733 assert_eq!(
2734 mz_sql::catalog::ObjectType::ClusterReplica,
2735 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2736 ClusterId::user(1).expect("1 is a valid ID"),
2737 ReplicaId::User(1)
2738 )))
2739 );
2740 assert_eq!(
2741 mz_sql::catalog::ObjectType::Role,
2742 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2743 );
2744 catalog.expire().await;
2745 })
2746 .await;
2747 }
2748
2749 #[mz_ore::test(tokio::test)]
2750 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2752 Catalog::with_debug(|catalog| async move {
2753 let conn_catalog = catalog.for_system_session();
2754
2755 assert_eq!(
2756 None,
2757 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2758 ClusterId::user(1).expect("1 is a valid ID"),
2759 ReplicaId::User(1),
2760 ))))
2761 );
2762 assert_eq!(
2763 None,
2764 conn_catalog
2765 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2766 );
2767 catalog.expire().await;
2768 })
2769 .await;
2770 }
2771
2772 #[mz_ore::test(tokio::test)]
2773 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2775 Catalog::with_debug(|catalog| async move {
2776 let conn_catalog = catalog.for_system_session();
2777
2778 let builtins_cfg = BuiltinsConfig {
2779 include_continual_tasks: true,
2780 };
2781 for builtin in BUILTINS::iter(&builtins_cfg) {
2782 let (schema, name, expected_desc) = match builtin {
2783 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2784 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2785 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2786 Builtin::Log(_)
2787 | Builtin::Type(_)
2788 | Builtin::Func(_)
2789 | Builtin::ContinualTask(_)
2790 | Builtin::Index(_)
2791 | Builtin::Connection(_) => continue,
2792 };
2793 let item = conn_catalog
2794 .resolve_item(&PartialItemName {
2795 database: None,
2796 schema: Some(schema.to_string()),
2797 item: name.to_string(),
2798 })
2799 .expect("unable to resolve item")
2800 .at_version(RelationVersionSelector::Latest);
2801
2802 let full_name = conn_catalog.resolve_full_name(item.name());
2803 let actual_desc = item.desc(&full_name).expect("invalid item type");
2804 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2805 actual_desc.iter().zip(expected_desc.iter()).enumerate()
2806 {
2807 assert_eq!(
2808 actual_name, expected_name,
2809 "item {schema}.{name} column {index} name did not match its expected name"
2810 );
2811 assert_eq!(
2812 actual_typ, expected_typ,
2813 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2814 );
2815 }
2816 assert_eq!(
2817 &*actual_desc, expected_desc,
2818 "item {schema}.{name} did not match its expected RelationDesc"
2819 );
2820 }
2821 catalog.expire().await;
2822 })
2823 .await
2824 }
2825
2826 #[mz_ore::test(tokio::test)]
2829 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2831 async fn inner(catalog: Catalog) {
2832 let (client, connection) = tokio_postgres::connect(
2836 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2837 NoTls,
2838 )
2839 .await
2840 .expect("failed to connect to Postgres");
2841
2842 task::spawn(|| "compare_builtin_postgres", async move {
2843 if let Err(e) = connection.await {
2844 panic!("connection error: {}", e);
2845 }
2846 });
2847
2848 struct PgProc {
2849 name: String,
2850 arg_oids: Vec<u32>,
2851 ret_oid: Option<u32>,
2852 ret_set: bool,
2853 }
2854
2855 struct PgType {
2856 name: String,
2857 ty: String,
2858 elem: u32,
2859 array: u32,
2860 input: u32,
2861 receive: u32,
2862 }
2863
2864 struct PgOper {
2865 oprresult: u32,
2866 name: String,
2867 }
2868
2869 let pg_proc: BTreeMap<_, _> = client
2870 .query(
2871 "SELECT
2872 p.oid,
2873 proname,
2874 proargtypes,
2875 prorettype,
2876 proretset
2877 FROM pg_proc p
2878 JOIN pg_namespace n ON p.pronamespace = n.oid",
2879 &[],
2880 )
2881 .await
2882 .expect("pg query failed")
2883 .into_iter()
2884 .map(|row| {
2885 let oid: u32 = row.get("oid");
2886 let pg_proc = PgProc {
2887 name: row.get("proname"),
2888 arg_oids: row.get("proargtypes"),
2889 ret_oid: row.get("prorettype"),
2890 ret_set: row.get("proretset"),
2891 };
2892 (oid, pg_proc)
2893 })
2894 .collect();
2895
2896 let pg_type: BTreeMap<_, _> = client
2897 .query(
2898 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2899 &[],
2900 )
2901 .await
2902 .expect("pg query failed")
2903 .into_iter()
2904 .map(|row| {
2905 let oid: u32 = row.get("oid");
2906 let pg_type = PgType {
2907 name: row.get("typname"),
2908 ty: row.get("typtype"),
2909 elem: row.get("typelem"),
2910 array: row.get("typarray"),
2911 input: row.get("typinput"),
2912 receive: row.get("typreceive"),
2913 };
2914 (oid, pg_type)
2915 })
2916 .collect();
2917
2918 let pg_oper: BTreeMap<_, _> = client
2919 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2920 .await
2921 .expect("pg query failed")
2922 .into_iter()
2923 .map(|row| {
2924 let oid: u32 = row.get("oid");
2925 let pg_oper = PgOper {
2926 name: row.get("oprname"),
2927 oprresult: row.get("oprresult"),
2928 };
2929 (oid, pg_oper)
2930 })
2931 .collect();
2932
2933 let conn_catalog = catalog.for_system_session();
2934 let resolve_type_oid = |item: &str| {
2935 conn_catalog
2936 .resolve_type(&PartialItemName {
2937 database: None,
2938 schema: Some(PG_CATALOG_SCHEMA.into()),
2941 item: item.to_string(),
2942 })
2943 .expect("unable to resolve type")
2944 .oid()
2945 };
2946
2947 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2948 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2949 .collect();
2950
2951 let mut all_oids = BTreeSet::new();
2952
2953 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2956 [
2957 (Type::NAME, Type::TEXT),
2959 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2960 (Type::TIME, Type::TIMETZ),
2962 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2963 ]
2964 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2965 );
2966 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2967 1619, ]);
2969 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2970 if ignore_return_types.contains(&fn_oid) {
2971 return true;
2972 }
2973 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2974 return true;
2975 }
2976 a == b
2977 };
2978
2979 let builtins_cfg = BuiltinsConfig {
2980 include_continual_tasks: true,
2981 };
2982 for builtin in BUILTINS::iter(&builtins_cfg) {
2983 match builtin {
2984 Builtin::Type(ty) => {
2985 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
2986
2987 if ty.oid >= FIRST_MATERIALIZE_OID {
2988 continue;
2991 }
2992
2993 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
2996 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
2997 });
2998 assert_eq!(
2999 ty.name, pg_ty.name,
3000 "oid {} has name {} in postgres; expected {}",
3001 ty.oid, pg_ty.name, ty.name,
3002 );
3003
3004 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3005 None => (0, 0),
3006 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3007 };
3008 assert_eq!(
3009 typinput_oid, pg_ty.input,
3010 "type {} has typinput OID {:?} in mz but {:?} in pg",
3011 ty.name, typinput_oid, pg_ty.input,
3012 );
3013 assert_eq!(
3014 typreceive_oid, pg_ty.receive,
3015 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3016 ty.name, typreceive_oid, pg_ty.receive,
3017 );
3018 if typinput_oid != 0 {
3019 assert!(
3020 func_oids.contains(&typinput_oid),
3021 "type {} has typinput OID {} that does not exist in pg_proc",
3022 ty.name,
3023 typinput_oid,
3024 );
3025 }
3026 if typreceive_oid != 0 {
3027 assert!(
3028 func_oids.contains(&typreceive_oid),
3029 "type {} has typreceive OID {} that does not exist in pg_proc",
3030 ty.name,
3031 typreceive_oid,
3032 );
3033 }
3034
3035 match &ty.details.typ {
3037 CatalogType::Array { element_reference } => {
3038 let elem_ty = BUILTINS::iter(&builtins_cfg)
3039 .filter_map(|builtin| match builtin {
3040 Builtin::Type(ty @ BuiltinType { name, .. })
3041 if element_reference == name =>
3042 {
3043 Some(ty)
3044 }
3045 _ => None,
3046 })
3047 .next();
3048 let elem_ty = match elem_ty {
3049 Some(ty) => ty,
3050 None => {
3051 panic!("{} is unexpectedly not a type", element_reference)
3052 }
3053 };
3054 assert_eq!(
3055 pg_ty.elem, elem_ty.oid,
3056 "type {} has mismatched element OIDs",
3057 ty.name
3058 )
3059 }
3060 CatalogType::Pseudo => {
3061 assert_eq!(
3062 pg_ty.ty, "p",
3063 "type {} is not a pseudo type as expected",
3064 ty.name
3065 )
3066 }
3067 CatalogType::Range { .. } => {
3068 assert_eq!(
3069 pg_ty.ty, "r",
3070 "type {} is not a range type as expected",
3071 ty.name
3072 );
3073 }
3074 _ => {
3075 assert_eq!(
3076 pg_ty.ty, "b",
3077 "type {} is not a base type as expected",
3078 ty.name
3079 )
3080 }
3081 }
3082
3083 let schema = catalog
3085 .resolve_schema_in_database(
3086 &ResolvedDatabaseSpecifier::Ambient,
3087 ty.schema,
3088 &SYSTEM_CONN_ID,
3089 )
3090 .expect("unable to resolve schema");
3091 let allocated_type = catalog
3092 .resolve_type(
3093 None,
3094 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3095 &PartialItemName {
3096 database: None,
3097 schema: Some(schema.name().schema.clone()),
3098 item: ty.name.to_string(),
3099 },
3100 &SYSTEM_CONN_ID,
3101 )
3102 .expect("unable to resolve type");
3103 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3104 ty
3105 } else {
3106 panic!("unexpectedly not a type")
3107 };
3108 match ty.details.array_id {
3109 Some(array_id) => {
3110 let array_ty = catalog.get_entry(&array_id);
3111 assert_eq!(
3112 pg_ty.array, array_ty.oid,
3113 "type {} has mismatched array OIDs",
3114 allocated_type.name.item,
3115 );
3116 }
3117 None => assert_eq!(
3118 pg_ty.array, 0,
3119 "type {} does not have an array type in mz but does in pg",
3120 allocated_type.name.item,
3121 ),
3122 }
3123 }
3124 Builtin::Func(func) => {
3125 for imp in func.inner.func_impls() {
3126 assert!(
3127 all_oids.insert(imp.oid),
3128 "{} reused oid {}",
3129 func.name,
3130 imp.oid
3131 );
3132
3133 assert!(
3134 imp.oid < FIRST_USER_OID,
3135 "built-in function {} erroneously has OID in user space ({})",
3136 func.name,
3137 imp.oid,
3138 );
3139
3140 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3143 continue;
3144 } else {
3145 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3146 panic!(
3147 "pg_proc missing function {}: oid {}",
3148 func.name, imp.oid
3149 )
3150 })
3151 };
3152 assert_eq!(
3153 func.name, pg_fn.name,
3154 "funcs with oid {} don't match names: {} in mz, {} in pg",
3155 imp.oid, func.name, pg_fn.name
3156 );
3157
3158 let imp_arg_oids = imp
3161 .arg_typs
3162 .iter()
3163 .map(|item| resolve_type_oid(item))
3164 .collect::<Vec<_>>();
3165
3166 if imp_arg_oids != pg_fn.arg_oids {
3167 println!(
3168 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3169 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3170 );
3171 }
3172
3173 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3174
3175 assert!(
3176 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3177 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3178 imp.oid,
3179 func.name,
3180 imp_return_oid,
3181 pg_fn.ret_oid
3182 );
3183
3184 assert_eq!(
3185 imp.return_is_set, pg_fn.ret_set,
3186 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3187 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3188 );
3189 }
3190 }
3191 _ => (),
3192 }
3193 }
3194
3195 for (op, func) in OP_IMPLS.iter() {
3196 for imp in func.func_impls() {
3197 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3198
3199 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3201 continue;
3202 } else {
3203 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3204 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3205 })
3206 };
3207
3208 assert_eq!(*op, pg_op.name);
3209
3210 let imp_return_oid =
3211 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3212 if imp_return_oid != pg_op.oprresult {
3213 panic!(
3214 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3215 imp.oid, op, imp_return_oid, pg_op.oprresult
3216 );
3217 }
3218 }
3219 }
3220 catalog.expire().await;
3221 }
3222
3223 Catalog::with_debug(inner).await
3224 }
3225
3226 #[mz_ore::test(tokio::test)]
3228 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3230 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3231 let catalog = Arc::new(catalog);
3232 let conn_catalog = catalog.for_system_session();
3233
3234 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3235 let mut handles = Vec::new();
3236
3237 let ignore_names = BTreeSet::from([
3239 "avg",
3240 "avg_internal_v1",
3241 "bool_and",
3242 "bool_or",
3243 "has_table_privilege", "has_type_privilege", "mod",
3246 "mz_panic",
3247 "mz_sleep",
3248 "pow",
3249 "stddev_pop",
3250 "stddev_samp",
3251 "stddev",
3252 "var_pop",
3253 "var_samp",
3254 "variance",
3255 ]);
3256
3257 let fns = BUILTINS::funcs()
3258 .map(|func| (&func.name, func.inner))
3259 .chain(OP_IMPLS.iter());
3260
3261 for (name, func) in fns {
3262 if ignore_names.contains(name) {
3263 continue;
3264 }
3265 let Func::Scalar(impls) = func else {
3266 continue;
3267 };
3268
3269 'outer: for imp in impls {
3270 let details = imp.details();
3271 let mut styps = Vec::new();
3272 for item in details.arg_typs.iter() {
3273 let oid = resolve_type_oid(item);
3274 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3275 continue 'outer;
3276 };
3277 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3278 }
3279 let datums = styps
3280 .iter()
3281 .map(|styp| {
3282 let mut datums = vec![Datum::Null];
3283 datums.extend(styp.interesting_datums());
3284 datums
3285 })
3286 .collect::<Vec<_>>();
3287 if datums.is_empty() {
3289 continue;
3290 }
3291
3292 let return_oid = details
3293 .return_typ
3294 .map(resolve_type_oid)
3295 .expect("must exist");
3296 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3297 .ok()
3298 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3299
3300 let mut idxs = vec![0; datums.len()];
3301 while idxs[0] < datums[0].len() {
3302 let mut args = Vec::with_capacity(idxs.len());
3303 for i in 0..(datums.len()) {
3304 args.push(datums[i][idxs[i]]);
3305 }
3306
3307 let op = &imp.op;
3308 let scalars = args
3309 .iter()
3310 .enumerate()
3311 .map(|(i, datum)| {
3312 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3313 datum.clone(),
3314 styps[i].clone(),
3315 ))
3316 })
3317 .collect();
3318
3319 let call_name = format!(
3320 "{name}({}) (oid: {})",
3321 args.iter()
3322 .map(|d| d.to_string())
3323 .collect::<Vec<_>>()
3324 .join(", "),
3325 imp.oid
3326 );
3327 let catalog = Arc::clone(&catalog);
3328 let call_name_fn = call_name.clone();
3329 let return_styp = return_styp.clone();
3330 let handle = task::spawn_blocking(
3331 || call_name,
3332 move || {
3333 smoketest_fn(
3334 name,
3335 call_name_fn,
3336 op,
3337 imp,
3338 args,
3339 catalog,
3340 scalars,
3341 return_styp,
3342 )
3343 },
3344 );
3345 handles.push(handle);
3346
3347 for i in (0..datums.len()).rev() {
3349 idxs[i] += 1;
3350 if idxs[i] >= datums[i].len() {
3351 if i == 0 {
3352 break;
3353 }
3354 idxs[i] = 0;
3355 continue;
3356 } else {
3357 break;
3358 }
3359 }
3360 }
3361 }
3362 }
3363 handles
3364 }
3365
3366 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3367 for handle in handles {
3368 handle.await.expect("must succeed");
3369 }
3370 }
3371
3372 fn smoketest_fn(
3373 name: &&str,
3374 call_name: String,
3375 op: &Operation<HirScalarExpr>,
3376 imp: &FuncImpl<HirScalarExpr>,
3377 args: Vec<Datum<'_>>,
3378 catalog: Arc<Catalog>,
3379 scalars: Vec<CoercibleScalarExpr>,
3380 return_styp: Option<SqlScalarType>,
3381 ) {
3382 let conn_catalog = catalog.for_system_session();
3383 let pcx = PlanContext::zero();
3384 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3385 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3386 let ecx = ExprContext {
3387 qcx: &qcx,
3388 name: "smoketest",
3389 scope: &Scope::empty(),
3390 relation_type: &SqlRelationType::empty(),
3391 allow_aggregates: false,
3392 allow_subqueries: false,
3393 allow_parameters: false,
3394 allow_windows: false,
3395 };
3396 let arena = RowArena::new();
3397 let mut session = Session::<Timestamp>::dummy();
3398 session
3399 .start_transaction(to_datetime(0), None, None)
3400 .expect("must succeed");
3401 let prep_style = ExprPrepStyle::OneShot {
3402 logical_time: EvalTime::Time(Timestamp::MIN),
3403 session: &session,
3404 catalog_state: &catalog.state,
3405 };
3406
3407 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3410 if let Ok(hir) = res {
3411 if let Ok(mut mir) = hir.lower_uncorrelated() {
3412 prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3414
3415 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3416 if let Some(return_styp) = return_styp {
3417 let mir_typ = mir.typ(&[]);
3418 assert_eq!(mir_typ.scalar_type, return_styp);
3421 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3425 panic!(
3426 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3427 );
3428 }
3429 if let Some((introduces_nulls, propagates_nulls)) =
3432 call_introduces_propagates_nulls(&mir)
3433 {
3434 if introduces_nulls {
3435 assert!(
3439 mir_typ.nullable,
3440 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3441 name, args, mir, mir_typ.nullable
3442 );
3443 } else {
3444 let any_input_null = args.iter().any(|arg| arg.is_null());
3445 if !any_input_null {
3446 assert!(
3447 !mir_typ.nullable,
3448 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3449 name, args, mir, mir_typ.nullable
3450 );
3451 } else {
3452 assert_eq!(
3453 mir_typ.nullable, propagates_nulls,
3454 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3455 name, args, mir, mir_typ.nullable
3456 );
3457 }
3458 }
3459 }
3460 let mut reduced = mir.clone();
3463 reduced.reduce(&[]);
3464 match reduced {
3465 MirScalarExpr::Literal(reduce_result, ctyp) => {
3466 match reduce_result {
3467 Ok(reduce_result_row) => {
3468 let reduce_result_datum = reduce_result_row.unpack_first();
3469 assert_eq!(
3470 reduce_result_datum,
3471 eval_result_datum,
3472 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3473 name,
3474 args,
3475 mir,
3476 eval_result_datum,
3477 mir_typ.scalar_type,
3478 reduce_result_datum,
3479 ctyp.scalar_type
3480 );
3481 assert_eq!(
3487 ctyp.scalar_type,
3488 mir_typ.scalar_type,
3489 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3490 name,
3491 args,
3492 mir,
3493 eval_result_datum,
3494 mir_typ.scalar_type,
3495 reduce_result_datum,
3496 ctyp.scalar_type
3497 );
3498 }
3499 Err(..) => {} }
3501 }
3502 _ => unreachable!(
3503 "all args are literals, so should have reduced to a literal"
3504 ),
3505 }
3506 }
3507 }
3508 }
3509 }
3510 }
3511
3512 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3517 match mir_func_call {
3518 MirScalarExpr::CallUnary { func, expr } => {
3519 if expr.is_literal() {
3520 Some((func.introduces_nulls(), func.propagates_nulls()))
3521 } else {
3522 None
3523 }
3524 }
3525 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3526 if expr1.is_literal() && expr2.is_literal() {
3527 Some((func.introduces_nulls(), func.propagates_nulls()))
3528 } else {
3529 None
3530 }
3531 }
3532 MirScalarExpr::CallVariadic { func, exprs } => {
3533 if exprs.iter().all(|arg| arg.is_literal()) {
3534 Some((func.introduces_nulls(), func.propagates_nulls()))
3535 } else {
3536 None
3537 }
3538 }
3539 _ => None,
3540 }
3541 }
3542
3543 #[mz_ore::test(tokio::test)]
3545 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3547 Catalog::with_debug(|catalog| async move {
3548 let conn_catalog = catalog.for_system_session();
3549
3550 for view in BUILTINS::views().filter(|view| {
3551 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3552 }) {
3553 let item = conn_catalog
3554 .resolve_item(&PartialItemName {
3555 database: None,
3556 schema: Some(view.schema.to_string()),
3557 item: view.name.to_string(),
3558 })
3559 .expect("unable to resolve view")
3560 .at_version(RelationVersionSelector::Latest);
3562 let full_name = conn_catalog.resolve_full_name(item.name());
3563 for col_type in item
3564 .desc(&full_name)
3565 .expect("invalid item type")
3566 .iter_types()
3567 {
3568 match &col_type.scalar_type {
3569 typ @ SqlScalarType::UInt16
3570 | typ @ SqlScalarType::UInt32
3571 | typ @ SqlScalarType::UInt64
3572 | typ @ SqlScalarType::MzTimestamp
3573 | typ @ SqlScalarType::List { .. }
3574 | typ @ SqlScalarType::Map { .. }
3575 | typ @ SqlScalarType::MzAclItem => {
3576 panic!("{typ:?} type found in {full_name}");
3577 }
3578 SqlScalarType::AclItem
3579 | SqlScalarType::Bool
3580 | SqlScalarType::Int16
3581 | SqlScalarType::Int32
3582 | SqlScalarType::Int64
3583 | SqlScalarType::Float32
3584 | SqlScalarType::Float64
3585 | SqlScalarType::Numeric { .. }
3586 | SqlScalarType::Date
3587 | SqlScalarType::Time
3588 | SqlScalarType::Timestamp { .. }
3589 | SqlScalarType::TimestampTz { .. }
3590 | SqlScalarType::Interval
3591 | SqlScalarType::PgLegacyChar
3592 | SqlScalarType::Bytes
3593 | SqlScalarType::String
3594 | SqlScalarType::Char { .. }
3595 | SqlScalarType::VarChar { .. }
3596 | SqlScalarType::Jsonb
3597 | SqlScalarType::Uuid
3598 | SqlScalarType::Array(_)
3599 | SqlScalarType::Record { .. }
3600 | SqlScalarType::Oid
3601 | SqlScalarType::RegProc
3602 | SqlScalarType::RegType
3603 | SqlScalarType::RegClass
3604 | SqlScalarType::Int2Vector
3605 | SqlScalarType::Range { .. }
3606 | SqlScalarType::PgLegacyName => {}
3607 }
3608 }
3609 }
3610 catalog.expire().await;
3611 })
3612 .await
3613 }
3614
3615 #[mz_ore::test(tokio::test)]
3618 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3620 Catalog::with_debug(|catalog| async move {
3621 let conn_catalog = catalog.for_system_session();
3622
3623 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3624 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3625
3626 for entry in catalog.entries() {
3627 let schema_spec = entry.name().qualifiers.schema_spec;
3628 let introspection_deps = catalog.introspection_dependencies(entry.id);
3629 if introspection_deps.is_empty() {
3630 assert!(
3631 schema_spec != introspection_schema_spec,
3632 "entry does not depend on introspection sources but is in \
3633 `mz_introspection`: {}",
3634 conn_catalog.resolve_full_name(entry.name()),
3635 );
3636 } else {
3637 assert!(
3638 schema_spec == introspection_schema_spec,
3639 "entry depends on introspection sources but is not in \
3640 `mz_introspection`: {}",
3641 conn_catalog.resolve_full_name(entry.name()),
3642 );
3643 }
3644 }
3645 })
3646 .await
3647 }
3648
3649 #[mz_ore::test(tokio::test)]
3650 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3652 let persist_client = PersistClient::new_for_tests().await;
3653 let bootstrap_args = test_bootstrap_args();
3654 let organization_id = Uuid::new_v4();
3655 let db_name = "DB";
3656
3657 let mut writer_catalog = Catalog::open_debug_catalog(
3658 persist_client.clone(),
3659 organization_id.clone(),
3660 &bootstrap_args,
3661 )
3662 .await
3663 .expect("open_debug_catalog");
3664 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3665 persist_client.clone(),
3666 organization_id.clone(),
3667 &bootstrap_args,
3668 )
3669 .await
3670 .expect("open_debug_read_only_catalog");
3671 assert_err!(writer_catalog.resolve_database(db_name));
3672 assert_err!(read_only_catalog.resolve_database(db_name));
3673
3674 let commit_ts = writer_catalog.current_upper().await;
3675 writer_catalog
3676 .transact(
3677 None,
3678 commit_ts,
3679 None,
3680 vec![Op::CreateDatabase {
3681 name: db_name.to_string(),
3682 owner_id: MZ_SYSTEM_ROLE_ID,
3683 }],
3684 )
3685 .await
3686 .expect("failed to transact");
3687
3688 let write_db = writer_catalog
3689 .resolve_database(db_name)
3690 .expect("resolve_database");
3691 read_only_catalog
3692 .sync_to_current_updates()
3693 .await
3694 .expect("sync_to_current_updates");
3695 let read_db = read_only_catalog
3696 .resolve_database(db_name)
3697 .expect("resolve_database");
3698
3699 assert_eq!(write_db, read_db);
3700
3701 let writer_catalog_fencer =
3702 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3703 .await
3704 .expect("open_debug_catalog for fencer");
3705 let fencer_db = writer_catalog_fencer
3706 .resolve_database(db_name)
3707 .expect("resolve_database for fencer");
3708 assert_eq!(fencer_db, read_db);
3709
3710 let write_fence_err = writer_catalog
3711 .sync_to_current_updates()
3712 .await
3713 .expect_err("sync_to_current_updates for fencer");
3714 assert!(matches!(
3715 write_fence_err,
3716 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3717 ));
3718 let read_fence_err = read_only_catalog
3719 .sync_to_current_updates()
3720 .await
3721 .expect_err("sync_to_current_updates after fencer");
3722 assert!(matches!(
3723 read_fence_err,
3724 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3725 ));
3726
3727 writer_catalog.expire().await;
3728 read_only_catalog.expire().await;
3729 writer_catalog_fencer.expire().await;
3730 }
3731}