1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71 TypeReference,
72};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::ConnMeta;
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
119 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
121 #[serde(serialize_with = "skip_temp_items")]
122 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
125 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
128 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
131 pub(super) roles_by_name: BTreeMap<String, RoleId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
134 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
135 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
137 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
139
140 #[serde(skip)]
141 pub(super) system_configuration: SystemVars,
142 pub(super) default_privileges: DefaultPrivileges,
143 pub(super) system_privileges: PrivilegeMap,
144 pub(super) comments: CommentsMap,
145 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
147 pub(super) storage_metadata: StorageMetadata,
148 pub(super) mock_authentication_nonce: Option<String>,
149
150 #[serde(skip)]
152 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
153
154 #[serde(skip)]
156 pub(super) config: mz_sql::catalog::CatalogConfig,
157 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158 #[serde(skip)]
159 pub(crate) availability_zones: Vec<String>,
160
161 #[serde(skip)]
163 pub(super) egress_addresses: Vec<IpNet>,
164 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166 pub(super) http_host_name: Option<String>,
167
168 #[serde(skip)]
170 pub(super) license_key: ValidatedLicenseKey,
171}
172
173#[derive(Debug, Clone, Serialize)]
175pub(crate) enum LocalExpressionCache {
176 Open {
178 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
180 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
182 },
183 Closed,
185}
186
187impl LocalExpressionCache {
188 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
189 Self::Open {
190 cached_exprs,
191 uncached_exprs: BTreeMap::new(),
192 }
193 }
194
195 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
196 match self {
197 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
198 LocalExpressionCache::Closed => None,
199 }
200 }
201
202 pub(super) fn insert_cached_expression(
205 &mut self,
206 id: GlobalId,
207 local_expressions: LocalExpressions,
208 ) {
209 match self {
210 LocalExpressionCache::Open { cached_exprs, .. } => {
211 cached_exprs.insert(id, local_expressions);
212 }
213 LocalExpressionCache::Closed => {}
214 }
215 }
216
217 pub(super) fn insert_uncached_expression(
220 &mut self,
221 id: GlobalId,
222 local_mir: OptimizedMirRelationExpr,
223 optimizer_features: OptimizerFeatures,
224 ) {
225 match self {
226 LocalExpressionCache::Open { uncached_exprs, .. } => {
227 let local_expr = LocalExpressions {
228 local_mir,
229 optimizer_features,
230 };
231 let prev = uncached_exprs.remove(&id);
238 match prev {
239 Some(prev) if prev == local_expr => {
240 uncached_exprs.insert(id, local_expr);
241 }
242 None => {
243 uncached_exprs.insert(id, local_expr);
244 }
245 Some(_) => {}
246 }
247 }
248 LocalExpressionCache::Closed => {}
249 }
250 }
251
252 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
253 match self {
254 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
255 LocalExpressionCache::Closed => BTreeMap::new(),
256 }
257 }
258}
259
260fn skip_temp_items<S>(
261 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
262 serializer: S,
263) -> Result<S::Ok, S::Error>
264where
265 S: serde::Serializer,
266{
267 mz_ore::serde::map_key_to_string(
268 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
269 serializer,
270 )
271}
272
273impl CatalogState {
274 pub fn empty_test() -> Self {
278 CatalogState {
279 database_by_name: Default::default(),
280 database_by_id: Default::default(),
281 entry_by_id: Default::default(),
282 entry_by_global_id: Default::default(),
283 ambient_schemas_by_name: Default::default(),
284 ambient_schemas_by_id: Default::default(),
285 temporary_schemas: Default::default(),
286 clusters_by_id: Default::default(),
287 clusters_by_name: Default::default(),
288 network_policies_by_name: Default::default(),
289 roles_by_name: Default::default(),
290 roles_by_id: Default::default(),
291 network_policies_by_id: Default::default(),
292 role_auth_by_id: Default::default(),
293 config: CatalogConfig {
294 start_time: Default::default(),
295 start_instant: Instant::now(),
296 nonce: Default::default(),
297 environment_id: EnvironmentId::for_tests(),
298 session_id: Default::default(),
299 build_info: &DUMMY_BUILD_INFO,
300 timestamp_interval: Default::default(),
301 now: NOW_ZERO.clone(),
302 connection_context: ConnectionContext::for_tests(Arc::new(
303 InMemorySecretsController::new(),
304 )),
305 builtins_cfg: BuiltinsConfig {
306 include_continual_tasks: true,
307 },
308 helm_chart_version: None,
309 },
310 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
311 availability_zones: Default::default(),
312 system_configuration: Default::default(),
313 egress_addresses: Default::default(),
314 aws_principal_context: Default::default(),
315 aws_privatelink_availability_zones: Default::default(),
316 http_host_name: Default::default(),
317 default_privileges: Default::default(),
318 system_privileges: Default::default(),
319 comments: Default::default(),
320 source_references: Default::default(),
321 storage_metadata: Default::default(),
322 license_key: ValidatedLicenseKey::for_tests(),
323 mock_authentication_nonce: Default::default(),
324 }
325 }
326
327 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
328 let search_path = self.resolve_search_path(session);
329 let database = self
330 .database_by_name
331 .get(session.vars().database())
332 .map(|id| id.clone());
333 let state = match session.transaction().catalog_state() {
334 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
335 None => Cow::Borrowed(self),
336 };
337 ConnCatalog {
338 state,
339 unresolvable_ids: BTreeSet::new(),
340 conn_id: session.conn_id().clone(),
341 cluster: session.vars().cluster().into(),
342 database,
343 search_path,
344 role_id: session.current_role_id().clone(),
345 prepared_statements: Some(session.prepared_statements()),
346 portals: Some(session.portals()),
347 notices_tx: session.retain_notice_transmitter(),
348 }
349 }
350
351 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
352 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
353 let cluster = self.system_configuration.default_cluster();
354
355 ConnCatalog {
356 state: Cow::Borrowed(self),
357 unresolvable_ids: BTreeSet::new(),
358 conn_id: SYSTEM_CONN_ID.clone(),
359 cluster,
360 database: self
361 .resolve_database(DEFAULT_DATABASE_NAME)
362 .ok()
363 .map(|db| db.id()),
364 search_path: Vec::new(),
367 role_id,
368 prepared_statements: None,
369 portals: None,
370 notices_tx,
371 }
372 }
373
374 pub fn for_system_session(&self) -> ConnCatalog<'_> {
375 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
376 }
377
378 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
383 struct I<'a> {
384 queue: VecDeque<CatalogItemId>,
385 seen: BTreeSet<CatalogItemId>,
386 this: &'a CatalogState,
387 }
388 impl<'a> Iterator for I<'a> {
389 type Item = CatalogItemId;
390 fn next(&mut self) -> Option<Self::Item> {
391 if let Some(next) = self.queue.pop_front() {
392 for child in self.this.get_entry(&next).item().uses() {
393 if !self.seen.contains(&child) {
394 self.queue.push_back(child);
395 self.seen.insert(child);
396 }
397 }
398 Some(next)
399 } else {
400 None
401 }
402 }
403 }
404
405 I {
406 queue: [id].into_iter().collect(),
407 seen: [id].into_iter().collect(),
408 this: self,
409 }
410 }
411
412 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
415 let mut out = Vec::new();
416 self.introspection_dependencies_inner(id, &mut out);
417 out
418 }
419
420 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
421 match self.get_entry(&id).item() {
422 CatalogItem::Log(_) => out.push(id),
423 item @ (CatalogItem::View(_)
424 | CatalogItem::MaterializedView(_)
425 | CatalogItem::Connection(_)
426 | CatalogItem::ContinualTask(_)) => {
427 for item_id in item.references().items() {
429 self.introspection_dependencies_inner(*item_id, out);
430 }
431 }
432 CatalogItem::Sink(sink) => {
433 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
434 self.introspection_dependencies_inner(from_item_id, out)
435 }
436 CatalogItem::Index(idx) => {
437 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
438 self.introspection_dependencies_inner(on_item_id, out)
439 }
440 CatalogItem::Table(_)
441 | CatalogItem::Source(_)
442 | CatalogItem::Type(_)
443 | CatalogItem::Func(_)
444 | CatalogItem::Secret(_) => (),
445 }
446 }
447
448 pub(super) fn object_dependents(
454 &self,
455 object_ids: &Vec<ObjectId>,
456 conn_id: &ConnectionId,
457 seen: &mut BTreeSet<ObjectId>,
458 ) -> Vec<ObjectId> {
459 let mut dependents = Vec::new();
460 for object_id in object_ids {
461 match object_id {
462 ObjectId::Cluster(id) => {
463 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
464 }
465 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
466 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
467 ),
468 ObjectId::Database(id) => {
469 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
470 }
471 ObjectId::Schema((database_spec, schema_spec)) => {
472 dependents.extend_from_slice(&self.schema_dependents(
473 database_spec.clone(),
474 schema_spec.clone(),
475 conn_id,
476 seen,
477 ));
478 }
479 ObjectId::NetworkPolicy(id) => {
480 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
481 }
482 id @ ObjectId::Role(_) => {
483 let unseen = seen.insert(id.clone());
484 if unseen {
485 dependents.push(id.clone());
486 }
487 }
488 ObjectId::Item(id) => {
489 dependents.extend_from_slice(&self.item_dependents(*id, seen))
490 }
491 }
492 }
493 dependents
494 }
495
496 fn cluster_dependents(
503 &self,
504 cluster_id: ClusterId,
505 seen: &mut BTreeSet<ObjectId>,
506 ) -> Vec<ObjectId> {
507 let mut dependents = Vec::new();
508 let object_id = ObjectId::Cluster(cluster_id);
509 if !seen.contains(&object_id) {
510 seen.insert(object_id.clone());
511 let cluster = self.get_cluster(cluster_id);
512 for item_id in cluster.bound_objects() {
513 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
514 }
515 for replica_id in cluster.replica_ids().values() {
516 dependents.extend_from_slice(&self.cluster_replica_dependents(
517 cluster_id,
518 *replica_id,
519 seen,
520 ));
521 }
522 dependents.push(object_id);
523 }
524 dependents
525 }
526
527 pub(super) fn cluster_replica_dependents(
534 &self,
535 cluster_id: ClusterId,
536 replica_id: ReplicaId,
537 seen: &mut BTreeSet<ObjectId>,
538 ) -> Vec<ObjectId> {
539 let mut dependents = Vec::new();
540 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
541 if !seen.contains(&object_id) {
542 seen.insert(object_id.clone());
543 dependents.push(object_id);
544 }
545 dependents
546 }
547
548 fn database_dependents(
555 &self,
556 database_id: DatabaseId,
557 conn_id: &ConnectionId,
558 seen: &mut BTreeSet<ObjectId>,
559 ) -> Vec<ObjectId> {
560 let mut dependents = Vec::new();
561 let object_id = ObjectId::Database(database_id);
562 if !seen.contains(&object_id) {
563 seen.insert(object_id.clone());
564 let database = self.get_database(&database_id);
565 for schema_id in database.schema_ids().values() {
566 dependents.extend_from_slice(&self.schema_dependents(
567 ResolvedDatabaseSpecifier::Id(database_id),
568 SchemaSpecifier::Id(*schema_id),
569 conn_id,
570 seen,
571 ));
572 }
573 dependents.push(object_id);
574 }
575 dependents
576 }
577
578 fn schema_dependents(
585 &self,
586 database_spec: ResolvedDatabaseSpecifier,
587 schema_spec: SchemaSpecifier,
588 conn_id: &ConnectionId,
589 seen: &mut BTreeSet<ObjectId>,
590 ) -> Vec<ObjectId> {
591 let mut dependents = Vec::new();
592 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
593 if !seen.contains(&object_id) {
594 seen.insert(object_id.clone());
595 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
596 for item_id in schema.item_ids() {
597 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
598 }
599 dependents.push(object_id)
600 }
601 dependents
602 }
603
604 pub(super) fn item_dependents(
611 &self,
612 item_id: CatalogItemId,
613 seen: &mut BTreeSet<ObjectId>,
614 ) -> Vec<ObjectId> {
615 let mut dependents = Vec::new();
616 let object_id = ObjectId::Item(item_id);
617 if !seen.contains(&object_id) {
618 seen.insert(object_id.clone());
619 let entry = self.get_entry(&item_id);
620 for dependent_id in entry.used_by() {
621 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
622 }
623 dependents.push(object_id);
624 if let Some(progress_id) = entry.progress_id() {
628 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
629 }
630 }
631 dependents
632 }
633
634 pub(super) fn network_policy_dependents(
641 &self,
642 network_policy_id: NetworkPolicyId,
643 _seen: &mut BTreeSet<ObjectId>,
644 ) -> Vec<ObjectId> {
645 let object_id = ObjectId::NetworkPolicy(network_policy_id);
646 vec![object_id]
650 }
651
652 fn is_stable(&self, id: CatalogItemId) -> bool {
656 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
657 !self.is_unstable_schema_specifier(spec)
658 }
659
660 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
661 if self.system_config().unsafe_enable_unstable_dependencies() {
662 return Ok(());
663 }
664
665 let unstable_dependencies: Vec<_> = item
666 .references()
667 .items()
668 .filter(|id| !self.is_stable(**id))
669 .map(|id| self.get_entry(id).name().item.clone())
670 .collect();
671
672 if unstable_dependencies.is_empty() || item.is_temporary() {
676 Ok(())
677 } else {
678 let object_type = item.typ().to_string();
679 Err(Error {
680 kind: ErrorKind::UnstableDependency {
681 object_type,
682 unstable_dependencies,
683 },
684 })
685 }
686 }
687
688 pub fn resolve_full_name(
689 &self,
690 name: &QualifiedItemName,
691 conn_id: Option<&ConnectionId>,
692 ) -> FullItemName {
693 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
694
695 let database = match &name.qualifiers.database_spec {
696 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
697 ResolvedDatabaseSpecifier::Id(id) => {
698 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
699 }
700 };
701 let schema = self
702 .get_schema(
703 &name.qualifiers.database_spec,
704 &name.qualifiers.schema_spec,
705 conn_id,
706 )
707 .name()
708 .schema
709 .clone();
710 FullItemName {
711 database,
712 schema,
713 item: name.item.clone(),
714 }
715 }
716
717 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
718 let database = match &name.database {
719 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
720 ResolvedDatabaseSpecifier::Id(id) => {
721 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
722 }
723 };
724 FullSchemaName {
725 database,
726 schema: name.schema.clone(),
727 }
728 }
729
730 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
731 self.entry_by_id
732 .get(id)
733 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
734 }
735
736 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
737 let item_id = self
738 .entry_by_global_id
739 .get(id)
740 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
741
742 let entry = self.get_entry(item_id).clone();
743 let version = match entry.item() {
744 CatalogItem::Table(table) => {
745 let (version, _) = table
746 .collections
747 .iter()
748 .find(|(_verison, gid)| *gid == id)
749 .expect("version to exist");
750 RelationVersionSelector::Specific(*version)
751 }
752 _ => RelationVersionSelector::Latest,
753 };
754 CatalogCollectionEntry { entry, version }
755 }
756
757 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
758 self.entry_by_id.iter()
759 }
760
761 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
762 let schema = self
763 .temporary_schemas
764 .get(conn)
765 .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
766 schema.items.values().copied().map(ObjectId::from)
767 }
768
769 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
775 let mut res = None;
776 for schema_id in self.system_schema_ids() {
777 let schema = &self.ambient_schemas_by_id[&schema_id];
778 if let Some(global_id) = schema.types.get(name) {
779 match res {
780 None => res = Some(self.get_entry(global_id)),
781 Some(_) => panic!(
782 "only call get_system_type on objects uniquely identifiable in one system schema"
783 ),
784 }
785 }
786 }
787
788 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
789 }
790
791 pub fn get_item_by_name(
792 &self,
793 name: &QualifiedItemName,
794 conn_id: &ConnectionId,
795 ) -> Option<&CatalogEntry> {
796 self.get_schema(
797 &name.qualifiers.database_spec,
798 &name.qualifiers.schema_spec,
799 conn_id,
800 )
801 .items
802 .get(&name.item)
803 .and_then(|id| self.try_get_entry(id))
804 }
805
806 pub fn get_type_by_name(
807 &self,
808 name: &QualifiedItemName,
809 conn_id: &ConnectionId,
810 ) -> Option<&CatalogEntry> {
811 self.get_schema(
812 &name.qualifiers.database_spec,
813 &name.qualifiers.schema_spec,
814 conn_id,
815 )
816 .types
817 .get(&name.item)
818 .and_then(|id| self.try_get_entry(id))
819 }
820
821 pub(super) fn find_available_name(
822 &self,
823 mut name: QualifiedItemName,
824 conn_id: &ConnectionId,
825 ) -> QualifiedItemName {
826 let mut i = 0;
827 let orig_item_name = name.item.clone();
828 while self.get_item_by_name(&name, conn_id).is_some() {
829 i += 1;
830 name.item = format!("{}{}", orig_item_name, i);
831 }
832 name
833 }
834
835 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
836 self.entry_by_id.get(id)
837 }
838
839 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
840 let item_id = self.entry_by_global_id.get(id)?;
841 self.try_get_entry(item_id)
842 }
843
844 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
847 let entry = self.try_get_entry_by_global_id(id)?;
848 let desc = match entry.item() {
849 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
850 other => other.desc_opt(RelationVersionSelector::Latest)?,
852 };
853 Some(desc)
854 }
855
856 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
857 self.try_get_cluster(cluster_id)
858 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
859 }
860
861 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
862 self.clusters_by_id.get(&cluster_id)
863 }
864
865 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
866 self.roles_by_id.get(id)
867 }
868
869 pub fn get_role(&self, id: &RoleId) -> &Role {
870 self.roles_by_id.get(id).expect("catalog out of sync")
871 }
872
873 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
874 self.roles_by_id.keys()
875 }
876
877 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
878 self.roles_by_name
879 .get(role_name)
880 .map(|id| &self.roles_by_id[id])
881 }
882
883 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
884 self.role_auth_by_id
885 .get(id)
886 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
887 }
888
889 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
890 self.role_auth_by_id.get(id)
891 }
892
893 pub(super) fn try_get_network_policy_by_name(
894 &self,
895 policy_name: &str,
896 ) -> Option<&NetworkPolicy> {
897 self.network_policies_by_name
898 .get(policy_name)
899 .map(|id| &self.network_policies_by_id[id])
900 }
901
902 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
903 let mut membership = BTreeSet::new();
904 let mut queue = VecDeque::from(vec![id]);
905 while let Some(cur_id) = queue.pop_front() {
906 if !membership.contains(cur_id) {
907 membership.insert(cur_id.clone());
908 let role = self.get_role(cur_id);
909 soft_assert_no_log!(
910 !role.membership().keys().contains(id),
911 "circular membership exists in the catalog"
912 );
913 queue.extend(role.membership().keys());
914 }
915 }
916 membership.insert(RoleId::Public);
917 membership
918 }
919
920 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
921 self.network_policies_by_id
922 .get(id)
923 .expect("catalog out of sync")
924 }
925
926 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
927 self.network_policies_by_id.keys()
928 }
929
930 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
935 let entry = self.try_get_entry(id)?;
936 let name = self.resolve_full_name(entry.name(), None);
938 let host_name = self
939 .http_host_name
940 .as_ref()
941 .map(|x| x.as_str())
942 .unwrap_or_else(|| "HOST");
943
944 let RawDatabaseSpecifier::Name(database) = name.database else {
945 return None;
946 };
947
948 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
949 url.path_segments_mut()
950 .ok()?
951 .push(&database)
952 .push(&name.schema)
953 .push(&name.item);
954
955 Some(url)
956 }
957
958 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
966 &mut self,
969 create_sql: &str,
970 force_if_exists_skip: bool,
971 ) -> Result<(Plan, ResolvedIds), AdapterError> {
972 self.with_enable_for_item_parsing(|state| {
973 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
974 let pcx = Some(&pcx);
975 let session_catalog = state.for_system_session();
976
977 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
978 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
979 let plan =
980 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
981
982 Ok((plan, resolved_ids))
983 })
984 }
985
986 #[mz_ore::instrument]
988 pub(crate) fn parse_plan(
989 create_sql: &str,
990 pcx: Option<&PlanContext>,
991 catalog: &ConnCatalog,
992 ) -> Result<(Plan, ResolvedIds), AdapterError> {
993 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
994 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
995 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
996
997 return Ok((plan, resolved_ids));
998 }
999
1000 pub(crate) fn deserialize_item(
1002 &self,
1003 global_id: GlobalId,
1004 create_sql: &str,
1005 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1006 local_expression_cache: &mut LocalExpressionCache,
1007 previous_item: Option<CatalogItem>,
1008 ) -> Result<CatalogItem, AdapterError> {
1009 self.parse_item(
1010 global_id,
1011 create_sql,
1012 extra_versions,
1013 None,
1014 false,
1015 None,
1016 local_expression_cache,
1017 previous_item,
1018 )
1019 }
1020
1021 #[mz_ore::instrument]
1023 pub(crate) fn parse_item(
1024 &self,
1025 global_id: GlobalId,
1026 create_sql: &str,
1027 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1028 pcx: Option<&PlanContext>,
1029 is_retained_metrics_object: bool,
1030 custom_logical_compaction_window: Option<CompactionWindow>,
1031 local_expression_cache: &mut LocalExpressionCache,
1032 previous_item: Option<CatalogItem>,
1033 ) -> Result<CatalogItem, AdapterError> {
1034 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1035 match self.parse_item_inner(
1036 global_id,
1037 create_sql,
1038 extra_versions,
1039 pcx,
1040 is_retained_metrics_object,
1041 custom_logical_compaction_window,
1042 cached_expr,
1043 previous_item,
1044 ) {
1045 Ok((item, uncached_expr)) => {
1046 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1047 local_expression_cache.insert_uncached_expression(
1048 global_id,
1049 uncached_expr,
1050 optimizer_features,
1051 );
1052 }
1053 Ok(item)
1054 }
1055 Err((err, cached_expr)) => {
1056 if let Some(local_expr) = cached_expr {
1057 local_expression_cache.insert_cached_expression(global_id, local_expr);
1058 }
1059 Err(err)
1060 }
1061 }
1062 }
1063
1064 #[mz_ore::instrument]
1071 pub(crate) fn parse_item_inner(
1072 &self,
1073 global_id: GlobalId,
1074 create_sql: &str,
1075 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1076 pcx: Option<&PlanContext>,
1077 is_retained_metrics_object: bool,
1078 custom_logical_compaction_window: Option<CompactionWindow>,
1079 cached_expr: Option<LocalExpressions>,
1080 previous_item: Option<CatalogItem>,
1081 ) -> Result<
1082 (
1083 CatalogItem,
1084 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1085 ),
1086 (AdapterError, Option<LocalExpressions>),
1087 > {
1088 let session_catalog = self.for_system_session();
1089
1090 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1091 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1092 Err(err) => return Err((err, cached_expr)),
1093 };
1094
1095 let mut uncached_expr = None;
1096
1097 let item = match plan {
1098 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1099 let collections = extra_versions
1100 .iter()
1101 .map(|(version, gid)| (*version, *gid))
1102 .chain([(RelationVersion::root(), global_id)].into_iter())
1103 .collect();
1104
1105 CatalogItem::Table(Table {
1106 create_sql: Some(table.create_sql),
1107 desc: table.desc,
1108 collections,
1109 conn_id: None,
1110 resolved_ids,
1111 custom_logical_compaction_window: custom_logical_compaction_window
1112 .or(table.compaction_window),
1113 is_retained_metrics_object,
1114 data_source: match table.data_source {
1115 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1116 TableDataSource::TableWrites { defaults }
1117 }
1118 mz_sql::plan::TableDataSource::DataSource {
1119 desc: data_source_desc,
1120 timeline,
1121 } => match data_source_desc {
1122 mz_sql::plan::DataSourceDesc::IngestionExport {
1123 ingestion_id,
1124 external_reference,
1125 details,
1126 data_config,
1127 } => TableDataSource::DataSource {
1128 desc: DataSourceDesc::IngestionExport {
1129 ingestion_id,
1130 external_reference,
1131 details,
1132 data_config,
1133 },
1134 timeline,
1135 },
1136 mz_sql::plan::DataSourceDesc::Webhook {
1137 validate_using,
1138 body_format,
1139 headers,
1140 cluster_id,
1141 } => TableDataSource::DataSource {
1142 desc: DataSourceDesc::Webhook {
1143 validate_using,
1144 body_format,
1145 headers,
1146 cluster_id: cluster_id
1147 .expect("Webhook Tables must have a cluster_id set"),
1148 },
1149 timeline,
1150 },
1151 _ => {
1152 return Err((
1153 AdapterError::Unstructured(anyhow::anyhow!(
1154 "unsupported data source for table"
1155 )),
1156 cached_expr,
1157 ));
1158 }
1159 },
1160 },
1161 })
1162 }
1163 Plan::CreateSource(CreateSourcePlan {
1164 source,
1165 timeline,
1166 in_cluster,
1167 ..
1168 }) => CatalogItem::Source(Source {
1169 create_sql: Some(source.create_sql),
1170 data_source: match source.data_source {
1171 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1172 desc,
1173 cluster_id: match in_cluster {
1174 Some(id) => id,
1175 None => {
1176 return Err((
1177 AdapterError::Unstructured(anyhow::anyhow!(
1178 "ingestion-based sources must have cluster specified"
1179 )),
1180 cached_expr,
1181 ));
1182 }
1183 },
1184 },
1185 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1186 desc,
1187 progress_subsource,
1188 data_config,
1189 details,
1190 } => DataSourceDesc::OldSyntaxIngestion {
1191 desc,
1192 progress_subsource,
1193 data_config,
1194 details,
1195 cluster_id: match in_cluster {
1196 Some(id) => id,
1197 None => {
1198 return Err((
1199 AdapterError::Unstructured(anyhow::anyhow!(
1200 "ingestion-based sources must have cluster specified"
1201 )),
1202 cached_expr,
1203 ));
1204 }
1205 },
1206 },
1207 mz_sql::plan::DataSourceDesc::IngestionExport {
1208 ingestion_id,
1209 external_reference,
1210 details,
1211 data_config,
1212 } => DataSourceDesc::IngestionExport {
1213 ingestion_id,
1214 external_reference,
1215 details,
1216 data_config,
1217 },
1218 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1219 mz_sql::plan::DataSourceDesc::Webhook {
1220 validate_using,
1221 body_format,
1222 headers,
1223 cluster_id,
1224 } => {
1225 mz_ore::soft_assert_or_log!(
1226 cluster_id.is_none(),
1227 "cluster_id set at Source level for Webhooks"
1228 );
1229 DataSourceDesc::Webhook {
1230 validate_using,
1231 body_format,
1232 headers,
1233 cluster_id: in_cluster
1234 .expect("webhook sources must use an existing cluster"),
1235 }
1236 }
1237 },
1238 desc: source.desc,
1239 global_id,
1240 timeline,
1241 resolved_ids,
1242 custom_logical_compaction_window: source
1243 .compaction_window
1244 .or(custom_logical_compaction_window),
1245 is_retained_metrics_object,
1246 }),
1247 Plan::CreateView(CreateViewPlan { view, .. }) => {
1248 let optimizer_config =
1250 optimize::OptimizerConfig::from(session_catalog.system_vars());
1251 let previous_exprs = previous_item.map(|item| match item {
1252 CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1253 _ => None,
1254 });
1255
1256 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1257 (Some(local_expr), _)
1258 if local_expr.optimizer_features == optimizer_config.features =>
1259 {
1260 debug!("local expression cache hit for {global_id:?}");
1261 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1262 }
1263 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1265 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1266 }
1267 (cached_expr, _) => {
1268 let optimizer_features = optimizer_config.features.clone();
1269 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1271
1272 let raw_expr = view.expr;
1274 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1275 Ok(optimzed_expr) => optimzed_expr,
1276 Err(err) => return Err((err.into(), cached_expr)),
1277 };
1278
1279 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1280
1281 (Arc::new(raw_expr), Arc::new(optimized_expr))
1282 }
1283 };
1284
1285 let dependencies: BTreeSet<_> = raw_expr
1287 .depends_on()
1288 .into_iter()
1289 .map(|gid| self.get_entry_by_global_id(&gid).id())
1290 .collect();
1291
1292 CatalogItem::View(View {
1293 create_sql: view.create_sql,
1294 global_id,
1295 raw_expr,
1296 desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1297 optimized_expr,
1298 conn_id: None,
1299 resolved_ids,
1300 dependencies: DependencyIds(dependencies),
1301 })
1302 }
1303 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1304 materialized_view, ..
1305 }) => {
1306 let collections = extra_versions
1307 .iter()
1308 .map(|(version, gid)| (*version, *gid))
1309 .chain([(RelationVersion::root(), global_id)].into_iter())
1310 .collect();
1311
1312 let optimizer_config =
1314 optimize::OptimizerConfig::from(session_catalog.system_vars());
1315 let previous_exprs = previous_item.map(|item| match item {
1316 CatalogItem::MaterializedView(materialized_view) => {
1317 (materialized_view.raw_expr, materialized_view.optimized_expr)
1318 }
1319 item => unreachable!("expected materialized view, found: {item:#?}"),
1320 });
1321
1322 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1323 (Some(local_expr), _)
1324 if local_expr.optimizer_features == optimizer_config.features =>
1325 {
1326 debug!("local expression cache hit for {global_id:?}");
1327 (
1328 Arc::new(materialized_view.expr),
1329 Arc::new(local_expr.local_mir),
1330 )
1331 }
1332 (_, Some((raw_expr, optimized_expr)))
1334 if *raw_expr == materialized_view.expr =>
1335 {
1336 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1337 }
1338 (cached_expr, _) => {
1339 let optimizer_features = optimizer_config.features.clone();
1340 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1342
1343 let raw_expr = materialized_view.expr;
1344 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1345 Ok(optimized_expr) => optimized_expr,
1346 Err(err) => return Err((err.into(), cached_expr)),
1347 };
1348
1349 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1350
1351 (Arc::new(raw_expr), Arc::new(optimized_expr))
1352 }
1353 };
1354 let mut typ = optimized_expr.typ();
1355 for &i in &materialized_view.non_null_assertions {
1356 typ.column_types[i].nullable = false;
1357 }
1358 let desc = RelationDesc::new(typ, materialized_view.column_names);
1359 let desc = VersionedRelationDesc::new(desc);
1360
1361 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1362
1363 let dependencies = raw_expr
1365 .depends_on()
1366 .into_iter()
1367 .map(|gid| self.get_entry_by_global_id(&gid).id())
1368 .collect();
1369
1370 CatalogItem::MaterializedView(MaterializedView {
1371 create_sql: materialized_view.create_sql,
1372 collections,
1373 raw_expr,
1374 optimized_expr,
1375 desc,
1376 resolved_ids,
1377 dependencies,
1378 cluster_id: materialized_view.cluster_id,
1379 non_null_assertions: materialized_view.non_null_assertions,
1380 custom_logical_compaction_window: materialized_view.compaction_window,
1381 refresh_schedule: materialized_view.refresh_schedule,
1382 initial_as_of,
1383 })
1384 }
1385 Plan::CreateContinualTask(plan) => {
1386 let ct =
1387 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1388 Ok(ct) => ct,
1389 Err(err) => return Err((err, cached_expr)),
1390 };
1391 CatalogItem::ContinualTask(ct)
1392 }
1393 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1394 create_sql: index.create_sql,
1395 global_id,
1396 on: index.on,
1397 keys: index.keys.into(),
1398 conn_id: None,
1399 resolved_ids,
1400 cluster_id: index.cluster_id,
1401 custom_logical_compaction_window: custom_logical_compaction_window
1402 .or(index.compaction_window),
1403 is_retained_metrics_object,
1404 }),
1405 Plan::CreateSink(CreateSinkPlan {
1406 sink,
1407 with_snapshot,
1408 in_cluster,
1409 ..
1410 }) => CatalogItem::Sink(Sink {
1411 create_sql: sink.create_sql,
1412 global_id,
1413 from: sink.from,
1414 connection: sink.connection,
1415 envelope: sink.envelope,
1416 version: sink.version,
1417 with_snapshot,
1418 resolved_ids,
1419 cluster_id: in_cluster,
1420 }),
1421 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1422 let desc = match typ.inner.desc(&session_catalog) {
1423 Ok(desc) => desc,
1424 Err(err) => return Err((err.into(), cached_expr)),
1425 };
1426 CatalogItem::Type(Type {
1427 create_sql: Some(typ.create_sql),
1428 global_id,
1429 desc,
1430 details: CatalogTypeDetails {
1431 array_id: None,
1432 typ: typ.inner,
1433 pg_metadata: None,
1434 },
1435 resolved_ids,
1436 })
1437 }
1438 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1439 create_sql: secret.create_sql,
1440 global_id,
1441 }),
1442 Plan::CreateConnection(CreateConnectionPlan {
1443 connection:
1444 mz_sql::plan::Connection {
1445 create_sql,
1446 details,
1447 },
1448 ..
1449 }) => CatalogItem::Connection(Connection {
1450 create_sql,
1451 global_id,
1452 details,
1453 resolved_ids,
1454 }),
1455 _ => {
1456 return Err((
1457 Error::new(ErrorKind::Corruption {
1458 detail: "catalog entry generated inappropriate plan".to_string(),
1459 })
1460 .into(),
1461 cached_expr,
1462 ));
1463 }
1464 };
1465
1466 Ok((item, uncached_expr))
1467 }
1468
1469 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1475 let restore = self.system_configuration.clone();
1486 self.system_configuration.enable_for_item_parsing();
1487 let res = f(self);
1488 self.system_configuration = restore;
1489 res
1490 }
1491
1492 pub fn get_indexes_on(
1494 &self,
1495 id: GlobalId,
1496 cluster: ClusterId,
1497 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1498 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1499
1500 self.try_get_entry_by_global_id(&id)
1501 .into_iter()
1502 .map(move |e| {
1503 e.used_by()
1504 .iter()
1505 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1506 CatalogItem::Index(index) if index_matches(index) => {
1507 Some((index.global_id(), index))
1508 }
1509 _ => None,
1510 })
1511 })
1512 .flatten()
1513 }
1514
1515 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1516 &self.database_by_id[database_id]
1517 }
1518
1519 pub(super) fn try_get_cluster_replica(
1524 &self,
1525 id: ClusterId,
1526 replica_id: ReplicaId,
1527 ) -> Option<&ClusterReplica> {
1528 self.try_get_cluster(id)
1529 .and_then(|cluster| cluster.replica(replica_id))
1530 }
1531
1532 pub(super) fn get_cluster_replica(
1536 &self,
1537 cluster_id: ClusterId,
1538 replica_id: ReplicaId,
1539 ) -> &ClusterReplica {
1540 self.try_get_cluster_replica(cluster_id, replica_id)
1541 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1542 }
1543
1544 pub(super) fn resolve_replica_in_cluster(
1545 &self,
1546 cluster_id: &ClusterId,
1547 replica_name: &str,
1548 ) -> Result<&ClusterReplica, SqlCatalogError> {
1549 let cluster = self.get_cluster(*cluster_id);
1550 let replica_id = cluster
1551 .replica_id_by_name_
1552 .get(replica_name)
1553 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1554 Ok(&cluster.replicas_by_id_[replica_id])
1555 }
1556
1557 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1559 Ok(self.system_configuration.get(name)?)
1560 }
1561
1562 pub(super) fn parse_system_configuration(
1566 &self,
1567 name: &str,
1568 value: VarInput,
1569 ) -> Result<String, Error> {
1570 let value = self.system_configuration.parse(name, value)?;
1571 Ok(value.format())
1572 }
1573
1574 pub(super) fn resolve_schema_in_database(
1576 &self,
1577 database_spec: &ResolvedDatabaseSpecifier,
1578 schema_name: &str,
1579 conn_id: &ConnectionId,
1580 ) -> Result<&Schema, SqlCatalogError> {
1581 let schema = match database_spec {
1582 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1583 self.temporary_schemas.get(conn_id)
1584 }
1585 ResolvedDatabaseSpecifier::Ambient => self
1586 .ambient_schemas_by_name
1587 .get(schema_name)
1588 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1589 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1590 db.schemas_by_name
1591 .get(schema_name)
1592 .and_then(|id| db.schemas_by_id.get(id))
1593 }),
1594 };
1595 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1596 }
1597
1598 pub fn get_schema(
1599 &self,
1600 database_spec: &ResolvedDatabaseSpecifier,
1601 schema_spec: &SchemaSpecifier,
1602 conn_id: &ConnectionId,
1603 ) -> &Schema {
1604 match (database_spec, schema_spec) {
1606 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1607 &self.temporary_schemas[conn_id]
1608 }
1609 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1610 &self.ambient_schemas_by_id[id]
1611 }
1612
1613 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1614 &self.database_by_id[database_id].schemas_by_id[schema_id]
1615 }
1616 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1617 unreachable!("temporary schemas are in the ambient database")
1618 }
1619 }
1620 }
1621
1622 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1623 self.database_by_id
1624 .values()
1625 .filter_map(|database| database.schemas_by_id.get(schema_id))
1626 .chain(self.ambient_schemas_by_id.values())
1627 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1628 .into_first()
1629 }
1630
1631 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1632 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1633 }
1634
1635 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1636 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1637 }
1638
1639 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1640 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1641 }
1642
1643 pub fn get_information_schema_id(&self) -> SchemaId {
1644 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1645 }
1646
1647 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1648 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1649 }
1650
1651 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1652 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1653 }
1654
1655 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1656 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1657 }
1658
1659 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1660 SYSTEM_SCHEMAS
1661 .iter()
1662 .map(|name| self.ambient_schemas_by_name[*name])
1663 }
1664
1665 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1666 self.system_schema_ids().contains(&id)
1667 }
1668
1669 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1670 match spec {
1671 SchemaSpecifier::Temporary => false,
1672 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1673 }
1674 }
1675
1676 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1677 UNSTABLE_SCHEMAS
1678 .iter()
1679 .map(|name| self.ambient_schemas_by_name[*name])
1680 }
1681
1682 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1683 self.unstable_schema_ids().contains(&id)
1684 }
1685
1686 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1687 match spec {
1688 SchemaSpecifier::Temporary => false,
1689 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1690 }
1691 }
1692
1693 pub fn create_temporary_schema(
1696 &mut self,
1697 conn_id: &ConnectionId,
1698 owner_id: RoleId,
1699 ) -> Result<(), Error> {
1700 let oid = INVALID_OID;
1705 self.temporary_schemas.insert(
1706 conn_id.clone(),
1707 Schema {
1708 name: QualifiedSchemaName {
1709 database: ResolvedDatabaseSpecifier::Ambient,
1710 schema: MZ_TEMP_SCHEMA.into(),
1711 },
1712 id: SchemaSpecifier::Temporary,
1713 oid,
1714 items: BTreeMap::new(),
1715 functions: BTreeMap::new(),
1716 types: BTreeMap::new(),
1717 owner_id,
1718 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1719 mz_sql::catalog::ObjectType::Schema,
1720 owner_id,
1721 )]),
1722 },
1723 );
1724 Ok(())
1725 }
1726
1727 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1729 std::iter::empty()
1730 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1731 if schema.id.is_temporary() {
1732 Some(schema.oid)
1733 } else {
1734 None
1735 }
1736 }))
1737 .chain(self.entry_by_id.values().filter_map(|entry| {
1738 if entry.item().is_temporary() {
1739 Some(entry.oid)
1740 } else {
1741 None
1742 }
1743 }))
1744 }
1745
1746 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1750 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1751 }
1752
1753 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1757 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1758 let log = match self.get_entry(&item_id).item() {
1759 CatalogItem::Log(log) => log,
1760 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1761 };
1762 (item_id, log.global_id)
1763 }
1764
1765 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1769 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1770 }
1771
1772 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1776 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1777 let schema = &self.ambient_schemas_by_id[schema_id];
1778 match builtin.catalog_item_type() {
1779 CatalogItemType::Type => schema.types[builtin.name()],
1780 CatalogItemType::Func => schema.functions[builtin.name()],
1781 CatalogItemType::Table
1782 | CatalogItemType::Source
1783 | CatalogItemType::Sink
1784 | CatalogItemType::View
1785 | CatalogItemType::MaterializedView
1786 | CatalogItemType::Index
1787 | CatalogItemType::Secret
1788 | CatalogItemType::Connection
1789 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1790 }
1791 }
1792
1793 pub fn resolve_builtin_type_references(
1795 &self,
1796 builtin: &BuiltinType<NameReference>,
1797 ) -> BuiltinType<IdReference> {
1798 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1799 CatalogType::AclItem => CatalogType::AclItem,
1800 CatalogType::Array { element_reference } => CatalogType::Array {
1801 element_reference: self.get_system_type(element_reference).id,
1802 },
1803 CatalogType::List {
1804 element_reference,
1805 element_modifiers,
1806 } => CatalogType::List {
1807 element_reference: self.get_system_type(element_reference).id,
1808 element_modifiers: element_modifiers.clone(),
1809 },
1810 CatalogType::Map {
1811 key_reference,
1812 value_reference,
1813 key_modifiers,
1814 value_modifiers,
1815 } => CatalogType::Map {
1816 key_reference: self.get_system_type(key_reference).id,
1817 value_reference: self.get_system_type(value_reference).id,
1818 key_modifiers: key_modifiers.clone(),
1819 value_modifiers: value_modifiers.clone(),
1820 },
1821 CatalogType::Range { element_reference } => CatalogType::Range {
1822 element_reference: self.get_system_type(element_reference).id,
1823 },
1824 CatalogType::Record { fields } => CatalogType::Record {
1825 fields: fields
1826 .into_iter()
1827 .map(|f| CatalogRecordField {
1828 name: f.name.clone(),
1829 type_reference: self.get_system_type(f.type_reference).id,
1830 type_modifiers: f.type_modifiers.clone(),
1831 })
1832 .collect(),
1833 },
1834 CatalogType::Bool => CatalogType::Bool,
1835 CatalogType::Bytes => CatalogType::Bytes,
1836 CatalogType::Char => CatalogType::Char,
1837 CatalogType::Date => CatalogType::Date,
1838 CatalogType::Float32 => CatalogType::Float32,
1839 CatalogType::Float64 => CatalogType::Float64,
1840 CatalogType::Int16 => CatalogType::Int16,
1841 CatalogType::Int32 => CatalogType::Int32,
1842 CatalogType::Int64 => CatalogType::Int64,
1843 CatalogType::UInt16 => CatalogType::UInt16,
1844 CatalogType::UInt32 => CatalogType::UInt32,
1845 CatalogType::UInt64 => CatalogType::UInt64,
1846 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1847 CatalogType::Interval => CatalogType::Interval,
1848 CatalogType::Jsonb => CatalogType::Jsonb,
1849 CatalogType::Numeric => CatalogType::Numeric,
1850 CatalogType::Oid => CatalogType::Oid,
1851 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1852 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1853 CatalogType::Pseudo => CatalogType::Pseudo,
1854 CatalogType::RegClass => CatalogType::RegClass,
1855 CatalogType::RegProc => CatalogType::RegProc,
1856 CatalogType::RegType => CatalogType::RegType,
1857 CatalogType::String => CatalogType::String,
1858 CatalogType::Time => CatalogType::Time,
1859 CatalogType::Timestamp => CatalogType::Timestamp,
1860 CatalogType::TimestampTz => CatalogType::TimestampTz,
1861 CatalogType::Uuid => CatalogType::Uuid,
1862 CatalogType::VarChar => CatalogType::VarChar,
1863 CatalogType::Int2Vector => CatalogType::Int2Vector,
1864 CatalogType::MzAclItem => CatalogType::MzAclItem,
1865 };
1866
1867 BuiltinType {
1868 name: builtin.name,
1869 schema: builtin.schema,
1870 oid: builtin.oid,
1871 details: CatalogTypeDetails {
1872 array_id: builtin.details.array_id,
1873 typ,
1874 pg_metadata: builtin.details.pg_metadata.clone(),
1875 },
1876 }
1877 }
1878
1879 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1880 &self.config
1881 }
1882
1883 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1884 match self.database_by_name.get(database_name) {
1885 Some(id) => Ok(&self.database_by_id[id]),
1886 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1887 }
1888 }
1889
1890 pub fn resolve_schema(
1891 &self,
1892 current_database: Option<&DatabaseId>,
1893 database_name: Option<&str>,
1894 schema_name: &str,
1895 conn_id: &ConnectionId,
1896 ) -> Result<&Schema, SqlCatalogError> {
1897 let database_spec = match database_name {
1898 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1903 self.resolve_database(database)?.id().clone(),
1904 )),
1905 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1906 };
1907
1908 if let Some(database_spec) = database_spec {
1910 if let Ok(schema) =
1911 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1912 {
1913 return Ok(schema);
1914 }
1915 }
1916
1917 if let Ok(schema) = self.resolve_schema_in_database(
1919 &ResolvedDatabaseSpecifier::Ambient,
1920 schema_name,
1921 conn_id,
1922 ) {
1923 return Ok(schema);
1924 }
1925
1926 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1927 }
1928
1929 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1933 self.ambient_schemas_by_name[name]
1934 }
1935
1936 pub fn resolve_search_path(
1937 &self,
1938 session: &dyn SessionMetadata,
1939 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1940 let database = self
1941 .database_by_name
1942 .get(session.database())
1943 .map(|id| id.clone());
1944
1945 session
1946 .search_path()
1947 .iter()
1948 .map(|schema| {
1949 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1950 })
1951 .filter_map(|schema| schema.ok())
1952 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1953 .collect()
1954 }
1955
1956 pub fn effective_search_path(
1957 &self,
1958 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1959 include_temp_schema: bool,
1960 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1961 let mut v = Vec::with_capacity(search_path.len() + 3);
1962 let temp_schema = (
1964 ResolvedDatabaseSpecifier::Ambient,
1965 SchemaSpecifier::Temporary,
1966 );
1967 if include_temp_schema && !search_path.contains(&temp_schema) {
1968 v.push(temp_schema);
1969 }
1970 let default_schemas = [
1971 (
1972 ResolvedDatabaseSpecifier::Ambient,
1973 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1974 ),
1975 (
1976 ResolvedDatabaseSpecifier::Ambient,
1977 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1978 ),
1979 ];
1980 for schema in default_schemas.into_iter() {
1981 if !search_path.contains(&schema) {
1982 v.push(schema);
1983 }
1984 }
1985 v.extend_from_slice(search_path);
1986 v
1987 }
1988
1989 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1990 let id = self
1991 .clusters_by_name
1992 .get(name)
1993 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1994 Ok(&self.clusters_by_id[id])
1995 }
1996
1997 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1998 let id = self
1999 .clusters_by_name
2000 .get(cluster.name)
2001 .expect("failed to lookup BuiltinCluster by name");
2002 self.clusters_by_id
2003 .get(id)
2004 .expect("failed to lookup BuiltinCluster by ID")
2005 }
2006
2007 pub fn resolve_cluster_replica(
2008 &self,
2009 cluster_replica_name: &QualifiedReplica,
2010 ) -> Result<&ClusterReplica, SqlCatalogError> {
2011 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2012 let replica_name = cluster_replica_name.replica.as_str();
2013 let replica_id = cluster
2014 .replica_id(replica_name)
2015 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2016 Ok(cluster.replica(replica_id).expect("Must exist"))
2017 }
2018
2019 #[allow(clippy::useless_let_if_seq)]
2025 pub fn resolve(
2026 &self,
2027 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2028 current_database: Option<&DatabaseId>,
2029 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2030 name: &PartialItemName,
2031 conn_id: &ConnectionId,
2032 err_gen: fn(String) -> SqlCatalogError,
2033 ) -> Result<&CatalogEntry, SqlCatalogError> {
2034 let schemas = match &name.schema {
2039 Some(schema_name) => {
2040 match self.resolve_schema(
2041 current_database,
2042 name.database.as_deref(),
2043 schema_name,
2044 conn_id,
2045 ) {
2046 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2047 Err(e) => return Err(e),
2048 }
2049 }
2050 None => match self
2051 .get_schema(
2052 &ResolvedDatabaseSpecifier::Ambient,
2053 &SchemaSpecifier::Temporary,
2054 conn_id,
2055 )
2056 .items
2057 .get(&name.item)
2058 {
2059 Some(id) => return Ok(self.get_entry(id)),
2060 None => search_path.to_vec(),
2061 },
2062 };
2063
2064 for (database_spec, schema_spec) in &schemas {
2065 let schema = self.get_schema(database_spec, schema_spec, conn_id);
2066
2067 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2068 return Ok(&self.entry_by_id[id]);
2069 }
2070 }
2071
2072 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2077 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2078 for schema_id in [
2079 self.get_mz_catalog_unstable_schema_id(),
2080 self.get_mz_introspection_schema_id(),
2081 ] {
2082 let schema = self.get_schema(
2083 &ResolvedDatabaseSpecifier::Ambient,
2084 &SchemaSpecifier::Id(schema_id),
2085 conn_id,
2086 );
2087
2088 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2089 debug!(
2090 github_27831 = true,
2091 "encountered use of outdated schema `mz_internal` for relation: {name}",
2092 );
2093 return Ok(&self.entry_by_id[id]);
2094 }
2095 }
2096 }
2097
2098 Err(err_gen(name.to_string()))
2099 }
2100
2101 pub fn resolve_entry(
2103 &self,
2104 current_database: Option<&DatabaseId>,
2105 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2106 name: &PartialItemName,
2107 conn_id: &ConnectionId,
2108 ) -> Result<&CatalogEntry, SqlCatalogError> {
2109 self.resolve(
2110 |schema| &schema.items,
2111 current_database,
2112 search_path,
2113 name,
2114 conn_id,
2115 SqlCatalogError::UnknownItem,
2116 )
2117 }
2118
2119 pub fn resolve_function(
2121 &self,
2122 current_database: Option<&DatabaseId>,
2123 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2124 name: &PartialItemName,
2125 conn_id: &ConnectionId,
2126 ) -> Result<&CatalogEntry, SqlCatalogError> {
2127 self.resolve(
2128 |schema| &schema.functions,
2129 current_database,
2130 search_path,
2131 name,
2132 conn_id,
2133 |name| SqlCatalogError::UnknownFunction {
2134 name,
2135 alternative: None,
2136 },
2137 )
2138 }
2139
2140 pub fn resolve_type(
2142 &self,
2143 current_database: Option<&DatabaseId>,
2144 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2145 name: &PartialItemName,
2146 conn_id: &ConnectionId,
2147 ) -> Result<&CatalogEntry, SqlCatalogError> {
2148 static NON_PG_CATALOG_TYPES: LazyLock<
2149 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2150 > = LazyLock::new(|| {
2151 BUILTINS::types()
2152 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2153 .map(|typ| (typ.name, typ))
2154 .collect()
2155 });
2156
2157 let entry = self.resolve(
2158 |schema| &schema.types,
2159 current_database,
2160 search_path,
2161 name,
2162 conn_id,
2163 |name| SqlCatalogError::UnknownType { name },
2164 )?;
2165
2166 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2167 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2168 warn!(
2169 "user specified an incorrect schema of {} for the type {}, which should be in \
2170 the {} schema. This works now due to a bug but will be fixed in a later release.",
2171 PG_CATALOG_SCHEMA.quoted(),
2172 typ.name.quoted(),
2173 typ.schema.quoted(),
2174 )
2175 }
2176 }
2177
2178 Ok(entry)
2179 }
2180
2181 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2183 match object_id {
2184 ObjectId::Item(item_id) => {
2185 let entry = self.get_entry(&item_id);
2186 match entry.item_type() {
2187 CatalogItemType::Table => CommentObjectId::Table(item_id),
2188 CatalogItemType::Source => CommentObjectId::Source(item_id),
2189 CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2190 CatalogItemType::View => CommentObjectId::View(item_id),
2191 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2192 CatalogItemType::Index => CommentObjectId::Index(item_id),
2193 CatalogItemType::Func => CommentObjectId::Func(item_id),
2194 CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2195 CatalogItemType::Type => CommentObjectId::Type(item_id),
2196 CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2197 CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2198 }
2199 }
2200 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2201 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2202 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2203 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2204 ObjectId::ClusterReplica(cluster_replica_id) => {
2205 CommentObjectId::ClusterReplica(cluster_replica_id)
2206 }
2207 ObjectId::NetworkPolicy(network_policy_id) => {
2208 CommentObjectId::NetworkPolicy(network_policy_id)
2209 }
2210 }
2211 }
2212
2213 pub fn system_config(&self) -> &SystemVars {
2215 &self.system_configuration
2216 }
2217
2218 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2220 &mut self.system_configuration
2221 }
2222
2223 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2233 let mut dump = serde_json::to_value(&self).map_err(|e| {
2235 Error::new(ErrorKind::Unstructured(format!(
2236 "internal error: could not dump catalog: {}",
2239 e
2240 )))
2241 })?;
2242
2243 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2244 dump_obj.insert(
2246 "system_parameter_defaults".into(),
2247 serde_json::json!(self.system_config().defaults()),
2248 );
2249 if let Some(unfinalized_shards) = unfinalized_shards {
2251 dump_obj
2252 .get_mut("storage_metadata")
2253 .expect("known to exist")
2254 .as_object_mut()
2255 .expect("storage_metadata is an object")
2256 .insert(
2257 "unfinalized_shards".into(),
2258 serde_json::json!(unfinalized_shards),
2259 );
2260 }
2261 let temporary_gids: Vec<_> = self
2266 .entry_by_global_id
2267 .iter()
2268 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2269 .map(|(gid, _item_id)| *gid)
2270 .collect();
2271 if !temporary_gids.is_empty() {
2272 let gids = dump_obj
2273 .get_mut("entry_by_global_id")
2274 .expect("known_to_exist")
2275 .as_object_mut()
2276 .expect("entry_by_global_id is an object");
2277 for gid in temporary_gids {
2278 gids.remove(&gid.to_string());
2279 }
2280 }
2281 dump_obj.remove("role_auth_by_id");
2284
2285 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2287 }
2288
2289 pub fn availability_zones(&self) -> &[String] {
2290 &self.availability_zones
2291 }
2292
2293 pub fn concretize_replica_location(
2294 &self,
2295 location: mz_catalog::durable::ReplicaLocation,
2296 allowed_sizes: &Vec<String>,
2297 allowed_availability_zones: Option<&[String]>,
2298 ) -> Result<ReplicaLocation, Error> {
2299 let location = match location {
2300 mz_catalog::durable::ReplicaLocation::Unmanaged {
2301 storagectl_addrs,
2302 computectl_addrs,
2303 } => {
2304 if allowed_availability_zones.is_some() {
2305 return Err(Error {
2306 kind: ErrorKind::Internal(
2307 "tried concretize unmanaged replica with specific availability_zones"
2308 .to_string(),
2309 ),
2310 });
2311 }
2312 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2313 storagectl_addrs,
2314 computectl_addrs,
2315 })
2316 }
2317 mz_catalog::durable::ReplicaLocation::Managed {
2318 size,
2319 availability_zone,
2320 billed_as,
2321 internal,
2322 pending,
2323 } => {
2324 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2325 let message = "tried concretize managed replica with specific availability zones and availability zone";
2326 return Err(Error {
2327 kind: ErrorKind::Internal(message.to_string()),
2328 });
2329 }
2330 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2331 let cluster_replica_sizes = &self.cluster_replica_sizes;
2332
2333 ReplicaLocation::Managed(ManagedReplicaLocation {
2334 allocation: cluster_replica_sizes
2335 .0
2336 .get(&size)
2337 .expect("catalog out of sync")
2338 .clone(),
2339 availability_zones: match (availability_zone, allowed_availability_zones) {
2340 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2341 (None, Some(azs)) if azs.is_empty() => {
2342 ManagedReplicaAvailabilityZones::FromCluster(None)
2343 }
2344 (None, Some(azs)) => {
2345 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2346 }
2347 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2348 },
2349 size,
2350 billed_as,
2351 internal,
2352 pending,
2353 })
2354 }
2355 };
2356 Ok(location)
2357 }
2358
2359 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2370 let alloc = &self.cluster_replica_sizes.0[size];
2371 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2372 }
2373
2374 pub(crate) fn ensure_valid_replica_size(
2375 &self,
2376 allowed_sizes: &[String],
2377 size: &String,
2378 ) -> Result<(), Error> {
2379 let cluster_replica_sizes = &self.cluster_replica_sizes;
2380
2381 if !cluster_replica_sizes.0.contains_key(size)
2382 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2383 || cluster_replica_sizes.0[size].disabled
2384 {
2385 let mut entries = cluster_replica_sizes
2386 .enabled_allocations()
2387 .collect::<Vec<_>>();
2388
2389 if !allowed_sizes.is_empty() {
2390 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2391 entries.retain(|(name, _)| allowed_sizes.contains(name));
2392 }
2393
2394 entries.sort_by_key(
2395 |(
2396 _name,
2397 ReplicaAllocation {
2398 scale, cpu_limit, ..
2399 },
2400 )| (scale, cpu_limit),
2401 );
2402
2403 Err(Error {
2404 kind: ErrorKind::InvalidClusterReplicaSize {
2405 size: size.to_owned(),
2406 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2407 },
2408 })
2409 } else {
2410 Ok(())
2411 }
2412 }
2413
2414 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2415 if role_id.is_builtin() {
2416 let role = self.get_role(role_id);
2417 Err(Error::new(ErrorKind::ReservedRoleName(
2418 role.name().to_string(),
2419 )))
2420 } else {
2421 Ok(())
2422 }
2423 }
2424
2425 pub fn ensure_not_reserved_network_policy(
2426 &self,
2427 network_policy_id: &NetworkPolicyId,
2428 ) -> Result<(), Error> {
2429 if network_policy_id.is_builtin() {
2430 let policy = self.get_network_policy(network_policy_id);
2431 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2432 policy.name.clone(),
2433 )))
2434 } else {
2435 Ok(())
2436 }
2437 }
2438
2439 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2440 let is_grantable = !role_id.is_public() && !role_id.is_system();
2441 if is_grantable {
2442 Ok(())
2443 } else {
2444 let role = self.get_role(role_id);
2445 Err(Error::new(ErrorKind::UngrantableRoleName(
2446 role.name().to_string(),
2447 )))
2448 }
2449 }
2450
2451 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2452 if role_id.is_system() {
2453 let role = self.get_role(role_id);
2454 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2455 role.name().to_string(),
2456 )))
2457 } else {
2458 Ok(())
2459 }
2460 }
2461
2462 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2463 if role_id.is_predefined() {
2464 let role = self.get_role(role_id);
2465 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2466 role.name().to_string(),
2467 )))
2468 } else {
2469 Ok(())
2470 }
2471 }
2472
2473 pub(crate) fn add_to_audit_log(
2476 system_configuration: &SystemVars,
2477 oracle_write_ts: mz_repr::Timestamp,
2478 session: Option<&ConnMeta>,
2479 tx: &mut mz_catalog::durable::Transaction,
2480 audit_events: &mut Vec<VersionedEvent>,
2481 event_type: EventType,
2482 object_type: ObjectType,
2483 details: EventDetails,
2484 ) -> Result<(), Error> {
2485 let user = session.map(|session| session.user().name.to_string());
2486
2487 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2490 Some(ts) => ts.into(),
2491 _ => oracle_write_ts.into(),
2492 };
2493 let id = tx.allocate_audit_log_id()?;
2494 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2495 audit_events.push(event.clone());
2496 tx.insert_audit_log_event(event);
2497 Ok(())
2498 }
2499
2500 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2501 match id {
2502 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2503 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2504 self.get_cluster_replica(*cluster_id, *replica_id)
2505 .owner_id(),
2506 ),
2507 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2508 ObjectId::Schema((database_spec, schema_spec)) => Some(
2509 self.get_schema(database_spec, schema_spec, conn_id)
2510 .owner_id(),
2511 ),
2512 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2513 ObjectId::Role(_) => None,
2514 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2515 }
2516 }
2517
2518 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2519 match object_id {
2520 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2521 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2522 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2523 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2524 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2525 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2526 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2527 }
2528 }
2529
2530 pub(super) fn get_system_object_type(
2531 &self,
2532 id: &SystemObjectId,
2533 ) -> mz_sql::catalog::SystemObjectType {
2534 match id {
2535 SystemObjectId::Object(object_id) => {
2536 SystemObjectType::Object(self.get_object_type(object_id))
2537 }
2538 SystemObjectId::System => SystemObjectType::System,
2539 }
2540 }
2541
2542 pub fn storage_metadata(&self) -> &StorageMetadata {
2546 &self.storage_metadata
2547 }
2548
2549 pub fn source_compaction_windows(
2551 &self,
2552 ids: impl IntoIterator<Item = CatalogItemId>,
2553 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2554 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2555 let mut seen = BTreeSet::new();
2556 for item_id in ids {
2557 if !seen.insert(item_id) {
2558 continue;
2559 }
2560 let entry = self.get_entry(&item_id);
2561 match entry.item() {
2562 CatalogItem::Source(source) => {
2563 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2564 match source.data_source {
2565 DataSourceDesc::Ingestion { .. }
2566 | DataSourceDesc::OldSyntaxIngestion { .. }
2567 | DataSourceDesc::IngestionExport { .. } => {
2568 cws.entry(source_cw).or_default().insert(item_id);
2569 }
2570 DataSourceDesc::Introspection(_)
2571 | DataSourceDesc::Progress
2572 | DataSourceDesc::Webhook { .. } => {
2573 cws.entry(source_cw).or_default().insert(item_id);
2574 }
2575 }
2576 }
2577 CatalogItem::Table(table) => {
2578 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2579 match &table.data_source {
2580 TableDataSource::DataSource {
2581 desc: DataSourceDesc::IngestionExport { .. },
2582 timeline: _,
2583 } => {
2584 cws.entry(table_cw).or_default().insert(item_id);
2585 }
2586 _ => {}
2587 }
2588 }
2589 _ => {
2590 continue;
2592 }
2593 }
2594 }
2595 cws
2596 }
2597
2598 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2599 match id {
2600 CommentObjectId::Table(id)
2601 | CommentObjectId::View(id)
2602 | CommentObjectId::MaterializedView(id)
2603 | CommentObjectId::Source(id)
2604 | CommentObjectId::Sink(id)
2605 | CommentObjectId::Index(id)
2606 | CommentObjectId::Func(id)
2607 | CommentObjectId::Connection(id)
2608 | CommentObjectId::Type(id)
2609 | CommentObjectId::Secret(id)
2610 | CommentObjectId::ContinualTask(id) => Some(*id),
2611 CommentObjectId::Role(_)
2612 | CommentObjectId::Database(_)
2613 | CommentObjectId::Schema(_)
2614 | CommentObjectId::Cluster(_)
2615 | CommentObjectId::ClusterReplica(_)
2616 | CommentObjectId::NetworkPolicy(_) => None,
2617 }
2618 }
2619
2620 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2621 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2622 }
2623
2624 pub fn comment_id_to_audit_log_name(
2625 &self,
2626 id: CommentObjectId,
2627 conn_id: &ConnectionId,
2628 ) -> String {
2629 match id {
2630 CommentObjectId::Table(id)
2631 | CommentObjectId::View(id)
2632 | CommentObjectId::MaterializedView(id)
2633 | CommentObjectId::Source(id)
2634 | CommentObjectId::Sink(id)
2635 | CommentObjectId::Index(id)
2636 | CommentObjectId::Func(id)
2637 | CommentObjectId::Connection(id)
2638 | CommentObjectId::Type(id)
2639 | CommentObjectId::Secret(id)
2640 | CommentObjectId::ContinualTask(id) => {
2641 let item = self.get_entry(&id);
2642 let name = self.resolve_full_name(item.name(), Some(conn_id));
2643 name.to_string()
2644 }
2645 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2646 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2647 CommentObjectId::Schema((spec, schema_id)) => {
2648 let schema = self.get_schema(&spec, &schema_id, conn_id);
2649 self.resolve_full_schema_name(&schema.name).to_string()
2650 }
2651 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2652 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2653 let cluster = self.get_cluster(cluster_id);
2654 let replica = self.get_cluster_replica(cluster_id, replica_id);
2655 QualifiedReplica {
2656 cluster: Ident::new_unchecked(cluster.name.clone()),
2657 replica: Ident::new_unchecked(replica.name.clone()),
2658 }
2659 .to_string()
2660 }
2661 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2662 }
2663 }
2664
2665 pub fn mock_authentication_nonce(&self) -> String {
2666 self.mock_authentication_nonce.clone().unwrap_or_default()
2667 }
2668}
2669
2670impl ConnectionResolver for CatalogState {
2671 fn resolve_connection(
2672 &self,
2673 id: CatalogItemId,
2674 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2675 use mz_storage_types::connections::Connection::*;
2676 match self
2677 .get_entry(&id)
2678 .connection()
2679 .expect("catalog out of sync")
2680 .details
2681 .to_connection()
2682 {
2683 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2684 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2685 Csr(conn) => Csr(conn.into_inline_connection(self)),
2686 Ssh(conn) => Ssh(conn),
2687 Aws(conn) => Aws(conn),
2688 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2689 MySql(conn) => MySql(conn.into_inline_connection(self)),
2690 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2691 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2692 }
2693 }
2694}
2695
2696impl OptimizerCatalog for CatalogState {
2697 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2698 CatalogState::get_entry_by_global_id(self, id)
2699 }
2700 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2701 CatalogState::get_entry(self, id)
2702 }
2703 fn resolve_full_name(
2704 &self,
2705 name: &QualifiedItemName,
2706 conn_id: Option<&ConnectionId>,
2707 ) -> FullItemName {
2708 CatalogState::resolve_full_name(self, name, conn_id)
2709 }
2710 fn get_indexes_on(
2711 &self,
2712 id: GlobalId,
2713 cluster: ClusterId,
2714 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2715 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2716 }
2717}
2718
2719impl OptimizerCatalog for Catalog {
2720 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2721 self.state.get_entry_by_global_id(id)
2722 }
2723
2724 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2725 self.state.get_entry(id)
2726 }
2727
2728 fn resolve_full_name(
2729 &self,
2730 name: &QualifiedItemName,
2731 conn_id: Option<&ConnectionId>,
2732 ) -> FullItemName {
2733 self.state.resolve_full_name(name, conn_id)
2734 }
2735
2736 fn get_indexes_on(
2737 &self,
2738 id: GlobalId,
2739 cluster: ClusterId,
2740 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2741 Box::new(self.state.get_indexes_on(id, cluster))
2742 }
2743}
2744
2745impl Catalog {
2746 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2747 self
2748 }
2749}