1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet, VecDeque};
16use std::convert;
17use std::sync::Arc;
18
19use futures::future::BoxFuture;
20use futures::{Future, FutureExt};
21use itertools::Itertools;
22use mz_adapter_types::bootstrap_builtin_cluster_config::{
23 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
24 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
25 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
26};
27use mz_adapter_types::connection::ConnectionId;
28use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
29use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
30use mz_catalog::builtin::{
31 BUILTIN_PREFIXES, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable,
32 MZ_CATALOG_SERVER_CLUSTER,
33};
34use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Config, StateConfig};
35#[cfg(test)]
36use mz_catalog::durable::CatalogError;
37use mz_catalog::durable::{
38 BootstrapArgs, DurableCatalogState, TestCatalogStateBuilder, test_bootstrap_args,
39};
40use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
41use mz_catalog::memory::error::{Error, ErrorKind};
42use mz_catalog::memory::objects::{
43 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database,
44 NetworkPolicy, Role, RoleAuth, Schema,
45};
46use mz_compute_types::dataflows::DataflowDescription;
47use mz_controller::clusters::ReplicaLocation;
48use mz_controller_types::{ClusterId, ReplicaId};
49use mz_expr::OptimizedMirRelationExpr;
50use mz_license_keys::ValidatedLicenseKey;
51use mz_ore::collections::HashSet;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
54use mz_ore::result::ResultExt as _;
55use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
56use mz_persist_client::PersistClient;
57use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
58use mz_repr::explain::ExprHumanizer;
59use mz_repr::namespaces::MZ_TEMP_SCHEMA;
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::role_id::RoleId;
62use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, SqlScalarType};
63use mz_secrets::InMemorySecretsController;
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
67 CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
68 DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
72 PUBLIC_ROLE_NAME, PartialItemName, QualifiedItemName, QualifiedSchemaName,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
76use mz_sql::rbac;
77use mz_sql::session::metadata::SessionMetadata;
78use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
79use mz_sql::session::vars::SystemVars;
80use mz_sql_parser::ast::QualifiedReplica;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
83use mz_transform::dataflow::DataflowMetainfo;
84use mz_transform::notice::OptimizerNotice;
85use smallvec::SmallVec;
86use tokio::sync::MutexGuard;
87use tokio::sync::mpsc::UnboundedSender;
88use uuid::Uuid;
89
90pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
92pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
93pub use crate::catalog::state::CatalogState;
94pub use crate::catalog::transact::{
95 DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
96};
97use crate::command::CatalogDump;
98use crate::coord::TargetCluster;
99#[cfg(test)]
100use crate::coord::catalog_implications::parsed_state_updates::ParsedStateUpdate;
101use crate::session::{Portal, PreparedStatement, Session};
102use crate::util::ResultExt;
103use crate::{AdapterError, AdapterNotice, ExecuteResponse};
104
105mod builtin_table_updates;
106pub(crate) mod consistency;
107mod migrate;
108
109mod apply;
110mod open;
111mod state;
112mod timeline;
113mod transact;
114
115#[derive(Debug)]
138pub struct Catalog {
139 state: CatalogState,
140 plans: CatalogPlans,
141 expr_cache_handle: Option<ExpressionCacheHandle>,
142 storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
143 transient_revision: u64,
144}
145
146impl Clone for Catalog {
149 fn clone(&self) -> Self {
150 Self {
151 state: self.state.clone(),
152 plans: self.plans.clone(),
153 expr_cache_handle: self.expr_cache_handle.clone(),
154 storage: Arc::clone(&self.storage),
155 transient_revision: self.transient_revision,
156 }
157 }
158}
159
160#[derive(Default, Debug, Clone)]
161pub struct CatalogPlans {
162 optimized_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
163 physical_plan_by_id: BTreeMap<GlobalId, Arc<DataflowDescription<mz_compute_types::plan::Plan>>>,
164 dataflow_metainfos: BTreeMap<GlobalId, DataflowMetainfo<Arc<OptimizerNotice>>>,
165 notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
166}
167
168impl Catalog {
169 #[mz_ore::instrument(level = "trace")]
171 pub fn set_optimized_plan(
172 &mut self,
173 id: GlobalId,
174 plan: DataflowDescription<OptimizedMirRelationExpr>,
175 ) {
176 self.plans.optimized_plan_by_id.insert(id, plan.into());
177 }
178
179 #[mz_ore::instrument(level = "trace")]
181 pub fn set_physical_plan(
182 &mut self,
183 id: GlobalId,
184 plan: DataflowDescription<mz_compute_types::plan::Plan>,
185 ) {
186 self.plans.physical_plan_by_id.insert(id, plan.into());
187 }
188
189 #[mz_ore::instrument(level = "trace")]
191 pub fn try_get_optimized_plan(
192 &self,
193 id: &GlobalId,
194 ) -> Option<&DataflowDescription<OptimizedMirRelationExpr>> {
195 self.plans.optimized_plan_by_id.get(id).map(AsRef::as_ref)
196 }
197
198 #[mz_ore::instrument(level = "trace")]
200 pub fn try_get_physical_plan(
201 &self,
202 id: &GlobalId,
203 ) -> Option<&DataflowDescription<mz_compute_types::plan::Plan>> {
204 self.plans.physical_plan_by_id.get(id).map(AsRef::as_ref)
205 }
206
207 #[mz_ore::instrument(level = "trace")]
209 pub fn set_dataflow_metainfo(
210 &mut self,
211 id: GlobalId,
212 metainfo: DataflowMetainfo<Arc<OptimizerNotice>>,
213 ) {
214 for notice in metainfo.optimizer_notices.iter() {
216 for dep_id in notice.dependencies.iter() {
217 let entry = self.plans.notices_by_dep_id.entry(*dep_id).or_default();
218 entry.push(Arc::clone(notice))
219 }
220 if let Some(item_id) = notice.item_id {
221 soft_assert_eq_or_log!(
222 item_id,
223 id,
224 "notice.item_id should match the id for whom we are saving the notice"
225 );
226 }
227 }
228 self.plans.dataflow_metainfos.insert(id, metainfo);
230 }
231
232 #[mz_ore::instrument(level = "trace")]
234 pub fn try_get_dataflow_metainfo(
235 &self,
236 id: &GlobalId,
237 ) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
238 self.plans.dataflow_metainfos.get(id)
239 }
240
241 #[mz_ore::instrument(level = "trace")]
250 pub fn drop_plans_and_metainfos(
251 &mut self,
252 drop_ids: &BTreeSet<GlobalId>,
253 ) -> BTreeSet<Arc<OptimizerNotice>> {
254 let mut dropped_notices = BTreeSet::new();
256
257 for id in drop_ids {
259 self.plans.optimized_plan_by_id.remove(id);
260 self.plans.physical_plan_by_id.remove(id);
261 if let Some(mut metainfo) = self.plans.dataflow_metainfos.remove(id) {
262 soft_assert_or_log!(
263 metainfo.optimizer_notices.iter().all_unique(),
264 "should have been pushed there by `push_optimizer_notice_dedup`"
265 );
266 for n in metainfo.optimizer_notices.drain(..) {
267 for dep_id in n.dependencies.iter() {
269 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(dep_id) {
270 soft_assert_or_log!(
271 notices.iter().any(|x| &n == x),
272 "corrupt notices_by_dep_id"
273 );
274 notices.retain(|x| &n != x)
275 }
276 }
277 dropped_notices.insert(n);
278 }
279 }
280 }
281
282 for id in drop_ids {
284 if let Some(mut notices) = self.plans.notices_by_dep_id.remove(id) {
285 for n in notices.drain(..) {
286 if let Some(item_id) = n.item_id.as_ref() {
288 if let Some(metainfo) = self.plans.dataflow_metainfos.get_mut(item_id) {
289 metainfo.optimizer_notices.iter().for_each(|n2| {
290 if let Some(item_id_2) = n2.item_id {
291 soft_assert_eq_or_log!(item_id_2, *item_id, "a notice's item_id should match the id for whom we have saved the notice");
292 }
293 });
294 metainfo.optimizer_notices.retain(|x| &n != x);
295 }
296 }
297 dropped_notices.insert(n);
298 }
299 }
300 }
301
302 let mut todo_dep_ids = BTreeSet::new();
305 for notice in dropped_notices.iter() {
306 for dep_id in notice.dependencies.iter() {
307 if !drop_ids.contains(dep_id) {
308 todo_dep_ids.insert(*dep_id);
309 }
310 }
311 }
312 for id in todo_dep_ids {
315 if let Some(notices) = self.plans.notices_by_dep_id.get_mut(&id) {
316 notices.retain(|n| !dropped_notices.contains(n))
317 }
318 }
319
320 dropped_notices
327 }
328
329 pub(crate) fn invalidate_for_index(
340 &self,
341 ons: impl Iterator<Item = GlobalId>,
342 ) -> BTreeSet<GlobalId> {
343 let mut dependencies = BTreeSet::new();
344 let mut queue = VecDeque::new();
345 let mut seen = HashSet::new();
346 for on in ons {
347 let entry = self.get_entry_by_global_id(&on);
348 dependencies.insert(on);
349 seen.insert(entry.id);
350 let uses = entry.uses();
351 queue.extend(uses.clone());
352 }
353
354 while let Some(cur) = queue.pop_front() {
355 let entry = self.get_entry(&cur);
356 if seen.insert(cur) {
357 let global_ids = entry.global_ids();
358 match entry.item_type() {
359 CatalogItemType::Table
360 | CatalogItemType::Source
361 | CatalogItemType::MaterializedView
362 | CatalogItemType::Sink
363 | CatalogItemType::Index
364 | CatalogItemType::Type
365 | CatalogItemType::Func
366 | CatalogItemType::Secret
367 | CatalogItemType::Connection
368 | CatalogItemType::ContinualTask => {
369 dependencies.extend(global_ids);
370 }
371 CatalogItemType::View => {
372 dependencies.extend(global_ids);
373 queue.extend(entry.uses());
374 }
375 }
376 }
377 }
378 dependencies
379 }
380}
381
382#[derive(Debug)]
383pub struct ConnCatalog<'a> {
384 state: Cow<'a, CatalogState>,
385 unresolvable_ids: BTreeSet<CatalogItemId>,
395 conn_id: ConnectionId,
396 cluster: String,
397 database: Option<DatabaseId>,
398 search_path: Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
399 role_id: RoleId,
400 prepared_statements: Option<&'a BTreeMap<String, PreparedStatement>>,
401 portals: Option<&'a BTreeMap<String, Portal>>,
402 notices_tx: UnboundedSender<AdapterNotice>,
403}
404
405impl ConnCatalog<'_> {
406 pub fn conn_id(&self) -> &ConnectionId {
407 &self.conn_id
408 }
409
410 pub fn state(&self) -> &CatalogState {
411 &*self.state
412 }
413
414 pub fn mark_id_unresolvable_for_replanning(&mut self, id: CatalogItemId) {
424 assert_eq!(
425 self.role_id, MZ_SYSTEM_ROLE_ID,
426 "only the system role can mark IDs unresolvable",
427 );
428 self.unresolvable_ids.insert(id);
429 }
430
431 pub fn effective_search_path(
437 &self,
438 include_temp_schema: bool,
439 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
440 self.state
441 .effective_search_path(&self.search_path, include_temp_schema)
442 }
443}
444
445impl ConnectionResolver for ConnCatalog<'_> {
446 fn resolve_connection(
447 &self,
448 id: CatalogItemId,
449 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
450 self.state().resolve_connection(id)
451 }
452}
453
454impl Catalog {
455 pub fn transient_revision(&self) -> u64 {
459 self.transient_revision
460 }
461
462 pub async fn with_debug<F, Fut, T>(f: F) -> T
474 where
475 F: FnOnce(Catalog) -> Fut,
476 Fut: Future<Output = T>,
477 {
478 let persist_client = PersistClient::new_for_tests().await;
479 let organization_id = Uuid::new_v4();
480 let bootstrap_args = test_bootstrap_args();
481 let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
482 .await
483 .expect("can open debug catalog");
484 f(catalog).await
485 }
486
487 pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
490 where
491 F: FnOnce(Catalog) -> Fut,
492 Fut: Future<Output = T>,
493 {
494 let persist_client = PersistClient::new_for_tests().await;
495 let organization_id = Uuid::new_v4();
496 let bootstrap_args = test_bootstrap_args();
497 let mut catalog =
498 Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
499 .await
500 .expect("can open debug catalog");
501
502 let now = SYSTEM_TIME.clone();
504 let openable_storage = TestCatalogStateBuilder::new(persist_client)
505 .with_organization_id(organization_id)
506 .with_default_deploy_generation()
507 .build()
508 .await
509 .expect("can create durable catalog");
510 let mut storage = openable_storage
511 .open(now().into(), &bootstrap_args)
512 .await
513 .expect("can open durable catalog")
514 .0;
515 let _ = storage
517 .sync_to_current_updates()
518 .await
519 .expect("can sync to current updates");
520 catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));
521
522 f(catalog).await
523 }
524
525 pub async fn open_debug_catalog(
529 persist_client: PersistClient,
530 organization_id: Uuid,
531 bootstrap_args: &BootstrapArgs,
532 ) -> Result<Catalog, anyhow::Error> {
533 let now = SYSTEM_TIME.clone();
534 let environment_id = None;
535 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
536 .with_organization_id(organization_id)
537 .with_default_deploy_generation()
538 .build()
539 .await?;
540 let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
541 let system_parameter_defaults = BTreeMap::default();
542 Self::open_debug_catalog_inner(
543 persist_client,
544 storage,
545 now,
546 environment_id,
547 &DUMMY_BUILD_INFO,
548 system_parameter_defaults,
549 bootstrap_args,
550 None,
551 )
552 .await
553 }
554
555 pub async fn open_debug_read_only_catalog(
560 persist_client: PersistClient,
561 organization_id: Uuid,
562 bootstrap_args: &BootstrapArgs,
563 ) -> Result<Catalog, anyhow::Error> {
564 let now = SYSTEM_TIME.clone();
565 let environment_id = None;
566 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
567 .with_organization_id(organization_id)
568 .build()
569 .await?;
570 let storage = openable_storage
571 .open_read_only(&test_bootstrap_args())
572 .await?;
573 let system_parameter_defaults = BTreeMap::default();
574 Self::open_debug_catalog_inner(
575 persist_client,
576 storage,
577 now,
578 environment_id,
579 &DUMMY_BUILD_INFO,
580 system_parameter_defaults,
581 bootstrap_args,
582 None,
583 )
584 .await
585 }
586
587 pub async fn open_debug_read_only_persist_catalog_config(
592 persist_client: PersistClient,
593 now: NowFn,
594 environment_id: EnvironmentId,
595 system_parameter_defaults: BTreeMap<String, String>,
596 build_info: &'static BuildInfo,
597 bootstrap_args: &BootstrapArgs,
598 enable_expression_cache_override: Option<bool>,
599 ) -> Result<Catalog, anyhow::Error> {
600 let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
601 .with_organization_id(environment_id.organization_id())
602 .with_version(
603 build_info
604 .version
605 .parse()
606 .expect("build version is parseable"),
607 )
608 .build()
609 .await?;
610 let storage = openable_storage.open_read_only(bootstrap_args).await?;
611 Self::open_debug_catalog_inner(
612 persist_client,
613 storage,
614 now,
615 Some(environment_id),
616 build_info,
617 system_parameter_defaults,
618 bootstrap_args,
619 enable_expression_cache_override,
620 )
621 .await
622 }
623
624 async fn open_debug_catalog_inner(
625 persist_client: PersistClient,
626 storage: Box<dyn DurableCatalogState>,
627 now: NowFn,
628 environment_id: Option<EnvironmentId>,
629 build_info: &'static BuildInfo,
630 system_parameter_defaults: BTreeMap<String, String>,
631 bootstrap_args: &BootstrapArgs,
632 enable_expression_cache_override: Option<bool>,
633 ) -> Result<Catalog, anyhow::Error> {
634 let metrics_registry = &MetricsRegistry::new();
635 let secrets_reader = Arc::new(InMemorySecretsController::new());
636 let previous_ts = now().into();
639 let replica_size = &bootstrap_args.default_cluster_replica_size;
640 let read_only = false;
641
642 let OpenCatalogResult {
643 catalog,
644 migrated_storage_collections_0dt: _,
645 new_builtin_collections: _,
646 builtin_table_updates: _,
647 cached_global_exprs: _,
648 uncached_local_exprs: _,
649 } = Catalog::open(Config {
650 storage,
651 metrics_registry,
652 state: StateConfig {
653 unsafe_mode: true,
654 all_features: false,
655 build_info,
656 deploy_generation: 0,
657 environment_id: environment_id.unwrap_or_else(EnvironmentId::for_tests),
658 read_only,
659 now,
660 boot_ts: previous_ts,
661 skip_migrations: true,
662 cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
663 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
664 size: replica_size.clone(),
665 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
666 },
667 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
668 size: replica_size.clone(),
669 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
670 },
671 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
672 size: replica_size.clone(),
673 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
674 },
675 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
676 size: replica_size.clone(),
677 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
678 },
679 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
680 size: replica_size.clone(),
681 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
682 },
683 system_parameter_defaults,
684 remote_system_parameters: None,
685 availability_zones: vec![],
686 egress_addresses: vec![],
687 aws_principal_context: None,
688 aws_privatelink_availability_zones: None,
689 http_host_name: None,
690 connection_context: ConnectionContext::for_tests(secrets_reader),
691 builtin_item_migration_config: BuiltinItemMigrationConfig {
692 persist_client: persist_client.clone(),
693 read_only,
694 force_migration: None,
695 },
696 persist_client,
697 enable_expression_cache_override,
698 helm_chart_version: None,
699 external_login_password_mz_system: None,
700 license_key: ValidatedLicenseKey::for_tests(),
701 },
702 })
703 .await?;
704 Ok(catalog)
705 }
706
707 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
708 self.state.for_session(session)
709 }
710
711 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
712 self.state.for_sessionless_user(role_id)
713 }
714
715 pub fn for_system_session(&self) -> ConnCatalog<'_> {
716 self.state.for_system_session()
717 }
718
719 async fn storage<'a>(
720 &'a self,
721 ) -> MutexGuard<'a, Box<dyn mz_catalog::durable::DurableCatalogState>> {
722 self.storage.lock().await
723 }
724
725 pub async fn current_upper(&self) -> mz_repr::Timestamp {
726 self.storage().await.current_upper().await
727 }
728
729 pub async fn allocate_user_id(
730 &self,
731 commit_ts: mz_repr::Timestamp,
732 ) -> Result<(CatalogItemId, GlobalId), Error> {
733 self.storage()
734 .await
735 .allocate_user_id(commit_ts)
736 .await
737 .maybe_terminate("allocating user ids")
738 .err_into()
739 }
740
741 pub async fn allocate_user_ids(
743 &self,
744 amount: u64,
745 commit_ts: mz_repr::Timestamp,
746 ) -> Result<Vec<(CatalogItemId, GlobalId)>, Error> {
747 self.storage()
748 .await
749 .allocate_user_ids(amount, commit_ts)
750 .await
751 .maybe_terminate("allocating user ids")
752 .err_into()
753 }
754
755 pub async fn allocate_user_id_for_test(&self) -> Result<(CatalogItemId, GlobalId), Error> {
756 let commit_ts = self.storage().await.current_upper().await;
757 self.allocate_user_id(commit_ts).await
758 }
759
760 pub async fn get_next_user_item_id(&self) -> Result<u64, Error> {
762 self.storage()
763 .await
764 .get_next_user_item_id()
765 .await
766 .err_into()
767 }
768
769 #[cfg(test)]
770 pub async fn allocate_system_id(
771 &self,
772 commit_ts: mz_repr::Timestamp,
773 ) -> Result<(CatalogItemId, GlobalId), Error> {
774 use mz_ore::collections::CollectionExt;
775
776 let mut storage = self.storage().await;
777 let mut txn = storage.transaction().await?;
778 let id = txn
779 .allocate_system_item_ids(1)
780 .maybe_terminate("allocating system ids")?
781 .into_element();
782 let _ = txn.get_and_commit_op_updates();
784 txn.commit(commit_ts).await?;
785 Ok(id)
786 }
787
788 pub async fn get_next_system_item_id(&self) -> Result<u64, Error> {
790 self.storage()
791 .await
792 .get_next_system_item_id()
793 .await
794 .err_into()
795 }
796
797 pub async fn allocate_user_cluster_id(
798 &self,
799 commit_ts: mz_repr::Timestamp,
800 ) -> Result<ClusterId, Error> {
801 self.storage()
802 .await
803 .allocate_user_cluster_id(commit_ts)
804 .await
805 .maybe_terminate("allocating user cluster ids")
806 .err_into()
807 }
808
809 pub async fn get_next_system_replica_id(&self) -> Result<u64, Error> {
811 self.storage()
812 .await
813 .get_next_system_replica_id()
814 .await
815 .err_into()
816 }
817
818 pub async fn get_next_user_replica_id(&self) -> Result<u64, Error> {
820 self.storage()
821 .await
822 .get_next_user_replica_id()
823 .await
824 .err_into()
825 }
826
827 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
828 self.state.resolve_database(database_name)
829 }
830
831 pub fn resolve_schema(
832 &self,
833 current_database: Option<&DatabaseId>,
834 database_name: Option<&str>,
835 schema_name: &str,
836 conn_id: &ConnectionId,
837 ) -> Result<&Schema, SqlCatalogError> {
838 self.state
839 .resolve_schema(current_database, database_name, schema_name, conn_id)
840 }
841
842 pub fn resolve_schema_in_database(
843 &self,
844 database_spec: &ResolvedDatabaseSpecifier,
845 schema_name: &str,
846 conn_id: &ConnectionId,
847 ) -> Result<&Schema, SqlCatalogError> {
848 self.state
849 .resolve_schema_in_database(database_spec, schema_name, conn_id)
850 }
851
852 pub fn resolve_replica_in_cluster(
853 &self,
854 cluster_id: &ClusterId,
855 replica_name: &str,
856 ) -> Result<&ClusterReplica, SqlCatalogError> {
857 self.state
858 .resolve_replica_in_cluster(cluster_id, replica_name)
859 }
860
861 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
862 self.state.resolve_system_schema(name)
863 }
864
865 pub fn resolve_search_path(
866 &self,
867 session: &Session,
868 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
869 self.state.resolve_search_path(session)
870 }
871
872 pub fn resolve_entry(
874 &self,
875 current_database: Option<&DatabaseId>,
876 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
877 name: &PartialItemName,
878 conn_id: &ConnectionId,
879 ) -> Result<&CatalogEntry, SqlCatalogError> {
880 self.state
881 .resolve_entry(current_database, search_path, name, conn_id)
882 }
883
884 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
886 self.state.resolve_builtin_table(builtin)
887 }
888
889 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> CatalogItemId {
891 self.state.resolve_builtin_log(builtin).0
892 }
893
894 pub fn resolve_builtin_storage_collection(
896 &self,
897 builtin: &'static BuiltinSource,
898 ) -> CatalogItemId {
899 self.state.resolve_builtin_source(builtin)
900 }
901
902 pub fn resolve_function(
904 &self,
905 current_database: Option<&DatabaseId>,
906 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
907 name: &PartialItemName,
908 conn_id: &ConnectionId,
909 ) -> Result<&CatalogEntry, SqlCatalogError> {
910 self.state
911 .resolve_function(current_database, search_path, name, conn_id)
912 }
913
914 pub fn resolve_type(
916 &self,
917 current_database: Option<&DatabaseId>,
918 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
919 name: &PartialItemName,
920 conn_id: &ConnectionId,
921 ) -> Result<&CatalogEntry, SqlCatalogError> {
922 self.state
923 .resolve_type(current_database, search_path, name, conn_id)
924 }
925
926 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
927 self.state.resolve_cluster(name)
928 }
929
930 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
936 self.state.resolve_builtin_cluster(cluster)
937 }
938
939 pub fn get_mz_catalog_server_cluster_id(&self) -> &ClusterId {
940 &self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER).id
941 }
942
943 pub fn resolve_target_cluster(
945 &self,
946 target_cluster: TargetCluster,
947 session: &Session,
948 ) -> Result<&Cluster, AdapterError> {
949 match target_cluster {
950 TargetCluster::CatalogServer => {
951 Ok(self.resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER))
952 }
953 TargetCluster::Active => self.active_cluster(session),
954 TargetCluster::Transaction(cluster_id) => self
955 .try_get_cluster(cluster_id)
956 .ok_or(AdapterError::ConcurrentClusterDrop),
957 }
958 }
959
960 pub fn active_cluster(&self, session: &Session) -> Result<&Cluster, AdapterError> {
961 if session.user().name != SYSTEM_USER.name
965 && session.user().name != SUPPORT_USER.name
966 && session.vars().cluster() == SYSTEM_USER.name
967 {
968 coord_bail!(
969 "system cluster '{}' cannot execute user queries",
970 SYSTEM_USER.name
971 );
972 }
973 let cluster = self.resolve_cluster(session.vars().cluster())?;
974 Ok(cluster)
975 }
976
977 pub fn state(&self) -> &CatalogState {
978 &self.state
979 }
980
981 pub fn resolve_full_name(
982 &self,
983 name: &QualifiedItemName,
984 conn_id: Option<&ConnectionId>,
985 ) -> FullItemName {
986 self.state.resolve_full_name(name, conn_id)
987 }
988
989 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
990 self.state.try_get_entry(id)
991 }
992
993 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
994 self.state.try_get_entry_by_global_id(id)
995 }
996
997 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
998 self.state.get_entry(id)
999 }
1000
1001 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
1002 self.state.get_entry_by_global_id(id)
1003 }
1004
1005 pub fn get_global_ids<'a>(
1006 &'a self,
1007 id: &CatalogItemId,
1008 ) -> impl Iterator<Item = GlobalId> + use<'a> {
1009 self.get_entry(id).global_ids()
1010 }
1011
1012 pub fn resolve_item_id(&self, id: &GlobalId) -> CatalogItemId {
1013 self.get_entry_by_global_id(id).id()
1014 }
1015
1016 pub fn try_resolve_item_id(&self, id: &GlobalId) -> Option<CatalogItemId> {
1017 let item = self.try_get_entry_by_global_id(id)?;
1018 Some(item.id())
1019 }
1020
1021 pub fn get_schema(
1022 &self,
1023 database_spec: &ResolvedDatabaseSpecifier,
1024 schema_spec: &SchemaSpecifier,
1025 conn_id: &ConnectionId,
1026 ) -> &Schema {
1027 self.state.get_schema(database_spec, schema_spec, conn_id)
1028 }
1029
1030 pub fn try_get_schema(
1031 &self,
1032 database_spec: &ResolvedDatabaseSpecifier,
1033 schema_spec: &SchemaSpecifier,
1034 conn_id: &ConnectionId,
1035 ) -> Option<&Schema> {
1036 self.state
1037 .try_get_schema(database_spec, schema_spec, conn_id)
1038 }
1039
1040 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1041 self.state.get_mz_catalog_schema_id()
1042 }
1043
1044 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1045 self.state.get_pg_catalog_schema_id()
1046 }
1047
1048 pub fn get_information_schema_id(&self) -> SchemaId {
1049 self.state.get_information_schema_id()
1050 }
1051
1052 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1053 self.state.get_mz_internal_schema_id()
1054 }
1055
1056 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1057 self.state.get_mz_introspection_schema_id()
1058 }
1059
1060 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1061 self.state.get_mz_unsafe_schema_id()
1062 }
1063
1064 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1065 self.state.system_schema_ids()
1066 }
1067
1068 pub fn get_database(&self, id: &DatabaseId) -> &Database {
1069 self.state.get_database(id)
1070 }
1071
1072 pub fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
1073 self.state.try_get_role(id)
1074 }
1075
1076 pub fn get_role(&self, id: &RoleId) -> &Role {
1077 self.state.get_role(id)
1078 }
1079
1080 pub fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
1081 self.state.try_get_role_by_name(role_name)
1082 }
1083
1084 pub fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
1085 self.state.try_get_role_auth_by_id(id)
1086 }
1087
1088 pub fn create_temporary_schema(
1091 &mut self,
1092 conn_id: &ConnectionId,
1093 owner_id: RoleId,
1094 ) -> Result<(), Error> {
1095 self.state.create_temporary_schema(conn_id, owner_id)
1096 }
1097
1098 fn item_exists_in_temp_schemas(&self, conn_id: &ConnectionId, item_name: &str) -> bool {
1099 self.state
1101 .temporary_schemas
1102 .get(conn_id)
1103 .map(|schema| schema.items.contains_key(item_name))
1104 .unwrap_or(false)
1105 }
1106
1107 pub fn drop_temporary_schema(&mut self, conn_id: &ConnectionId) -> Result<(), Error> {
1110 let Some(schema) = self.state.temporary_schemas.remove(conn_id) else {
1111 return Ok(());
1112 };
1113 if !schema.items.is_empty() {
1114 return Err(Error::new(ErrorKind::SchemaNotEmpty(MZ_TEMP_SCHEMA.into())));
1115 }
1116 Ok(())
1117 }
1118
1119 pub(crate) fn object_dependents(
1120 &self,
1121 object_ids: &Vec<ObjectId>,
1122 conn_id: &ConnectionId,
1123 ) -> Vec<ObjectId> {
1124 let mut seen = BTreeSet::new();
1125 self.state.object_dependents(object_ids, conn_id, &mut seen)
1126 }
1127
1128 fn full_name_detail(name: &FullItemName) -> FullNameV1 {
1129 FullNameV1 {
1130 database: name.database.to_string(),
1131 schema: name.schema.clone(),
1132 item: name.item.clone(),
1133 }
1134 }
1135
1136 pub fn find_available_cluster_name(&self, name: &str) -> String {
1137 let mut i = 0;
1138 let mut candidate = name.to_string();
1139 while self.state.clusters_by_name.contains_key(&candidate) {
1140 i += 1;
1141 candidate = format!("{}{}", name, i);
1142 }
1143 candidate
1144 }
1145
1146 pub fn get_role_allowed_cluster_sizes(&self, role_id: &Option<RoleId>) -> Vec<String> {
1147 if role_id == &Some(MZ_SYSTEM_ROLE_ID) {
1148 self.cluster_replica_sizes()
1149 .enabled_allocations()
1150 .map(|a| a.0.to_owned())
1151 .collect::<Vec<_>>()
1152 } else {
1153 self.system_config().allowed_cluster_replica_sizes()
1154 }
1155 }
1156
1157 pub fn concretize_replica_location(
1158 &self,
1159 location: mz_catalog::durable::ReplicaLocation,
1160 allowed_sizes: &Vec<String>,
1161 allowed_availability_zones: Option<&[String]>,
1162 ) -> Result<ReplicaLocation, Error> {
1163 self.state
1164 .concretize_replica_location(location, allowed_sizes, allowed_availability_zones)
1165 }
1166
1167 pub(crate) fn ensure_valid_replica_size(
1168 &self,
1169 allowed_sizes: &[String],
1170 size: &String,
1171 ) -> Result<(), Error> {
1172 self.state.ensure_valid_replica_size(allowed_sizes, size)
1173 }
1174
1175 pub fn cluster_replica_sizes(&self) -> &ClusterReplicaSizeMap {
1176 &self.state.cluster_replica_sizes
1177 }
1178
1179 pub fn get_privileges(
1181 &self,
1182 id: &SystemObjectId,
1183 conn_id: &ConnectionId,
1184 ) -> Option<&PrivilegeMap> {
1185 match id {
1186 SystemObjectId::Object(id) => match id {
1187 ObjectId::Cluster(id) => Some(self.get_cluster(*id).privileges()),
1188 ObjectId::Database(id) => Some(self.get_database(id).privileges()),
1189 ObjectId::Schema((database_spec, schema_spec)) => Some(
1190 self.get_schema(database_spec, schema_spec, conn_id)
1191 .privileges(),
1192 ),
1193 ObjectId::Item(id) => Some(self.get_entry(id).privileges()),
1194 ObjectId::ClusterReplica(_) | ObjectId::Role(_) => None,
1195 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(*id).privileges()),
1196 },
1197 SystemObjectId::System => Some(&self.state.system_privileges),
1198 }
1199 }
1200
1201 #[mz_ore::instrument(level = "debug")]
1202 pub async fn confirm_leadership(&self) -> Result<(), AdapterError> {
1203 Ok(self.storage().await.confirm_leadership().await?)
1204 }
1205
1206 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
1208 self.state.introspection_dependencies(id)
1209 }
1210
1211 pub fn dump(&self) -> Result<CatalogDump, Error> {
1217 Ok(CatalogDump::new(self.state.dump(None)?))
1218 }
1219
1220 pub fn check_consistency(&self) -> Result<(), serde_json::Value> {
1224 self.state.check_consistency().map_err(|inconsistencies| {
1225 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1226 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1227 })
1228 })
1229 }
1230
1231 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1232 self.state.config()
1233 }
1234
1235 pub fn entries(&self) -> impl Iterator<Item = &CatalogEntry> {
1236 self.state.entry_by_id.values()
1237 }
1238
1239 pub fn user_connections(&self) -> impl Iterator<Item = &CatalogEntry> {
1240 self.entries()
1241 .filter(|entry| entry.is_connection() && entry.id().is_user())
1242 }
1243
1244 pub fn user_tables(&self) -> impl Iterator<Item = &CatalogEntry> {
1245 self.entries()
1246 .filter(|entry| entry.is_table() && entry.id().is_user())
1247 }
1248
1249 pub fn user_sources(&self) -> impl Iterator<Item = &CatalogEntry> {
1250 self.entries()
1251 .filter(|entry| entry.is_source() && entry.id().is_user())
1252 }
1253
1254 pub fn user_sinks(&self) -> impl Iterator<Item = &CatalogEntry> {
1255 self.entries()
1256 .filter(|entry| entry.is_sink() && entry.id().is_user())
1257 }
1258
1259 pub fn user_materialized_views(&self) -> impl Iterator<Item = &CatalogEntry> {
1260 self.entries()
1261 .filter(|entry| entry.is_materialized_view() && entry.id().is_user())
1262 }
1263
1264 pub fn user_secrets(&self) -> impl Iterator<Item = &CatalogEntry> {
1265 self.entries()
1266 .filter(|entry| entry.is_secret() && entry.id().is_user())
1267 }
1268
1269 pub fn get_network_policy(&self, network_policy_id: NetworkPolicyId) -> &NetworkPolicy {
1270 self.state.get_network_policy(&network_policy_id)
1271 }
1272
1273 pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
1274 self.state.try_get_network_policy_by_name(name)
1275 }
1276
1277 pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
1278 self.state.clusters_by_id.values()
1279 }
1280
1281 pub fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
1282 self.state.get_cluster(cluster_id)
1283 }
1284
1285 pub fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
1286 self.state.try_get_cluster(cluster_id)
1287 }
1288
1289 pub fn user_clusters(&self) -> impl Iterator<Item = &Cluster> {
1290 self.clusters().filter(|cluster| cluster.id.is_user())
1291 }
1292
1293 pub fn get_cluster_replica(
1294 &self,
1295 cluster_id: ClusterId,
1296 replica_id: ReplicaId,
1297 ) -> &ClusterReplica {
1298 self.state.get_cluster_replica(cluster_id, replica_id)
1299 }
1300
1301 pub fn try_get_cluster_replica(
1302 &self,
1303 cluster_id: ClusterId,
1304 replica_id: ReplicaId,
1305 ) -> Option<&ClusterReplica> {
1306 self.state.try_get_cluster_replica(cluster_id, replica_id)
1307 }
1308
1309 pub fn user_cluster_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
1310 self.user_clusters()
1311 .flat_map(|cluster| cluster.user_replicas())
1312 }
1313
1314 pub fn databases(&self) -> impl Iterator<Item = &Database> {
1315 self.state.database_by_id.values()
1316 }
1317
1318 pub fn user_roles(&self) -> impl Iterator<Item = &Role> {
1319 self.state
1320 .roles_by_id
1321 .values()
1322 .filter(|role| role.is_user())
1323 }
1324
1325 pub fn user_continual_tasks(&self) -> impl Iterator<Item = &CatalogEntry> {
1326 self.entries()
1327 .filter(|entry| entry.is_continual_task() && entry.id().is_user())
1328 }
1329
1330 pub fn system_privileges(&self) -> &PrivilegeMap {
1331 &self.state.system_privileges
1332 }
1333
1334 pub fn default_privileges(
1335 &self,
1336 ) -> impl Iterator<
1337 Item = (
1338 &DefaultPrivilegeObject,
1339 impl Iterator<Item = &DefaultPrivilegeAclItem>,
1340 ),
1341 > {
1342 self.state.default_privileges.iter()
1343 }
1344
1345 pub fn pack_item_update(&self, id: CatalogItemId, diff: Diff) -> Vec<BuiltinTableUpdate> {
1346 self.state
1347 .resolve_builtin_table_updates(self.state.pack_item_update(id, diff))
1348 }
1349
1350 pub fn pack_storage_usage_update(
1351 &self,
1352 event: VersionedStorageUsage,
1353 diff: Diff,
1354 ) -> BuiltinTableUpdate {
1355 self.state
1356 .resolve_builtin_table_update(self.state.pack_storage_usage_update(event, diff))
1357 }
1358
1359 pub fn system_config(&self) -> &SystemVars {
1360 self.state.system_config()
1361 }
1362
1363 pub fn system_config_mut(&mut self) -> &mut SystemVars {
1364 self.state.system_config_mut()
1365 }
1366
1367 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
1368 self.state.ensure_not_reserved_role(role_id)
1369 }
1370
1371 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
1372 self.state.ensure_grantable_role(role_id)
1373 }
1374
1375 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
1376 self.state.ensure_not_system_role(role_id)
1377 }
1378
1379 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
1380 self.state.ensure_not_predefined_role(role_id)
1381 }
1382
1383 pub fn ensure_not_reserved_network_policy(
1384 &self,
1385 network_policy_id: &NetworkPolicyId,
1386 ) -> Result<(), Error> {
1387 self.state
1388 .ensure_not_reserved_network_policy(network_policy_id)
1389 }
1390
1391 pub fn ensure_not_reserved_object(
1392 &self,
1393 object_id: &ObjectId,
1394 conn_id: &ConnectionId,
1395 ) -> Result<(), Error> {
1396 match object_id {
1397 ObjectId::Cluster(cluster_id) => {
1398 if cluster_id.is_system() {
1399 let cluster = self.get_cluster(*cluster_id);
1400 Err(Error::new(ErrorKind::ReadOnlyCluster(
1401 cluster.name().to_string(),
1402 )))
1403 } else {
1404 Ok(())
1405 }
1406 }
1407 ObjectId::ClusterReplica((cluster_id, replica_id)) => {
1408 if replica_id.is_system() {
1409 let replica = self.get_cluster_replica(*cluster_id, *replica_id);
1410 Err(Error::new(ErrorKind::ReadOnlyClusterReplica(
1411 replica.name().to_string(),
1412 )))
1413 } else {
1414 Ok(())
1415 }
1416 }
1417 ObjectId::Database(database_id) => {
1418 if database_id.is_system() {
1419 let database = self.get_database(database_id);
1420 Err(Error::new(ErrorKind::ReadOnlyDatabase(
1421 database.name().to_string(),
1422 )))
1423 } else {
1424 Ok(())
1425 }
1426 }
1427 ObjectId::Schema((database_spec, schema_spec)) => {
1428 if schema_spec.is_system() {
1429 let schema = self.get_schema(database_spec, schema_spec, conn_id);
1430 Err(Error::new(ErrorKind::ReadOnlySystemSchema(
1431 schema.name().schema.clone(),
1432 )))
1433 } else {
1434 Ok(())
1435 }
1436 }
1437 ObjectId::Role(role_id) => self.ensure_not_reserved_role(role_id),
1438 ObjectId::Item(item_id) => {
1439 if item_id.is_system() {
1440 let item = self.get_entry(item_id);
1441 let name = self.resolve_full_name(item.name(), Some(conn_id));
1442 Err(Error::new(ErrorKind::ReadOnlyItem(name.to_string())))
1443 } else {
1444 Ok(())
1445 }
1446 }
1447 ObjectId::NetworkPolicy(network_policy_id) => {
1448 self.ensure_not_reserved_network_policy(network_policy_id)
1449 }
1450 }
1451 }
1452
1453 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1455 &mut self,
1456 create_sql: &str,
1457 force_if_exists_skip: bool,
1458 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1459 self.state
1460 .deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
1461 }
1462
1463 pub(crate) fn update_expression_cache<'a, 'b>(
1464 &'a self,
1465 new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1466 new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1467 ) -> BoxFuture<'b, ()> {
1468 if let Some(expr_cache) = &self.expr_cache_handle {
1469 let ons = new_local_expressions
1470 .iter()
1471 .map(|(id, _)| id)
1472 .chain(new_global_expressions.iter().map(|(id, _)| id))
1473 .map(|id| self.get_entry_by_global_id(id))
1474 .filter_map(|entry| entry.index().map(|index| index.on));
1475 let invalidate_ids = self.invalidate_for_index(ons);
1476 expr_cache
1477 .update(
1478 new_local_expressions,
1479 new_global_expressions,
1480 invalidate_ids,
1481 )
1482 .boxed()
1483 } else {
1484 async {}.boxed()
1485 }
1486 }
1487
1488 #[cfg(test)]
1492 async fn sync_to_current_updates(
1493 &mut self,
1494 ) -> Result<
1495 (
1496 Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
1497 Vec<ParsedStateUpdate>,
1498 ),
1499 CatalogError,
1500 > {
1501 let updates = self.storage().await.sync_to_current_updates().await?;
1502 let (builtin_table_updates, catalog_updates) = self
1503 .state
1504 .apply_updates(updates, &mut state::LocalExpressionCache::Closed)
1505 .await;
1506 Ok((builtin_table_updates, catalog_updates))
1507 }
1508}
1509
1510pub fn is_reserved_name(name: &str) -> bool {
1511 BUILTIN_PREFIXES
1512 .iter()
1513 .any(|prefix| name.starts_with(prefix))
1514}
1515
1516pub fn is_reserved_role_name(name: &str) -> bool {
1517 is_reserved_name(name) || is_public_role(name)
1518}
1519
1520pub fn is_public_role(name: &str) -> bool {
1521 name == &*PUBLIC_ROLE_NAME
1522}
1523
1524pub(crate) fn catalog_type_to_audit_object_type(sql_type: SqlCatalogItemType) -> ObjectType {
1525 object_type_to_audit_object_type(sql_type.into())
1526}
1527
1528pub(crate) fn comment_id_to_audit_object_type(id: CommentObjectId) -> ObjectType {
1529 match id {
1530 CommentObjectId::Table(_) => ObjectType::Table,
1531 CommentObjectId::View(_) => ObjectType::View,
1532 CommentObjectId::MaterializedView(_) => ObjectType::MaterializedView,
1533 CommentObjectId::Source(_) => ObjectType::Source,
1534 CommentObjectId::Sink(_) => ObjectType::Sink,
1535 CommentObjectId::Index(_) => ObjectType::Index,
1536 CommentObjectId::Func(_) => ObjectType::Func,
1537 CommentObjectId::Connection(_) => ObjectType::Connection,
1538 CommentObjectId::Type(_) => ObjectType::Type,
1539 CommentObjectId::Secret(_) => ObjectType::Secret,
1540 CommentObjectId::Role(_) => ObjectType::Role,
1541 CommentObjectId::Database(_) => ObjectType::Database,
1542 CommentObjectId::Schema(_) => ObjectType::Schema,
1543 CommentObjectId::Cluster(_) => ObjectType::Cluster,
1544 CommentObjectId::ClusterReplica(_) => ObjectType::ClusterReplica,
1545 CommentObjectId::ContinualTask(_) => ObjectType::ContinualTask,
1546 CommentObjectId::NetworkPolicy(_) => ObjectType::NetworkPolicy,
1547 }
1548}
1549
1550pub(crate) fn object_type_to_audit_object_type(
1551 object_type: mz_sql::catalog::ObjectType,
1552) -> ObjectType {
1553 system_object_type_to_audit_object_type(&SystemObjectType::Object(object_type))
1554}
1555
1556pub(crate) fn system_object_type_to_audit_object_type(
1557 system_type: &SystemObjectType,
1558) -> ObjectType {
1559 match system_type {
1560 SystemObjectType::Object(object_type) => match object_type {
1561 mz_sql::catalog::ObjectType::Table => ObjectType::Table,
1562 mz_sql::catalog::ObjectType::View => ObjectType::View,
1563 mz_sql::catalog::ObjectType::MaterializedView => ObjectType::MaterializedView,
1564 mz_sql::catalog::ObjectType::Source => ObjectType::Source,
1565 mz_sql::catalog::ObjectType::Sink => ObjectType::Sink,
1566 mz_sql::catalog::ObjectType::Index => ObjectType::Index,
1567 mz_sql::catalog::ObjectType::Type => ObjectType::Type,
1568 mz_sql::catalog::ObjectType::Role => ObjectType::Role,
1569 mz_sql::catalog::ObjectType::Cluster => ObjectType::Cluster,
1570 mz_sql::catalog::ObjectType::ClusterReplica => ObjectType::ClusterReplica,
1571 mz_sql::catalog::ObjectType::Secret => ObjectType::Secret,
1572 mz_sql::catalog::ObjectType::Connection => ObjectType::Connection,
1573 mz_sql::catalog::ObjectType::Database => ObjectType::Database,
1574 mz_sql::catalog::ObjectType::Schema => ObjectType::Schema,
1575 mz_sql::catalog::ObjectType::Func => ObjectType::Func,
1576 mz_sql::catalog::ObjectType::ContinualTask => ObjectType::ContinualTask,
1577 mz_sql::catalog::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy,
1578 },
1579 SystemObjectType::System => ObjectType::System,
1580 }
1581}
1582
1583#[derive(Debug, Copy, Clone)]
1584pub enum UpdatePrivilegeVariant {
1585 Grant,
1586 Revoke,
1587}
1588
1589impl From<UpdatePrivilegeVariant> for ExecuteResponse {
1590 fn from(variant: UpdatePrivilegeVariant) -> Self {
1591 match variant {
1592 UpdatePrivilegeVariant::Grant => ExecuteResponse::GrantedPrivilege,
1593 UpdatePrivilegeVariant::Revoke => ExecuteResponse::RevokedPrivilege,
1594 }
1595 }
1596}
1597
1598impl From<UpdatePrivilegeVariant> for EventType {
1599 fn from(variant: UpdatePrivilegeVariant) -> Self {
1600 match variant {
1601 UpdatePrivilegeVariant::Grant => EventType::Grant,
1602 UpdatePrivilegeVariant::Revoke => EventType::Revoke,
1603 }
1604 }
1605}
1606
1607impl ConnCatalog<'_> {
1608 fn resolve_item_name(
1609 &self,
1610 name: &PartialItemName,
1611 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1612 self.resolve_item(name).map(|entry| entry.name())
1613 }
1614
1615 fn resolve_function_name(
1616 &self,
1617 name: &PartialItemName,
1618 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1619 self.resolve_function(name).map(|entry| entry.name())
1620 }
1621
1622 fn resolve_type_name(
1623 &self,
1624 name: &PartialItemName,
1625 ) -> Result<&QualifiedItemName, SqlCatalogError> {
1626 self.resolve_type(name).map(|entry| entry.name())
1627 }
1628}
1629
1630impl ExprHumanizer for ConnCatalog<'_> {
1631 fn humanize_id(&self, id: GlobalId) -> Option<String> {
1632 let entry = self.state.try_get_entry_by_global_id(&id)?;
1633 Some(self.resolve_full_name(entry.name()).to_string())
1634 }
1635
1636 fn humanize_id_unqualified(&self, id: GlobalId) -> Option<String> {
1637 let entry = self.state.try_get_entry_by_global_id(&id)?;
1638 Some(entry.name().item.clone())
1639 }
1640
1641 fn humanize_id_parts(&self, id: GlobalId) -> Option<Vec<String>> {
1642 let entry = self.state.try_get_entry_by_global_id(&id)?;
1643 Some(self.resolve_full_name(entry.name()).into_parts())
1644 }
1645
1646 fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
1647 use SqlScalarType::*;
1648
1649 match typ {
1650 Array(t) => format!("{}[]", self.humanize_scalar_type(t, postgres_compat)),
1651 List {
1652 custom_id: Some(item_id),
1653 ..
1654 }
1655 | Map {
1656 custom_id: Some(item_id),
1657 ..
1658 } => {
1659 let item = self.get_item(item_id);
1660 self.minimal_qualification(item.name()).to_string()
1661 }
1662 List { element_type, .. } => {
1663 format!(
1664 "{} list",
1665 self.humanize_scalar_type(element_type, postgres_compat)
1666 )
1667 }
1668 Map { value_type, .. } => format!(
1669 "map[{}=>{}]",
1670 self.humanize_scalar_type(&SqlScalarType::String, postgres_compat),
1671 self.humanize_scalar_type(value_type, postgres_compat)
1672 ),
1673 Record {
1674 custom_id: Some(item_id),
1675 ..
1676 } => {
1677 let item = self.get_item(item_id);
1678 self.minimal_qualification(item.name()).to_string()
1679 }
1680 Record { fields, .. } => format!(
1681 "record({})",
1682 fields
1683 .iter()
1684 .map(|f| format!(
1685 "{}: {}",
1686 f.0,
1687 self.humanize_column_type(&f.1, postgres_compat)
1688 ))
1689 .join(",")
1690 ),
1691 PgLegacyChar => "\"char\"".into(),
1692 Char { length } if !postgres_compat => match length {
1693 None => "char".into(),
1694 Some(length) => format!("char({})", length.into_u32()),
1695 },
1696 VarChar { max_length } if !postgres_compat => match max_length {
1697 None => "varchar".into(),
1698 Some(length) => format!("varchar({})", length.into_u32()),
1699 },
1700 UInt16 => "uint2".into(),
1701 UInt32 => "uint4".into(),
1702 UInt64 => "uint8".into(),
1703 ty => {
1704 let pgrepr_type = mz_pgrepr::Type::from(ty);
1705 let pg_catalog_schema = SchemaSpecifier::Id(self.state.get_pg_catalog_schema_id());
1706
1707 let res = if self
1708 .effective_search_path(true)
1709 .iter()
1710 .any(|(_, schema)| schema == &pg_catalog_schema)
1711 {
1712 pgrepr_type.name().to_string()
1713 } else {
1714 let name = QualifiedItemName {
1717 qualifiers: ItemQualifiers {
1718 database_spec: ResolvedDatabaseSpecifier::Ambient,
1719 schema_spec: pg_catalog_schema,
1720 },
1721 item: pgrepr_type.name().to_string(),
1722 };
1723 self.resolve_full_name(&name).to_string()
1724 };
1725 res
1726 }
1727 }
1728 }
1729
1730 fn column_names_for_id(&self, id: GlobalId) -> Option<Vec<String>> {
1731 let entry = self.state.try_get_entry_by_global_id(&id)?;
1732
1733 match entry.index() {
1734 Some(index) => {
1735 let on_desc = self.state.try_get_desc_by_global_id(&index.on)?;
1736 let mut on_names = on_desc
1737 .iter_names()
1738 .map(|col_name| col_name.to_string())
1739 .collect::<Vec<_>>();
1740
1741 let (p, _) = mz_expr::permutation_for_arrangement(&index.keys, on_desc.arity());
1742
1743 let ix_arity = p.iter().map(|x| *x + 1).max().unwrap_or(0);
1747 let mut ix_names = vec![String::new(); ix_arity];
1748
1749 for (on_pos, ix_pos) in p.into_iter().enumerate() {
1751 let on_name = on_names.get_mut(on_pos).expect("on_name");
1752 let ix_name = ix_names.get_mut(ix_pos).expect("ix_name");
1753 std::mem::swap(on_name, ix_name);
1754 }
1755
1756 Some(ix_names) }
1758 None => {
1759 let desc = self.state.try_get_desc_by_global_id(&id)?;
1760 let column_names = desc
1761 .iter_names()
1762 .map(|col_name| col_name.to_string())
1763 .collect();
1764
1765 Some(column_names)
1766 }
1767 }
1768 }
1769
1770 fn humanize_column(&self, id: GlobalId, column: usize) -> Option<String> {
1771 let desc = self.state.try_get_desc_by_global_id(&id)?;
1772 Some(desc.get_name(column).to_string())
1773 }
1774
1775 fn id_exists(&self, id: GlobalId) -> bool {
1776 self.state.entry_by_global_id.contains_key(&id)
1777 }
1778}
1779
1780impl SessionCatalog for ConnCatalog<'_> {
1781 fn active_role_id(&self) -> &RoleId {
1782 &self.role_id
1783 }
1784
1785 fn get_prepared_statement_desc(&self, name: &str) -> Option<&StatementDesc> {
1786 self.prepared_statements
1787 .as_ref()
1788 .map(|ps| ps.get(name).map(|ps| ps.desc()))
1789 .flatten()
1790 }
1791
1792 fn get_portal_desc_unverified(&self, portal_name: &str) -> Option<&StatementDesc> {
1793 self.portals
1794 .and_then(|portals| portals.get(portal_name).map(|portal| &portal.desc))
1795 }
1796
1797 fn active_database(&self) -> Option<&DatabaseId> {
1798 self.database.as_ref()
1799 }
1800
1801 fn active_cluster(&self) -> &str {
1802 &self.cluster
1803 }
1804
1805 fn search_path(&self) -> &[(ResolvedDatabaseSpecifier, SchemaSpecifier)] {
1806 &self.search_path
1807 }
1808
1809 fn resolve_database(
1810 &self,
1811 database_name: &str,
1812 ) -> Result<&dyn mz_sql::catalog::CatalogDatabase, SqlCatalogError> {
1813 Ok(self.state.resolve_database(database_name)?)
1814 }
1815
1816 fn get_database(&self, id: &DatabaseId) -> &dyn mz_sql::catalog::CatalogDatabase {
1817 self.state
1818 .database_by_id
1819 .get(id)
1820 .expect("database doesn't exist")
1821 }
1822
1823 #[allow(clippy::as_conversions)]
1825 fn get_databases(&self) -> Vec<&dyn CatalogDatabase> {
1826 self.state
1827 .database_by_id
1828 .values()
1829 .map(|database| database as &dyn CatalogDatabase)
1830 .collect()
1831 }
1832
1833 fn resolve_schema(
1834 &self,
1835 database_name: Option<&str>,
1836 schema_name: &str,
1837 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1838 Ok(self.state.resolve_schema(
1839 self.database.as_ref(),
1840 database_name,
1841 schema_name,
1842 &self.conn_id,
1843 )?)
1844 }
1845
1846 fn resolve_schema_in_database(
1847 &self,
1848 database_spec: &ResolvedDatabaseSpecifier,
1849 schema_name: &str,
1850 ) -> Result<&dyn mz_sql::catalog::CatalogSchema, SqlCatalogError> {
1851 Ok(self
1852 .state
1853 .resolve_schema_in_database(database_spec, schema_name, &self.conn_id)?)
1854 }
1855
1856 fn get_schema(
1857 &self,
1858 database_spec: &ResolvedDatabaseSpecifier,
1859 schema_spec: &SchemaSpecifier,
1860 ) -> &dyn CatalogSchema {
1861 self.state
1862 .get_schema(database_spec, schema_spec, &self.conn_id)
1863 }
1864
1865 #[allow(clippy::as_conversions)]
1867 fn get_schemas(&self) -> Vec<&dyn CatalogSchema> {
1868 self.get_databases()
1869 .into_iter()
1870 .flat_map(|database| database.schemas().into_iter())
1871 .chain(
1872 self.state
1873 .ambient_schemas_by_id
1874 .values()
1875 .chain(self.state.temporary_schemas.values())
1876 .map(|schema| schema as &dyn CatalogSchema),
1877 )
1878 .collect()
1879 }
1880
1881 fn get_mz_internal_schema_id(&self) -> SchemaId {
1882 self.state().get_mz_internal_schema_id()
1883 }
1884
1885 fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1886 self.state().get_mz_unsafe_schema_id()
1887 }
1888
1889 fn is_system_schema_specifier(&self, schema: SchemaSpecifier) -> bool {
1890 self.state.is_system_schema_specifier(schema)
1891 }
1892
1893 fn resolve_role(
1894 &self,
1895 role_name: &str,
1896 ) -> Result<&dyn mz_sql::catalog::CatalogRole, SqlCatalogError> {
1897 match self.state.try_get_role_by_name(role_name) {
1898 Some(role) => Ok(role),
1899 None => Err(SqlCatalogError::UnknownRole(role_name.into())),
1900 }
1901 }
1902
1903 fn resolve_network_policy(
1904 &self,
1905 policy_name: &str,
1906 ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> {
1907 match self.state.try_get_network_policy_by_name(policy_name) {
1908 Some(policy) => Ok(policy),
1909 None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())),
1910 }
1911 }
1912
1913 fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> {
1914 Some(self.state.roles_by_id.get(id)?)
1915 }
1916
1917 fn get_role(&self, id: &RoleId) -> &dyn mz_sql::catalog::CatalogRole {
1918 self.state.get_role(id)
1919 }
1920
1921 fn get_roles(&self) -> Vec<&dyn CatalogRole> {
1922 #[allow(clippy::as_conversions)]
1924 self.state
1925 .roles_by_id
1926 .values()
1927 .map(|role| role as &dyn CatalogRole)
1928 .collect()
1929 }
1930
1931 fn mz_system_role_id(&self) -> RoleId {
1932 MZ_SYSTEM_ROLE_ID
1933 }
1934
1935 fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
1936 self.state.collect_role_membership(id)
1937 }
1938
1939 fn get_network_policy(
1940 &self,
1941 id: &NetworkPolicyId,
1942 ) -> &dyn mz_sql::catalog::CatalogNetworkPolicy {
1943 self.state.get_network_policy(id)
1944 }
1945
1946 fn get_network_policies(&self) -> Vec<&dyn mz_sql::catalog::CatalogNetworkPolicy> {
1947 #[allow(clippy::as_conversions)]
1949 self.state
1950 .network_policies_by_id
1951 .values()
1952 .map(|policy| policy as &dyn CatalogNetworkPolicy)
1953 .collect()
1954 }
1955
1956 fn resolve_cluster(
1957 &self,
1958 cluster_name: Option<&str>,
1959 ) -> Result<&dyn mz_sql::catalog::CatalogCluster<'_>, SqlCatalogError> {
1960 Ok(self
1961 .state
1962 .resolve_cluster(cluster_name.unwrap_or_else(|| self.active_cluster()))?)
1963 }
1964
1965 fn resolve_cluster_replica(
1966 &self,
1967 cluster_replica_name: &QualifiedReplica,
1968 ) -> Result<&dyn CatalogClusterReplica<'_>, SqlCatalogError> {
1969 Ok(self.state.resolve_cluster_replica(cluster_replica_name)?)
1970 }
1971
1972 fn resolve_item(
1973 &self,
1974 name: &PartialItemName,
1975 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1976 let r = self.state.resolve_entry(
1977 self.database.as_ref(),
1978 &self.effective_search_path(true),
1979 name,
1980 &self.conn_id,
1981 )?;
1982 if self.unresolvable_ids.contains(&r.id()) {
1983 Err(SqlCatalogError::UnknownItem(name.to_string()))
1984 } else {
1985 Ok(r)
1986 }
1987 }
1988
1989 fn resolve_function(
1990 &self,
1991 name: &PartialItemName,
1992 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
1993 let r = self.state.resolve_function(
1994 self.database.as_ref(),
1995 &self.effective_search_path(false),
1996 name,
1997 &self.conn_id,
1998 )?;
1999
2000 if self.unresolvable_ids.contains(&r.id()) {
2001 Err(SqlCatalogError::UnknownFunction {
2002 name: name.to_string(),
2003 alternative: None,
2004 })
2005 } else {
2006 Ok(r)
2007 }
2008 }
2009
2010 fn resolve_type(
2011 &self,
2012 name: &PartialItemName,
2013 ) -> Result<&dyn mz_sql::catalog::CatalogItem, SqlCatalogError> {
2014 let r = self.state.resolve_type(
2015 self.database.as_ref(),
2016 &self.effective_search_path(false),
2017 name,
2018 &self.conn_id,
2019 )?;
2020
2021 if self.unresolvable_ids.contains(&r.id()) {
2022 Err(SqlCatalogError::UnknownType {
2023 name: name.to_string(),
2024 })
2025 } else {
2026 Ok(r)
2027 }
2028 }
2029
2030 fn get_system_type(&self, name: &str) -> &dyn mz_sql::catalog::CatalogItem {
2031 self.state.get_system_type(name)
2032 }
2033
2034 fn try_get_item(&self, id: &CatalogItemId) -> Option<&dyn mz_sql::catalog::CatalogItem> {
2035 Some(self.state.try_get_entry(id)?)
2036 }
2037
2038 fn try_get_item_by_global_id(
2039 &self,
2040 id: &GlobalId,
2041 ) -> Option<Box<dyn mz_sql::catalog::CatalogCollectionItem>> {
2042 let entry = self.state.try_get_entry_by_global_id(id)?;
2043 let entry = match &entry.item {
2044 CatalogItem::Table(table) => {
2045 let (version, _gid) = table
2046 .collections
2047 .iter()
2048 .find(|(_version, gid)| *gid == id)
2049 .expect("catalog out of sync, mismatched GlobalId");
2050 entry.at_version(RelationVersionSelector::Specific(*version))
2051 }
2052 _ => entry.at_version(RelationVersionSelector::Latest),
2053 };
2054 Some(entry)
2055 }
2056
2057 fn get_item(&self, id: &CatalogItemId) -> &dyn mz_sql::catalog::CatalogItem {
2058 self.state.get_entry(id)
2059 }
2060
2061 fn get_item_by_global_id(
2062 &self,
2063 id: &GlobalId,
2064 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
2065 let entry = self.state.get_entry_by_global_id(id);
2066 let entry = match &entry.item {
2067 CatalogItem::Table(table) => {
2068 let (version, _gid) = table
2069 .collections
2070 .iter()
2071 .find(|(_version, gid)| *gid == id)
2072 .expect("catalog out of sync, mismatched GlobalId");
2073 entry.at_version(RelationVersionSelector::Specific(*version))
2074 }
2075 _ => entry.at_version(RelationVersionSelector::Latest),
2076 };
2077 entry
2078 }
2079
2080 fn get_items(&self) -> Vec<&dyn mz_sql::catalog::CatalogItem> {
2081 self.get_schemas()
2082 .into_iter()
2083 .flat_map(|schema| schema.item_ids())
2084 .map(|id| self.get_item(&id))
2085 .collect()
2086 }
2087
2088 fn get_item_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2089 self.state
2090 .get_item_by_name(name, &self.conn_id)
2091 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2092 }
2093
2094 fn get_type_by_name(&self, name: &QualifiedItemName) -> Option<&dyn SqlCatalogItem> {
2095 self.state
2096 .get_type_by_name(name, &self.conn_id)
2097 .map(|item| convert::identity::<&dyn SqlCatalogItem>(item))
2098 }
2099
2100 fn get_cluster(&self, id: ClusterId) -> &dyn mz_sql::catalog::CatalogCluster<'_> {
2101 &self.state.clusters_by_id[&id]
2102 }
2103
2104 fn get_clusters(&self) -> Vec<&dyn mz_sql::catalog::CatalogCluster<'_>> {
2105 self.state
2106 .clusters_by_id
2107 .values()
2108 .map(|cluster| convert::identity::<&dyn mz_sql::catalog::CatalogCluster>(cluster))
2109 .collect()
2110 }
2111
2112 fn get_cluster_replica(
2113 &self,
2114 cluster_id: ClusterId,
2115 replica_id: ReplicaId,
2116 ) -> &dyn mz_sql::catalog::CatalogClusterReplica<'_> {
2117 let cluster = self.get_cluster(cluster_id);
2118 cluster.replica(replica_id)
2119 }
2120
2121 fn get_cluster_replicas(&self) -> Vec<&dyn mz_sql::catalog::CatalogClusterReplica<'_>> {
2122 self.get_clusters()
2123 .into_iter()
2124 .flat_map(|cluster| cluster.replicas().into_iter())
2125 .collect()
2126 }
2127
2128 fn get_system_privileges(&self) -> &PrivilegeMap {
2129 &self.state.system_privileges
2130 }
2131
2132 fn get_default_privileges(
2133 &self,
2134 ) -> Vec<(&DefaultPrivilegeObject, Vec<&DefaultPrivilegeAclItem>)> {
2135 self.state
2136 .default_privileges
2137 .iter()
2138 .map(|(object, acl_items)| (object, acl_items.collect()))
2139 .collect()
2140 }
2141
2142 fn find_available_name(&self, name: QualifiedItemName) -> QualifiedItemName {
2143 self.state.find_available_name(name, &self.conn_id)
2144 }
2145
2146 fn resolve_full_name(&self, name: &QualifiedItemName) -> FullItemName {
2147 self.state.resolve_full_name(name, Some(&self.conn_id))
2148 }
2149
2150 fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
2151 self.state.resolve_full_schema_name(name)
2152 }
2153
2154 fn resolve_item_id(&self, global_id: &GlobalId) -> CatalogItemId {
2155 self.state.get_entry_by_global_id(global_id).id()
2156 }
2157
2158 fn resolve_global_id(
2159 &self,
2160 item_id: &CatalogItemId,
2161 version: RelationVersionSelector,
2162 ) -> GlobalId {
2163 self.state
2164 .get_entry(item_id)
2165 .at_version(version)
2166 .global_id()
2167 }
2168
2169 fn config(&self) -> &mz_sql::catalog::CatalogConfig {
2170 self.state.config()
2171 }
2172
2173 fn now(&self) -> EpochMillis {
2174 (self.state.config().now)()
2175 }
2176
2177 fn aws_privatelink_availability_zones(&self) -> Option<BTreeSet<String>> {
2178 self.state.aws_privatelink_availability_zones.clone()
2179 }
2180
2181 fn system_vars(&self) -> &SystemVars {
2182 &self.state.system_configuration
2183 }
2184
2185 fn system_vars_mut(&mut self) -> &mut SystemVars {
2186 &mut self.state.to_mut().system_configuration
2187 }
2188
2189 fn get_owner_id(&self, id: &ObjectId) -> Option<RoleId> {
2190 self.state().get_owner_id(id, self.conn_id())
2191 }
2192
2193 fn get_privileges(&self, id: &SystemObjectId) -> Option<&PrivilegeMap> {
2194 match id {
2195 SystemObjectId::System => Some(&self.state.system_privileges),
2196 SystemObjectId::Object(ObjectId::Cluster(id)) => {
2197 Some(self.get_cluster(*id).privileges())
2198 }
2199 SystemObjectId::Object(ObjectId::Database(id)) => {
2200 Some(self.get_database(id).privileges())
2201 }
2202 SystemObjectId::Object(ObjectId::Schema((database_spec, schema_spec))) => {
2203 self.state
2206 .try_get_schema(database_spec, schema_spec, &self.conn_id)
2207 .map(|schema| schema.privileges())
2208 }
2209 SystemObjectId::Object(ObjectId::Item(id)) => Some(self.get_item(id).privileges()),
2210 SystemObjectId::Object(ObjectId::NetworkPolicy(id)) => {
2211 Some(self.get_network_policy(id).privileges())
2212 }
2213 SystemObjectId::Object(ObjectId::ClusterReplica(_))
2214 | SystemObjectId::Object(ObjectId::Role(_)) => None,
2215 }
2216 }
2217
2218 fn object_dependents(&self, ids: &Vec<ObjectId>) -> Vec<ObjectId> {
2219 let mut seen = BTreeSet::new();
2220 self.state.object_dependents(ids, &self.conn_id, &mut seen)
2221 }
2222
2223 fn item_dependents(&self, id: CatalogItemId) -> Vec<ObjectId> {
2224 let mut seen = BTreeSet::new();
2225 self.state.item_dependents(id, &mut seen)
2226 }
2227
2228 fn all_object_privileges(&self, object_type: mz_sql::catalog::SystemObjectType) -> AclMode {
2229 rbac::all_object_privileges(object_type)
2230 }
2231
2232 fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2233 self.state.get_object_type(object_id)
2234 }
2235
2236 fn get_system_object_type(&self, id: &SystemObjectId) -> mz_sql::catalog::SystemObjectType {
2237 self.state.get_system_object_type(id)
2238 }
2239
2240 fn minimal_qualification(&self, qualified_name: &QualifiedItemName) -> PartialItemName {
2247 if qualified_name.qualifiers.schema_spec.is_temporary() {
2248 return qualified_name.item.clone().into();
2256 }
2257
2258 let database_id = match &qualified_name.qualifiers.database_spec {
2259 ResolvedDatabaseSpecifier::Ambient => None,
2260 ResolvedDatabaseSpecifier::Id(id)
2261 if self.database.is_some() && self.database == Some(*id) =>
2262 {
2263 None
2264 }
2265 ResolvedDatabaseSpecifier::Id(id) => Some(id.clone()),
2266 };
2267
2268 let schema_spec = if database_id.is_none()
2269 && self.resolve_item_name(&PartialItemName {
2270 database: None,
2271 schema: None,
2272 item: qualified_name.item.clone(),
2273 }) == Ok(qualified_name)
2274 || self.resolve_function_name(&PartialItemName {
2275 database: None,
2276 schema: None,
2277 item: qualified_name.item.clone(),
2278 }) == Ok(qualified_name)
2279 || self.resolve_type_name(&PartialItemName {
2280 database: None,
2281 schema: None,
2282 item: qualified_name.item.clone(),
2283 }) == Ok(qualified_name)
2284 {
2285 None
2286 } else {
2287 Some(qualified_name.qualifiers.schema_spec.clone())
2290 };
2291
2292 let res = PartialItemName {
2293 database: database_id.map(|id| self.get_database(&id).name().to_string()),
2294 schema: schema_spec.map(|spec| {
2295 self.get_schema(&qualified_name.qualifiers.database_spec, &spec)
2296 .name()
2297 .schema
2298 .clone()
2299 }),
2300 item: qualified_name.item.clone(),
2301 };
2302 assert!(
2303 self.resolve_item_name(&res) == Ok(qualified_name)
2304 || self.resolve_function_name(&res) == Ok(qualified_name)
2305 || self.resolve_type_name(&res) == Ok(qualified_name)
2306 );
2307 res
2308 }
2309
2310 fn add_notice(&self, notice: PlanNotice) {
2311 let _ = self.notices_tx.send(notice.into());
2312 }
2313
2314 fn get_item_comments(&self, id: &CatalogItemId) -> Option<&BTreeMap<Option<usize>, String>> {
2315 let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
2316 self.state.comments.get_object_comments(comment_id)
2317 }
2318
2319 fn is_cluster_size_cc(&self, size: &str) -> bool {
2320 self.state
2321 .cluster_replica_sizes
2322 .0
2323 .get(size)
2324 .map_or(false, |a| a.is_cc)
2325 }
2326}
2327
2328#[cfg(test)]
2329mod tests {
2330 use std::collections::{BTreeMap, BTreeSet};
2331 use std::sync::Arc;
2332 use std::{env, iter};
2333
2334 use itertools::Itertools;
2335 use mz_catalog::memory::objects::CatalogItem;
2336 use tokio_postgres::NoTls;
2337 use tokio_postgres::types::Type;
2338 use uuid::Uuid;
2339
2340 use mz_catalog::SYSTEM_CONN_ID;
2341 use mz_catalog::builtin::{BUILTINS, Builtin, BuiltinType};
2342 use mz_catalog::durable::{CatalogError, DurableCatalogError, FenceError, test_bootstrap_args};
2343 use mz_controller_types::{ClusterId, ReplicaId};
2344 use mz_expr::MirScalarExpr;
2345 use mz_ore::now::to_datetime;
2346 use mz_ore::{assert_err, assert_ok, soft_assert_eq_or_log, task};
2347 use mz_persist_client::PersistClient;
2348 use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_UNPINNED_OID, FIRST_USER_OID};
2349 use mz_repr::namespaces::{INFORMATION_SCHEMA, PG_CATALOG_SCHEMA};
2350 use mz_repr::role_id::RoleId;
2351 use mz_repr::{
2352 CatalogItemId, Datum, GlobalId, RelationVersionSelector, RowArena, SqlRelationType,
2353 SqlScalarType, Timestamp,
2354 };
2355 use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
2356 use mz_sql::func::{Func, FuncImpl, OP_IMPLS, Operation};
2357 use mz_sql::names::{
2358 self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
2359 ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, SystemObjectId,
2360 };
2361 use mz_sql::plan::{
2362 CoercibleScalarExpr, ExprContext, HirScalarExpr, PlanContext, QueryContext, QueryLifetime,
2363 Scope, StatementContext,
2364 };
2365 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
2366 use mz_sql::session::vars::{SystemVars, VarInput};
2367
2368 use crate::catalog::state::LocalExpressionCache;
2369 use crate::catalog::{Catalog, Op};
2370 use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
2371 use crate::session::Session;
2372
2373 #[mz_ore::test(tokio::test)]
2380 #[cfg_attr(miri, ignore)] async fn test_minimal_qualification() {
2382 Catalog::with_debug(|catalog| async move {
2383 struct TestCase {
2384 input: QualifiedItemName,
2385 system_output: PartialItemName,
2386 normal_output: PartialItemName,
2387 }
2388
2389 let test_cases = vec![
2390 TestCase {
2391 input: QualifiedItemName {
2392 qualifiers: ItemQualifiers {
2393 database_spec: ResolvedDatabaseSpecifier::Ambient,
2394 schema_spec: SchemaSpecifier::Id(catalog.get_pg_catalog_schema_id()),
2395 },
2396 item: "numeric".to_string(),
2397 },
2398 system_output: PartialItemName {
2399 database: None,
2400 schema: None,
2401 item: "numeric".to_string(),
2402 },
2403 normal_output: PartialItemName {
2404 database: None,
2405 schema: None,
2406 item: "numeric".to_string(),
2407 },
2408 },
2409 TestCase {
2410 input: QualifiedItemName {
2411 qualifiers: ItemQualifiers {
2412 database_spec: ResolvedDatabaseSpecifier::Ambient,
2413 schema_spec: SchemaSpecifier::Id(catalog.get_mz_catalog_schema_id()),
2414 },
2415 item: "mz_array_types".to_string(),
2416 },
2417 system_output: PartialItemName {
2418 database: None,
2419 schema: None,
2420 item: "mz_array_types".to_string(),
2421 },
2422 normal_output: PartialItemName {
2423 database: None,
2424 schema: None,
2425 item: "mz_array_types".to_string(),
2426 },
2427 },
2428 ];
2429
2430 for tc in test_cases {
2431 assert_eq!(
2432 catalog
2433 .for_system_session()
2434 .minimal_qualification(&tc.input),
2435 tc.system_output
2436 );
2437 assert_eq!(
2438 catalog
2439 .for_session(&Session::dummy())
2440 .minimal_qualification(&tc.input),
2441 tc.normal_output
2442 );
2443 }
2444 catalog.expire().await;
2445 })
2446 .await
2447 }
2448
2449 #[mz_ore::test(tokio::test)]
2450 #[cfg_attr(miri, ignore)] async fn test_catalog_revision() {
2452 let persist_client = PersistClient::new_for_tests().await;
2453 let organization_id = Uuid::new_v4();
2454 let bootstrap_args = test_bootstrap_args();
2455 {
2456 let mut catalog = Catalog::open_debug_catalog(
2457 persist_client.clone(),
2458 organization_id.clone(),
2459 &bootstrap_args,
2460 )
2461 .await
2462 .expect("unable to open debug catalog");
2463 assert_eq!(catalog.transient_revision(), 1);
2464 let commit_ts = catalog.current_upper().await;
2465 catalog
2466 .transact(
2467 None,
2468 commit_ts,
2469 None,
2470 vec![Op::CreateDatabase {
2471 name: "test".to_string(),
2472 owner_id: MZ_SYSTEM_ROLE_ID,
2473 }],
2474 )
2475 .await
2476 .expect("failed to transact");
2477 assert_eq!(catalog.transient_revision(), 2);
2478 catalog.expire().await;
2479 }
2480 {
2481 let catalog =
2482 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2483 .await
2484 .expect("unable to open debug catalog");
2485 assert_eq!(catalog.transient_revision(), 1);
2487 catalog.expire().await;
2488 }
2489 }
2490
2491 #[mz_ore::test(tokio::test)]
2492 #[cfg_attr(miri, ignore)] async fn test_effective_search_path() {
2494 Catalog::with_debug(|catalog| async move {
2495 let mz_catalog_schema = (
2496 ResolvedDatabaseSpecifier::Ambient,
2497 SchemaSpecifier::Id(catalog.state().get_mz_catalog_schema_id()),
2498 );
2499 let pg_catalog_schema = (
2500 ResolvedDatabaseSpecifier::Ambient,
2501 SchemaSpecifier::Id(catalog.state().get_pg_catalog_schema_id()),
2502 );
2503 let mz_temp_schema = (
2504 ResolvedDatabaseSpecifier::Ambient,
2505 SchemaSpecifier::Temporary,
2506 );
2507
2508 let session = Session::dummy();
2510 let conn_catalog = catalog.for_session(&session);
2511 assert_ne!(
2512 conn_catalog.effective_search_path(false),
2513 conn_catalog.search_path
2514 );
2515 assert_ne!(
2516 conn_catalog.effective_search_path(true),
2517 conn_catalog.search_path
2518 );
2519 assert_eq!(
2520 conn_catalog.effective_search_path(false),
2521 vec![
2522 mz_catalog_schema.clone(),
2523 pg_catalog_schema.clone(),
2524 conn_catalog.search_path[0].clone()
2525 ]
2526 );
2527 assert_eq!(
2528 conn_catalog.effective_search_path(true),
2529 vec![
2530 mz_temp_schema.clone(),
2531 mz_catalog_schema.clone(),
2532 pg_catalog_schema.clone(),
2533 conn_catalog.search_path[0].clone()
2534 ]
2535 );
2536
2537 let mut session = Session::dummy();
2539 session
2540 .vars_mut()
2541 .set(
2542 &SystemVars::new(),
2543 "search_path",
2544 VarInput::Flat(mz_repr::namespaces::PG_CATALOG_SCHEMA),
2545 false,
2546 )
2547 .expect("failed to set search_path");
2548 let conn_catalog = catalog.for_session(&session);
2549 assert_ne!(
2550 conn_catalog.effective_search_path(false),
2551 conn_catalog.search_path
2552 );
2553 assert_ne!(
2554 conn_catalog.effective_search_path(true),
2555 conn_catalog.search_path
2556 );
2557 assert_eq!(
2558 conn_catalog.effective_search_path(false),
2559 vec![mz_catalog_schema.clone(), pg_catalog_schema.clone()]
2560 );
2561 assert_eq!(
2562 conn_catalog.effective_search_path(true),
2563 vec![
2564 mz_temp_schema.clone(),
2565 mz_catalog_schema.clone(),
2566 pg_catalog_schema.clone()
2567 ]
2568 );
2569
2570 let mut session = Session::dummy();
2571 session
2572 .vars_mut()
2573 .set(
2574 &SystemVars::new(),
2575 "search_path",
2576 VarInput::Flat(mz_repr::namespaces::MZ_CATALOG_SCHEMA),
2577 false,
2578 )
2579 .expect("failed to set search_path");
2580 let conn_catalog = catalog.for_session(&session);
2581 assert_ne!(
2582 conn_catalog.effective_search_path(false),
2583 conn_catalog.search_path
2584 );
2585 assert_ne!(
2586 conn_catalog.effective_search_path(true),
2587 conn_catalog.search_path
2588 );
2589 assert_eq!(
2590 conn_catalog.effective_search_path(false),
2591 vec![pg_catalog_schema.clone(), mz_catalog_schema.clone()]
2592 );
2593 assert_eq!(
2594 conn_catalog.effective_search_path(true),
2595 vec![
2596 mz_temp_schema.clone(),
2597 pg_catalog_schema.clone(),
2598 mz_catalog_schema.clone()
2599 ]
2600 );
2601
2602 let mut session = Session::dummy();
2603 session
2604 .vars_mut()
2605 .set(
2606 &SystemVars::new(),
2607 "search_path",
2608 VarInput::Flat(mz_repr::namespaces::MZ_TEMP_SCHEMA),
2609 false,
2610 )
2611 .expect("failed to set search_path");
2612 let conn_catalog = catalog.for_session(&session);
2613 assert_ne!(
2614 conn_catalog.effective_search_path(false),
2615 conn_catalog.search_path
2616 );
2617 assert_ne!(
2618 conn_catalog.effective_search_path(true),
2619 conn_catalog.search_path
2620 );
2621 assert_eq!(
2622 conn_catalog.effective_search_path(false),
2623 vec![
2624 mz_catalog_schema.clone(),
2625 pg_catalog_schema.clone(),
2626 mz_temp_schema.clone()
2627 ]
2628 );
2629 assert_eq!(
2630 conn_catalog.effective_search_path(true),
2631 vec![mz_catalog_schema, pg_catalog_schema, mz_temp_schema]
2632 );
2633 catalog.expire().await;
2634 })
2635 .await
2636 }
2637
2638 #[mz_ore::test(tokio::test)]
2639 #[cfg_attr(miri, ignore)] async fn test_normalized_create() {
2641 use mz_ore::collections::CollectionExt;
2642 Catalog::with_debug(|catalog| async move {
2643 let conn_catalog = catalog.for_system_session();
2644 let scx = &mut StatementContext::new(None, &conn_catalog);
2645
2646 let parsed = mz_sql_parser::parser::parse_statements(
2647 "create view public.foo as select 1 as bar",
2648 )
2649 .expect("")
2650 .into_element()
2651 .ast;
2652
2653 let (stmt, _) = names::resolve(scx.catalog, parsed).expect("");
2654
2655 assert_eq!(
2657 r#"CREATE VIEW "materialize"."public"."foo" AS SELECT 1 AS "bar""#,
2658 mz_sql::normalize::create_statement(scx, stmt).expect(""),
2659 );
2660 catalog.expire().await;
2661 })
2662 .await;
2663 }
2664
2665 #[mz_ore::test(tokio::test)]
2667 #[cfg_attr(miri, ignore)] async fn test_large_catalog_item() {
2669 let view_def = "CREATE VIEW \"materialize\".\"public\".\"v\" AS SELECT 1 FROM (SELECT 1";
2670 let column = ", 1";
2671 let view_def_size = view_def.bytes().count();
2672 let column_size = column.bytes().count();
2673 let column_count =
2674 (mz_sql_parser::parser::MAX_STATEMENT_BATCH_SIZE - view_def_size) / column_size + 1;
2675 let columns = iter::repeat(column).take(column_count).join("");
2676 let create_sql = format!("{view_def}{columns})");
2677 let create_sql_check = create_sql.clone();
2678 assert_ok!(mz_sql_parser::parser::parse_statements(&create_sql));
2679 assert_err!(mz_sql_parser::parser::parse_statements_with_limit(
2680 &create_sql
2681 ));
2682
2683 let persist_client = PersistClient::new_for_tests().await;
2684 let organization_id = Uuid::new_v4();
2685 let id = CatalogItemId::User(1);
2686 let gid = GlobalId::User(1);
2687 let bootstrap_args = test_bootstrap_args();
2688 {
2689 let mut catalog = Catalog::open_debug_catalog(
2690 persist_client.clone(),
2691 organization_id.clone(),
2692 &bootstrap_args,
2693 )
2694 .await
2695 .expect("unable to open debug catalog");
2696 let item = catalog
2697 .state()
2698 .deserialize_item(
2699 gid,
2700 &create_sql,
2701 &BTreeMap::new(),
2702 &mut LocalExpressionCache::Closed,
2703 None,
2704 )
2705 .expect("unable to parse view");
2706 let commit_ts = catalog.current_upper().await;
2707 catalog
2708 .transact(
2709 None,
2710 commit_ts,
2711 None,
2712 vec![Op::CreateItem {
2713 item,
2714 name: QualifiedItemName {
2715 qualifiers: ItemQualifiers {
2716 database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)),
2717 schema_spec: SchemaSpecifier::Id(SchemaId::User(3)),
2718 },
2719 item: "v".to_string(),
2720 },
2721 id,
2722 owner_id: MZ_SYSTEM_ROLE_ID,
2723 }],
2724 )
2725 .await
2726 .expect("failed to transact");
2727 catalog.expire().await;
2728 }
2729 {
2730 let catalog =
2731 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
2732 .await
2733 .expect("unable to open debug catalog");
2734 let view = catalog.get_entry(&id);
2735 assert_eq!("v", view.name.item);
2736 match &view.item {
2737 CatalogItem::View(view) => assert_eq!(create_sql_check, view.create_sql),
2738 item => panic!("expected view, got {}", item.typ()),
2739 }
2740 catalog.expire().await;
2741 }
2742 }
2743
2744 #[mz_ore::test(tokio::test)]
2745 #[cfg_attr(miri, ignore)] async fn test_object_type() {
2747 Catalog::with_debug(|catalog| async move {
2748 let conn_catalog = catalog.for_system_session();
2749
2750 assert_eq!(
2751 mz_sql::catalog::ObjectType::ClusterReplica,
2752 conn_catalog.get_object_type(&ObjectId::ClusterReplica((
2753 ClusterId::user(1).expect("1 is a valid ID"),
2754 ReplicaId::User(1)
2755 )))
2756 );
2757 assert_eq!(
2758 mz_sql::catalog::ObjectType::Role,
2759 conn_catalog.get_object_type(&ObjectId::Role(RoleId::User(1)))
2760 );
2761 catalog.expire().await;
2762 })
2763 .await;
2764 }
2765
2766 #[mz_ore::test(tokio::test)]
2767 #[cfg_attr(miri, ignore)] async fn test_get_privileges() {
2769 Catalog::with_debug(|catalog| async move {
2770 let conn_catalog = catalog.for_system_session();
2771
2772 assert_eq!(
2773 None,
2774 conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
2775 ClusterId::user(1).expect("1 is a valid ID"),
2776 ReplicaId::User(1),
2777 ))))
2778 );
2779 assert_eq!(
2780 None,
2781 conn_catalog
2782 .get_privileges(&SystemObjectId::Object(ObjectId::Role(RoleId::User(1))))
2783 );
2784 catalog.expire().await;
2785 })
2786 .await;
2787 }
2788
2789 #[mz_ore::test(tokio::test)]
2790 #[cfg_attr(miri, ignore)] async fn verify_builtin_descs() {
2792 Catalog::with_debug(|catalog| async move {
2793 let conn_catalog = catalog.for_system_session();
2794
2795 let builtins_cfg = BuiltinsConfig {
2796 include_continual_tasks: true,
2797 };
2798 for builtin in BUILTINS::iter(&builtins_cfg) {
2799 let (schema, name, expected_desc) = match builtin {
2800 Builtin::Table(t) => (&t.schema, &t.name, &t.desc),
2801 Builtin::View(v) => (&v.schema, &v.name, &v.desc),
2802 Builtin::Source(s) => (&s.schema, &s.name, &s.desc),
2803 Builtin::Log(_)
2804 | Builtin::Type(_)
2805 | Builtin::Func(_)
2806 | Builtin::ContinualTask(_)
2807 | Builtin::Index(_)
2808 | Builtin::Connection(_) => continue,
2809 };
2810 let item = conn_catalog
2811 .resolve_item(&PartialItemName {
2812 database: None,
2813 schema: Some(schema.to_string()),
2814 item: name.to_string(),
2815 })
2816 .expect("unable to resolve item")
2817 .at_version(RelationVersionSelector::Latest);
2818
2819 let actual_desc = item.relation_desc().expect("invalid item type");
2820 for (index, ((actual_name, actual_typ), (expected_name, expected_typ))) in
2821 actual_desc.iter().zip_eq(expected_desc.iter()).enumerate()
2822 {
2823 assert_eq!(
2824 actual_name, expected_name,
2825 "item {schema}.{name} column {index} name did not match its expected name"
2826 );
2827 assert_eq!(
2828 actual_typ, expected_typ,
2829 "item {schema}.{name} column {index} ('{actual_name}') type did not match its expected type"
2830 );
2831 }
2832 assert_eq!(
2833 &*actual_desc, expected_desc,
2834 "item {schema}.{name} did not match its expected RelationDesc"
2835 );
2836 }
2837 catalog.expire().await;
2838 })
2839 .await
2840 }
2841
2842 #[mz_ore::test(tokio::test)]
2845 #[cfg_attr(miri, ignore)] async fn test_compare_builtins_postgres() {
2847 async fn inner(catalog: Catalog) {
2848 let (client, connection) = tokio_postgres::connect(
2852 &env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
2853 NoTls,
2854 )
2855 .await
2856 .expect("failed to connect to Postgres");
2857
2858 task::spawn(|| "compare_builtin_postgres", async move {
2859 if let Err(e) = connection.await {
2860 panic!("connection error: {}", e);
2861 }
2862 });
2863
2864 struct PgProc {
2865 name: String,
2866 arg_oids: Vec<u32>,
2867 ret_oid: Option<u32>,
2868 ret_set: bool,
2869 }
2870
2871 struct PgType {
2872 name: String,
2873 ty: String,
2874 elem: u32,
2875 array: u32,
2876 input: u32,
2877 receive: u32,
2878 }
2879
2880 struct PgOper {
2881 oprresult: u32,
2882 name: String,
2883 }
2884
2885 let pg_proc: BTreeMap<_, _> = client
2886 .query(
2887 "SELECT
2888 p.oid,
2889 proname,
2890 proargtypes,
2891 prorettype,
2892 proretset
2893 FROM pg_proc p
2894 JOIN pg_namespace n ON p.pronamespace = n.oid",
2895 &[],
2896 )
2897 .await
2898 .expect("pg query failed")
2899 .into_iter()
2900 .map(|row| {
2901 let oid: u32 = row.get("oid");
2902 let pg_proc = PgProc {
2903 name: row.get("proname"),
2904 arg_oids: row.get("proargtypes"),
2905 ret_oid: row.get("prorettype"),
2906 ret_set: row.get("proretset"),
2907 };
2908 (oid, pg_proc)
2909 })
2910 .collect();
2911
2912 let pg_type: BTreeMap<_, _> = client
2913 .query(
2914 "SELECT oid, typname, typtype::text, typelem, typarray, typinput::oid, typreceive::oid as typreceive FROM pg_type",
2915 &[],
2916 )
2917 .await
2918 .expect("pg query failed")
2919 .into_iter()
2920 .map(|row| {
2921 let oid: u32 = row.get("oid");
2922 let pg_type = PgType {
2923 name: row.get("typname"),
2924 ty: row.get("typtype"),
2925 elem: row.get("typelem"),
2926 array: row.get("typarray"),
2927 input: row.get("typinput"),
2928 receive: row.get("typreceive"),
2929 };
2930 (oid, pg_type)
2931 })
2932 .collect();
2933
2934 let pg_oper: BTreeMap<_, _> = client
2935 .query("SELECT oid, oprname, oprresult FROM pg_operator", &[])
2936 .await
2937 .expect("pg query failed")
2938 .into_iter()
2939 .map(|row| {
2940 let oid: u32 = row.get("oid");
2941 let pg_oper = PgOper {
2942 name: row.get("oprname"),
2943 oprresult: row.get("oprresult"),
2944 };
2945 (oid, pg_oper)
2946 })
2947 .collect();
2948
2949 let conn_catalog = catalog.for_system_session();
2950 let resolve_type_oid = |item: &str| {
2951 conn_catalog
2952 .resolve_type(&PartialItemName {
2953 database: None,
2954 schema: Some(PG_CATALOG_SCHEMA.into()),
2957 item: item.to_string(),
2958 })
2959 .expect("unable to resolve type")
2960 .oid()
2961 };
2962
2963 let func_oids: BTreeSet<_> = BUILTINS::funcs()
2964 .flat_map(|f| f.inner.func_impls().into_iter().map(|f| f.oid))
2965 .collect();
2966
2967 let mut all_oids = BTreeSet::new();
2968
2969 let equivalent_types: BTreeSet<(Option<u32>, Option<u32>)> = BTreeSet::from_iter(
2972 [
2973 (Type::NAME, Type::TEXT),
2975 (Type::NAME_ARRAY, Type::TEXT_ARRAY),
2976 (Type::TIME, Type::TIMETZ),
2978 (Type::TIME_ARRAY, Type::TIMETZ_ARRAY),
2979 ]
2980 .map(|(a, b)| (Some(a.oid()), Some(b.oid()))),
2981 );
2982 let ignore_return_types: BTreeSet<u32> = BTreeSet::from([
2983 1619, ]);
2985 let is_same_type = |fn_oid: u32, a: Option<u32>, b: Option<u32>| -> bool {
2986 if ignore_return_types.contains(&fn_oid) {
2987 return true;
2988 }
2989 if equivalent_types.contains(&(a, b)) || equivalent_types.contains(&(b, a)) {
2990 return true;
2991 }
2992 a == b
2993 };
2994
2995 let builtins_cfg = BuiltinsConfig {
2996 include_continual_tasks: true,
2997 };
2998 for builtin in BUILTINS::iter(&builtins_cfg) {
2999 match builtin {
3000 Builtin::Type(ty) => {
3001 assert!(all_oids.insert(ty.oid), "{} reused oid {}", ty.name, ty.oid);
3002
3003 if ty.oid >= FIRST_MATERIALIZE_OID {
3004 continue;
3007 }
3008
3009 let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
3012 panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
3013 });
3014 assert_eq!(
3015 ty.name, pg_ty.name,
3016 "oid {} has name {} in postgres; expected {}",
3017 ty.oid, pg_ty.name, ty.name,
3018 );
3019
3020 let (typinput_oid, typreceive_oid) = match &ty.details.pg_metadata {
3021 None => (0, 0),
3022 Some(pgmeta) => (pgmeta.typinput_oid, pgmeta.typreceive_oid),
3023 };
3024 assert_eq!(
3025 typinput_oid, pg_ty.input,
3026 "type {} has typinput OID {:?} in mz but {:?} in pg",
3027 ty.name, typinput_oid, pg_ty.input,
3028 );
3029 assert_eq!(
3030 typreceive_oid, pg_ty.receive,
3031 "type {} has typreceive OID {:?} in mz but {:?} in pg",
3032 ty.name, typreceive_oid, pg_ty.receive,
3033 );
3034 if typinput_oid != 0 {
3035 assert!(
3036 func_oids.contains(&typinput_oid),
3037 "type {} has typinput OID {} that does not exist in pg_proc",
3038 ty.name,
3039 typinput_oid,
3040 );
3041 }
3042 if typreceive_oid != 0 {
3043 assert!(
3044 func_oids.contains(&typreceive_oid),
3045 "type {} has typreceive OID {} that does not exist in pg_proc",
3046 ty.name,
3047 typreceive_oid,
3048 );
3049 }
3050
3051 match &ty.details.typ {
3053 CatalogType::Array { element_reference } => {
3054 let elem_ty = BUILTINS::iter(&builtins_cfg)
3055 .filter_map(|builtin| match builtin {
3056 Builtin::Type(ty @ BuiltinType { name, .. })
3057 if element_reference == name =>
3058 {
3059 Some(ty)
3060 }
3061 _ => None,
3062 })
3063 .next();
3064 let elem_ty = match elem_ty {
3065 Some(ty) => ty,
3066 None => {
3067 panic!("{} is unexpectedly not a type", element_reference)
3068 }
3069 };
3070 assert_eq!(
3071 pg_ty.elem, elem_ty.oid,
3072 "type {} has mismatched element OIDs",
3073 ty.name
3074 )
3075 }
3076 CatalogType::Pseudo => {
3077 assert_eq!(
3078 pg_ty.ty, "p",
3079 "type {} is not a pseudo type as expected",
3080 ty.name
3081 )
3082 }
3083 CatalogType::Range { .. } => {
3084 assert_eq!(
3085 pg_ty.ty, "r",
3086 "type {} is not a range type as expected",
3087 ty.name
3088 );
3089 }
3090 _ => {
3091 assert_eq!(
3092 pg_ty.ty, "b",
3093 "type {} is not a base type as expected",
3094 ty.name
3095 )
3096 }
3097 }
3098
3099 let schema = catalog
3101 .resolve_schema_in_database(
3102 &ResolvedDatabaseSpecifier::Ambient,
3103 ty.schema,
3104 &SYSTEM_CONN_ID,
3105 )
3106 .expect("unable to resolve schema");
3107 let allocated_type = catalog
3108 .resolve_type(
3109 None,
3110 &vec![(ResolvedDatabaseSpecifier::Ambient, schema.id().clone())],
3111 &PartialItemName {
3112 database: None,
3113 schema: Some(schema.name().schema.clone()),
3114 item: ty.name.to_string(),
3115 },
3116 &SYSTEM_CONN_ID,
3117 )
3118 .expect("unable to resolve type");
3119 let ty = if let CatalogItem::Type(ty) = &allocated_type.item {
3120 ty
3121 } else {
3122 panic!("unexpectedly not a type")
3123 };
3124 match ty.details.array_id {
3125 Some(array_id) => {
3126 let array_ty = catalog.get_entry(&array_id);
3127 assert_eq!(
3128 pg_ty.array, array_ty.oid,
3129 "type {} has mismatched array OIDs",
3130 allocated_type.name.item,
3131 );
3132 }
3133 None => assert_eq!(
3134 pg_ty.array, 0,
3135 "type {} does not have an array type in mz but does in pg",
3136 allocated_type.name.item,
3137 ),
3138 }
3139 }
3140 Builtin::Func(func) => {
3141 for imp in func.inner.func_impls() {
3142 assert!(
3143 all_oids.insert(imp.oid),
3144 "{} reused oid {}",
3145 func.name,
3146 imp.oid
3147 );
3148
3149 assert!(
3150 imp.oid < FIRST_USER_OID,
3151 "built-in function {} erroneously has OID in user space ({})",
3152 func.name,
3153 imp.oid,
3154 );
3155
3156 let pg_fn = if imp.oid >= FIRST_UNPINNED_OID {
3159 continue;
3160 } else {
3161 pg_proc.get(&imp.oid).unwrap_or_else(|| {
3162 panic!(
3163 "pg_proc missing function {}: oid {}",
3164 func.name, imp.oid
3165 )
3166 })
3167 };
3168 assert_eq!(
3169 func.name, pg_fn.name,
3170 "funcs with oid {} don't match names: {} in mz, {} in pg",
3171 imp.oid, func.name, pg_fn.name
3172 );
3173
3174 let imp_arg_oids = imp
3177 .arg_typs
3178 .iter()
3179 .map(|item| resolve_type_oid(item))
3180 .collect::<Vec<_>>();
3181
3182 if imp_arg_oids != pg_fn.arg_oids {
3183 println!(
3184 "funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
3185 imp.oid, func.name, imp_arg_oids, pg_fn.arg_oids
3186 );
3187 }
3188
3189 let imp_return_oid = imp.return_typ.map(resolve_type_oid);
3190
3191 assert!(
3192 is_same_type(imp.oid, imp_return_oid, pg_fn.ret_oid),
3193 "funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
3194 imp.oid,
3195 func.name,
3196 imp_return_oid,
3197 pg_fn.ret_oid
3198 );
3199
3200 assert_eq!(
3201 imp.return_is_set, pg_fn.ret_set,
3202 "funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
3203 imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
3204 );
3205 }
3206 }
3207 _ => (),
3208 }
3209 }
3210
3211 for (op, func) in OP_IMPLS.iter() {
3212 for imp in func.func_impls() {
3213 assert!(all_oids.insert(imp.oid), "{} reused oid {}", op, imp.oid);
3214
3215 let pg_op = if imp.oid >= FIRST_UNPINNED_OID {
3217 continue;
3218 } else {
3219 pg_oper.get(&imp.oid).unwrap_or_else(|| {
3220 panic!("pg_operator missing operator {}: oid {}", op, imp.oid)
3221 })
3222 };
3223
3224 assert_eq!(*op, pg_op.name);
3225
3226 let imp_return_oid =
3227 imp.return_typ.map(resolve_type_oid).expect("must have oid");
3228 if imp_return_oid != pg_op.oprresult {
3229 panic!(
3230 "operators with oid {} ({}) don't match return typs: {} in mz, {} in pg",
3231 imp.oid, op, imp_return_oid, pg_op.oprresult
3232 );
3233 }
3234 }
3235 }
3236 catalog.expire().await;
3237 }
3238
3239 Catalog::with_debug(inner).await
3240 }
3241
3242 #[mz_ore::test(tokio::test)]
3244 #[cfg_attr(miri, ignore)] async fn test_smoketest_all_builtins() {
3246 fn inner(catalog: Catalog) -> Vec<mz_ore::task::JoinHandle<()>> {
3247 let catalog = Arc::new(catalog);
3248 let conn_catalog = catalog.for_system_session();
3249
3250 let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
3251 let mut handles = Vec::new();
3252
3253 let ignore_names = BTreeSet::from([
3255 "avg",
3256 "avg_internal_v1",
3257 "bool_and",
3258 "bool_or",
3259 "has_table_privilege", "has_type_privilege", "mod",
3262 "mz_panic",
3263 "mz_sleep",
3264 "pow",
3265 "stddev_pop",
3266 "stddev_samp",
3267 "stddev",
3268 "var_pop",
3269 "var_samp",
3270 "variance",
3271 ]);
3272
3273 let fns = BUILTINS::funcs()
3274 .map(|func| (&func.name, func.inner))
3275 .chain(OP_IMPLS.iter());
3276
3277 for (name, func) in fns {
3278 if ignore_names.contains(name) {
3279 continue;
3280 }
3281 let Func::Scalar(impls) = func else {
3282 continue;
3283 };
3284
3285 'outer: for imp in impls {
3286 let details = imp.details();
3287 let mut styps = Vec::new();
3288 for item in details.arg_typs.iter() {
3289 let oid = resolve_type_oid(item);
3290 let Ok(pgtyp) = mz_pgrepr::Type::from_oid(oid) else {
3291 continue 'outer;
3292 };
3293 styps.push(SqlScalarType::try_from(&pgtyp).expect("must exist"));
3294 }
3295 let datums = styps
3296 .iter()
3297 .map(|styp| {
3298 let mut datums = vec![Datum::Null];
3299 datums.extend(styp.interesting_datums());
3300 datums
3301 })
3302 .collect::<Vec<_>>();
3303 if datums.is_empty() {
3305 continue;
3306 }
3307
3308 let return_oid = details
3309 .return_typ
3310 .map(resolve_type_oid)
3311 .expect("must exist");
3312 let return_styp = mz_pgrepr::Type::from_oid(return_oid)
3313 .ok()
3314 .map(|typ| SqlScalarType::try_from(&typ).expect("must exist"));
3315
3316 let mut idxs = vec![0; datums.len()];
3317 while idxs[0] < datums[0].len() {
3318 let mut args = Vec::with_capacity(idxs.len());
3319 for i in 0..(datums.len()) {
3320 args.push(datums[i][idxs[i]]);
3321 }
3322
3323 let op = &imp.op;
3324 let scalars = args
3325 .iter()
3326 .enumerate()
3327 .map(|(i, datum)| {
3328 CoercibleScalarExpr::Coerced(HirScalarExpr::literal(
3329 datum.clone(),
3330 styps[i].clone(),
3331 ))
3332 })
3333 .collect();
3334
3335 let call_name = format!(
3336 "{name}({}) (oid: {})",
3337 args.iter()
3338 .map(|d| d.to_string())
3339 .collect::<Vec<_>>()
3340 .join(", "),
3341 imp.oid
3342 );
3343 let catalog = Arc::clone(&catalog);
3344 let call_name_fn = call_name.clone();
3345 let return_styp = return_styp.clone();
3346 let handle = task::spawn_blocking(
3347 || call_name,
3348 move || {
3349 smoketest_fn(
3350 name,
3351 call_name_fn,
3352 op,
3353 imp,
3354 args,
3355 catalog,
3356 scalars,
3357 return_styp,
3358 )
3359 },
3360 );
3361 handles.push(handle);
3362
3363 for i in (0..datums.len()).rev() {
3365 idxs[i] += 1;
3366 if idxs[i] >= datums[i].len() {
3367 if i == 0 {
3368 break;
3369 }
3370 idxs[i] = 0;
3371 continue;
3372 } else {
3373 break;
3374 }
3375 }
3376 }
3377 }
3378 }
3379 handles
3380 }
3381
3382 let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
3383 for handle in handles {
3384 handle.await;
3385 }
3386 }
3387
3388 fn smoketest_fn(
3389 name: &&str,
3390 call_name: String,
3391 op: &Operation<HirScalarExpr>,
3392 imp: &FuncImpl<HirScalarExpr>,
3393 args: Vec<Datum<'_>>,
3394 catalog: Arc<Catalog>,
3395 scalars: Vec<CoercibleScalarExpr>,
3396 return_styp: Option<SqlScalarType>,
3397 ) {
3398 let conn_catalog = catalog.for_system_session();
3399 let pcx = PlanContext::zero();
3400 let scx = StatementContext::new(Some(&pcx), &conn_catalog);
3401 let qcx = QueryContext::root(&scx, QueryLifetime::OneShot);
3402 let ecx = ExprContext {
3403 qcx: &qcx,
3404 name: "smoketest",
3405 scope: &Scope::empty(),
3406 relation_type: &SqlRelationType::empty(),
3407 allow_aggregates: false,
3408 allow_subqueries: false,
3409 allow_parameters: false,
3410 allow_windows: false,
3411 };
3412 let arena = RowArena::new();
3413 let mut session = Session::<Timestamp>::dummy();
3414 session
3415 .start_transaction(to_datetime(0), None, None)
3416 .expect("must succeed");
3417 let prep_style = ExprPrepOneShot {
3418 logical_time: EvalTime::Time(Timestamp::MIN),
3419 session: &session,
3420 catalog_state: &catalog.state,
3421 };
3422
3423 let res = (op.0)(&ecx, scalars, &imp.params, vec![]);
3426 if let Ok(hir) = res {
3427 if let Ok(mut mir) = hir.lower_uncorrelated(catalog.system_config()) {
3428 prep_style.prep_scalar_expr(&mut mir).expect("must succeed");
3430
3431 if let Ok(eval_result_datum) = mir.eval(&[], &arena) {
3432 if let Some(return_styp) = return_styp {
3433 let mir_typ = mir.typ(&[]);
3434 soft_assert_eq_or_log!(
3437 mir_typ.scalar_type,
3438 return_styp,
3439 "MIR type did not match the catalog type (cast elimination/repr type error)"
3440 );
3441 if !eval_result_datum.is_instance_of_sql(&mir_typ) {
3445 panic!(
3446 "{call_name}: expected return type of {return_styp:?}, got {eval_result_datum}"
3447 );
3448 }
3449 if let Some((introduces_nulls, propagates_nulls)) =
3452 call_introduces_propagates_nulls(&mir)
3453 {
3454 if introduces_nulls {
3455 assert!(
3459 mir_typ.nullable,
3460 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3461 name, args, mir, mir_typ.nullable
3462 );
3463 } else {
3464 let any_input_null = args.iter().any(|arg| arg.is_null());
3465 if !any_input_null {
3466 assert!(
3467 !mir_typ.nullable,
3468 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3469 name, args, mir, mir_typ.nullable
3470 );
3471 } else {
3472 assert_eq!(
3473 mir_typ.nullable, propagates_nulls,
3474 "fn named `{}` called on args `{:?}` (lowered to `{}`) yielded mir_typ.nullable: {}",
3475 name, args, mir, mir_typ.nullable
3476 );
3477 }
3478 }
3479 }
3480 let mut reduced = mir.clone();
3483 reduced.reduce(&[]);
3484 match reduced {
3485 MirScalarExpr::Literal(reduce_result, ctyp) => {
3486 match reduce_result {
3487 Ok(reduce_result_row) => {
3488 let reduce_result_datum = reduce_result_row.unpack_first();
3489 assert_eq!(
3490 reduce_result_datum,
3491 eval_result_datum,
3492 "eval/reduce datum mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3493 name,
3494 args,
3495 mir,
3496 eval_result_datum,
3497 mir_typ.scalar_type,
3498 reduce_result_datum,
3499 ctyp.scalar_type
3500 );
3501 assert_eq!(
3507 ctyp.scalar_type,
3508 mir_typ.scalar_type,
3509 "eval/reduce type mismatch: fn named `{}` called on args `{:?}` (lowered to `{}`) evaluated to `{}` with typ `{:?}`, but reduced to `{}` with typ `{:?}`",
3510 name,
3511 args,
3512 mir,
3513 eval_result_datum,
3514 mir_typ.scalar_type,
3515 reduce_result_datum,
3516 ctyp.scalar_type
3517 );
3518 }
3519 Err(..) => {} }
3521 }
3522 _ => unreachable!(
3523 "all args are literals, so should have reduced to a literal"
3524 ),
3525 }
3526 }
3527 }
3528 }
3529 }
3530 }
3531
3532 fn call_introduces_propagates_nulls(mir_func_call: &MirScalarExpr) -> Option<(bool, bool)> {
3537 match mir_func_call {
3538 MirScalarExpr::CallUnary { func, expr } => {
3539 if expr.is_literal() {
3540 Some((func.introduces_nulls(), func.propagates_nulls()))
3541 } else {
3542 None
3543 }
3544 }
3545 MirScalarExpr::CallBinary { func, expr1, expr2 } => {
3546 if expr1.is_literal() && expr2.is_literal() {
3547 Some((func.introduces_nulls(), func.propagates_nulls()))
3548 } else {
3549 None
3550 }
3551 }
3552 MirScalarExpr::CallVariadic { func, exprs } => {
3553 if exprs.iter().all(|arg| arg.is_literal()) {
3554 Some((func.introduces_nulls(), func.propagates_nulls()))
3555 } else {
3556 None
3557 }
3558 }
3559 _ => None,
3560 }
3561 }
3562
3563 #[mz_ore::test(tokio::test)]
3565 #[cfg_attr(miri, ignore)] async fn test_pg_views_forbidden_types() {
3567 Catalog::with_debug(|catalog| async move {
3568 let conn_catalog = catalog.for_system_session();
3569
3570 for view in BUILTINS::views().filter(|view| {
3571 view.schema == PG_CATALOG_SCHEMA || view.schema == INFORMATION_SCHEMA
3572 }) {
3573 let item = conn_catalog
3574 .resolve_item(&PartialItemName {
3575 database: None,
3576 schema: Some(view.schema.to_string()),
3577 item: view.name.to_string(),
3578 })
3579 .expect("unable to resolve view")
3580 .at_version(RelationVersionSelector::Latest);
3582 let full_name = conn_catalog.resolve_full_name(item.name());
3583 let desc = item.relation_desc().expect("invalid item type");
3584 for col_type in desc.iter_types() {
3585 match &col_type.scalar_type {
3586 typ @ SqlScalarType::UInt16
3587 | typ @ SqlScalarType::UInt32
3588 | typ @ SqlScalarType::UInt64
3589 | typ @ SqlScalarType::MzTimestamp
3590 | typ @ SqlScalarType::List { .. }
3591 | typ @ SqlScalarType::Map { .. }
3592 | typ @ SqlScalarType::MzAclItem => {
3593 panic!("{typ:?} type found in {full_name}");
3594 }
3595 SqlScalarType::AclItem
3596 | SqlScalarType::Bool
3597 | SqlScalarType::Int16
3598 | SqlScalarType::Int32
3599 | SqlScalarType::Int64
3600 | SqlScalarType::Float32
3601 | SqlScalarType::Float64
3602 | SqlScalarType::Numeric { .. }
3603 | SqlScalarType::Date
3604 | SqlScalarType::Time
3605 | SqlScalarType::Timestamp { .. }
3606 | SqlScalarType::TimestampTz { .. }
3607 | SqlScalarType::Interval
3608 | SqlScalarType::PgLegacyChar
3609 | SqlScalarType::Bytes
3610 | SqlScalarType::String
3611 | SqlScalarType::Char { .. }
3612 | SqlScalarType::VarChar { .. }
3613 | SqlScalarType::Jsonb
3614 | SqlScalarType::Uuid
3615 | SqlScalarType::Array(_)
3616 | SqlScalarType::Record { .. }
3617 | SqlScalarType::Oid
3618 | SqlScalarType::RegProc
3619 | SqlScalarType::RegType
3620 | SqlScalarType::RegClass
3621 | SqlScalarType::Int2Vector
3622 | SqlScalarType::Range { .. }
3623 | SqlScalarType::PgLegacyName => {}
3624 }
3625 }
3626 }
3627 catalog.expire().await;
3628 })
3629 .await
3630 }
3631
3632 #[mz_ore::test(tokio::test)]
3635 #[cfg_attr(miri, ignore)] async fn test_mz_introspection_builtins() {
3637 Catalog::with_debug(|catalog| async move {
3638 let conn_catalog = catalog.for_system_session();
3639
3640 let introspection_schema_id = catalog.get_mz_introspection_schema_id();
3641 let introspection_schema_spec = SchemaSpecifier::Id(introspection_schema_id);
3642
3643 for entry in catalog.entries() {
3644 let schema_spec = entry.name().qualifiers.schema_spec;
3645 let introspection_deps = catalog.introspection_dependencies(entry.id);
3646 if introspection_deps.is_empty() {
3647 assert!(
3648 schema_spec != introspection_schema_spec,
3649 "entry does not depend on introspection sources but is in \
3650 `mz_introspection`: {}",
3651 conn_catalog.resolve_full_name(entry.name()),
3652 );
3653 } else {
3654 assert!(
3655 schema_spec == introspection_schema_spec,
3656 "entry depends on introspection sources but is not in \
3657 `mz_introspection`: {}",
3658 conn_catalog.resolve_full_name(entry.name()),
3659 );
3660 }
3661 }
3662 })
3663 .await
3664 }
3665
3666 #[mz_ore::test(tokio::test)]
3667 #[cfg_attr(miri, ignore)] async fn test_multi_subscriber_catalog() {
3669 let persist_client = PersistClient::new_for_tests().await;
3670 let bootstrap_args = test_bootstrap_args();
3671 let organization_id = Uuid::new_v4();
3672 let db_name = "DB";
3673
3674 let mut writer_catalog = Catalog::open_debug_catalog(
3675 persist_client.clone(),
3676 organization_id.clone(),
3677 &bootstrap_args,
3678 )
3679 .await
3680 .expect("open_debug_catalog");
3681 let mut read_only_catalog = Catalog::open_debug_read_only_catalog(
3682 persist_client.clone(),
3683 organization_id.clone(),
3684 &bootstrap_args,
3685 )
3686 .await
3687 .expect("open_debug_read_only_catalog");
3688 assert_err!(writer_catalog.resolve_database(db_name));
3689 assert_err!(read_only_catalog.resolve_database(db_name));
3690
3691 let commit_ts = writer_catalog.current_upper().await;
3692 writer_catalog
3693 .transact(
3694 None,
3695 commit_ts,
3696 None,
3697 vec![Op::CreateDatabase {
3698 name: db_name.to_string(),
3699 owner_id: MZ_SYSTEM_ROLE_ID,
3700 }],
3701 )
3702 .await
3703 .expect("failed to transact");
3704
3705 let write_db = writer_catalog
3706 .resolve_database(db_name)
3707 .expect("resolve_database");
3708 read_only_catalog
3709 .sync_to_current_updates()
3710 .await
3711 .expect("sync_to_current_updates");
3712 let read_db = read_only_catalog
3713 .resolve_database(db_name)
3714 .expect("resolve_database");
3715
3716 assert_eq!(write_db, read_db);
3717
3718 let writer_catalog_fencer =
3719 Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
3720 .await
3721 .expect("open_debug_catalog for fencer");
3722 let fencer_db = writer_catalog_fencer
3723 .resolve_database(db_name)
3724 .expect("resolve_database for fencer");
3725 assert_eq!(fencer_db, read_db);
3726
3727 let write_fence_err = writer_catalog
3728 .sync_to_current_updates()
3729 .await
3730 .expect_err("sync_to_current_updates for fencer");
3731 assert!(matches!(
3732 write_fence_err,
3733 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3734 ));
3735 let read_fence_err = read_only_catalog
3736 .sync_to_current_updates()
3737 .await
3738 .expect_err("sync_to_current_updates after fencer");
3739 assert!(matches!(
3740 read_fence_err,
3741 CatalogError::Durable(DurableCatalogError::Fence(FenceError::Epoch { .. }))
3742 ));
3743
3744 writer_catalog.expire().await;
3745 read_only_catalog.expire().await;
3746 writer_catalog_fencer.expire().await;
3747 }
3748}