1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71 TypeReference,
72};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::ConnMeta;
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
119 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
121 #[serde(serialize_with = "skip_temp_items")]
122 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
125 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
128 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
131 pub(super) roles_by_name: BTreeMap<String, RoleId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
134 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
135 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
137 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
139
140 #[serde(skip)]
141 pub(super) system_configuration: SystemVars,
142 pub(super) default_privileges: DefaultPrivileges,
143 pub(super) system_privileges: PrivilegeMap,
144 pub(super) comments: CommentsMap,
145 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
147 pub(super) storage_metadata: StorageMetadata,
148 pub(super) mock_authentication_nonce: Option<String>,
149
150 #[serde(skip)]
152 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
153
154 #[serde(skip)]
156 pub(super) config: mz_sql::catalog::CatalogConfig,
157 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158 #[serde(skip)]
159 pub(crate) availability_zones: Vec<String>,
160
161 #[serde(skip)]
163 pub(super) egress_addresses: Vec<IpNet>,
164 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166 pub(super) http_host_name: Option<String>,
167
168 #[serde(skip)]
170 pub(super) license_key: ValidatedLicenseKey,
171}
172
173#[derive(Debug, Clone, Serialize)]
175pub(crate) enum LocalExpressionCache {
176 Open {
178 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
180 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
182 },
183 Closed,
185}
186
187impl LocalExpressionCache {
188 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
189 Self::Open {
190 cached_exprs,
191 uncached_exprs: BTreeMap::new(),
192 }
193 }
194
195 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
196 match self {
197 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
198 LocalExpressionCache::Closed => None,
199 }
200 }
201
202 pub(super) fn insert_cached_expression(
205 &mut self,
206 id: GlobalId,
207 local_expressions: LocalExpressions,
208 ) {
209 match self {
210 LocalExpressionCache::Open { cached_exprs, .. } => {
211 cached_exprs.insert(id, local_expressions);
212 }
213 LocalExpressionCache::Closed => {}
214 }
215 }
216
217 pub(super) fn insert_uncached_expression(
220 &mut self,
221 id: GlobalId,
222 local_mir: OptimizedMirRelationExpr,
223 optimizer_features: OptimizerFeatures,
224 ) {
225 match self {
226 LocalExpressionCache::Open { uncached_exprs, .. } => {
227 let local_expr = LocalExpressions {
228 local_mir,
229 optimizer_features,
230 };
231 let prev = uncached_exprs.remove(&id);
238 match prev {
239 Some(prev) if prev == local_expr => {
240 uncached_exprs.insert(id, local_expr);
241 }
242 None => {
243 uncached_exprs.insert(id, local_expr);
244 }
245 Some(_) => {}
246 }
247 }
248 LocalExpressionCache::Closed => {}
249 }
250 }
251
252 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
253 match self {
254 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
255 LocalExpressionCache::Closed => BTreeMap::new(),
256 }
257 }
258}
259
260fn skip_temp_items<S>(
261 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
262 serializer: S,
263) -> Result<S::Ok, S::Error>
264where
265 S: serde::Serializer,
266{
267 mz_ore::serde::map_key_to_string(
268 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
269 serializer,
270 )
271}
272
273impl CatalogState {
274 pub fn empty_test() -> Self {
278 CatalogState {
279 database_by_name: Default::default(),
280 database_by_id: Default::default(),
281 entry_by_id: Default::default(),
282 entry_by_global_id: Default::default(),
283 ambient_schemas_by_name: Default::default(),
284 ambient_schemas_by_id: Default::default(),
285 temporary_schemas: Default::default(),
286 clusters_by_id: Default::default(),
287 clusters_by_name: Default::default(),
288 network_policies_by_name: Default::default(),
289 roles_by_name: Default::default(),
290 roles_by_id: Default::default(),
291 network_policies_by_id: Default::default(),
292 role_auth_by_id: Default::default(),
293 config: CatalogConfig {
294 start_time: Default::default(),
295 start_instant: Instant::now(),
296 nonce: Default::default(),
297 environment_id: EnvironmentId::for_tests(),
298 session_id: Default::default(),
299 build_info: &DUMMY_BUILD_INFO,
300 timestamp_interval: Default::default(),
301 now: NOW_ZERO.clone(),
302 connection_context: ConnectionContext::for_tests(Arc::new(
303 InMemorySecretsController::new(),
304 )),
305 builtins_cfg: BuiltinsConfig {
306 include_continual_tasks: true,
307 },
308 helm_chart_version: None,
309 },
310 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
311 availability_zones: Default::default(),
312 system_configuration: Default::default(),
313 egress_addresses: Default::default(),
314 aws_principal_context: Default::default(),
315 aws_privatelink_availability_zones: Default::default(),
316 http_host_name: Default::default(),
317 default_privileges: Default::default(),
318 system_privileges: Default::default(),
319 comments: Default::default(),
320 source_references: Default::default(),
321 storage_metadata: Default::default(),
322 license_key: ValidatedLicenseKey::for_tests(),
323 mock_authentication_nonce: Default::default(),
324 }
325 }
326
327 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
328 let search_path = self.resolve_search_path(session);
329 let database = self
330 .database_by_name
331 .get(session.vars().database())
332 .map(|id| id.clone());
333 let state = match session.transaction().catalog_state() {
334 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
335 None => Cow::Borrowed(self),
336 };
337 ConnCatalog {
338 state,
339 unresolvable_ids: BTreeSet::new(),
340 conn_id: session.conn_id().clone(),
341 cluster: session.vars().cluster().into(),
342 database,
343 search_path,
344 role_id: session.current_role_id().clone(),
345 prepared_statements: Some(session.prepared_statements()),
346 portals: Some(session.portals()),
347 notices_tx: session.retain_notice_transmitter(),
348 }
349 }
350
351 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
352 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
353 let cluster = self.system_configuration.default_cluster();
354
355 ConnCatalog {
356 state: Cow::Borrowed(self),
357 unresolvable_ids: BTreeSet::new(),
358 conn_id: SYSTEM_CONN_ID.clone(),
359 cluster,
360 database: self
361 .resolve_database(DEFAULT_DATABASE_NAME)
362 .ok()
363 .map(|db| db.id()),
364 search_path: Vec::new(),
367 role_id,
368 prepared_statements: None,
369 portals: None,
370 notices_tx,
371 }
372 }
373
374 pub fn for_system_session(&self) -> ConnCatalog<'_> {
375 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
376 }
377
378 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
383 struct I<'a> {
384 queue: VecDeque<CatalogItemId>,
385 seen: BTreeSet<CatalogItemId>,
386 this: &'a CatalogState,
387 }
388 impl<'a> Iterator for I<'a> {
389 type Item = CatalogItemId;
390 fn next(&mut self) -> Option<Self::Item> {
391 if let Some(next) = self.queue.pop_front() {
392 for child in self.this.get_entry(&next).item().uses() {
393 if !self.seen.contains(&child) {
394 self.queue.push_back(child);
395 self.seen.insert(child);
396 }
397 }
398 Some(next)
399 } else {
400 None
401 }
402 }
403 }
404
405 I {
406 queue: [id].into_iter().collect(),
407 seen: [id].into_iter().collect(),
408 this: self,
409 }
410 }
411
412 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
415 let mut out = Vec::new();
416 self.introspection_dependencies_inner(id, &mut out);
417 out
418 }
419
420 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
421 match self.get_entry(&id).item() {
422 CatalogItem::Log(_) => out.push(id),
423 item @ (CatalogItem::View(_)
424 | CatalogItem::MaterializedView(_)
425 | CatalogItem::Connection(_)
426 | CatalogItem::ContinualTask(_)) => {
427 for item_id in item.references().items() {
429 self.introspection_dependencies_inner(*item_id, out);
430 }
431 }
432 CatalogItem::Sink(sink) => {
433 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
434 self.introspection_dependencies_inner(from_item_id, out)
435 }
436 CatalogItem::Index(idx) => {
437 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
438 self.introspection_dependencies_inner(on_item_id, out)
439 }
440 CatalogItem::Table(_)
441 | CatalogItem::Source(_)
442 | CatalogItem::Type(_)
443 | CatalogItem::Func(_)
444 | CatalogItem::Secret(_) => (),
445 }
446 }
447
448 pub(super) fn object_dependents(
454 &self,
455 object_ids: &Vec<ObjectId>,
456 conn_id: &ConnectionId,
457 seen: &mut BTreeSet<ObjectId>,
458 ) -> Vec<ObjectId> {
459 let mut dependents = Vec::new();
460 for object_id in object_ids {
461 match object_id {
462 ObjectId::Cluster(id) => {
463 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
464 }
465 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
466 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
467 ),
468 ObjectId::Database(id) => {
469 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
470 }
471 ObjectId::Schema((database_spec, schema_spec)) => {
472 dependents.extend_from_slice(&self.schema_dependents(
473 database_spec.clone(),
474 schema_spec.clone(),
475 conn_id,
476 seen,
477 ));
478 }
479 ObjectId::NetworkPolicy(id) => {
480 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
481 }
482 id @ ObjectId::Role(_) => {
483 let unseen = seen.insert(id.clone());
484 if unseen {
485 dependents.push(id.clone());
486 }
487 }
488 ObjectId::Item(id) => {
489 dependents.extend_from_slice(&self.item_dependents(*id, seen))
490 }
491 }
492 }
493 dependents
494 }
495
496 fn cluster_dependents(
503 &self,
504 cluster_id: ClusterId,
505 seen: &mut BTreeSet<ObjectId>,
506 ) -> Vec<ObjectId> {
507 let mut dependents = Vec::new();
508 let object_id = ObjectId::Cluster(cluster_id);
509 if !seen.contains(&object_id) {
510 seen.insert(object_id.clone());
511 let cluster = self.get_cluster(cluster_id);
512 for item_id in cluster.bound_objects() {
513 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
514 }
515 for replica_id in cluster.replica_ids().values() {
516 dependents.extend_from_slice(&self.cluster_replica_dependents(
517 cluster_id,
518 *replica_id,
519 seen,
520 ));
521 }
522 dependents.push(object_id);
523 }
524 dependents
525 }
526
527 pub(super) fn cluster_replica_dependents(
534 &self,
535 cluster_id: ClusterId,
536 replica_id: ReplicaId,
537 seen: &mut BTreeSet<ObjectId>,
538 ) -> Vec<ObjectId> {
539 let mut dependents = Vec::new();
540 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
541 if !seen.contains(&object_id) {
542 seen.insert(object_id.clone());
543 dependents.push(object_id);
544 }
545 dependents
546 }
547
548 fn database_dependents(
555 &self,
556 database_id: DatabaseId,
557 conn_id: &ConnectionId,
558 seen: &mut BTreeSet<ObjectId>,
559 ) -> Vec<ObjectId> {
560 let mut dependents = Vec::new();
561 let object_id = ObjectId::Database(database_id);
562 if !seen.contains(&object_id) {
563 seen.insert(object_id.clone());
564 let database = self.get_database(&database_id);
565 for schema_id in database.schema_ids().values() {
566 dependents.extend_from_slice(&self.schema_dependents(
567 ResolvedDatabaseSpecifier::Id(database_id),
568 SchemaSpecifier::Id(*schema_id),
569 conn_id,
570 seen,
571 ));
572 }
573 dependents.push(object_id);
574 }
575 dependents
576 }
577
578 fn schema_dependents(
585 &self,
586 database_spec: ResolvedDatabaseSpecifier,
587 schema_spec: SchemaSpecifier,
588 conn_id: &ConnectionId,
589 seen: &mut BTreeSet<ObjectId>,
590 ) -> Vec<ObjectId> {
591 let mut dependents = Vec::new();
592 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
593 if !seen.contains(&object_id) {
594 seen.insert(object_id.clone());
595 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
596 for item_id in schema.item_ids() {
597 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
598 }
599 dependents.push(object_id)
600 }
601 dependents
602 }
603
604 pub(super) fn item_dependents(
611 &self,
612 item_id: CatalogItemId,
613 seen: &mut BTreeSet<ObjectId>,
614 ) -> Vec<ObjectId> {
615 let mut dependents = Vec::new();
616 let object_id = ObjectId::Item(item_id);
617 if !seen.contains(&object_id) {
618 seen.insert(object_id.clone());
619 let entry = self.get_entry(&item_id);
620 for dependent_id in entry.used_by() {
621 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
622 }
623 dependents.push(object_id);
624 if let Some(progress_id) = entry.progress_id() {
628 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
629 }
630 }
631 dependents
632 }
633
634 pub(super) fn network_policy_dependents(
641 &self,
642 network_policy_id: NetworkPolicyId,
643 _seen: &mut BTreeSet<ObjectId>,
644 ) -> Vec<ObjectId> {
645 let object_id = ObjectId::NetworkPolicy(network_policy_id);
646 vec![object_id]
650 }
651
652 fn is_stable(&self, id: CatalogItemId) -> bool {
656 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
657 !self.is_unstable_schema_specifier(spec)
658 }
659
660 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
661 if self.system_config().unsafe_enable_unstable_dependencies() {
662 return Ok(());
663 }
664
665 let unstable_dependencies: Vec<_> = item
666 .references()
667 .items()
668 .filter(|id| !self.is_stable(**id))
669 .map(|id| self.get_entry(id).name().item.clone())
670 .collect();
671
672 if unstable_dependencies.is_empty() || item.is_temporary() {
676 Ok(())
677 } else {
678 let object_type = item.typ().to_string();
679 Err(Error {
680 kind: ErrorKind::UnstableDependency {
681 object_type,
682 unstable_dependencies,
683 },
684 })
685 }
686 }
687
688 pub fn resolve_full_name(
689 &self,
690 name: &QualifiedItemName,
691 conn_id: Option<&ConnectionId>,
692 ) -> FullItemName {
693 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
694
695 let database = match &name.qualifiers.database_spec {
696 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
697 ResolvedDatabaseSpecifier::Id(id) => {
698 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
699 }
700 };
701 let schema = match &name.qualifiers.schema_spec {
704 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
705 _ => self
706 .get_schema(
707 &name.qualifiers.database_spec,
708 &name.qualifiers.schema_spec,
709 conn_id,
710 )
711 .name()
712 .schema
713 .clone(),
714 };
715 FullItemName {
716 database,
717 schema,
718 item: name.item.clone(),
719 }
720 }
721
722 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
723 let database = match &name.database {
724 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
725 ResolvedDatabaseSpecifier::Id(id) => {
726 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
727 }
728 };
729 FullSchemaName {
730 database,
731 schema: name.schema.clone(),
732 }
733 }
734
735 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
736 self.entry_by_id
737 .get(id)
738 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
739 }
740
741 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
742 let item_id = self
743 .entry_by_global_id
744 .get(id)
745 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
746
747 let entry = self.get_entry(item_id).clone();
748 let version = match entry.item() {
749 CatalogItem::Table(table) => {
750 let (version, _) = table
751 .collections
752 .iter()
753 .find(|(_verison, gid)| *gid == id)
754 .expect("version to exist");
755 RelationVersionSelector::Specific(*version)
756 }
757 _ => RelationVersionSelector::Latest,
758 };
759 CatalogCollectionEntry { entry, version }
760 }
761
762 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
763 self.entry_by_id.iter()
764 }
765
766 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
767 self.temporary_schemas
769 .get(conn)
770 .into_iter()
771 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
772 }
773
774 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
780 self.temporary_schemas.contains_key(conn)
781 }
782
783 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
789 let mut res = None;
790 for schema_id in self.system_schema_ids() {
791 let schema = &self.ambient_schemas_by_id[&schema_id];
792 if let Some(global_id) = schema.types.get(name) {
793 match res {
794 None => res = Some(self.get_entry(global_id)),
795 Some(_) => panic!(
796 "only call get_system_type on objects uniquely identifiable in one system schema"
797 ),
798 }
799 }
800 }
801
802 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
803 }
804
805 pub fn get_item_by_name(
806 &self,
807 name: &QualifiedItemName,
808 conn_id: &ConnectionId,
809 ) -> Option<&CatalogEntry> {
810 self.get_schema(
811 &name.qualifiers.database_spec,
812 &name.qualifiers.schema_spec,
813 conn_id,
814 )
815 .items
816 .get(&name.item)
817 .and_then(|id| self.try_get_entry(id))
818 }
819
820 pub fn get_type_by_name(
821 &self,
822 name: &QualifiedItemName,
823 conn_id: &ConnectionId,
824 ) -> Option<&CatalogEntry> {
825 self.get_schema(
826 &name.qualifiers.database_spec,
827 &name.qualifiers.schema_spec,
828 conn_id,
829 )
830 .types
831 .get(&name.item)
832 .and_then(|id| self.try_get_entry(id))
833 }
834
835 pub(super) fn find_available_name(
836 &self,
837 mut name: QualifiedItemName,
838 conn_id: &ConnectionId,
839 ) -> QualifiedItemName {
840 let mut i = 0;
841 let orig_item_name = name.item.clone();
842 while self.get_item_by_name(&name, conn_id).is_some() {
843 i += 1;
844 name.item = format!("{}{}", orig_item_name, i);
845 }
846 name
847 }
848
849 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
850 self.entry_by_id.get(id)
851 }
852
853 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
854 let item_id = self.entry_by_global_id.get(id)?;
855 self.try_get_entry(item_id)
856 }
857
858 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
861 let entry = self.try_get_entry_by_global_id(id)?;
862 let desc = match entry.item() {
863 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
864 other => other.relation_desc(RelationVersionSelector::Latest)?,
866 };
867 Some(desc)
868 }
869
870 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
871 self.try_get_cluster(cluster_id)
872 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
873 }
874
875 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
876 self.clusters_by_id.get(&cluster_id)
877 }
878
879 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
880 self.roles_by_id.get(id)
881 }
882
883 pub fn get_role(&self, id: &RoleId) -> &Role {
884 self.roles_by_id.get(id).expect("catalog out of sync")
885 }
886
887 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
888 self.roles_by_id.keys()
889 }
890
891 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
892 self.roles_by_name
893 .get(role_name)
894 .map(|id| &self.roles_by_id[id])
895 }
896
897 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
898 self.role_auth_by_id
899 .get(id)
900 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
901 }
902
903 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
904 self.role_auth_by_id.get(id)
905 }
906
907 pub(super) fn try_get_network_policy_by_name(
908 &self,
909 policy_name: &str,
910 ) -> Option<&NetworkPolicy> {
911 self.network_policies_by_name
912 .get(policy_name)
913 .map(|id| &self.network_policies_by_id[id])
914 }
915
916 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
917 let mut membership = BTreeSet::new();
918 let mut queue = VecDeque::from(vec![id]);
919 while let Some(cur_id) = queue.pop_front() {
920 if !membership.contains(cur_id) {
921 membership.insert(cur_id.clone());
922 let role = self.get_role(cur_id);
923 soft_assert_no_log!(
924 !role.membership().keys().contains(id),
925 "circular membership exists in the catalog"
926 );
927 queue.extend(role.membership().keys());
928 }
929 }
930 membership.insert(RoleId::Public);
931 membership
932 }
933
934 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
935 self.network_policies_by_id
936 .get(id)
937 .expect("catalog out of sync")
938 }
939
940 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
941 self.network_policies_by_id.keys()
942 }
943
944 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
949 let entry = self.try_get_entry(id)?;
950 let name = self.resolve_full_name(entry.name(), None);
952 let host_name = self
953 .http_host_name
954 .as_ref()
955 .map(|x| x.as_str())
956 .unwrap_or_else(|| "HOST");
957
958 let RawDatabaseSpecifier::Name(database) = name.database else {
959 return None;
960 };
961
962 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
963 url.path_segments_mut()
964 .ok()?
965 .push(&database)
966 .push(&name.schema)
967 .push(&name.item);
968
969 Some(url)
970 }
971
972 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
980 &mut self,
983 create_sql: &str,
984 force_if_exists_skip: bool,
985 ) -> Result<(Plan, ResolvedIds), AdapterError> {
986 self.with_enable_for_item_parsing(|state| {
987 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
988 let pcx = Some(&pcx);
989 let session_catalog = state.for_system_session();
990
991 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
992 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
993 let plan =
994 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
995
996 Ok((plan, resolved_ids))
997 })
998 }
999
1000 #[mz_ore::instrument]
1002 pub(crate) fn parse_plan(
1003 create_sql: &str,
1004 pcx: Option<&PlanContext>,
1005 catalog: &ConnCatalog,
1006 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1007 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1008 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1009 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1010
1011 Ok((plan, resolved_ids))
1012 }
1013
1014 pub(crate) fn deserialize_item(
1016 &self,
1017 global_id: GlobalId,
1018 create_sql: &str,
1019 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1020 local_expression_cache: &mut LocalExpressionCache,
1021 previous_item: Option<CatalogItem>,
1022 ) -> Result<CatalogItem, AdapterError> {
1023 self.parse_item(
1024 global_id,
1025 create_sql,
1026 extra_versions,
1027 None,
1028 false,
1029 None,
1030 local_expression_cache,
1031 previous_item,
1032 )
1033 }
1034
1035 #[mz_ore::instrument]
1037 pub(crate) fn parse_item(
1038 &self,
1039 global_id: GlobalId,
1040 create_sql: &str,
1041 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1042 pcx: Option<&PlanContext>,
1043 is_retained_metrics_object: bool,
1044 custom_logical_compaction_window: Option<CompactionWindow>,
1045 local_expression_cache: &mut LocalExpressionCache,
1046 previous_item: Option<CatalogItem>,
1047 ) -> Result<CatalogItem, AdapterError> {
1048 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1049 match self.parse_item_inner(
1050 global_id,
1051 create_sql,
1052 extra_versions,
1053 pcx,
1054 is_retained_metrics_object,
1055 custom_logical_compaction_window,
1056 cached_expr,
1057 previous_item,
1058 ) {
1059 Ok((item, uncached_expr)) => {
1060 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1061 local_expression_cache.insert_uncached_expression(
1062 global_id,
1063 uncached_expr,
1064 optimizer_features,
1065 );
1066 }
1067 Ok(item)
1068 }
1069 Err((err, cached_expr)) => {
1070 if let Some(local_expr) = cached_expr {
1071 local_expression_cache.insert_cached_expression(global_id, local_expr);
1072 }
1073 Err(err)
1074 }
1075 }
1076 }
1077
1078 #[mz_ore::instrument]
1085 pub(crate) fn parse_item_inner(
1086 &self,
1087 global_id: GlobalId,
1088 create_sql: &str,
1089 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1090 pcx: Option<&PlanContext>,
1091 is_retained_metrics_object: bool,
1092 custom_logical_compaction_window: Option<CompactionWindow>,
1093 cached_expr: Option<LocalExpressions>,
1094 previous_item: Option<CatalogItem>,
1095 ) -> Result<
1096 (
1097 CatalogItem,
1098 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1099 ),
1100 (AdapterError, Option<LocalExpressions>),
1101 > {
1102 let session_catalog = self.for_system_session();
1103
1104 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1105 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1106 Err(err) => return Err((err, cached_expr)),
1107 };
1108
1109 let mut uncached_expr = None;
1110
1111 let item = match plan {
1112 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1113 let collections = extra_versions
1114 .iter()
1115 .map(|(version, gid)| (*version, *gid))
1116 .chain([(RelationVersion::root(), global_id)].into_iter())
1117 .collect();
1118
1119 CatalogItem::Table(Table {
1120 create_sql: Some(table.create_sql),
1121 desc: table.desc,
1122 collections,
1123 conn_id: None,
1124 resolved_ids,
1125 custom_logical_compaction_window: custom_logical_compaction_window
1126 .or(table.compaction_window),
1127 is_retained_metrics_object,
1128 data_source: match table.data_source {
1129 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1130 TableDataSource::TableWrites { defaults }
1131 }
1132 mz_sql::plan::TableDataSource::DataSource {
1133 desc: data_source_desc,
1134 timeline,
1135 } => match data_source_desc {
1136 mz_sql::plan::DataSourceDesc::IngestionExport {
1137 ingestion_id,
1138 external_reference,
1139 details,
1140 data_config,
1141 } => TableDataSource::DataSource {
1142 desc: DataSourceDesc::IngestionExport {
1143 ingestion_id,
1144 external_reference,
1145 details,
1146 data_config,
1147 },
1148 timeline,
1149 },
1150 mz_sql::plan::DataSourceDesc::Webhook {
1151 validate_using,
1152 body_format,
1153 headers,
1154 cluster_id,
1155 } => TableDataSource::DataSource {
1156 desc: DataSourceDesc::Webhook {
1157 validate_using,
1158 body_format,
1159 headers,
1160 cluster_id: cluster_id
1161 .expect("Webhook Tables must have a cluster_id set"),
1162 },
1163 timeline,
1164 },
1165 _ => {
1166 return Err((
1167 AdapterError::Unstructured(anyhow::anyhow!(
1168 "unsupported data source for table"
1169 )),
1170 cached_expr,
1171 ));
1172 }
1173 },
1174 },
1175 })
1176 }
1177 Plan::CreateSource(CreateSourcePlan {
1178 source,
1179 timeline,
1180 in_cluster,
1181 ..
1182 }) => CatalogItem::Source(Source {
1183 create_sql: Some(source.create_sql),
1184 data_source: match source.data_source {
1185 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1186 desc,
1187 cluster_id: match in_cluster {
1188 Some(id) => id,
1189 None => {
1190 return Err((
1191 AdapterError::Unstructured(anyhow::anyhow!(
1192 "ingestion-based sources must have cluster specified"
1193 )),
1194 cached_expr,
1195 ));
1196 }
1197 },
1198 },
1199 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1200 desc,
1201 progress_subsource,
1202 data_config,
1203 details,
1204 } => DataSourceDesc::OldSyntaxIngestion {
1205 desc,
1206 progress_subsource,
1207 data_config,
1208 details,
1209 cluster_id: match in_cluster {
1210 Some(id) => id,
1211 None => {
1212 return Err((
1213 AdapterError::Unstructured(anyhow::anyhow!(
1214 "ingestion-based sources must have cluster specified"
1215 )),
1216 cached_expr,
1217 ));
1218 }
1219 },
1220 },
1221 mz_sql::plan::DataSourceDesc::IngestionExport {
1222 ingestion_id,
1223 external_reference,
1224 details,
1225 data_config,
1226 } => DataSourceDesc::IngestionExport {
1227 ingestion_id,
1228 external_reference,
1229 details,
1230 data_config,
1231 },
1232 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1233 mz_sql::plan::DataSourceDesc::Webhook {
1234 validate_using,
1235 body_format,
1236 headers,
1237 cluster_id,
1238 } => {
1239 mz_ore::soft_assert_or_log!(
1240 cluster_id.is_none(),
1241 "cluster_id set at Source level for Webhooks"
1242 );
1243 DataSourceDesc::Webhook {
1244 validate_using,
1245 body_format,
1246 headers,
1247 cluster_id: in_cluster
1248 .expect("webhook sources must use an existing cluster"),
1249 }
1250 }
1251 },
1252 desc: source.desc,
1253 global_id,
1254 timeline,
1255 resolved_ids,
1256 custom_logical_compaction_window: source
1257 .compaction_window
1258 .or(custom_logical_compaction_window),
1259 is_retained_metrics_object,
1260 }),
1261 Plan::CreateView(CreateViewPlan { view, .. }) => {
1262 let optimizer_config =
1264 optimize::OptimizerConfig::from(session_catalog.system_vars());
1265 let previous_exprs = previous_item.map(|item| match item {
1266 CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1267 _ => None,
1268 });
1269
1270 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1271 (Some(local_expr), _)
1272 if local_expr.optimizer_features == optimizer_config.features =>
1273 {
1274 debug!("local expression cache hit for {global_id:?}");
1275 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1276 }
1277 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1279 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1280 }
1281 (cached_expr, _) => {
1282 let optimizer_features = optimizer_config.features.clone();
1283 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1285
1286 let raw_expr = view.expr;
1288 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1289 Ok(optimzed_expr) => optimzed_expr,
1290 Err(err) => return Err((err.into(), cached_expr)),
1291 };
1292
1293 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1294
1295 (Arc::new(raw_expr), Arc::new(optimized_expr))
1296 }
1297 };
1298
1299 let dependencies: BTreeSet<_> = raw_expr
1301 .depends_on()
1302 .into_iter()
1303 .map(|gid| self.get_entry_by_global_id(&gid).id())
1304 .collect();
1305
1306 CatalogItem::View(View {
1307 create_sql: view.create_sql,
1308 global_id,
1309 raw_expr,
1310 desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1311 optimized_expr,
1312 conn_id: None,
1313 resolved_ids,
1314 dependencies: DependencyIds(dependencies),
1315 })
1316 }
1317 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1318 materialized_view, ..
1319 }) => {
1320 let collections = extra_versions
1321 .iter()
1322 .map(|(version, gid)| (*version, *gid))
1323 .chain([(RelationVersion::root(), global_id)].into_iter())
1324 .collect();
1325
1326 let optimizer_config =
1328 optimize::OptimizerConfig::from(session_catalog.system_vars());
1329 let previous_exprs = previous_item.map(|item| match item {
1330 CatalogItem::MaterializedView(materialized_view) => {
1331 (materialized_view.raw_expr, materialized_view.optimized_expr)
1332 }
1333 item => unreachable!("expected materialized view, found: {item:#?}"),
1334 });
1335
1336 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1337 (Some(local_expr), _)
1338 if local_expr.optimizer_features == optimizer_config.features =>
1339 {
1340 debug!("local expression cache hit for {global_id:?}");
1341 (
1342 Arc::new(materialized_view.expr),
1343 Arc::new(local_expr.local_mir),
1344 )
1345 }
1346 (_, Some((raw_expr, optimized_expr)))
1348 if *raw_expr == materialized_view.expr =>
1349 {
1350 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1351 }
1352 (cached_expr, _) => {
1353 let optimizer_features = optimizer_config.features.clone();
1354 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1356
1357 let raw_expr = materialized_view.expr;
1358 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1359 Ok(optimized_expr) => optimized_expr,
1360 Err(err) => return Err((err.into(), cached_expr)),
1361 };
1362
1363 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1364
1365 (Arc::new(raw_expr), Arc::new(optimized_expr))
1366 }
1367 };
1368 let mut typ = optimized_expr.typ();
1369 for &i in &materialized_view.non_null_assertions {
1370 typ.column_types[i].nullable = false;
1371 }
1372 let desc = RelationDesc::new(typ, materialized_view.column_names);
1373 let desc = VersionedRelationDesc::new(desc);
1374
1375 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1376
1377 let dependencies = raw_expr
1379 .depends_on()
1380 .into_iter()
1381 .map(|gid| self.get_entry_by_global_id(&gid).id())
1382 .collect();
1383
1384 CatalogItem::MaterializedView(MaterializedView {
1385 create_sql: materialized_view.create_sql,
1386 collections,
1387 raw_expr,
1388 optimized_expr,
1389 desc,
1390 resolved_ids,
1391 dependencies,
1392 replacement_target: materialized_view.replacement_target,
1393 cluster_id: materialized_view.cluster_id,
1394 non_null_assertions: materialized_view.non_null_assertions,
1395 custom_logical_compaction_window: materialized_view.compaction_window,
1396 refresh_schedule: materialized_view.refresh_schedule,
1397 initial_as_of,
1398 })
1399 }
1400 Plan::CreateContinualTask(plan) => {
1401 let ct =
1402 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1403 Ok(ct) => ct,
1404 Err(err) => return Err((err, cached_expr)),
1405 };
1406 CatalogItem::ContinualTask(ct)
1407 }
1408 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1409 create_sql: index.create_sql,
1410 global_id,
1411 on: index.on,
1412 keys: index.keys.into(),
1413 conn_id: None,
1414 resolved_ids,
1415 cluster_id: index.cluster_id,
1416 custom_logical_compaction_window: custom_logical_compaction_window
1417 .or(index.compaction_window),
1418 is_retained_metrics_object,
1419 }),
1420 Plan::CreateSink(CreateSinkPlan {
1421 sink,
1422 with_snapshot,
1423 in_cluster,
1424 ..
1425 }) => CatalogItem::Sink(Sink {
1426 create_sql: sink.create_sql,
1427 global_id,
1428 from: sink.from,
1429 connection: sink.connection,
1430 envelope: sink.envelope,
1431 version: sink.version,
1432 with_snapshot,
1433 resolved_ids,
1434 cluster_id: in_cluster,
1435 commit_interval: sink.commit_interval,
1436 }),
1437 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1438 if let Err(err) = typ.inner.desc(&session_catalog) {
1442 return Err((err.into(), cached_expr));
1443 }
1444 CatalogItem::Type(Type {
1445 create_sql: Some(typ.create_sql),
1446 global_id,
1447 details: CatalogTypeDetails {
1448 array_id: None,
1449 typ: typ.inner,
1450 pg_metadata: None,
1451 },
1452 resolved_ids,
1453 })
1454 }
1455 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1456 create_sql: secret.create_sql,
1457 global_id,
1458 }),
1459 Plan::CreateConnection(CreateConnectionPlan {
1460 connection:
1461 mz_sql::plan::Connection {
1462 create_sql,
1463 details,
1464 },
1465 ..
1466 }) => CatalogItem::Connection(Connection {
1467 create_sql,
1468 global_id,
1469 details,
1470 resolved_ids,
1471 }),
1472 _ => {
1473 return Err((
1474 Error::new(ErrorKind::Corruption {
1475 detail: "catalog entry generated inappropriate plan".to_string(),
1476 })
1477 .into(),
1478 cached_expr,
1479 ));
1480 }
1481 };
1482
1483 Ok((item, uncached_expr))
1484 }
1485
1486 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1492 let restore = self.system_configuration.clone();
1503 self.system_configuration.enable_for_item_parsing();
1504 let res = f(self);
1505 self.system_configuration = restore;
1506 res
1507 }
1508
1509 pub fn get_indexes_on(
1511 &self,
1512 id: GlobalId,
1513 cluster: ClusterId,
1514 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1515 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1516
1517 self.try_get_entry_by_global_id(&id)
1518 .into_iter()
1519 .map(move |e| {
1520 e.used_by()
1521 .iter()
1522 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1523 CatalogItem::Index(index) if index_matches(index) => {
1524 Some((index.global_id(), index))
1525 }
1526 _ => None,
1527 })
1528 })
1529 .flatten()
1530 }
1531
1532 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1533 &self.database_by_id[database_id]
1534 }
1535
1536 pub(super) fn try_get_cluster_replica(
1541 &self,
1542 id: ClusterId,
1543 replica_id: ReplicaId,
1544 ) -> Option<&ClusterReplica> {
1545 self.try_get_cluster(id)
1546 .and_then(|cluster| cluster.replica(replica_id))
1547 }
1548
1549 pub(crate) fn get_cluster_replica(
1553 &self,
1554 cluster_id: ClusterId,
1555 replica_id: ReplicaId,
1556 ) -> &ClusterReplica {
1557 self.try_get_cluster_replica(cluster_id, replica_id)
1558 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1559 }
1560
1561 pub(super) fn resolve_replica_in_cluster(
1562 &self,
1563 cluster_id: &ClusterId,
1564 replica_name: &str,
1565 ) -> Result<&ClusterReplica, SqlCatalogError> {
1566 let cluster = self.get_cluster(*cluster_id);
1567 let replica_id = cluster
1568 .replica_id_by_name_
1569 .get(replica_name)
1570 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1571 Ok(&cluster.replicas_by_id_[replica_id])
1572 }
1573
1574 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1576 Ok(self.system_configuration.get(name)?)
1577 }
1578
1579 pub(super) fn parse_system_configuration(
1583 &self,
1584 name: &str,
1585 value: VarInput,
1586 ) -> Result<String, Error> {
1587 let value = self.system_configuration.parse(name, value)?;
1588 Ok(value.format())
1589 }
1590
1591 pub(super) fn resolve_schema_in_database(
1593 &self,
1594 database_spec: &ResolvedDatabaseSpecifier,
1595 schema_name: &str,
1596 conn_id: &ConnectionId,
1597 ) -> Result<&Schema, SqlCatalogError> {
1598 let schema = match database_spec {
1599 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1600 self.temporary_schemas.get(conn_id)
1601 }
1602 ResolvedDatabaseSpecifier::Ambient => self
1603 .ambient_schemas_by_name
1604 .get(schema_name)
1605 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1606 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1607 db.schemas_by_name
1608 .get(schema_name)
1609 .and_then(|id| db.schemas_by_id.get(id))
1610 }),
1611 };
1612 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1613 }
1614
1615 pub fn try_get_schema(
1620 &self,
1621 database_spec: &ResolvedDatabaseSpecifier,
1622 schema_spec: &SchemaSpecifier,
1623 conn_id: &ConnectionId,
1624 ) -> Option<&Schema> {
1625 match (database_spec, schema_spec) {
1627 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1628 self.temporary_schemas.get(conn_id)
1629 }
1630 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1631 self.ambient_schemas_by_id.get(id)
1632 }
1633 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1634 .database_by_id
1635 .get(database_id)
1636 .and_then(|db| db.schemas_by_id.get(schema_id)),
1637 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1638 unreachable!("temporary schemas are in the ambient database")
1639 }
1640 }
1641 }
1642
1643 pub fn get_schema(
1644 &self,
1645 database_spec: &ResolvedDatabaseSpecifier,
1646 schema_spec: &SchemaSpecifier,
1647 conn_id: &ConnectionId,
1648 ) -> &Schema {
1649 self.try_get_schema(database_spec, schema_spec, conn_id)
1651 .expect("schema must exist")
1652 }
1653
1654 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1655 self.database_by_id
1656 .values()
1657 .filter_map(|database| database.schemas_by_id.get(schema_id))
1658 .chain(self.ambient_schemas_by_id.values())
1659 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1660 .into_first()
1661 }
1662
1663 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1664 self.temporary_schemas
1665 .values()
1666 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1667 .into_first()
1668 }
1669
1670 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1671 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1672 }
1673
1674 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1675 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1676 }
1677
1678 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1679 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1680 }
1681
1682 pub fn get_information_schema_id(&self) -> SchemaId {
1683 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1684 }
1685
1686 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1687 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1688 }
1689
1690 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1691 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1692 }
1693
1694 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1695 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1696 }
1697
1698 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1699 SYSTEM_SCHEMAS
1700 .iter()
1701 .map(|name| self.ambient_schemas_by_name[*name])
1702 }
1703
1704 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1705 self.system_schema_ids().contains(&id)
1706 }
1707
1708 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1709 match spec {
1710 SchemaSpecifier::Temporary => false,
1711 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1712 }
1713 }
1714
1715 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1716 UNSTABLE_SCHEMAS
1717 .iter()
1718 .map(|name| self.ambient_schemas_by_name[*name])
1719 }
1720
1721 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1722 self.unstable_schema_ids().contains(&id)
1723 }
1724
1725 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1726 match spec {
1727 SchemaSpecifier::Temporary => false,
1728 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1729 }
1730 }
1731
1732 pub fn create_temporary_schema(
1735 &mut self,
1736 conn_id: &ConnectionId,
1737 owner_id: RoleId,
1738 ) -> Result<(), Error> {
1739 let oid = INVALID_OID;
1744 self.temporary_schemas.insert(
1745 conn_id.clone(),
1746 Schema {
1747 name: QualifiedSchemaName {
1748 database: ResolvedDatabaseSpecifier::Ambient,
1749 schema: MZ_TEMP_SCHEMA.into(),
1750 },
1751 id: SchemaSpecifier::Temporary,
1752 oid,
1753 items: BTreeMap::new(),
1754 functions: BTreeMap::new(),
1755 types: BTreeMap::new(),
1756 owner_id,
1757 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1758 mz_sql::catalog::ObjectType::Schema,
1759 owner_id,
1760 )]),
1761 },
1762 );
1763 Ok(())
1764 }
1765
1766 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1768 std::iter::empty()
1769 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1770 if schema.id.is_temporary() {
1771 Some(schema.oid)
1772 } else {
1773 None
1774 }
1775 }))
1776 .chain(self.entry_by_id.values().filter_map(|entry| {
1777 if entry.item().is_temporary() {
1778 Some(entry.oid)
1779 } else {
1780 None
1781 }
1782 }))
1783 }
1784
1785 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1789 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1790 }
1791
1792 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1796 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1797 let log = match self.get_entry(&item_id).item() {
1798 CatalogItem::Log(log) => log,
1799 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1800 };
1801 (item_id, log.global_id)
1802 }
1803
1804 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1808 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1809 }
1810
1811 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1815 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1816 let schema = &self.ambient_schemas_by_id[schema_id];
1817 match builtin.catalog_item_type() {
1818 CatalogItemType::Type => schema.types[builtin.name()],
1819 CatalogItemType::Func => schema.functions[builtin.name()],
1820 CatalogItemType::Table
1821 | CatalogItemType::Source
1822 | CatalogItemType::Sink
1823 | CatalogItemType::View
1824 | CatalogItemType::MaterializedView
1825 | CatalogItemType::Index
1826 | CatalogItemType::Secret
1827 | CatalogItemType::Connection
1828 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1829 }
1830 }
1831
1832 pub fn resolve_builtin_type_references(
1834 &self,
1835 builtin: &BuiltinType<NameReference>,
1836 ) -> BuiltinType<IdReference> {
1837 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1838 CatalogType::AclItem => CatalogType::AclItem,
1839 CatalogType::Array { element_reference } => CatalogType::Array {
1840 element_reference: self.get_system_type(element_reference).id,
1841 },
1842 CatalogType::List {
1843 element_reference,
1844 element_modifiers,
1845 } => CatalogType::List {
1846 element_reference: self.get_system_type(element_reference).id,
1847 element_modifiers: element_modifiers.clone(),
1848 },
1849 CatalogType::Map {
1850 key_reference,
1851 value_reference,
1852 key_modifiers,
1853 value_modifiers,
1854 } => CatalogType::Map {
1855 key_reference: self.get_system_type(key_reference).id,
1856 value_reference: self.get_system_type(value_reference).id,
1857 key_modifiers: key_modifiers.clone(),
1858 value_modifiers: value_modifiers.clone(),
1859 },
1860 CatalogType::Range { element_reference } => CatalogType::Range {
1861 element_reference: self.get_system_type(element_reference).id,
1862 },
1863 CatalogType::Record { fields } => CatalogType::Record {
1864 fields: fields
1865 .into_iter()
1866 .map(|f| CatalogRecordField {
1867 name: f.name.clone(),
1868 type_reference: self.get_system_type(f.type_reference).id,
1869 type_modifiers: f.type_modifiers.clone(),
1870 })
1871 .collect(),
1872 },
1873 CatalogType::Bool => CatalogType::Bool,
1874 CatalogType::Bytes => CatalogType::Bytes,
1875 CatalogType::Char => CatalogType::Char,
1876 CatalogType::Date => CatalogType::Date,
1877 CatalogType::Float32 => CatalogType::Float32,
1878 CatalogType::Float64 => CatalogType::Float64,
1879 CatalogType::Int16 => CatalogType::Int16,
1880 CatalogType::Int32 => CatalogType::Int32,
1881 CatalogType::Int64 => CatalogType::Int64,
1882 CatalogType::UInt16 => CatalogType::UInt16,
1883 CatalogType::UInt32 => CatalogType::UInt32,
1884 CatalogType::UInt64 => CatalogType::UInt64,
1885 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1886 CatalogType::Interval => CatalogType::Interval,
1887 CatalogType::Jsonb => CatalogType::Jsonb,
1888 CatalogType::Numeric => CatalogType::Numeric,
1889 CatalogType::Oid => CatalogType::Oid,
1890 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1891 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1892 CatalogType::Pseudo => CatalogType::Pseudo,
1893 CatalogType::RegClass => CatalogType::RegClass,
1894 CatalogType::RegProc => CatalogType::RegProc,
1895 CatalogType::RegType => CatalogType::RegType,
1896 CatalogType::String => CatalogType::String,
1897 CatalogType::Time => CatalogType::Time,
1898 CatalogType::Timestamp => CatalogType::Timestamp,
1899 CatalogType::TimestampTz => CatalogType::TimestampTz,
1900 CatalogType::Uuid => CatalogType::Uuid,
1901 CatalogType::VarChar => CatalogType::VarChar,
1902 CatalogType::Int2Vector => CatalogType::Int2Vector,
1903 CatalogType::MzAclItem => CatalogType::MzAclItem,
1904 };
1905
1906 BuiltinType {
1907 name: builtin.name,
1908 schema: builtin.schema,
1909 oid: builtin.oid,
1910 details: CatalogTypeDetails {
1911 array_id: builtin.details.array_id,
1912 typ,
1913 pg_metadata: builtin.details.pg_metadata.clone(),
1914 },
1915 }
1916 }
1917
1918 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1919 &self.config
1920 }
1921
1922 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1923 match self.database_by_name.get(database_name) {
1924 Some(id) => Ok(&self.database_by_id[id]),
1925 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1926 }
1927 }
1928
1929 pub fn resolve_schema(
1930 &self,
1931 current_database: Option<&DatabaseId>,
1932 database_name: Option<&str>,
1933 schema_name: &str,
1934 conn_id: &ConnectionId,
1935 ) -> Result<&Schema, SqlCatalogError> {
1936 let database_spec = match database_name {
1937 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1942 self.resolve_database(database)?.id().clone(),
1943 )),
1944 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1945 };
1946
1947 if let Some(database_spec) = database_spec {
1949 if let Ok(schema) =
1950 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1951 {
1952 return Ok(schema);
1953 }
1954 }
1955
1956 if let Ok(schema) = self.resolve_schema_in_database(
1958 &ResolvedDatabaseSpecifier::Ambient,
1959 schema_name,
1960 conn_id,
1961 ) {
1962 return Ok(schema);
1963 }
1964
1965 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1966 }
1967
1968 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1972 self.ambient_schemas_by_name[name]
1973 }
1974
1975 pub fn resolve_search_path(
1976 &self,
1977 session: &dyn SessionMetadata,
1978 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1979 let database = self
1980 .database_by_name
1981 .get(session.database())
1982 .map(|id| id.clone());
1983
1984 session
1985 .search_path()
1986 .iter()
1987 .map(|schema| {
1988 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1989 })
1990 .filter_map(|schema| schema.ok())
1991 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1992 .collect()
1993 }
1994
1995 pub fn effective_search_path(
1996 &self,
1997 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1998 include_temp_schema: bool,
1999 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2000 let mut v = Vec::with_capacity(search_path.len() + 3);
2001 let temp_schema = (
2003 ResolvedDatabaseSpecifier::Ambient,
2004 SchemaSpecifier::Temporary,
2005 );
2006 if include_temp_schema && !search_path.contains(&temp_schema) {
2007 v.push(temp_schema);
2008 }
2009 let default_schemas = [
2010 (
2011 ResolvedDatabaseSpecifier::Ambient,
2012 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2013 ),
2014 (
2015 ResolvedDatabaseSpecifier::Ambient,
2016 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2017 ),
2018 ];
2019 for schema in default_schemas.into_iter() {
2020 if !search_path.contains(&schema) {
2021 v.push(schema);
2022 }
2023 }
2024 v.extend_from_slice(search_path);
2025 v
2026 }
2027
2028 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2029 let id = self
2030 .clusters_by_name
2031 .get(name)
2032 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2033 Ok(&self.clusters_by_id[id])
2034 }
2035
2036 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2037 let id = self
2038 .clusters_by_name
2039 .get(cluster.name)
2040 .expect("failed to lookup BuiltinCluster by name");
2041 self.clusters_by_id
2042 .get(id)
2043 .expect("failed to lookup BuiltinCluster by ID")
2044 }
2045
2046 pub fn resolve_cluster_replica(
2047 &self,
2048 cluster_replica_name: &QualifiedReplica,
2049 ) -> Result<&ClusterReplica, SqlCatalogError> {
2050 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2051 let replica_name = cluster_replica_name.replica.as_str();
2052 let replica_id = cluster
2053 .replica_id(replica_name)
2054 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2055 Ok(cluster.replica(replica_id).expect("Must exist"))
2056 }
2057
2058 #[allow(clippy::useless_let_if_seq)]
2064 pub fn resolve(
2065 &self,
2066 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2067 current_database: Option<&DatabaseId>,
2068 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2069 name: &PartialItemName,
2070 conn_id: &ConnectionId,
2071 err_gen: fn(String) -> SqlCatalogError,
2072 ) -> Result<&CatalogEntry, SqlCatalogError> {
2073 let schemas = match &name.schema {
2078 Some(schema_name) => {
2079 match self.resolve_schema(
2080 current_database,
2081 name.database.as_deref(),
2082 schema_name,
2083 conn_id,
2084 ) {
2085 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2086 Err(e) => return Err(e),
2087 }
2088 }
2089 None => match self
2090 .try_get_schema(
2091 &ResolvedDatabaseSpecifier::Ambient,
2092 &SchemaSpecifier::Temporary,
2093 conn_id,
2094 )
2095 .and_then(|schema| schema.items.get(&name.item))
2096 {
2097 Some(id) => return Ok(self.get_entry(id)),
2098 None => search_path.to_vec(),
2099 },
2100 };
2101
2102 for (database_spec, schema_spec) in &schemas {
2103 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2106 continue;
2107 };
2108
2109 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2110 return Ok(&self.entry_by_id[id]);
2111 }
2112 }
2113
2114 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2119 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2120 for schema_id in [
2121 self.get_mz_catalog_unstable_schema_id(),
2122 self.get_mz_introspection_schema_id(),
2123 ] {
2124 let schema = self.get_schema(
2125 &ResolvedDatabaseSpecifier::Ambient,
2126 &SchemaSpecifier::Id(schema_id),
2127 conn_id,
2128 );
2129
2130 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2131 debug!(
2132 github_27831 = true,
2133 "encountered use of outdated schema `mz_internal` for relation: {name}",
2134 );
2135 return Ok(&self.entry_by_id[id]);
2136 }
2137 }
2138 }
2139
2140 Err(err_gen(name.to_string()))
2141 }
2142
2143 pub fn resolve_entry(
2145 &self,
2146 current_database: Option<&DatabaseId>,
2147 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2148 name: &PartialItemName,
2149 conn_id: &ConnectionId,
2150 ) -> Result<&CatalogEntry, SqlCatalogError> {
2151 self.resolve(
2152 |schema| &schema.items,
2153 current_database,
2154 search_path,
2155 name,
2156 conn_id,
2157 SqlCatalogError::UnknownItem,
2158 )
2159 }
2160
2161 pub fn resolve_function(
2163 &self,
2164 current_database: Option<&DatabaseId>,
2165 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2166 name: &PartialItemName,
2167 conn_id: &ConnectionId,
2168 ) -> Result<&CatalogEntry, SqlCatalogError> {
2169 self.resolve(
2170 |schema| &schema.functions,
2171 current_database,
2172 search_path,
2173 name,
2174 conn_id,
2175 |name| SqlCatalogError::UnknownFunction {
2176 name,
2177 alternative: None,
2178 },
2179 )
2180 }
2181
2182 pub fn resolve_type(
2184 &self,
2185 current_database: Option<&DatabaseId>,
2186 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2187 name: &PartialItemName,
2188 conn_id: &ConnectionId,
2189 ) -> Result<&CatalogEntry, SqlCatalogError> {
2190 static NON_PG_CATALOG_TYPES: LazyLock<
2191 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2192 > = LazyLock::new(|| {
2193 BUILTINS::types()
2194 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2195 .map(|typ| (typ.name, typ))
2196 .collect()
2197 });
2198
2199 let entry = self.resolve(
2200 |schema| &schema.types,
2201 current_database,
2202 search_path,
2203 name,
2204 conn_id,
2205 |name| SqlCatalogError::UnknownType { name },
2206 )?;
2207
2208 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2209 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2210 warn!(
2211 "user specified an incorrect schema of {} for the type {}, which should be in \
2212 the {} schema. This works now due to a bug but will be fixed in a later release.",
2213 PG_CATALOG_SCHEMA.quoted(),
2214 typ.name.quoted(),
2215 typ.schema.quoted(),
2216 )
2217 }
2218 }
2219
2220 Ok(entry)
2221 }
2222
2223 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2225 match object_id {
2226 ObjectId::Item(item_id) => {
2227 let entry = self.get_entry(&item_id);
2228 match entry.item_type() {
2229 CatalogItemType::Table => CommentObjectId::Table(item_id),
2230 CatalogItemType::Source => CommentObjectId::Source(item_id),
2231 CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2232 CatalogItemType::View => CommentObjectId::View(item_id),
2233 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2234 CatalogItemType::Index => CommentObjectId::Index(item_id),
2235 CatalogItemType::Func => CommentObjectId::Func(item_id),
2236 CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2237 CatalogItemType::Type => CommentObjectId::Type(item_id),
2238 CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2239 CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2240 }
2241 }
2242 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2243 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2244 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2245 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2246 ObjectId::ClusterReplica(cluster_replica_id) => {
2247 CommentObjectId::ClusterReplica(cluster_replica_id)
2248 }
2249 ObjectId::NetworkPolicy(network_policy_id) => {
2250 CommentObjectId::NetworkPolicy(network_policy_id)
2251 }
2252 }
2253 }
2254
2255 pub fn system_config(&self) -> &SystemVars {
2257 &self.system_configuration
2258 }
2259
2260 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2262 &mut self.system_configuration
2263 }
2264
2265 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2275 let mut dump = serde_json::to_value(&self).map_err(|e| {
2277 Error::new(ErrorKind::Unstructured(format!(
2278 "internal error: could not dump catalog: {}",
2281 e
2282 )))
2283 })?;
2284
2285 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2286 dump_obj.insert(
2288 "system_parameter_defaults".into(),
2289 serde_json::json!(self.system_config().defaults()),
2290 );
2291 if let Some(unfinalized_shards) = unfinalized_shards {
2293 dump_obj
2294 .get_mut("storage_metadata")
2295 .expect("known to exist")
2296 .as_object_mut()
2297 .expect("storage_metadata is an object")
2298 .insert(
2299 "unfinalized_shards".into(),
2300 serde_json::json!(unfinalized_shards),
2301 );
2302 }
2303 let temporary_gids: Vec<_> = self
2308 .entry_by_global_id
2309 .iter()
2310 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2311 .map(|(gid, _item_id)| *gid)
2312 .collect();
2313 if !temporary_gids.is_empty() {
2314 let gids = dump_obj
2315 .get_mut("entry_by_global_id")
2316 .expect("known_to_exist")
2317 .as_object_mut()
2318 .expect("entry_by_global_id is an object");
2319 for gid in temporary_gids {
2320 gids.remove(&gid.to_string());
2321 }
2322 }
2323 dump_obj.remove("role_auth_by_id");
2326
2327 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2329 }
2330
2331 pub fn availability_zones(&self) -> &[String] {
2332 &self.availability_zones
2333 }
2334
2335 pub fn concretize_replica_location(
2336 &self,
2337 location: mz_catalog::durable::ReplicaLocation,
2338 allowed_sizes: &Vec<String>,
2339 allowed_availability_zones: Option<&[String]>,
2340 ) -> Result<ReplicaLocation, Error> {
2341 let location = match location {
2342 mz_catalog::durable::ReplicaLocation::Unmanaged {
2343 storagectl_addrs,
2344 computectl_addrs,
2345 } => {
2346 if allowed_availability_zones.is_some() {
2347 return Err(Error {
2348 kind: ErrorKind::Internal(
2349 "tried concretize unmanaged replica with specific availability_zones"
2350 .to_string(),
2351 ),
2352 });
2353 }
2354 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2355 storagectl_addrs,
2356 computectl_addrs,
2357 })
2358 }
2359 mz_catalog::durable::ReplicaLocation::Managed {
2360 size,
2361 availability_zone,
2362 billed_as,
2363 internal,
2364 pending,
2365 } => {
2366 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2367 let message = "tried concretize managed replica with specific availability zones and availability zone";
2368 return Err(Error {
2369 kind: ErrorKind::Internal(message.to_string()),
2370 });
2371 }
2372 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2373 let cluster_replica_sizes = &self.cluster_replica_sizes;
2374
2375 ReplicaLocation::Managed(ManagedReplicaLocation {
2376 allocation: cluster_replica_sizes
2377 .0
2378 .get(&size)
2379 .expect("catalog out of sync")
2380 .clone(),
2381 availability_zones: match (availability_zone, allowed_availability_zones) {
2382 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2383 (None, Some(azs)) if azs.is_empty() => {
2384 ManagedReplicaAvailabilityZones::FromCluster(None)
2385 }
2386 (None, Some(azs)) => {
2387 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2388 }
2389 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2390 },
2391 size,
2392 billed_as,
2393 internal,
2394 pending,
2395 })
2396 }
2397 };
2398 Ok(location)
2399 }
2400
2401 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2412 let alloc = &self.cluster_replica_sizes.0[size];
2413 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2414 }
2415
2416 pub(crate) fn ensure_valid_replica_size(
2417 &self,
2418 allowed_sizes: &[String],
2419 size: &String,
2420 ) -> Result<(), Error> {
2421 let cluster_replica_sizes = &self.cluster_replica_sizes;
2422
2423 if !cluster_replica_sizes.0.contains_key(size)
2424 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2425 || cluster_replica_sizes.0[size].disabled
2426 {
2427 let mut entries = cluster_replica_sizes
2428 .enabled_allocations()
2429 .collect::<Vec<_>>();
2430
2431 if !allowed_sizes.is_empty() {
2432 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2433 entries.retain(|(name, _)| allowed_sizes.contains(name));
2434 }
2435
2436 entries.sort_by_key(
2437 |(
2438 _name,
2439 ReplicaAllocation {
2440 scale, cpu_limit, ..
2441 },
2442 )| (scale, cpu_limit),
2443 );
2444
2445 Err(Error {
2446 kind: ErrorKind::InvalidClusterReplicaSize {
2447 size: size.to_owned(),
2448 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2449 },
2450 })
2451 } else {
2452 Ok(())
2453 }
2454 }
2455
2456 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2457 if role_id.is_builtin() {
2458 let role = self.get_role(role_id);
2459 Err(Error::new(ErrorKind::ReservedRoleName(
2460 role.name().to_string(),
2461 )))
2462 } else {
2463 Ok(())
2464 }
2465 }
2466
2467 pub fn ensure_not_reserved_network_policy(
2468 &self,
2469 network_policy_id: &NetworkPolicyId,
2470 ) -> Result<(), Error> {
2471 if network_policy_id.is_builtin() {
2472 let policy = self.get_network_policy(network_policy_id);
2473 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2474 policy.name.clone(),
2475 )))
2476 } else {
2477 Ok(())
2478 }
2479 }
2480
2481 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2482 let is_grantable = !role_id.is_public() && !role_id.is_system();
2483 if is_grantable {
2484 Ok(())
2485 } else {
2486 let role = self.get_role(role_id);
2487 Err(Error::new(ErrorKind::UngrantableRoleName(
2488 role.name().to_string(),
2489 )))
2490 }
2491 }
2492
2493 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2494 if role_id.is_system() {
2495 let role = self.get_role(role_id);
2496 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2497 role.name().to_string(),
2498 )))
2499 } else {
2500 Ok(())
2501 }
2502 }
2503
2504 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2505 if role_id.is_predefined() {
2506 let role = self.get_role(role_id);
2507 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2508 role.name().to_string(),
2509 )))
2510 } else {
2511 Ok(())
2512 }
2513 }
2514
2515 pub(crate) fn add_to_audit_log(
2518 system_configuration: &SystemVars,
2519 oracle_write_ts: mz_repr::Timestamp,
2520 session: Option<&ConnMeta>,
2521 tx: &mut mz_catalog::durable::Transaction,
2522 audit_events: &mut Vec<VersionedEvent>,
2523 event_type: EventType,
2524 object_type: ObjectType,
2525 details: EventDetails,
2526 ) -> Result<(), Error> {
2527 let user = session.map(|session| session.user().name.to_string());
2528
2529 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2532 Some(ts) => ts.into(),
2533 _ => oracle_write_ts.into(),
2534 };
2535 let id = tx.allocate_audit_log_id()?;
2536 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2537 audit_events.push(event.clone());
2538 tx.insert_audit_log_event(event);
2539 Ok(())
2540 }
2541
2542 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2543 match id {
2544 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2545 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2546 self.get_cluster_replica(*cluster_id, *replica_id)
2547 .owner_id(),
2548 ),
2549 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2550 ObjectId::Schema((database_spec, schema_spec)) => Some(
2551 self.get_schema(database_spec, schema_spec, conn_id)
2552 .owner_id(),
2553 ),
2554 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2555 ObjectId::Role(_) => None,
2556 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2557 }
2558 }
2559
2560 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2561 match object_id {
2562 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2563 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2564 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2565 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2566 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2567 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2568 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2569 }
2570 }
2571
2572 pub(super) fn get_system_object_type(
2573 &self,
2574 id: &SystemObjectId,
2575 ) -> mz_sql::catalog::SystemObjectType {
2576 match id {
2577 SystemObjectId::Object(object_id) => {
2578 SystemObjectType::Object(self.get_object_type(object_id))
2579 }
2580 SystemObjectId::System => SystemObjectType::System,
2581 }
2582 }
2583
2584 pub fn storage_metadata(&self) -> &StorageMetadata {
2588 &self.storage_metadata
2589 }
2590
2591 pub fn source_compaction_windows(
2593 &self,
2594 ids: impl IntoIterator<Item = CatalogItemId>,
2595 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2596 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2597 let mut seen = BTreeSet::new();
2598 for item_id in ids {
2599 if !seen.insert(item_id) {
2600 continue;
2601 }
2602 let entry = self.get_entry(&item_id);
2603 match entry.item() {
2604 CatalogItem::Source(source) => {
2605 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2606 match source.data_source {
2607 DataSourceDesc::Ingestion { .. }
2608 | DataSourceDesc::OldSyntaxIngestion { .. }
2609 | DataSourceDesc::IngestionExport { .. } => {
2610 cws.entry(source_cw).or_default().insert(item_id);
2611 }
2612 DataSourceDesc::Introspection(_)
2613 | DataSourceDesc::Progress
2614 | DataSourceDesc::Webhook { .. } => {
2615 cws.entry(source_cw).or_default().insert(item_id);
2616 }
2617 }
2618 }
2619 CatalogItem::Table(table) => {
2620 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2621 match &table.data_source {
2622 TableDataSource::DataSource {
2623 desc: DataSourceDesc::IngestionExport { .. },
2624 timeline: _,
2625 } => {
2626 cws.entry(table_cw).or_default().insert(item_id);
2627 }
2628 _ => {}
2629 }
2630 }
2631 _ => {
2632 continue;
2634 }
2635 }
2636 }
2637 cws
2638 }
2639
2640 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2641 match id {
2642 CommentObjectId::Table(id)
2643 | CommentObjectId::View(id)
2644 | CommentObjectId::MaterializedView(id)
2645 | CommentObjectId::Source(id)
2646 | CommentObjectId::Sink(id)
2647 | CommentObjectId::Index(id)
2648 | CommentObjectId::Func(id)
2649 | CommentObjectId::Connection(id)
2650 | CommentObjectId::Type(id)
2651 | CommentObjectId::Secret(id)
2652 | CommentObjectId::ContinualTask(id) => Some(*id),
2653 CommentObjectId::Role(_)
2654 | CommentObjectId::Database(_)
2655 | CommentObjectId::Schema(_)
2656 | CommentObjectId::Cluster(_)
2657 | CommentObjectId::ClusterReplica(_)
2658 | CommentObjectId::NetworkPolicy(_) => None,
2659 }
2660 }
2661
2662 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2663 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2664 }
2665
2666 pub fn comment_id_to_audit_log_name(
2667 &self,
2668 id: CommentObjectId,
2669 conn_id: &ConnectionId,
2670 ) -> String {
2671 match id {
2672 CommentObjectId::Table(id)
2673 | CommentObjectId::View(id)
2674 | CommentObjectId::MaterializedView(id)
2675 | CommentObjectId::Source(id)
2676 | CommentObjectId::Sink(id)
2677 | CommentObjectId::Index(id)
2678 | CommentObjectId::Func(id)
2679 | CommentObjectId::Connection(id)
2680 | CommentObjectId::Type(id)
2681 | CommentObjectId::Secret(id)
2682 | CommentObjectId::ContinualTask(id) => {
2683 let item = self.get_entry(&id);
2684 let name = self.resolve_full_name(item.name(), Some(conn_id));
2685 name.to_string()
2686 }
2687 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2688 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2689 CommentObjectId::Schema((spec, schema_id)) => {
2690 let schema = self.get_schema(&spec, &schema_id, conn_id);
2691 self.resolve_full_schema_name(&schema.name).to_string()
2692 }
2693 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2694 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2695 let cluster = self.get_cluster(cluster_id);
2696 let replica = self.get_cluster_replica(cluster_id, replica_id);
2697 QualifiedReplica {
2698 cluster: Ident::new_unchecked(cluster.name.clone()),
2699 replica: Ident::new_unchecked(replica.name.clone()),
2700 }
2701 .to_string()
2702 }
2703 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2704 }
2705 }
2706
2707 pub fn mock_authentication_nonce(&self) -> String {
2708 self.mock_authentication_nonce.clone().unwrap_or_default()
2709 }
2710}
2711
2712impl ConnectionResolver for CatalogState {
2713 fn resolve_connection(
2714 &self,
2715 id: CatalogItemId,
2716 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2717 use mz_storage_types::connections::Connection::*;
2718 match self
2719 .get_entry(&id)
2720 .connection()
2721 .expect("catalog out of sync")
2722 .details
2723 .to_connection()
2724 {
2725 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2726 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2727 Csr(conn) => Csr(conn.into_inline_connection(self)),
2728 Ssh(conn) => Ssh(conn),
2729 Aws(conn) => Aws(conn),
2730 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2731 MySql(conn) => MySql(conn.into_inline_connection(self)),
2732 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2733 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2734 }
2735 }
2736}
2737
2738impl OptimizerCatalog for CatalogState {
2739 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2740 CatalogState::get_entry_by_global_id(self, id)
2741 }
2742 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2743 CatalogState::get_entry(self, id)
2744 }
2745 fn resolve_full_name(
2746 &self,
2747 name: &QualifiedItemName,
2748 conn_id: Option<&ConnectionId>,
2749 ) -> FullItemName {
2750 CatalogState::resolve_full_name(self, name, conn_id)
2751 }
2752 fn get_indexes_on(
2753 &self,
2754 id: GlobalId,
2755 cluster: ClusterId,
2756 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2757 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2758 }
2759}
2760
2761impl OptimizerCatalog for Catalog {
2762 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2763 self.state.get_entry_by_global_id(id)
2764 }
2765
2766 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2767 self.state.get_entry(id)
2768 }
2769
2770 fn resolve_full_name(
2771 &self,
2772 name: &QualifiedItemName,
2773 conn_id: Option<&ConnectionId>,
2774 ) -> FullItemName {
2775 self.state.resolve_full_name(name, conn_id)
2776 }
2777
2778 fn get_indexes_on(
2779 &self,
2780 id: GlobalId,
2781 cluster: ClusterId,
2782 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2783 Box::new(self.state.get_indexes_on(id, cluster))
2784 }
2785}
2786
2787impl Catalog {
2788 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2789 self
2790 }
2791}