1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use tracing::error;
89use uuid::Uuid;
90
91pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
93pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
94pub use crate::catalog::state::CatalogState;
95pub use crate::catalog::transact::{
96 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
97};
98use crate::command::CatalogDump;
99use crate::coord::TargetCluster;
100#[cfg(test)]
101use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
102use crate::session::{Portal, PreparedStatement, Session};
103use crate::util::ResultExt;
104use crate::{AdapterError, AdapterNotice, ExecuteResponse};
105
106mod builtin_table_updates;
107pub(crate) mod consistency;
108mod migrate;
109
110mod apply;
111mod open;
112mod state;
113mod timeline;
114mod transact;
115
116#[derive(Debug)]
139pub struct Catalog {
140 state: CatalogState,
141 plans: CatalogPlans,
142 expr_cache_handle: Option<ExpressionCacheHandle>,
143 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
144 transient_revision: u64,
145}
146
147impl Clone for Catalog {
150 fn clone(&self) -> Self {
151 Self {
152 state: self.state.clone(),
153 plans: self.plans.clone(),
154 expr_cache_handle: self.expr_cache_handle.clone(),
155 storage: Arc::clone(&self.storage),
156 transient_revision: self.transient_revision,
157 }
158 }
159}
160
161#[derive(Default, Debug, Clone)]
162pub struct CatalogPlans {
163 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
164 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
165 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
166 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
167}
168
169impl Catalog {
170 #[mz_ore::instrument(level = "trace")]
172 pub fn set_optimized_plan(
173 &mut self,
174 id: GlobalId,
175 plan: DataflowDescription<OptimizedMirRelationExpr>,
176 ) {
177 self.plans.optimized_plan_by_id.insert(id, plan.into());
178 }
179
180 #[mz_ore::instrument(level = "trace")]
182 pub fn set_physical_plan(
183 &mut self,
184 id: GlobalId,
185 plan: DataflowDescription<mz_compute_types::plan::Plan>,
186 ) {
187 self.plans.physical_plan_by_id.insert(id, plan.into());
188 }
189
190 #[mz_ore::instrument(level = "trace")]
192 pub fn try_get_optimized_plan(
193 &self,
194 id: &GlobalId,
195 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
196 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
197 }
198
199 #[mz_ore::instrument(level = "trace")]
201 pub fn try_get_physical_plan(
202 &self,
203 id: &GlobalId,
204 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
205 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
206 }
207
208 #[mz_ore::instrument(level = "trace")]
210 pub fn set_dataflow_metainfo(
211 &mut self,
212 id: GlobalId,
213 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
214 ) {
215 for notice in metainfo.optimizer_notices.iter() {
217 for dep_id in notice.dependencies.iter() {
218 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
219 entry.push(Arc::clone(notice))
220 }
221 if let Some(item_id) = notice.item_id {
222 soft_assert_eq_or_log!(
223 item_id,
224 id,
225 "notice.item_id should match the id for whom we are saving the notice"
226 );
227 }
228 }
229 self.plans.dataflow_metainfos.insert(id, metainfo);
231 }
232
233 #[mz_ore::instrument(level = "trace")]
235 pub fn try_get_dataflow_metainfo(
236 &self,
237 id: &GlobalId,
238 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
239 self.plans.dataflow_metainfos.get(id)
240 }
241
242 #[mz_ore::instrument(level = "trace")]
251 pub fn drop_plans_and_metainfos(
252 &mut self,
253 drop_ids: &BTreeSet<GlobalId>,
254 ) -> BTreeSet<Arc<OptimizerNotice>> {
255 let mut dropped_notices = BTreeSet::new();
257
258 for id in drop_ids {
260 self.plans.optimized_plan_by_id.remove(id);
261 self.plans.physical_plan_by_id.remove(id);
262 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
263 soft_assert_or_log!(
264 metainfo.optimizer_notices.iter().all_unique(),
265 "should have been pushed there by `push_optimizer_notice_dedup`"
266 );
267 for n in metainfo.optimizer_notices.drain(..) {
268 for dep_id in n.dependencies.iter() {
270 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
271 soft_assert_or_log!(
272 notices.iter().any(|x| &n == x),
273 "corrupt notices_by_dep_id"
274 );
275 notices.retain(|x| &n != x)
276 }
277 }
278 dropped_notices.insert(n);
279 }
280 }
281 }
282
283 for id in drop_ids {
285 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
286 for n in notices.drain(..) {
287 if let Some(item_id) = n.item_id.as_ref() {
289 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
290 metainfo.optimizer_notices.iter().for_each(|n2| {
291 if let Some(item_id_2) = n2.item_id {
292 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
293 }
294 });
295 metainfo.optimizer_notices.retain(|x| &n != x);
296 }
297 }
298 dropped_notices.insert(n);
299 }
300 }
301 }
302
303 let mut todo_dep_ids = BTreeSet::new();
306 for notice in dropped_notices.iter() {
307 for dep_id in notice.dependencies.iter() {
308 if !drop_ids.contains(dep_id) {
309 todo_dep_ids.insert(*dep_id);
310 }
311 }
312 }
313 for id in todo_dep_ids {
316 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
317 notices.retain(|n| !dropped_notices.contains(n))
318 }
319 }
320
321 if dropped_notices.iter().any(|n| Arc::strong_count(n) != 1) {
322 use mz_ore::str::{bracketed, separated};
323 let bad_notices = dropped_notices.iter().filter(|n| Arc::strong_count(n) != 1);
324 let bad_notices = bad_notices.map(|n| {
325 let mut dataflow_metainfo_occurrences = Vec::new();
328 for (id, meta_info) in self.plans.dataflow_metainfos.iter() {
329 if meta_info.optimizer_notices.contains(n) {
330 dataflow_metainfo_occurrences.push(id);
331 }
332 }
333 let mut notices_by_dep_id_occurrences = Vec::new();
335 for (id, notices) in self.plans.notices_by_dep_id.iter() {
336 if notices.iter().contains(n) {
337 notices_by_dep_id_occurrences.push(id);
338 }
339 }
340 format!(
341 "(id = {}, kind = {:?}, deps = {:?}, strong_count = {}, \
342 dataflow_metainfo_occurrences = {:?}, notices_by_dep_id_occurrences = {:?})",
343 n.id,
344 n.kind,
345 n.dependencies,
346 Arc::strong_count(n),
347 dataflow_metainfo_occurrences,
348 notices_by_dep_id_occurrences
349 )
350 });
351 let bad_notices = bracketed("{", "}", separated(", ", bad_notices));
352 error!(
353 "all dropped_notices entries should have `Arc::strong_count(_) == 1`; \
354 bad_notices = {bad_notices}; \
355 drop_ids = {drop_ids:?}"
356 );
357 }
358
359 dropped_notices
360 }
361
362 pub(crate) fn invalidate_for_index(
373 &self,
374 ons: impl Iterator<Item = GlobalId>,
375 ) -> BTreeSet<GlobalId> {
376 let mut dependencies = BTreeSet::new();
377 let mut queue = VecDeque::new();
378 let mut seen = HashSet::new();
379 for on in ons {
380 let entry = self.get_entry_by_global_id(&on);
381 dependencies.insert(on);
382 seen.insert(entry.id);
383 let uses = entry.uses();
384 queue.extend(uses.clone());
385 }
386
387 while let Some(cur) = queue.pop_front() {
388 let entry = self.get_entry(&cur);
389 if seen.insert(cur) {
390 let global_ids = entry.global_ids();
391 match entry.item_type() {
392 CatalogItemType::Table
393 | CatalogItemType::Source
394 | CatalogItemType::MaterializedView
395 | CatalogItemType::Sink
396 | CatalogItemType::Index
397 | CatalogItemType::Type
398 | CatalogItemType::Func
399 | CatalogItemType::Secret
400 | CatalogItemType::Connection
401 | CatalogItemType::ContinualTask => {
402 dependencies.extend(global_ids);
403 }
404 CatalogItemType::View => {
405 dependencies.extend(global_ids);
406 queue.extend(entry.uses());
407 }
408 }
409 }
410 }
411 dependencies
412 }
413}
414
415#[derive(Debug)]
416pub struct ConnCatalog<'a> {
417 state: Cow<'a, CatalogState>,
418 unresolvable_ids: BTreeSet<CatalogItemId>,
428 conn_id: ConnectionId,
429 cluster: String,
430 database: Option<DatabaseId>,
431 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
432 role_id: RoleId,
433 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
434 portals: Option<&'a BTreeMap<String, Portal>>,
435 notices_tx: UnboundedSender<AdapterNotice>,
436}
437
438impl ConnCatalog<'_> {
439 pub fn conn_id(&self) -> &ConnectionId {
440 &self.conn_id
441 }
442
443 pub fn state(&self) -> &CatalogState {
444 &*self.state
445 }
446
447 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
457 assert_eq!(
458 self.role_id, MZ_SYSTEM_ROLE_ID,
459 "only the system role can mark IDs unresolvable",
460 );
461 self.unresolvable_ids.insert(id);
462 }
463
464 pub fn effective_search_path(
470 &self,
471 include_temp_schema: bool,
472 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
473 self.state
474 .effective_search_path(&self.search_path, include_temp_schema)
475 }
476}
477
478impl ConnectionResolver for ConnCatalog<'_> {
479 fn resolve_connection(
480 &self,
481 id: CatalogItemId,
482 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
483 self.state().resolve_connection(id)
484 }
485}
486
487impl Catalog {
488 pub fn transient_revision(&self) -> u64 {
492 self.transient_revision
493 }
494
495 pub async fn with_debug<F, Fut, T>(f: F) -> T
507 where
508 F: FnOnce(Catalog) -> Fut,
509 Fut: Future<Output = T>,
510 {
511 let persist_client = PersistClient::new_for_tests().await;
512 let organization_id = Uuid::new_v4();
513 let bootstrap_args = test_bootstrap_args();
514 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
515 .await
516 .expect("can open debug catalog");
517 f(catalog).await
518 }
519
520 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
523 where
524 F: FnOnce(Catalog) -> Fut,
525 Fut: Future<Output = T>,
526 {
527 let persist_client = PersistClient::new_for_tests().await;
528 let organization_id = Uuid::new_v4();
529 let bootstrap_args = test_bootstrap_args();
530 let mut catalog =
531 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
532 .await
533 .expect("can open debug catalog");
534
535 let now = SYSTEM_TIME.clone();
537 let openable_storage = TestCatalogStateBuilder::new(persist_client)
538 .with_organization_id(organization_id)
539 .with_default_deploy_generation()
540 .build()
541 .await
542 .expect("can create durable catalog");
543 let mut storage = openable_storage
544 .open(now().into(), &bootstrap_args)
545 .await
546 .expect("can open durable catalog")
547 .0;
548 let _ = storage
550 .sync_to_current_updates()
551 .await
552 .expect("can sync to current updates");
553 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
554
555 f(catalog).await
556 }
557
558 pub async fn open_debug_catalog(
562 persist_client: PersistClient,
563 organization_id: Uuid,
564 bootstrap_args: &BootstrapArgs,
565 ) -> Result<Catalog, anyhow::Error> {
566 let now = SYSTEM_TIME.clone();
567 let environment_id = None;
568 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
569 .with_organization_id(organization_id)
570 .with_default_deploy_generation()
571 .build()
572 .await?;
573 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
574 let system_parameter_defaults = BTreeMap::default();
575 Self::open_debug_catalog_inner(
576 persist_client,
577 storage,
578 now,
579 environment_id,
580 &DUMMY_BUILD_INFO,
581 system_parameter_defaults,
582 bootstrap_args,
583 None,
584 )
585 .await
586 }
587
588 pub async fn open_debug_read_only_catalog(
593 persist_client: PersistClient,
594 organization_id: Uuid,
595 bootstrap_args: &BootstrapArgs,
596 ) -> Result<Catalog, anyhow::Error> {
597 let now = SYSTEM_TIME.clone();
598 let environment_id = None;
599 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
600 .with_organization_id(organization_id)
601 .build()
602 .await?;
603 let storage = openable_storage
604 .open_read_only(&test_bootstrap_args())
605 .await?;
606 let system_parameter_defaults = BTreeMap::default();
607 Self::open_debug_catalog_inner(
608 persist_client,
609 storage,
610 now,
611 environment_id,
612 &DUMMY_BUILD_INFO,
613 system_parameter_defaults,
614 bootstrap_args,
615 None,
616 )
617 .await
618 }
619
620 pub async fn open_debug_read_only_persist_catalog_config(
625 persist_client: PersistClient,
626 now: NowFn,
627 environment_id: EnvironmentId,
628 system_parameter_defaults: BTreeMap<String, String>,
629 build_info: &'static BuildInfo,
630 bootstrap_args: &BootstrapArgs,
631 enable_expression_cache_override: Option<bool>,
632 ) -> Result<Catalog, anyhow::Error> {
633 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
634 .with_organization_id(environment_id.organization_id())
635 .with_version(
636 build_info
637 .version
638 .parse()
639 .expect("build version is parseable"),
640 )
641 .build()
642 .await?;
643 let storage = openable_storage.open_read_only(bootstrap_args).await?;
644 Self::open_debug_catalog_inner(
645 persist_client,
646 storage,
647 now,
648 Some(environment_id),
649 build_info,
650 system_parameter_defaults,
651 bootstrap_args,
652 enable_expression_cache_override,
653 )
654 .await
655 }
656
657 async fn open_debug_catalog_inner(
658 persist_client: PersistClient,
659 storage: Box<dyn DurableCatalogState>,
660 now: NowFn,
661 environment_id: Option<EnvironmentId>,
662 build_info: &'static BuildInfo,
663 system_parameter_defaults: BTreeMap<String, String>,
664 bootstrap_args: &BootstrapArgs,
665 enable_expression_cache_override: Option<bool>,
666 ) -> Result<Catalog, anyhow::Error> {
667 let metrics_registry = &MetricsRegistry::new();
668 let secrets_reader = Arc::new(InMemorySecretsController::new());
669 let previous_ts = now().into();
672 let replica_size = &bootstrap_args.default_cluster_replica_size;
673 let read_only = false;
674
675 let OpenCatalogResult {
676 catalog,
677 migrated_storage_collections_0dt: _,
678 new_builtin_collections: _,
679 builtin_table_updates: _,
680 cached_global_exprs: _,
681 uncached_local_exprs: _,
682 } = Catalog::open(Config {
683 storage,
684 metrics_registry,
685 state: StateConfig {
686 unsafe_mode: true,
687 all_features: false,
688 build_info,
689 deploy_generation: 0,
690 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
691 read_only,
692 now,
693 boot_ts: previous_ts,
694 skip_migrations: true,
695 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
696 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
697 size: replica_size.clone(),
698 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
699 },
700 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
701 size: replica_size.clone(),
702 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
703 },
704 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
705 size: replica_size.clone(),
706 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
707 },
708 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
709 size: replica_size.clone(),
710 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
711 },
712 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
713 size: replica_size.clone(),
714 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
715 },
716 system_parameter_defaults,
717 remote_system_parameters: None,
718 availability_zones: vec![],
719 egress_addresses: vec![],
720 aws_principal_context: None,
721 aws_privatelink_availability_zones: None,
722 http_host_name: None,
723 connection_context: ConnectionContext::for_tests(secrets_reader),
724 builtin_item_migration_config: BuiltinItemMigrationConfig {
725 persist_client: persist_client.clone(),
726 read_only,
727 force_migration: None,
728 },
729 persist_client,
730 enable_expression_cache_override,
731 helm_chart_version: None,
732 external_login_password_mz_system: None,
733 license_key: ValidatedLicenseKey::for_tests(),
734 },
735 })
736 .await?;
737 Ok(catalog)
738 }
739
740 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
741 self.state.for_session(session)
742 }
743
744 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
745 self.state.for_sessionless_user(role_id)
746 }
747
748 pub fn for_system_session(&self) -> ConnCatalog<'_> {
749 self.state.for_system_session()
750 }
751
752 async fn storage<'a>(
753 &'a self,
754 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
755 self.storage.lock().await
756 }
757
758 pub async fn current_upper(&self) -> mz_repr::Timestamp {
759 self.storage().await.current_upper().await
760 }
761
762 pub async fn allocate_user_id(
763 &self,
764 commit_ts: mz_repr::Timestamp,
765 ) -> Result<(CatalogItemId, GlobalId), Error> {
766 self.storage()
767 .await
768 .allocate_user_id(commit_ts)
769 .await
770 .maybe_terminate("allocating user ids")
771 .err_into()
772 }
773
774 pub async fn allocate_user_ids(
776 &self,
777 amount: u64,
778 commit_ts: mz_repr::Timestamp,
779 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
780 self.storage()
781 .await
782 .allocate_user_ids(amount, commit_ts)
783 .await
784 .maybe_terminate("allocating user ids")
785 .err_into()
786 }
787
788 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
789 let commit_ts = self.storage().await.current_upper().await;
790 self.allocate_user_id(commit_ts).await
791 }
792
793 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
795 self.storage()
796 .await
797 .get_next_user_item_id()
798 .await
799 .err_into()
800 }
801
802 #[cfg(test)]
803 pub async fn allocate_system_id(
804 &self,
805 commit_ts: mz_repr::Timestamp,
806 ) -> Result<(CatalogItemId, GlobalId), Error> {
807 use mz_ore::collections::CollectionExt;
808
809 let mut storage = self.storage().await;
810 let mut txn = storage.transaction().await?;
811 let id = txn
812 .allocate_system_item_ids(1)
813 .maybe_terminate("allocating system ids")?
814 .into_element();
815 let _ = txn.get_and_commit_op_updates();
817 txn.commit(commit_ts).await?;
818 Ok(id)
819 }
820
821 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
823 self.storage()
824 .await
825 .get_next_system_item_id()
826 .await
827 .err_into()
828 }
829
830 pub async fn allocate_user_cluster_id(
831 &self,
832 commit_ts: mz_repr::Timestamp,
833 ) -> Result<ClusterId, Error> {
834 self.storage()
835 .await
836 .allocate_user_cluster_id(commit_ts)
837 .await
838 .maybe_terminate("allocating user cluster ids")
839 .err_into()
840 }
841
842 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
844 self.storage()
845 .await
846 .get_next_system_replica_id()
847 .await
848 .err_into()
849 }
850
851 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
853 self.storage()
854 .await
855 .get_next_user_replica_id()
856 .await
857 .err_into()
858 }
859
860 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
861 self.state.resolve_database(database_name)
862 }
863
864 pub fn resolve_schema(
865 &self,
866 current_database: Option<&DatabaseId>,
867 database_name: Option<&str>,
868 schema_name: &str,
869 conn_id: &ConnectionId,
870 ) -> Result<&Schema, SqlCatalogError> {
871 self.state
872 .resolve_schema(current_database, database_name, schema_name, conn_id)
873 }
874
875 pub fn resolve_schema_in_database(
876 &self,
877 database_spec: &ResolvedDatabaseSpecifier,
878 schema_name: &str,
879 conn_id: &ConnectionId,
880 ) -> Result<&Schema, SqlCatalogError> {
881 self.state
882 .resolve_schema_in_database(database_spec, schema_name, conn_id)
883 }
884
885 pub fn resolve_replica_in_cluster(
886 &self,
887 cluster_id: &ClusterId,
888 replica_name: &str,
889 ) -> Result<&ClusterReplica, SqlCatalogError> {
890 self.state
891 .resolve_replica_in_cluster(cluster_id, replica_name)
892 }
893
894 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
895 self.state.resolve_system_schema(name)
896 }
897
898 pub fn resolve_search_path(
899 &self,
900 session: &Session,
901 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
902 self.state.resolve_search_path(session)
903 }
904
905 pub fn resolve_entry(
907 &self,
908 current_database: Option<&DatabaseId>,
909 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
910 name: &PartialItemName,
911 conn_id: &ConnectionId,
912 ) -> Result<&CatalogEntry, SqlCatalogError> {
913 self.state
914 .resolve_entry(current_database, search_path, name, conn_id)
915 }
916
917 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
919 self.state.resolve_builtin_table(builtin)
920 }
921
922 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
924 self.state.resolve_builtin_log(builtin).0
925 }
926
927 pub fn resolve_builtin_storage_collection(
929 &self,
930 builtin: &'static BuiltinSource,
931 ) -> CatalogItemId {
932 self.state.resolve_builtin_source(builtin)
933 }
934
935 pub fn resolve_function(
937 &self,
938 current_database: Option<&DatabaseId>,
939 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
940 name: &PartialItemName,
941 conn_id: &ConnectionId,
942 ) -> Result<&CatalogEntry, SqlCatalogError> {
943 self.state
944 .resolve_function(current_database, search_path, name, conn_id)
945 }
946
947 pub fn resolve_type(
949 &self,
950 current_database: Option<&DatabaseId>,
951 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
952 name: &PartialItemName,
953 conn_id: &ConnectionId,
954 ) -> Result<&CatalogEntry, SqlCatalogError> {
955 self.state
956 .resolve_type(current_database, search_path, name, conn_id)
957 }
958
959 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
960 self.state.resolve_cluster(name)
961 }
962
963 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
969 self.state.resolve_builtin_cluster(cluster)
970 }
971
972 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
973 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
974 }
975
976 pub fn resolve_target_cluster(
978 &self,
979 target_cluster: TargetCluster,
980 session: &Session,
981 ) -> Result<&Cluster, AdapterError> {
982 match target_cluster {
983 TargetCluster::CatalogServer => {
984 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
985 }
986 TargetCluster::Active => self.active_cluster(session),
987 TargetCluster::Transaction(cluster_id) => self
988 .try_get_cluster(cluster_id)
989 .ok_or(AdapterError::ConcurrentClusterDrop),
990 }
991 }
992
993 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
994 if session.user().name != SYSTEM_USER.name
998 && session.user().name != SUPPORT_USER.name
999 && session.vars().cluster() == SYSTEM_USER.name
1000 {
1001 coord_bail!(
1002 "system cluster '{}' cannot execute user queries",
1003 SYSTEM_USER.name
1004 );
1005 }
1006 let cluster = self.resolve_cluster(session.vars().cluster())?;
1007 Ok(cluster)
1008 }
1009
1010 pub fn state(&self) -> &CatalogState {
1011 &self.state
1012 }
1013
1014 pub fn resolve_full_name(
1015 &self,
1016 name: &QualifiedItemName,
1017 conn_id: Option<&ConnectionId>,
1018 ) -> FullItemName {
1019 self.state.resolve_full_name(name, conn_id)
1020 }
1021
1022 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
1023 self.state.try_get_entry(id)
1024 }
1025
1026 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
1027 self.state.try_get_entry_by_global_id(id)
1028 }
1029
1030 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
1031 self.state.get_entry(id)
1032 }
1033
1034 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1035 self.state.get_entry_by_global_id(id)
1036 }
1037
1038 pub fn get_global_ids<'a>(
1039 &'a self,
1040 id: &CatalogItemId,
1041 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1042 self.get_entry(id).global_ids()
1043 }
1044
1045 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1046 self.get_entry_by_global_id(id).id()
1047 }
1048
1049 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1050 let item = self.try_get_entry_by_global_id(id)?;
1051 Some(item.id())
1052 }
1053
1054 pub fn get_schema(
1055 &self,
1056 database_spec: &ResolvedDatabaseSpecifier,
1057 schema_spec: &SchemaSpecifier,
1058 conn_id: &ConnectionId,
1059 ) -> &Schema {
1060 self.state.get_schema(database_spec, schema_spec, conn_id)
1061 }
1062
1063 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1064 self.state.get_mz_catalog_schema_id()
1065 }
1066
1067 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1068 self.state.get_pg_catalog_schema_id()
1069 }
1070
1071 pub fn get_information_schema_id(&self) -> SchemaId {
1072 self.state.get_information_schema_id()
1073 }
1074
1075 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1076 self.state.get_mz_internal_schema_id()
1077 }
1078
1079 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1080 self.state.get_mz_introspection_schema_id()
1081 }
1082
1083 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1084 self.state.get_mz_unsafe_schema_id()
1085 }
1086
1087 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1088 self.state.system_schema_ids()
1089 }
1090
1091 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1092 self.state.get_database(id)
1093 }
1094
1095 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1096 self.state.try_get_role(id)
1097 }
1098
1099 pub fn get_role(&self, id: &RoleId) -> &Role {
1100 self.state.get_role(id)
1101 }
1102
1103 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1104 self.state.try_get_role_by_name(role_name)
1105 }
1106
1107 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1108 self.state.try_get_role_auth_by_id(id)
1109 }
1110
1111 pub fn create_temporary_schema(
1114 &mut self,
1115 conn_id: &ConnectionId,
1116 owner_id: RoleId,
1117 ) -> Result<(), Error> {
1118 self.state.create_temporary_schema(conn_id, owner_id)
1119 }
1120
1121 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1122 self.state.temporary_schemas[conn_id]
1123 .items
1124 .contains_key(item_name)
1125 }
1126
1127 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1130 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1131 return Ok(());
1132 };
1133 if !schema.items.is_empty() {
1134 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1135 }
1136 Ok(())
1137 }
1138
1139 pub(crate) fn object_dependents(
1140 &self,
1141 object_ids: &Vec<ObjectId>,
1142 conn_id: &ConnectionId,
1143 ) -> Vec<ObjectId> {
1144 let mut seen = BTreeSet::new();
1145 self.state.object_dependents(object_ids, conn_id, &mut seen)
1146 }
1147
1148 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1149 FullNameV1 {
1150 database: name.database.to_string(),
1151 schema: name.schema.clone(),
1152 item: name.item.clone(),
1153 }
1154 }
1155
1156 pub fn find_available_cluster_name(&self, name: &str) -> String {
1157 let mut i = 0;
1158 let mut candidate = name.to_string();
1159 while self.state.clusters_by_name.contains_key(&candidate) {
1160 i += 1;
1161 candidate = format!("{}{}", name, i);
1162 }
1163 candidate
1164 }
1165
1166 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1167 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1168 self.cluster_replica_sizes()
1169 .enabled_allocations()
1170 .map(|a| a.0.to_owned())
1171 .collect::<Vec<_>>()
1172 } else {
1173 self.system_config().allowed_cluster_replica_sizes()
1174 }
1175 }
1176
1177 pub fn concretize_replica_location(
1178 &self,
1179 location: mz_catalog::durable::ReplicaLocation,
1180 allowed_sizes: &Vec<String>,
1181 allowed_availability_zones: Option<&[String]>,
1182 ) -> Result<ReplicaLocation, Error> {
1183 self.state
1184 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1185 }
1186
1187 pub(crate) fn ensure_valid_replica_size(
1188 &self,
1189 allowed_sizes: &[String],
1190 size: &String,
1191 ) -> Result<(), Error> {
1192 self.state.ensure_valid_replica_size(allowed_sizes, size)
1193 }
1194
1195 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1196 &self.state.cluster_replica_sizes
1197 }
1198
1199 pub fn get_privileges(
1201 &self,
1202 id: &SystemObjectId,
1203 conn_id: &ConnectionId,
1204 ) -> Option<&PrivilegeMap> {
1205 match id {
1206 SystemObjectId::Object(id) => match id {
1207 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1208 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1209 ObjectId::Schema((database_spec, schema_spec)) => Some(
1210 self.get_schema(database_spec, schema_spec, conn_id)
1211 .privileges(),
1212 ),
1213 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1214 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1215 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1216 },
1217 SystemObjectId::System => Some(&self.state.system_privileges),
1218 }
1219 }
1220
1221 #[mz_ore::instrument(level = "debug")]
1222 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1223 Ok(self.storage().await.confirm_leadership().await?)
1224 }
1225
1226 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1228 self.state.introspection_dependencies(id)
1229 }
1230
1231 pub fn dump(&self) -> Result<CatalogDump, Error> {
1237 Ok(CatalogDump::new(self.state.dump(None)?))
1238 }
1239
1240 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1244 self.state.check_consistency().map_err(|inconsistencies| {
1245 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1246 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1247 })
1248 })
1249 }
1250
1251 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1252 self.state.config()
1253 }
1254
1255 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1256 self.state.entry_by_id.values()
1257 }
1258
1259 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1260 self.entries()
1261 .filter(|entry| entry.is_connection() && entry.id().is_user())
1262 }
1263
1264 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1265 self.entries()
1266 .filter(|entry| entry.is_table() && entry.id().is_user())
1267 }
1268
1269 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1270 self.entries()
1271 .filter(|entry| entry.is_source() && entry.id().is_user())
1272 }
1273
1274 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1275 self.entries()
1276 .filter(|entry| entry.is_sink() && entry.id().is_user())
1277 }
1278
1279 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1280 self.entries()
1281 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1282 }
1283
1284 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1285 self.entries()
1286 .filter(|entry| entry.is_secret() && entry.id().is_user())
1287 }
1288
1289 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1290 self.state.get_network_policy(&network_policy_id)
1291 }
1292
1293 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1294 self.state.try_get_network_policy_by_name(name)
1295 }
1296
1297 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1298 self.state.clusters_by_id.values()
1299 }
1300
1301 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1302 self.state.get_cluster(cluster_id)
1303 }
1304
1305 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1306 self.state.try_get_cluster(cluster_id)
1307 }
1308
1309 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1310 self.clusters().filter(|cluster| cluster.id.is_user())
1311 }
1312
1313 pub fn get_cluster_replica(
1314 &self,
1315 cluster_id: ClusterId,
1316 replica_id: ReplicaId,
1317 ) -> &ClusterReplica {
1318 self.state.get_cluster_replica(cluster_id, replica_id)
1319 }
1320
1321 pub fn try_get_cluster_replica(
1322 &self,
1323 cluster_id: ClusterId,
1324 replica_id: ReplicaId,
1325 ) -> Option<&ClusterReplica> {
1326 self.state.try_get_cluster_replica(cluster_id, replica_id)
1327 }
1328
1329 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1330 self.user_clusters()
1331 .flat_map(|cluster| cluster.user_replicas())
1332 }
1333
1334 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1335 self.state.database_by_id.values()
1336 }
1337
1338 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1339 self.state
1340 .roles_by_id
1341 .values()
1342 .filter(|role| role.is_user())
1343 }
1344
1345 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1346 self.entries()
1347 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1348 }
1349
1350 pub fn system_privileges(&self) -> &PrivilegeMap {
1351 &self.state.system_privileges
1352 }
1353
1354 pub fn default_privileges(
1355 &self,
1356 ) -> impl Iterator<
1357 Item = (
1358 &DefaultPrivilegeObject,
1359 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1360 ),
1361 > {
1362 self.state.default_privileges.iter()
1363 }
1364
1365 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1366 self.state
1367 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1368 }
1369
1370 pub fn pack_storage_usage_update(
1371 &self,
1372 event: VersionedStorageUsage,
1373 diff: Diff,
1374 ) -> BuiltinTableUpdate {
1375 self.state
1376 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1377 }
1378
1379 pub fn system_config(&self) -> &SystemVars {
1380 self.state.system_config()
1381 }
1382
1383 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1384 self.state.system_config_mut()
1385 }
1386
1387 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1388 self.state.ensure_not_reserved_role(role_id)
1389 }
1390
1391 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1392 self.state.ensure_grantable_role(role_id)
1393 }
1394
1395 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1396 self.state.ensure_not_system_role(role_id)
1397 }
1398
1399 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1400 self.state.ensure_not_predefined_role(role_id)
1401 }
1402
1403 pub fn ensure_not_reserved_network_policy(
1404 &self,
1405 network_policy_id: &NetworkPolicyId,
1406 ) -> Result<(), Error> {
1407 self.state
1408 .ensure_not_reserved_network_policy(network_policy_id)
1409 }
1410
1411 pub fn ensure_not_reserved_object(
1412 &self,
1413 object_id: &ObjectId,
1414 conn_id: &ConnectionId,
1415 ) -> Result<(), Error> {
1416 match object_id {
1417 ObjectId::Cluster(cluster_id) => {
1418 if cluster_id.is_system() {
1419 let cluster = self.get_cluster(*cluster_id);
1420 Err(Error::new(ErrorKind::ReadOnlyCluster(
1421 cluster.name().to_string(),
1422 )))
1423 } else {
1424 Ok(())
1425 }
1426 }
1427 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1428 if replica_id.is_system() {
1429 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1430 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1431 replica.name().to_string(),
1432 )))
1433 } else {
1434 Ok(())
1435 }
1436 }
1437 ObjectId::Database(database_id) => {
1438 if database_id.is_system() {
1439 let database = self.get_database(database_id);
1440 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1441 database.name().to_string(),
1442 )))
1443 } else {
1444 Ok(())
1445 }
1446 }
1447 ObjectId::Schema((database_spec, schema_spec)) => {
1448 if schema_spec.is_system() {
1449 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1450 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1451 schema.name().schema.clone(),
1452 )))
1453 } else {
1454 Ok(())
1455 }
1456 }
1457 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1458 ObjectId::Item(item_id) => {
1459 if item_id.is_system() {
1460 let item = self.get_entry(item_id);
1461 let name = self.resolve_full_name(item.name(), Some(conn_id));
1462 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1463 } else {
1464 Ok(())
1465 }
1466 }
1467 ObjectId::NetworkPolicy(network_policy_id) => {
1468 self.ensure_not_reserved_network_policy(network_policy_id)
1469 }
1470 }
1471 }
1472
1473 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1475 &mut self,
1476 create_sql: &str,
1477 force_if_exists_skip: bool,
1478 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1479 self.state
1480 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1481 }
1482
1483 pub(crate) fn update_expression_cache<'a, 'b>(
1484 &'a self,
1485 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1486 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1487 ) -> BoxFuture<'b, ()> {
1488 if let Some(expr_cache) = &self.expr_cache_handle {
1489 let ons = new_local_expressions
1490 .iter()
1491 .map(|(id, _)| id)
1492 .chain(new_global_expressions.iter().map(|(id, _)| id))
1493 .map(|id| self.get_entry_by_global_id(id))
1494 .filter_map(|entry| entry.index().map(|index| index.on));
1495 let invalidate_ids = self.invalidate_for_index(ons);
1496 expr_cache
1497 .update(
1498 new_local_expressions,
1499 new_global_expressions,
1500 invalidate_ids,
1501 )
1502 .boxed()
1503 } else {
1504 async {}.boxed()
1505 }
1506 }
1507
1508 #[cfg(test)]
1512 async fn sync_to_current_updates(
1513 &mut self,
1514 ) -> Result<
1515 (
1516 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1517 Vec<ParsedStateUpdate>,
1518 ),
1519 CatalogError,
1520 > {
1521 let updates = self.storage().await.sync_to_current_updates().await?;
1522 let (builtin_table_updates, catalog_updates) = self.state.apply_updates(updates)?;
1523 Ok((builtin_table_updates, catalog_updates))
1524 }
1525}
1526
1527pub fn is_reserved_name(name: &str) -> bool {
1528 BUILTIN_PREFIXES
1529 .iter()
1530 .any(|prefix| name.starts_with(prefix))
1531}
1532
1533pub fn is_reserved_role_name(name: &str) -> bool {
1534 is_reserved_name(name) || is_public_role(name)
1535}
1536
1537pub fn is_public_role(name: &str) -> bool {
1538 name == &*PUBLIC_ROLE_NAME
1539}
1540
1541pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1542 object_type_to_audit_object_type(sql_type.into())
1543}
1544
1545pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1546 match id {
1547 CommentObjectId::Table(_) => ObjectType::Table,
1548 CommentObjectId::View(_) => ObjectType::View,
1549 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1550 CommentObjectId::Source(_) => ObjectType::Source,
1551 CommentObjectId::Sink(_) => ObjectType::Sink,
1552 CommentObjectId::Index(_) => ObjectType::Index,
1553 CommentObjectId::Func(_) => ObjectType::Func,
1554 CommentObjectId::Connection(_) => ObjectType::Connection,
1555 CommentObjectId::Type(_) => ObjectType::Type,
1556 CommentObjectId::Secret(_) => ObjectType::Secret,
1557 CommentObjectId::Role(_) => ObjectType::Role,
1558 CommentObjectId::Database(_) => ObjectType::Database,
1559 CommentObjectId::Schema(_) => ObjectType::Schema,
1560 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1561 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1562 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1563 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1564 }
1565}
1566
1567pub(crate) fn object_type_to_audit_object_type(
1568 object_type: mz_sql::catalog::ObjectType,
1569) -> ObjectType {
1570 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1571}
1572
1573pub(crate) fn system_object_type_to_audit_object_type(
1574 system_type: &SystemObjectType,
1575) -> ObjectType {
1576 match system_type {
1577 SystemObjectType::Object(object_type) => match object_type {
1578 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1579 mz_sql::catalog::ObjectType::View => ObjectType::View,
1580 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1581 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1582 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1583 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1584 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1585 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1586 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1587 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1588 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1589 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1590 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1591 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1592 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1593 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1594 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1595 },
1596 SystemObjectType::System => ObjectType::System,
1597 }
1598}
1599
1600#[derive(Debug, Copy, Clone)]
1601pub enum UpdatePrivilegeVariant {
1602 Grant,
1603 Revoke,
1604}
1605
1606impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1607 fn from(variant: UpdatePrivilegeVariant) -> Self {
1608 match variant {
1609 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1610 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1611 }
1612 }
1613}
1614
1615impl From<UpdatePrivilegeVariant> for EventType {
1616 fn from(variant: UpdatePrivilegeVariant) -> Self {
1617 match variant {
1618 UpdatePrivilegeVariant::Grant => EventType::Grant,
1619 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1620 }
1621 }
1622}
1623
1624impl ConnCatalog<'_> {
1625 fn resolve_item_name(
1626 &self,
1627 name: &PartialItemName,
1628 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1629 self.resolve_item(name).map(|entry| entry.name())
1630 }
1631
1632 fn resolve_function_name(
1633 &self,
1634 name: &PartialItemName,
1635 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1636 self.resolve_function(name).map(|entry| entry.name())
1637 }
1638
1639 fn resolve_type_name(
1640 &self,
1641 name: &PartialItemName,
1642 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1643 self.resolve_type(name).map(|entry| entry.name())
1644 }
1645}
1646
1647impl ExprHumanizer for ConnCatalog<'_> {
1648 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1649 let entry = self.state.try_get_entry_by_global_id(&id)?;
1650 Some(self.resolve_full_name(entry.name()).to_string())
1651 }
1652
1653 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1654 let entry = self.state.try_get_entry_by_global_id(&id)?;
1655 Some(entry.name().item.clone())
1656 }
1657
1658 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1659 let entry = self.state.try_get_entry_by_global_id(&id)?;
1660 Some(self.resolve_full_name(entry.name()).into_parts())
1661 }
1662
1663 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1664 use SqlScalarType::*;
1665
1666 match typ {
1667 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1668 List {
1669 custom_id: Some(item_id),
1670 ..
1671 }
1672 | Map {
1673 custom_id: Some(item_id),
1674 ..
1675 } => {
1676 let item = self.get_item(item_id);
1677 self.minimal_qualification(item.name()).to_string()
1678 }
1679 List { element_type, .. } => {
1680 format!(
1681 "{} list",
1682 self.humanize_scalar_type(element_type, postgres_compat)
1683 )
1684 }
1685 Map { value_type, .. } => format!(
1686 "map[{}=>{}]",
1687 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1688 self.humanize_scalar_type(value_type, postgres_compat)
1689 ),
1690 Record {
1691 custom_id: Some(item_id),
1692 ..
1693 } => {
1694 let item = self.get_item(item_id);
1695 self.minimal_qualification(item.name()).to_string()
1696 }
1697 Record { fields, .. } => format!(
1698 "record({})",
1699 fields
1700 .iter()
1701 .map(|f| format!(
1702 "{}: {}",
1703 f.0,
1704 self.humanize_column_type(&f.1, postgres_compat)
1705 ))
1706 .join(",")
1707 ),
1708 PgLegacyChar => "\"char\"".into(),
1709 Char { length } if !postgres_compat => match length {
1710 None => "char".into(),
1711 Some(length) => format!("char({})", length.into_u32()),
1712 },
1713 VarChar { max_length } if !postgres_compat => match max_length {
1714 None => "varchar".into(),
1715 Some(length) => format!("varchar({})", length.into_u32()),
1716 },
1717 UInt16 => "uint2".into(),
1718 UInt32 => "uint4".into(),
1719 UInt64 => "uint8".into(),
1720 ty => {
1721 let pgrepr_type = mz_pgrepr::Type::from(ty);
1722 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1723
1724 let res = if self
1725 .effective_search_path(true)
1726 .iter()
1727 .any(|(_, schema)| schema == &pg_catalog_schema)
1728 {
1729 pgrepr_type.name().to_string()
1730 } else {
1731 let name = QualifiedItemName {
1734 qualifiers: ItemQualifiers {
1735 database_spec: ResolvedDatabaseSpecifier::Ambient,
1736 schema_spec: pg_catalog_schema,
1737 },
1738 item: pgrepr_type.name().to_string(),
1739 };
1740 self.resolve_full_name(&name).to_string()
1741 };
1742 res
1743 }
1744 }
1745 }
1746
1747 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1748 let entry = self.state.try_get_entry_by_global_id(&id)?;
1749
1750 match entry.index() {
1751 Some(index) => {
1752 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1753 let mut on_names = on_desc
1754 .iter_names()
1755 .map(|col_name| col_name.to_string())
1756 .collect::<Vec<_>>();
1757
1758 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1759
1760 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1764 let mut ix_names = vec![String::new(); ix_arity];
1765
1766 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1768 let on_name = on_names.get_mut(on_pos).expect("on_name");
1769 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1770 std::mem::swap(on_name, ix_name);
1771 }
1772
1773 Some(ix_names) }
1775 None => {
1776 let desc = self.state.try_get_desc_by_global_id(&id)?;
1777 let column_names = desc
1778 .iter_names()
1779 .map(|col_name| col_name.to_string())
1780 .collect();
1781
1782 Some(column_names)
1783 }
1784 }
1785 }
1786
1787 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1788 let desc = self.state.try_get_desc_by_global_id(&id)?;
1789 Some(desc.get_name(column).to_string())
1790 }
1791
1792 fn id_exists(&self, id: GlobalId) -> bool {
1793 self.state.entry_by_global_id.contains_key(&id)
1794 }
1795}
1796
1797impl SessionCatalog for ConnCatalog<'_> {
1798 fn active_role_id(&self) -> &RoleId {
1799 &self.role_id
1800 }
1801
1802 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1803 self.prepared_statements
1804 .as_ref()
1805 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1806 .flatten()
1807 }
1808
1809 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1810 self.portals
1811 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1812 }
1813
1814 fn active_database(&self) -> Option<&DatabaseId> {
1815 self.database.as_ref()
1816 }
1817
1818 fn active_cluster(&self) -> &str {
1819 &self.cluster
1820 }
1821
1822 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1823 &self.search_path
1824 }
1825
1826 fn resolve_database(
1827 &self,
1828 database_name: &str,
1829 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1830 Ok(self.state.resolve_database(database_name)?)
1831 }
1832
1833 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1834 self.state
1835 .database_by_id
1836 .get(id)
1837 .expect("database doesn't exist")
1838 }
1839
1840 #[allow(clippy::as_conversions)]
1842 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1843 self.state
1844 .database_by_id
1845 .values()
1846 .map(|database| database as &dyn CatalogDatabase)
1847 .collect()
1848 }
1849
1850 fn resolve_schema(
1851 &self,
1852 database_name: Option<&str>,
1853 schema_name: &str,
1854 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1855 Ok(self.state.resolve_schema(
1856 self.database.as_ref(),
1857 database_name,
1858 schema_name,
1859 &self.conn_id,
1860 )?)
1861 }
1862
1863 fn resolve_schema_in_database(
1864 &self,
1865 database_spec: &ResolvedDatabaseSpecifier,
1866 schema_name: &str,
1867 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1868 Ok(self
1869 .state
1870 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1871 }
1872
1873 fn get_schema(
1874 &self,
1875 database_spec: &ResolvedDatabaseSpecifier,
1876 schema_spec: &SchemaSpecifier,
1877 ) -> &dyn CatalogSchema {
1878 self.state
1879 .get_schema(database_spec, schema_spec, &self.conn_id)
1880 }
1881
1882 #[allow(clippy::as_conversions)]
1884 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1885 self.get_databases()
1886 .into_iter()
1887 .flat_map(|database| database.schemas().into_iter())
1888 .chain(
1889 self.state
1890 .ambient_schemas_by_id
1891 .values()
1892 .chain(self.state.temporary_schemas.values())
1893 .map(|schema| schema as &dyn CatalogSchema),
1894 )
1895 .collect()
1896 }
1897
1898 fn get_mz_internal_schema_id(&self) -> SchemaId {
1899 self.state().get_mz_internal_schema_id()
1900 }
1901
1902 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1903 self.state().get_mz_unsafe_schema_id()
1904 }
1905
1906 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1907 self.state.is_system_schema_specifier(schema)
1908 }
1909
1910 fn resolve_role(
1911 &self,
1912 role_name: &str,
1913 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1914 match self.state.try_get_role_by_name(role_name) {
1915 Some(role) => Ok(role),
1916 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1917 }
1918 }
1919
1920 fn resolve_network_policy(
1921 &self,
1922 policy_name: &str,
1923 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1924 match self.state.try_get_network_policy_by_name(policy_name) {
1925 Some(policy) => Ok(policy),
1926 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1927 }
1928 }
1929
1930 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1931 Some(self.state.roles_by_id.get(id)?)
1932 }
1933
1934 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1935 self.state.get_role(id)
1936 }
1937
1938 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1939 #[allow(clippy::as_conversions)]
1941 self.state
1942 .roles_by_id
1943 .values()
1944 .map(|role| role as &dyn CatalogRole)
1945 .collect()
1946 }
1947
1948 fn mz_system_role_id(&self) -> RoleId {
1949 MZ_SYSTEM_ROLE_ID
1950 }
1951
1952 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1953 self.state.collect_role_membership(id)
1954 }
1955
1956 fn get_network_policy(
1957 &self,
1958 id: &NetworkPolicyId,
1959 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1960 self.state.get_network_policy(id)
1961 }
1962
1963 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1964 #[allow(clippy::as_conversions)]
1966 self.state
1967 .network_policies_by_id
1968 .values()
1969 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1970 .collect()
1971 }
1972
1973 fn resolve_cluster(
1974 &self,
1975 cluster_name: Option<&str>,
1976 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1977 Ok(self
1978 .state
1979 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1980 }
1981
1982 fn resolve_cluster_replica(
1983 &self,
1984 cluster_replica_name: &QualifiedReplica,
1985 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1986 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1987 }
1988
1989 fn resolve_item(
1990 &self,
1991 name: &PartialItemName,
1992 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1993 let r = self.state.resolve_entry(
1994 self.database.as_ref(),
1995 &self.effective_search_path(true),
1996 name,
1997 &self.conn_id,
1998 )?;
1999 if self.unresolvable_ids.contains(&r.id()) {
2000 Err(SqlCatalogError::UnknownItem(name.to_string()))
2001 } else {
2002 Ok(r)
2003 }
2004 }
2005
2006 fn resolve_function(
2007 &self,
2008 name: &PartialItemName,
2009 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2010 let r = self.state.resolve_function(
2011 self.database.as_ref(),
2012 &self.effective_search_path(false),
2013 name,
2014 &self.conn_id,
2015 )?;
2016
2017 if self.unresolvable_ids.contains(&r.id()) {
2018 Err(SqlCatalogError::UnknownFunction {
2019 name: name.to_string(),
2020 alternative: None,
2021 })
2022 } else {
2023 Ok(r)
2024 }
2025 }
2026
2027 fn resolve_type(
2028 &self,
2029 name: &PartialItemName,
2030 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2031 let r = self.state.resolve_type(
2032 self.database.as_ref(),
2033 &self.effective_search_path(false),
2034 name,
2035 &self.conn_id,
2036 )?;
2037
2038 if self.unresolvable_ids.contains(&r.id()) {
2039 Err(SqlCatalogError::UnknownType {
2040 name: name.to_string(),
2041 })
2042 } else {
2043 Ok(r)
2044 }
2045 }
2046
2047 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2048 self.state.get_system_type(name)
2049 }
2050
2051 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2052 Some(self.state.try_get_entry(id)?)
2053 }
2054
2055 fn try_get_item_by_global_id(
2056 &self,
2057 id: &GlobalId,
2058 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2059 let entry = self.state.try_get_entry_by_global_id(id)?;
2060 let entry = match &entry.item {
2061 CatalogItem::Table(table) => {
2062 let (version, _gid) = table
2063 .collections
2064 .iter()
2065 .find(|(_version, gid)| *gid == id)
2066 .expect("catalog out of sync, mismatched GlobalId");
2067 entry.at_version(RelationVersionSelector::Specific(*version))
2068 }
2069 _ => entry.at_version(RelationVersionSelector::Latest),
2070 };
2071 Some(entry)
2072 }
2073
2074 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2075 self.state.get_entry(id)
2076 }
2077
2078 fn get_item_by_global_id(
2079 &self,
2080 id: &GlobalId,
2081 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2082 let entry = self.state.get_entry_by_global_id(id);
2083 let entry = match &entry.item {
2084 CatalogItem::Table(table) => {
2085 let (version, _gid) = table
2086 .collections
2087 .iter()
2088 .find(|(_version, gid)| *gid == id)
2089 .expect("catalog out of sync, mismatched GlobalId");
2090 entry.at_version(RelationVersionSelector::Specific(*version))
2091 }
2092 _ => entry.at_version(RelationVersionSelector::Latest),
2093 };
2094 entry
2095 }
2096
2097 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2098 self.get_schemas()
2099 .into_iter()
2100 .flat_map(|schema| schema.item_ids())
2101 .map(|id| self.get_item(&id))
2102 .collect()
2103 }
2104
2105 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2106 self.state
2107 .get_item_by_name(name, &self.conn_id)
2108 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2109 }
2110
2111 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2112 self.state
2113 .get_type_by_name(name, &self.conn_id)
2114 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2115 }
2116
2117 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2118 &self.state.clusters_by_id[&id]
2119 }
2120
2121 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2122 self.state
2123 .clusters_by_id
2124 .values()
2125 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2126 .collect()
2127 }
2128
2129 fn get_cluster_replica(
2130 &self,
2131 cluster_id: ClusterId,
2132 replica_id: ReplicaId,
2133 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2134 let cluster = self.get_cluster(cluster_id);
2135 cluster.replica(replica_id)
2136 }
2137
2138 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2139 self.get_clusters()
2140 .into_iter()
2141 .flat_map(|cluster| cluster.replicas().into_iter())
2142 .collect()
2143 }
2144
2145 fn get_system_privileges(&self) -> &PrivilegeMap {
2146 &self.state.system_privileges
2147 }
2148
2149 fn get_default_privileges(
2150 &self,
2151 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2152 self.state
2153 .default_privileges
2154 .iter()
2155 .map(|(object, acl_items)| (object, acl_items.collect()))
2156 .collect()
2157 }
2158
2159 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2160 self.state.find_available_name(name, &self.conn_id)
2161 }
2162
2163 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2164 self.state.resolve_full_name(name, Some(&self.conn_id))
2165 }
2166
2167 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2168 self.state.resolve_full_schema_name(name)
2169 }
2170
2171 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2172 self.state.get_entry_by_global_id(global_id).id()
2173 }
2174
2175 fn resolve_global_id(
2176 &self,
2177 item_id: &CatalogItemId,
2178 version: RelationVersionSelector,
2179 ) -> GlobalId {
2180 self.state
2181 .get_entry(item_id)
2182 .at_version(version)
2183 .global_id()
2184 }
2185
2186 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2187 self.state.config()
2188 }
2189
2190 fn now(&self) -> EpochMillis {
2191 (self.state.config().now)()
2192 }
2193
2194 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2195 self.state.aws_privatelink_availability_zones.clone()
2196 }
2197
2198 fn system_vars(&self) -> &SystemVars {
2199 &self.state.system_configuration
2200 }
2201
2202 fn system_vars_mut(&mut self) -> &mut SystemVars {
2203 &mut self.state.to_mut().system_configuration
2204 }
2205
2206 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2207 self.state().get_owner_id(id, self.conn_id())
2208 }
2209
2210 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2211 match id {
2212 SystemObjectId::System => Some(&self.state.system_privileges),
2213 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2214 Some(self.get_cluster(*id).privileges())
2215 }
2216 SystemObjectId::Object(ObjectId::Database(id)) => {
2217 Some(self.get_database(id).privileges())
2218 }
2219 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2220 Some(self.get_schema(database_spec, schema_spec).privileges())
2221 }
2222 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2223 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2224 Some(self.get_network_policy(id).privileges())
2225 }
2226 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2227 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2228 }
2229 }
2230
2231 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2232 let mut seen = BTreeSet::new();
2233 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2234 }
2235
2236 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2237 let mut seen = BTreeSet::new();
2238 self.state.item_dependents(id, &mut seen)
2239 }
2240
2241 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2242 rbac::all_object_privileges(object_type)
2243 }
2244
2245 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2246 self.state.get_object_type(object_id)
2247 }
2248
2249 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2250 self.state.get_system_object_type(id)
2251 }
2252
2253 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2256 let database_id = match &qualified_name.qualifiers.database_spec {
2257 ResolvedDatabaseSpecifier::Ambient => None,
2258 ResolvedDatabaseSpecifier::Id(id)
2259 if self.database.is_some() && self.database == Some(*id) =>
2260 {
2261 None
2262 }
2263 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2264 };
2265
2266 let schema_spec = if database_id.is_none()
2267 && self.resolve_item_name(&PartialItemName {
2268 database: None,
2269 schema: None,
2270 item: qualified_name.item.clone(),
2271 }) == Ok(qualified_name)
2272 || self.resolve_function_name(&PartialItemName {
2273 database: None,
2274 schema: None,
2275 item: qualified_name.item.clone(),
2276 }) == Ok(qualified_name)
2277 || self.resolve_type_name(&PartialItemName {
2278 database: None,
2279 schema: None,
2280 item: qualified_name.item.clone(),
2281 }) == Ok(qualified_name)
2282 {
2283 None
2284 } else {
2285 Some(qualified_name.qualifiers.schema_spec.clone())
2288 };
2289
2290 let res = PartialItemName {
2291 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2292 schema: schema_spec.map(|spec| {
2293 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2294 .name()
2295 .schema
2296 .clone()
2297 }),
2298 item: qualified_name.item.clone(),
2299 };
2300 assert!(
2301 self.resolve_item_name(&res) == Ok(qualified_name)
2302 || self.resolve_function_name(&res) == Ok(qualified_name)
2303 || self.resolve_type_name(&res) == Ok(qualified_name)
2304 );
2305 res
2306 }
2307
2308 fn add_notice(&self, notice: PlanNotice) {
2309 let _ = self.notices_tx.send(notice.into());
2310 }
2311
2312 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2313 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2314 self.state.comments.get_object_comments(comment_id)
2315 }
2316
2317 fn is_cluster_size_cc(&self, size: &str) -> bool {
2318 self.state
2319 .cluster_replica_sizes
2320 .0
2321 .get(size)
2322 .map_or(false, |a| a.is_cc)
2323 }
2324}
2325
2326#[cfg(test)]
2327mod tests {
2328 use std::collections::{BTreeMap, BTreeSet};
2329 use std::sync::Arc;
2330 use std::{env, iter};
2331
2332 use itertools::Itertools;
2333 use mz_catalog::memory::objects::CatalogItem;
2334 use tokio_postgres::NoTls;
2335 use tokio_postgres::types::Type;
2336 use uuid::Uuid;
2337
2338 use mz_catalog::SYSTEM_CONN_ID;
2339 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2340 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2341 use mz_controller_types::{ClusterId, ReplicaId};
2342 use mz_expr::MirScalarExpr;
2343 use mz_ore::now::to_datetime;
2344 use mz_ore::{assert_err, assert_ok, task};
2345 use mz_persist_client::PersistClient;
2346 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2347 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2348 use mz_repr::role_id::RoleId;
2349 use mz_repr::{
2350 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2351 SqlScalarType, Timestamp,
2352 };
2353 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2354 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2355 use mz_sql::names::{
2356 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2357 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2358 };
2359 use mz_sql::plan::{
2360 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2361 Scope, StatementContext,
2362 };
2363 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2364 use mz_sql::session::vars::{SystemVars, VarInput};
2365
2366 use crate::catalog::state::LocalExpressionCache;
2367 use crate::catalog::{Catalog, Op};
2368 use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
2369 use crate::session::Session;
2370
2371 #[mz_ore::test(tokio::test)]
2378 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2380 Catalog::with_debug(|catalog| async move {
2381 struct TestCase {
2382 input: QualifiedItemName,
2383 system_output: PartialItemName,
2384 normal_output: PartialItemName,
2385 }
2386
2387 let test_cases = vec![
2388 TestCase {
2389 input: QualifiedItemName {
2390 qualifiers: ItemQualifiers {
2391 database_spec: ResolvedDatabaseSpecifier::Ambient,
2392 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2393 },
2394 item: "numeric".to_string(),
2395 },
2396 system_output: PartialItemName {
2397 database: None,
2398 schema: None,
2399 item: "numeric".to_string(),
2400 },
2401 normal_output: PartialItemName {
2402 database: None,
2403 schema: None,
2404 item: "numeric".to_string(),
2405 },
2406 },
2407 TestCase {
2408 input: QualifiedItemName {
2409 qualifiers: ItemQualifiers {
2410 database_spec: ResolvedDatabaseSpecifier::Ambient,
2411 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2412 },
2413 item: "mz_array_types".to_string(),
2414 },
2415 system_output: PartialItemName {
2416 database: None,
2417 schema: None,
2418 item: "mz_array_types".to_string(),
2419 },
2420 normal_output: PartialItemName {
2421 database: None,
2422 schema: None,
2423 item: "mz_array_types".to_string(),
2424 },
2425 },
2426 ];
2427
2428 for tc in test_cases {
2429 assert_eq!(
2430 catalog
2431 .for_system_session()
2432 .minimal_qualification(&tc.input),
2433 tc.system_output
2434 );
2435 assert_eq!(
2436 catalog
2437 .for_session(&Session::dummy())
2438 .minimal_qualification(&tc.input),
2439 tc.normal_output
2440 );
2441 }
2442 catalog.expire().await;
2443 })
2444 .await
2445 }
2446
2447 #[mz_ore::test(tokio::test)]
2448 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2450 let persist_client = PersistClient::new_for_tests().await;
2451 let organization_id = Uuid::new_v4();
2452 let bootstrap_args = test_bootstrap_args();
2453 {
2454 let mut catalog = Catalog::open_debug_catalog(
2455 persist_client.clone(),
2456 organization_id.clone(),
2457 &bootstrap_args,
2458 )
2459 .await
2460 .expect("unable to open debug catalog");
2461 assert_eq!(catalog.transient_revision(), 1);
2462 let commit_ts = catalog.current_upper().await;
2463 catalog
2464 .transact(
2465 None,
2466 commit_ts,
2467 None,
2468 vec![Op::CreateDatabase {
2469 name: "test".to_string(),
2470 owner_id: MZ_SYSTEM_ROLE_ID,
2471 }],
2472 )
2473 .await
2474 .expect("failed to transact");
2475 assert_eq!(catalog.transient_revision(), 2);
2476 catalog.expire().await;
2477 }
2478 {
2479 let catalog =
2480 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2481 .await
2482 .expect("unable to open debug catalog");
2483 assert_eq!(catalog.transient_revision(), 1);
2485 catalog.expire().await;
2486 }
2487 }
2488
2489 #[mz_ore::test(tokio::test)]
2490 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2492 Catalog::with_debug(|catalog| async move {
2493 let mz_catalog_schema = (
2494 ResolvedDatabaseSpecifier::Ambient,
2495 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2496 );
2497 let pg_catalog_schema = (
2498 ResolvedDatabaseSpecifier::Ambient,
2499 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2500 );
2501 let mz_temp_schema = (
2502 ResolvedDatabaseSpecifier::Ambient,
2503 SchemaSpecifier::Temporary,
2504 );
2505
2506 let session = Session::dummy();
2508 let conn_catalog = catalog.for_session(&session);
2509 assert_ne!(
2510 conn_catalog.effective_search_path(false),
2511 conn_catalog.search_path
2512 );
2513 assert_ne!(
2514 conn_catalog.effective_search_path(true),
2515 conn_catalog.search_path
2516 );
2517 assert_eq!(
2518 conn_catalog.effective_search_path(false),
2519 vec![
2520 mz_catalog_schema.clone(),
2521 pg_catalog_schema.clone(),
2522 conn_catalog.search_path[0].clone()
2523 ]
2524 );
2525 assert_eq!(
2526 conn_catalog.effective_search_path(true),
2527 vec![
2528 mz_temp_schema.clone(),
2529 mz_catalog_schema.clone(),
2530 pg_catalog_schema.clone(),
2531 conn_catalog.search_path[0].clone()
2532 ]
2533 );
2534
2535 let mut session = Session::dummy();
2537 session
2538 .vars_mut()
2539 .set(
2540 &SystemVars::new(),
2541 "search_path",
2542 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2543 false,
2544 )
2545 .expect("failed to set search_path");
2546 let conn_catalog = catalog.for_session(&session);
2547 assert_ne!(
2548 conn_catalog.effective_search_path(false),
2549 conn_catalog.search_path
2550 );
2551 assert_ne!(
2552 conn_catalog.effective_search_path(true),
2553 conn_catalog.search_path
2554 );
2555 assert_eq!(
2556 conn_catalog.effective_search_path(false),
2557 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2558 );
2559 assert_eq!(
2560 conn_catalog.effective_search_path(true),
2561 vec![
2562 mz_temp_schema.clone(),
2563 mz_catalog_schema.clone(),
2564 pg_catalog_schema.clone()
2565 ]
2566 );
2567
2568 let mut session = Session::dummy();
2569 session
2570 .vars_mut()
2571 .set(
2572 &SystemVars::new(),
2573 "search_path",
2574 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2575 false,
2576 )
2577 .expect("failed to set search_path");
2578 let conn_catalog = catalog.for_session(&session);
2579 assert_ne!(
2580 conn_catalog.effective_search_path(false),
2581 conn_catalog.search_path
2582 );
2583 assert_ne!(
2584 conn_catalog.effective_search_path(true),
2585 conn_catalog.search_path
2586 );
2587 assert_eq!(
2588 conn_catalog.effective_search_path(false),
2589 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2590 );
2591 assert_eq!(
2592 conn_catalog.effective_search_path(true),
2593 vec![
2594 mz_temp_schema.clone(),
2595 pg_catalog_schema.clone(),
2596 mz_catalog_schema.clone()
2597 ]
2598 );
2599
2600 let mut session = Session::dummy();
2601 session
2602 .vars_mut()
2603 .set(
2604 &SystemVars::new(),
2605 "search_path",
2606 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2607 false,
2608 )
2609 .expect("failed to set search_path");
2610 let conn_catalog = catalog.for_session(&session);
2611 assert_ne!(
2612 conn_catalog.effective_search_path(false),
2613 conn_catalog.search_path
2614 );
2615 assert_ne!(
2616 conn_catalog.effective_search_path(true),
2617 conn_catalog.search_path
2618 );
2619 assert_eq!(
2620 conn_catalog.effective_search_path(false),
2621 vec![
2622 mz_catalog_schema.clone(),
2623 pg_catalog_schema.clone(),
2624 mz_temp_schema.clone()
2625 ]
2626 );
2627 assert_eq!(
2628 conn_catalog.effective_search_path(true),
2629 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2630 );
2631 catalog.expire().await;
2632 })
2633 .await
2634 }
2635
2636 #[mz_ore::test(tokio::test)]
2637 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2639 use mz_ore::collections::CollectionExt;
2640 Catalog::with_debug(|catalog| async move {
2641 let conn_catalog = catalog.for_system_session();
2642 let scx = &mut StatementContext::new(None, &conn_catalog);
2643
2644 let parsed = mz_sql_parser::parser::parse_statements(
2645 "create view public.foo as select 1 as bar",
2646 )
2647 .expect("")
2648 .into_element()
2649 .ast;
2650
2651 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2652
2653 assert_eq!(
2655 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2656 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2657 );
2658 catalog.expire().await;
2659 })
2660 .await;
2661 }
2662
2663 #[mz_ore::test(tokio::test)]
2665 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2667 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2668 let column = ", 1";
2669 let view_def_size = view_def.bytes().count();
2670 let column_size = column.bytes().count();
2671 let column_count =
2672 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2673 let columns = iter::repeat(column).take(column_count).join("");
2674 let create_sql = format!("{view_def}{columns})");
2675 let create_sql_check = create_sql.clone();
2676 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2677 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2678 &create_sql
2679 ));
2680
2681 let persist_client = PersistClient::new_for_tests().await;
2682 let organization_id = Uuid::new_v4();
2683 let id = CatalogItemId::User(1);
2684 let gid = GlobalId::User(1);
2685 let bootstrap_args = test_bootstrap_args();
2686 {
2687 let mut catalog = Catalog::open_debug_catalog(
2688 persist_client.clone(),
2689 organization_id.clone(),
2690 &bootstrap_args,
2691 )
2692 .await
2693 .expect("unable to open debug catalog");
2694 let item = catalog
2695 .state()
2696 .deserialize_item(
2697 gid,
2698 &create_sql,
2699 &BTreeMap::new(),
2700 &mut LocalExpressionCache::Closed,
2701 None,
2702 )
2703 .expect("unable to parse view");
2704 let commit_ts = catalog.current_upper().await;
2705 catalog
2706 .transact(
2707 None,
2708 commit_ts,
2709 None,
2710 vec![Op::CreateItem {
2711 item,
2712 name: QualifiedItemName {
2713 qualifiers: ItemQualifiers {
2714 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2715 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2716 },
2717 item: "v".to_string(),
2718 },
2719 id,
2720 owner_id: MZ_SYSTEM_ROLE_ID,
2721 }],
2722 )
2723 .await
2724 .expect("failed to transact");
2725 catalog.expire().await;
2726 }
2727 {
2728 let catalog =
2729 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2730 .await
2731 .expect("unable to open debug catalog");
2732 let view = catalog.get_entry(&id);
2733 assert_eq!("v", view.name.item);
2734 match &view.item {
2735 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2736 item => panic!("expected view, got {}", item.typ()),
2737 }
2738 catalog.expire().await;
2739 }
2740 }
2741
2742 #[mz_ore::test(tokio::test)]
2743 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2745 Catalog::with_debug(|catalog| async move {
2746 let conn_catalog = catalog.for_system_session();
2747
2748 assert_eq!(
2749 mz_sql::catalog::ObjectType::ClusterReplica,
2750 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2751 ClusterId::user(1).expect("1 is a valid ID"),
2752 ReplicaId::User(1)
2753 )))
2754 );
2755 assert_eq!(
2756 mz_sql::catalog::ObjectType::Role,
2757 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2758 );
2759 catalog.expire().await;
2760 })
2761 .await;
2762 }
2763
2764 #[mz_ore::test(tokio::test)]
2765 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2767 Catalog::with_debug(|catalog| async move {
2768 let conn_catalog = catalog.for_system_session();
2769
2770 assert_eq!(
2771 None,
2772 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2773 ClusterId::user(1).expect("1 is a valid ID"),
2774 ReplicaId::User(1),
2775 ))))
2776 );
2777 assert_eq!(
2778 None,
2779 conn_catalog
2780 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2781 );
2782 catalog.expire().await;
2783 })
2784 .await;
2785 }
2786
2787 #[mz_ore::test(tokio::test)]
2788 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2790 Catalog::with_debug(|catalog| async move {
2791 let conn_catalog = catalog.for_system_session();
2792
2793 let builtins_cfg = BuiltinsConfig {
2794 include_continual_tasks: true,
2795 };
2796 for builtin in BUILTINS::iter(&builtins_cfg) {
2797 let (schema, name, expected_desc) = match builtin {
2798 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2799 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2800 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2801 Builtin::Log(_)
2802 | Builtin::Type(_)
2803 | Builtin::Func(_)
2804 | Builtin::ContinualTask(_)
2805 | Builtin::Index(_)
2806 | Builtin::Connection(_) => continue,
2807 };
2808 let item = conn_catalog
2809 .resolve_item(&PartialItemName {
2810 database: None,
2811 schema: Some(schema.to_string()),
2812 item: name.to_string(),
2813 })
2814 .expect("unable to resolve item")
2815 .at_version(RelationVersionSelector::Latest);
2816
2817 let full_name = conn_catalog.resolve_full_name(item.name());
2818 let actual_desc = item.desc(&full_name).expect("invalid item type");
2819 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2820 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2821 {
2822 assert_eq!(
2823 actual_name, expected_name,
2824 "item {schema}.{name} column {index} name did not match its expected name"
2825 );
2826 assert_eq!(
2827 actual_typ, expected_typ,
2828 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2829 );
2830 }
2831 assert_eq!(
2832 &*actual_desc, expected_desc,
2833 "item {schema}.{name} did not match its expected RelationDesc"
2834 );
2835 }
2836 catalog.expire().await;
2837 })
2838 .await
2839 }
2840
2841 #[mz_ore::test(tokio::test)]
2844 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2846 async fn inner(catalog: Catalog) {
2847 let (client, connection) = tokio_postgres::connect(
2851 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2852 NoTls,
2853 )
2854 .await
2855 .expect("failed to connect to Postgres");
2856
2857 task::spawn(|| "compare_builtin_postgres", async move {
2858 if let Err(e) = connection.await {
2859 panic!("connection error: {}", e);
2860 }
2861 });
2862
2863 struct PgProc {
2864 name: String,
2865 arg_oids: Vec<u32>,
2866 ret_oid: Option<u32>,
2867 ret_set: bool,
2868 }
2869
2870 struct PgType {
2871 name: String,
2872 ty: String,
2873 elem: u32,
2874 array: u32,
2875 input: u32,
2876 receive: u32,
2877 }
2878
2879 struct PgOper {
2880 oprresult: u32,
2881 name: String,
2882 }
2883
2884 let pg_proc: BTreeMap<_, _> = client
2885 .query(
2886 "SELECT
2887 p.oid,
2888 proname,
2889 proargtypes,
2890 prorettype,
2891 proretset
2892 FROM pg_proc p
2893 JOIN pg_namespace n ON p.pronamespace = n.oid",
2894 &[],
2895 )
2896 .await
2897 .expect("pg query failed")
2898 .into_iter()
2899 .map(|row| {
2900 let oid: u32 = row.get("oid");
2901 let pg_proc = PgProc {
2902 name: row.get("proname"),
2903 arg_oids: row.get("proargtypes"),
2904 ret_oid: row.get("prorettype"),
2905 ret_set: row.get("proretset"),
2906 };
2907 (oid, pg_proc)
2908 })
2909 .collect();
2910
2911 let pg_type: BTreeMap<_, _> = client
2912 .query(
2913 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2914 &[],
2915 )
2916 .await
2917 .expect("pg query failed")
2918 .into_iter()
2919 .map(|row| {
2920 let oid: u32 = row.get("oid");
2921 let pg_type = PgType {
2922 name: row.get("typname"),
2923 ty: row.get("typtype"),
2924 elem: row.get("typelem"),
2925 array: row.get("typarray"),
2926 input: row.get("typinput"),
2927 receive: row.get("typreceive"),
2928 };
2929 (oid, pg_type)
2930 })
2931 .collect();
2932
2933 let pg_oper: BTreeMap<_, _> = client
2934 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2935 .await
2936 .expect("pg query failed")
2937 .into_iter()
2938 .map(|row| {
2939 let oid: u32 = row.get("oid");
2940 let pg_oper = PgOper {
2941 name: row.get("oprname"),
2942 oprresult: row.get("oprresult"),
2943 };
2944 (oid, pg_oper)
2945 })
2946 .collect();
2947
2948 let conn_catalog = catalog.for_system_session();
2949 let resolve_type_oid = |item: &str| {
2950 conn_catalog
2951 .resolve_type(&PartialItemName {
2952 database: None,
2953 schema: Some(PG_CATALOG_SCHEMA.into()),
2956 item: item.to_string(),
2957 })
2958 .expect("unable to resolve type")
2959 .oid()
2960 };
2961
2962 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2963 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2964 .collect();
2965
2966 let mut all_oids = BTreeSet::new();
2967
2968 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2971 [
2972 (Type::NAME, Type::TEXT),
2974 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2975 (Type::TIME, Type::TIMETZ),
2977 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2978 ]
2979 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2980 );
2981 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2982 1619, ]);
2984 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2985 if ignore_return_types.contains(&fn_oid) {
2986 return true;
2987 }
2988 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2989 return true;
2990 }
2991 a == b
2992 };
2993
2994 let builtins_cfg = BuiltinsConfig {
2995 include_continual_tasks: true,
2996 };
2997 for builtin in BUILTINS::iter(&builtins_cfg) {
2998 match builtin {
2999 Builtin::Type(ty) => {
3000 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3001
3002 if ty.oid >= FIRST_MATERIALIZE_OID {
3003 continue;
3006 }
3007
3008 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3011 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3012 });
3013 assert_eq!(
3014 ty.name, pg_ty.name,
3015 "oid {} has name {} in postgres; expected {}",
3016 ty.oid, pg_ty.name, ty.name,
3017 );
3018
3019 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3020 None => (0, 0),
3021 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3022 };
3023 assert_eq!(
3024 typinput_oid, pg_ty.input,
3025 "type {} has typinput OID {:?} in mz but {:?} in pg",
3026 ty.name, typinput_oid, pg_ty.input,
3027 );
3028 assert_eq!(
3029 typreceive_oid, pg_ty.receive,
3030 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3031 ty.name, typreceive_oid, pg_ty.receive,
3032 );
3033 if typinput_oid != 0 {
3034 assert!(
3035 func_oids.contains(&typinput_oid),
3036 "type {} has typinput OID {} that does not exist in pg_proc",
3037 ty.name,
3038 typinput_oid,
3039 );
3040 }
3041 if typreceive_oid != 0 {
3042 assert!(
3043 func_oids.contains(&typreceive_oid),
3044 "type {} has typreceive OID {} that does not exist in pg_proc",
3045 ty.name,
3046 typreceive_oid,
3047 );
3048 }
3049
3050 match &ty.details.typ {
3052 CatalogType::Array { element_reference } => {
3053 let elem_ty = BUILTINS::iter(&builtins_cfg)
3054 .filter_map(|builtin| match builtin {
3055 Builtin::Type(ty @ BuiltinType { name, .. })
3056 if element_reference == name =>
3057 {
3058 Some(ty)
3059 }
3060 _ => None,
3061 })
3062 .next();
3063 let elem_ty = match elem_ty {
3064 Some(ty) => ty,
3065 None => {
3066 panic!("{} is unexpectedly not a type", element_reference)
3067 }
3068 };
3069 assert_eq!(
3070 pg_ty.elem, elem_ty.oid,
3071 "type {} has mismatched element OIDs",
3072 ty.name
3073 )
3074 }
3075 CatalogType::Pseudo => {
3076 assert_eq!(
3077 pg_ty.ty, "p",
3078 "type {} is not a pseudo type as expected",
3079 ty.name
3080 )
3081 }
3082 CatalogType::Range { .. } => {
3083 assert_eq!(
3084 pg_ty.ty, "r",
3085 "type {} is not a range type as expected",
3086 ty.name
3087 );
3088 }
3089 _ => {
3090 assert_eq!(
3091 pg_ty.ty, "b",
3092 "type {} is not a base type as expected",
3093 ty.name
3094 )
3095 }
3096 }
3097
3098 let schema = catalog
3100 .resolve_schema_in_database(
3101 &ResolvedDatabaseSpecifier::Ambient,
3102 ty.schema,
3103 &SYSTEM_CONN_ID,
3104 )
3105 .expect("unable to resolve schema");
3106 let allocated_type = catalog
3107 .resolve_type(
3108 None,
3109 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3110 &PartialItemName {
3111 database: None,
3112 schema: Some(schema.name().schema.clone()),
3113 item: ty.name.to_string(),
3114 },
3115 &SYSTEM_CONN_ID,
3116 )
3117 .expect("unable to resolve type");
3118 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3119 ty
3120 } else {
3121 panic!("unexpectedly not a type")
3122 };
3123 match ty.details.array_id {
3124 Some(array_id) => {
3125 let array_ty = catalog.get_entry(&array_id);
3126 assert_eq!(
3127 pg_ty.array, array_ty.oid,
3128 "type {} has mismatched array OIDs",
3129 allocated_type.name.item,
3130 );
3131 }
3132 None => assert_eq!(
3133 pg_ty.array, 0,
3134 "type {} does not have an array type in mz but does in pg",
3135 allocated_type.name.item,
3136 ),
3137 }
3138 }
3139 Builtin::Func(func) => {
3140 for imp in func.inner.func_impls() {
3141 assert!(
3142 all_oids.insert(imp.oid),
3143 "{} reused oid {}",
3144 func.name,
3145 imp.oid
3146 );
3147
3148 assert!(
3149 imp.oid < FIRST_USER_OID,
3150 "built-in function {} erroneously has OID in user space ({})",
3151 func.name,
3152 imp.oid,
3153 );
3154
3155 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3158 continue;
3159 } else {
3160 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3161 panic!(
3162 "pg_proc missing function {}: oid {}",
3163 func.name, imp.oid
3164 )
3165 })
3166 };
3167 assert_eq!(
3168 func.name, pg_fn.name,
3169 "funcs with oid {} don't match names: {} in mz, {} in pg",
3170 imp.oid, func.name, pg_fn.name
3171 );
3172
3173 let imp_arg_oids = imp
3176 .arg_typs
3177 .iter()
3178 .map(|item| resolve_type_oid(item))
3179 .collect::<Vec<_>>();
3180
3181 if imp_arg_oids != pg_fn.arg_oids {
3182 println!(
3183 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3184 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3185 );
3186 }
3187
3188 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3189
3190 assert!(
3191 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3192 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3193 imp.oid,
3194 func.name,
3195 imp_return_oid,
3196 pg_fn.ret_oid
3197 );
3198
3199 assert_eq!(
3200 imp.return_is_set, pg_fn.ret_set,
3201 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3202 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3203 );
3204 }
3205 }
3206 _ => (),
3207 }
3208 }
3209
3210 for (op, func) in OP_IMPLS.iter() {
3211 for imp in func.func_impls() {
3212 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3213
3214 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3216 continue;
3217 } else {
3218 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3219 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3220 })
3221 };
3222
3223 assert_eq!(*op, pg_op.name);
3224
3225 let imp_return_oid =
3226 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3227 if imp_return_oid != pg_op.oprresult {
3228 panic!(
3229 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3230 imp.oid, op, imp_return_oid, pg_op.oprresult
3231 );
3232 }
3233 }
3234 }
3235 catalog.expire().await;
3236 }
3237
3238 Catalog::with_debug(inner).await
3239 }
3240
3241 #[mz_ore::test(tokio::test)]
3243 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3245 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3246 let catalog = Arc::new(catalog);
3247 let conn_catalog = catalog.for_system_session();
3248
3249 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3250 let mut handles = Vec::new();
3251
3252 let ignore_names = BTreeSet::from([
3254 "avg",
3255 "avg_internal_v1",
3256 "bool_and",
3257 "bool_or",
3258 "has_table_privilege", "has_type_privilege", "mod",
3261 "mz_panic",
3262 "mz_sleep",
3263 "pow",
3264 "stddev_pop",
3265 "stddev_samp",
3266 "stddev",
3267 "var_pop",
3268 "var_samp",
3269 "variance",
3270 ]);
3271
3272 let fns = BUILTINS::funcs()
3273 .map(|func| (&func.name, func.inner))
3274 .chain(OP_IMPLS.iter());
3275
3276 for (name, func) in fns {
3277 if ignore_names.contains(name) {
3278 continue;
3279 }
3280 let Func::Scalar(impls) = func else {
3281 continue;
3282 };
3283
3284 'outer: for imp in impls {
3285 let details = imp.details();
3286 let mut styps = Vec::new();
3287 for item in details.arg_typs.iter() {
3288 let oid = resolve_type_oid(item);
3289 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3290 continue 'outer;
3291 };
3292 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3293 }
3294 let datums = styps
3295 .iter()
3296 .map(|styp| {
3297 let mut datums = vec![Datum::Null];
3298 datums.extend(styp.interesting_datums());
3299 datums
3300 })
3301 .collect::<Vec<_>>();
3302 if datums.is_empty() {
3304 continue;
3305 }
3306
3307 let return_oid = details
3308 .return_typ
3309 .map(resolve_type_oid)
3310 .expect("must exist");
3311 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3312 .ok()
3313 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3314
3315 let mut idxs = vec![0; datums.len()];
3316 while idxs[0] < datums[0].len() {
3317 let mut args = Vec::with_capacity(idxs.len());
3318 for i in 0..(datums.len()) {
3319 args.push(datums[i][idxs[i]]);
3320 }
3321
3322 let op = &imp.op;
3323 let scalars = args
3324 .iter()
3325 .enumerate()
3326 .map(|(i, datum)| {
3327 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3328 datum.clone(),
3329 styps[i].clone(),
3330 ))
3331 })
3332 .collect();
3333
3334 let call_name = format!(
3335 "{name}({}) (oid: {})",
3336 args.iter()
3337 .map(|d| d.to_string())
3338 .collect::<Vec<_>>()
3339 .join(", "),
3340 imp.oid
3341 );
3342 let catalog = Arc::clone(&catalog);
3343 let call_name_fn = call_name.clone();
3344 let return_styp = return_styp.clone();
3345 let handle = task::spawn_blocking(
3346 || call_name,
3347 move || {
3348 smoketest_fn(
3349 name,
3350 call_name_fn,
3351 op,
3352 imp,
3353 args,
3354 catalog,
3355 scalars,
3356 return_styp,
3357 )
3358 },
3359 );
3360 handles.push(handle);
3361
3362 for i in (0..datums.len()).rev() {
3364 idxs[i] += 1;
3365 if idxs[i] >= datums[i].len() {
3366 if i == 0 {
3367 break;
3368 }
3369 idxs[i] = 0;
3370 continue;
3371 } else {
3372 break;
3373 }
3374 }
3375 }
3376 }
3377 }
3378 handles
3379 }
3380
3381 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3382 for handle in handles {
3383 handle.await;
3384 }
3385 }
3386
3387 fn smoketest_fn(
3388 name: &&str,
3389 call_name: String,
3390 op: &Operation<HirScalarExpr>,
3391 imp: &FuncImpl<HirScalarExpr>,
3392 args: Vec<Datum<'_>>,
3393 catalog: Arc<Catalog>,
3394 scalars: Vec<CoercibleScalarExpr>,
3395 return_styp: Option<SqlScalarType>,
3396 ) {
3397 let conn_catalog = catalog.for_system_session();
3398 let pcx = PlanContext::zero();
3399 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3400 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3401 let ecx = ExprContext {
3402 qcx: &qcx,
3403 name: "smoketest",
3404 scope: &Scope::empty(),
3405 relation_type: &SqlRelationType::empty(),
3406 allow_aggregates: false,
3407 allow_subqueries: false,
3408 allow_parameters: false,
3409 allow_windows: false,
3410 };
3411 let arena = RowArena::new();
3412 let mut session = Session::<Timestamp>::dummy();
3413 session
3414 .start_transaction(to_datetime(0), None, None)
3415 .expect("must succeed");
3416 let prep_style = ExprPrepStyle::OneShot {
3417 logical_time: EvalTime::Time(Timestamp::MIN),
3418 session: &session,
3419 catalog_state: &catalog.state,
3420 };
3421
3422 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3425 if let Ok(hir) = res {
3426 if let Ok(mut mir) = hir.lower_uncorrelated() {
3427 prep_scalar_expr(&mut mir, prep_style.clone()).expect("must succeed");
3429
3430 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3431 if let Some(return_styp) = return_styp {
3432 let mir_typ = mir.typ(&[]);
3433 assert_eq!(mir_typ.scalar_type, return_styp);
3436 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3440 panic!(
3441 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3442 );
3443 }
3444 if let Some((introduces_nulls, propagates_nulls)) =
3447 call_introduces_propagates_nulls(&mir)
3448 {
3449 if introduces_nulls {
3450 assert!(
3454 mir_typ.nullable,
3455 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3456 name, args, mir, mir_typ.nullable
3457 );
3458 } else {
3459 let any_input_null = args.iter().any(|arg| arg.is_null());
3460 if !any_input_null {
3461 assert!(
3462 !mir_typ.nullable,
3463 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3464 name, args, mir, mir_typ.nullable
3465 );
3466 } else {
3467 assert_eq!(
3468 mir_typ.nullable, propagates_nulls,
3469 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3470 name, args, mir, mir_typ.nullable
3471 );
3472 }
3473 }
3474 }
3475 let mut reduced = mir.clone();
3478 reduced.reduce(&[]);
3479 match reduced {
3480 MirScalarExpr::Literal(reduce_result, ctyp) => {
3481 match reduce_result {
3482 Ok(reduce_result_row) => {
3483 let reduce_result_datum = reduce_result_row.unpack_first();
3484 assert_eq!(
3485 reduce_result_datum,
3486 eval_result_datum,
3487 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3488 name,
3489 args,
3490 mir,
3491 eval_result_datum,
3492 mir_typ.scalar_type,
3493 reduce_result_datum,
3494 ctyp.scalar_type
3495 );
3496 assert_eq!(
3502 ctyp.scalar_type,
3503 mir_typ.scalar_type,
3504 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3505 name,
3506 args,
3507 mir,
3508 eval_result_datum,
3509 mir_typ.scalar_type,
3510 reduce_result_datum,
3511 ctyp.scalar_type
3512 );
3513 }
3514 Err(..) => {} }
3516 }
3517 _ => unreachable!(
3518 "all args are literals, so should have reduced to a literal"
3519 ),
3520 }
3521 }
3522 }
3523 }
3524 }
3525 }
3526
3527 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3532 match mir_func_call {
3533 MirScalarExpr::CallUnary { func, expr } => {
3534 if expr.is_literal() {
3535 Some((func.introduces_nulls(), func.propagates_nulls()))
3536 } else {
3537 None
3538 }
3539 }
3540 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3541 if expr1.is_literal() && expr2.is_literal() {
3542 Some((func.introduces_nulls(), func.propagates_nulls()))
3543 } else {
3544 None
3545 }
3546 }
3547 MirScalarExpr::CallVariadic { func, exprs } => {
3548 if exprs.iter().all(|arg| arg.is_literal()) {
3549 Some((func.introduces_nulls(), func.propagates_nulls()))
3550 } else {
3551 None
3552 }
3553 }
3554 _ => None,
3555 }
3556 }
3557
3558 #[mz_ore::test(tokio::test)]
3560 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3562 Catalog::with_debug(|catalog| async move {
3563 let conn_catalog = catalog.for_system_session();
3564
3565 for view in BUILTINS::views().filter(|view| {
3566 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3567 }) {
3568 let item = conn_catalog
3569 .resolve_item(&PartialItemName {
3570 database: None,
3571 schema: Some(view.schema.to_string()),
3572 item: view.name.to_string(),
3573 })
3574 .expect("unable to resolve view")
3575 .at_version(RelationVersionSelector::Latest);
3577 let full_name = conn_catalog.resolve_full_name(item.name());
3578 for col_type in item
3579 .desc(&full_name)
3580 .expect("invalid item type")
3581 .iter_types()
3582 {
3583 match &col_type.scalar_type {
3584 typ @ SqlScalarType::UInt16
3585 | typ @ SqlScalarType::UInt32
3586 | typ @ SqlScalarType::UInt64
3587 | typ @ SqlScalarType::MzTimestamp
3588 | typ @ SqlScalarType::List { .. }
3589 | typ @ SqlScalarType::Map { .. }
3590 | typ @ SqlScalarType::MzAclItem => {
3591 panic!("{typ:?} type found in {full_name}");
3592 }
3593 SqlScalarType::AclItem
3594 | SqlScalarType::Bool
3595 | SqlScalarType::Int16
3596 | SqlScalarType::Int32
3597 | SqlScalarType::Int64
3598 | SqlScalarType::Float32
3599 | SqlScalarType::Float64
3600 | SqlScalarType::Numeric { .. }
3601 | SqlScalarType::Date
3602 | SqlScalarType::Time
3603 | SqlScalarType::Timestamp { .. }
3604 | SqlScalarType::TimestampTz { .. }
3605 | SqlScalarType::Interval
3606 | SqlScalarType::PgLegacyChar
3607 | SqlScalarType::Bytes
3608 | SqlScalarType::String
3609 | SqlScalarType::Char { .. }
3610 | SqlScalarType::VarChar { .. }
3611 | SqlScalarType::Jsonb
3612 | SqlScalarType::Uuid
3613 | SqlScalarType::Array(_)
3614 | SqlScalarType::Record { .. }
3615 | SqlScalarType::Oid
3616 | SqlScalarType::RegProc
3617 | SqlScalarType::RegType
3618 | SqlScalarType::RegClass
3619 | SqlScalarType::Int2Vector
3620 | SqlScalarType::Range { .. }
3621 | SqlScalarType::PgLegacyName => {}
3622 }
3623 }
3624 }
3625 catalog.expire().await;
3626 })
3627 .await
3628 }
3629
3630 #[mz_ore::test(tokio::test)]
3633 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3635 Catalog::with_debug(|catalog| async move {
3636 let conn_catalog = catalog.for_system_session();
3637
3638 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3639 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3640
3641 for entry in catalog.entries() {
3642 let schema_spec = entry.name().qualifiers.schema_spec;
3643 let introspection_deps = catalog.introspection_dependencies(entry.id);
3644 if introspection_deps.is_empty() {
3645 assert!(
3646 schema_spec != introspection_schema_spec,
3647 "entry does not depend on introspection sources but is in \
3648 `mz_introspection`: {}",
3649 conn_catalog.resolve_full_name(entry.name()),
3650 );
3651 } else {
3652 assert!(
3653 schema_spec == introspection_schema_spec,
3654 "entry depends on introspection sources but is not in \
3655 `mz_introspection`: {}",
3656 conn_catalog.resolve_full_name(entry.name()),
3657 );
3658 }
3659 }
3660 })
3661 .await
3662 }
3663
3664 #[mz_ore::test(tokio::test)]
3665 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3667 let persist_client = PersistClient::new_for_tests().await;
3668 let bootstrap_args = test_bootstrap_args();
3669 let organization_id = Uuid::new_v4();
3670 let db_name = "DB";
3671
3672 let mut writer_catalog = Catalog::open_debug_catalog(
3673 persist_client.clone(),
3674 organization_id.clone(),
3675 &bootstrap_args,
3676 )
3677 .await
3678 .expect("open_debug_catalog");
3679 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3680 persist_client.clone(),
3681 organization_id.clone(),
3682 &bootstrap_args,
3683 )
3684 .await
3685 .expect("open_debug_read_only_catalog");
3686 assert_err!(writer_catalog.resolve_database(db_name));
3687 assert_err!(read_only_catalog.resolve_database(db_name));
3688
3689 let commit_ts = writer_catalog.current_upper().await;
3690 writer_catalog
3691 .transact(
3692 None,
3693 commit_ts,
3694 None,
3695 vec![Op::CreateDatabase {
3696 name: db_name.to_string(),
3697 owner_id: MZ_SYSTEM_ROLE_ID,
3698 }],
3699 )
3700 .await
3701 .expect("failed to transact");
3702
3703 let write_db = writer_catalog
3704 .resolve_database(db_name)
3705 .expect("resolve_database");
3706 read_only_catalog
3707 .sync_to_current_updates()
3708 .await
3709 .expect("sync_to_current_updates");
3710 let read_db = read_only_catalog
3711 .resolve_database(db_name)
3712 .expect("resolve_database");
3713
3714 assert_eq!(write_db, read_db);
3715
3716 let writer_catalog_fencer =
3717 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3718 .await
3719 .expect("open_debug_catalog for fencer");
3720 let fencer_db = writer_catalog_fencer
3721 .resolve_database(db_name)
3722 .expect("resolve_database for fencer");
3723 assert_eq!(fencer_db, read_db);
3724
3725 let write_fence_err = writer_catalog
3726 .sync_to_current_updates()
3727 .await
3728 .expect_err("sync_to_current_updates for fencer");
3729 assert!(matches!(
3730 write_fence_err,
3731 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3732 ));
3733 let read_fence_err = read_only_catalog
3734 .sync_to_current_updates()
3735 .await
3736 .expect_err("sync_to_current_updates after fencer");
3737 assert!(matches!(
3738 read_fence_err,
3739 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3740 ));
3741
3742 writer_catalog.expire().await;
3743 read_only_catalog.expire().await;
3744 writer_catalog_fencer.expire().await;
3745 }
3746}