1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
61use mz_secrets::InMemorySecretsController;
62use mz_sql::ast::Ident;
63use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
64use mz_sql::catalog::{
65 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
66 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
67 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
68 TypeReference,
69};
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
72 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
73 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
74};
75use mz_sql::plan::{
76 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
77 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
78 Plan, PlanContext,
79};
80use mz_sql::rbac;
81use mz_sql::session::metadata::SessionMetadata;
82use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
83use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
84use mz_sql_parser::ast::QualifiedReplica;
85use mz_storage_client::controller::StorageMetadata;
86use mz_storage_types::connections::ConnectionContext;
87use mz_storage_types::connections::inline::{
88 ConnectionResolver, InlinedConnection, IntoInlineConnection,
89};
90use serde::Serialize;
91use timely::progress::Antichain;
92use tokio::sync::mpsc;
93use tracing::{debug, warn};
94
95use crate::AdapterError;
97use crate::catalog::{Catalog, ConnCatalog};
98use crate::coord::ConnMeta;
99use crate::optimize::{self, Optimize, OptimizerCatalog};
100use crate::session::Session;
101
102#[derive(Debug, Clone, Serialize)]
109pub struct CatalogState {
110 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
116 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
117 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
118 #[serde(serialize_with = "skip_temp_items")]
119 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
120 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
122 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
125 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
128 pub(super) roles_by_name: BTreeMap<String, RoleId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
131 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
134 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
135 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
136
137 #[serde(skip)]
138 pub(super) system_configuration: SystemVars,
139 pub(super) default_privileges: DefaultPrivileges,
140 pub(super) system_privileges: PrivilegeMap,
141 pub(super) comments: CommentsMap,
142 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
143 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
144 pub(super) storage_metadata: StorageMetadata,
145 pub(super) mock_authentication_nonce: Option<String>,
146
147 #[serde(skip)]
149 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
150
151 #[serde(skip)]
153 pub(super) config: mz_sql::catalog::CatalogConfig,
154 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
155 #[serde(skip)]
156 pub(crate) availability_zones: Vec<String>,
157
158 #[serde(skip)]
160 pub(super) egress_addresses: Vec<IpNet>,
161 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
162 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
163 pub(super) http_host_name: Option<String>,
164
165 #[serde(skip)]
167 pub(super) license_key: ValidatedLicenseKey,
168}
169
170#[derive(Debug, Clone, Serialize)]
172pub(crate) enum LocalExpressionCache {
173 Open {
175 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
177 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
179 },
180 Closed,
182}
183
184impl LocalExpressionCache {
185 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
186 Self::Open {
187 cached_exprs,
188 uncached_exprs: BTreeMap::new(),
189 }
190 }
191
192 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
193 match self {
194 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
195 LocalExpressionCache::Closed => None,
196 }
197 }
198
199 pub(super) fn insert_cached_expression(
202 &mut self,
203 id: GlobalId,
204 local_expressions: LocalExpressions,
205 ) {
206 match self {
207 LocalExpressionCache::Open { cached_exprs, .. } => {
208 cached_exprs.insert(id, local_expressions);
209 }
210 LocalExpressionCache::Closed => {}
211 }
212 }
213
214 pub(super) fn insert_uncached_expression(
217 &mut self,
218 id: GlobalId,
219 local_mir: OptimizedMirRelationExpr,
220 optimizer_features: OptimizerFeatures,
221 ) {
222 match self {
223 LocalExpressionCache::Open { uncached_exprs, .. } => {
224 let local_expr = LocalExpressions {
225 local_mir,
226 optimizer_features,
227 };
228 let prev = uncached_exprs.remove(&id);
235 match prev {
236 Some(prev) if prev == local_expr => {
237 uncached_exprs.insert(id, local_expr);
238 }
239 None => {
240 uncached_exprs.insert(id, local_expr);
241 }
242 Some(_) => {}
243 }
244 }
245 LocalExpressionCache::Closed => {}
246 }
247 }
248
249 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
250 match self {
251 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
252 LocalExpressionCache::Closed => BTreeMap::new(),
253 }
254 }
255}
256
257fn skip_temp_items<S>(
258 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
259 serializer: S,
260) -> Result<S::Ok, S::Error>
261where
262 S: serde::Serializer,
263{
264 mz_ore::serde::map_key_to_string(
265 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
266 serializer,
267 )
268}
269
270impl CatalogState {
271 pub fn empty_test() -> Self {
275 CatalogState {
276 database_by_name: Default::default(),
277 database_by_id: Default::default(),
278 entry_by_id: Default::default(),
279 entry_by_global_id: Default::default(),
280 ambient_schemas_by_name: Default::default(),
281 ambient_schemas_by_id: Default::default(),
282 temporary_schemas: Default::default(),
283 clusters_by_id: Default::default(),
284 clusters_by_name: Default::default(),
285 network_policies_by_name: Default::default(),
286 roles_by_name: Default::default(),
287 roles_by_id: Default::default(),
288 network_policies_by_id: Default::default(),
289 role_auth_by_id: Default::default(),
290 config: CatalogConfig {
291 start_time: Default::default(),
292 start_instant: Instant::now(),
293 nonce: Default::default(),
294 environment_id: EnvironmentId::for_tests(),
295 session_id: Default::default(),
296 build_info: &DUMMY_BUILD_INFO,
297 timestamp_interval: Default::default(),
298 now: NOW_ZERO.clone(),
299 connection_context: ConnectionContext::for_tests(Arc::new(
300 InMemorySecretsController::new(),
301 )),
302 builtins_cfg: BuiltinsConfig {
303 include_continual_tasks: true,
304 },
305 helm_chart_version: None,
306 },
307 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
308 availability_zones: Default::default(),
309 system_configuration: Default::default(),
310 egress_addresses: Default::default(),
311 aws_principal_context: Default::default(),
312 aws_privatelink_availability_zones: Default::default(),
313 http_host_name: Default::default(),
314 default_privileges: Default::default(),
315 system_privileges: Default::default(),
316 comments: Default::default(),
317 source_references: Default::default(),
318 storage_metadata: Default::default(),
319 license_key: ValidatedLicenseKey::for_tests(),
320 mock_authentication_nonce: Default::default(),
321 }
322 }
323
324 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
325 let search_path = self.resolve_search_path(session);
326 let database = self
327 .database_by_name
328 .get(session.vars().database())
329 .map(|id| id.clone());
330 let state = match session.transaction().catalog_state() {
331 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
332 None => Cow::Borrowed(self),
333 };
334 ConnCatalog {
335 state,
336 unresolvable_ids: BTreeSet::new(),
337 conn_id: session.conn_id().clone(),
338 cluster: session.vars().cluster().into(),
339 database,
340 search_path,
341 role_id: session.current_role_id().clone(),
342 prepared_statements: Some(session.prepared_statements()),
343 portals: Some(session.portals()),
344 notices_tx: session.retain_notice_transmitter(),
345 }
346 }
347
348 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
349 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
350 let cluster = self.system_configuration.default_cluster();
351
352 ConnCatalog {
353 state: Cow::Borrowed(self),
354 unresolvable_ids: BTreeSet::new(),
355 conn_id: SYSTEM_CONN_ID.clone(),
356 cluster,
357 database: self
358 .resolve_database(DEFAULT_DATABASE_NAME)
359 .ok()
360 .map(|db| db.id()),
361 search_path: Vec::new(),
364 role_id,
365 prepared_statements: None,
366 portals: None,
367 notices_tx,
368 }
369 }
370
371 pub fn for_system_session(&self) -> ConnCatalog<'_> {
372 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
373 }
374
375 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
380 struct I<'a> {
381 queue: VecDeque<CatalogItemId>,
382 seen: BTreeSet<CatalogItemId>,
383 this: &'a CatalogState,
384 }
385 impl<'a> Iterator for I<'a> {
386 type Item = CatalogItemId;
387 fn next(&mut self) -> Option<Self::Item> {
388 if let Some(next) = self.queue.pop_front() {
389 for child in self.this.get_entry(&next).item().uses() {
390 if !self.seen.contains(&child) {
391 self.queue.push_back(child);
392 self.seen.insert(child);
393 }
394 }
395 Some(next)
396 } else {
397 None
398 }
399 }
400 }
401
402 I {
403 queue: [id].into_iter().collect(),
404 seen: [id].into_iter().collect(),
405 this: self,
406 }
407 }
408
409 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
412 let mut out = Vec::new();
413 self.introspection_dependencies_inner(id, &mut out);
414 out
415 }
416
417 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
418 match self.get_entry(&id).item() {
419 CatalogItem::Log(_) => out.push(id),
420 item @ (CatalogItem::View(_)
421 | CatalogItem::MaterializedView(_)
422 | CatalogItem::Connection(_)
423 | CatalogItem::ContinualTask(_)) => {
424 for item_id in item.references().items() {
426 self.introspection_dependencies_inner(*item_id, out);
427 }
428 }
429 CatalogItem::Sink(sink) => {
430 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
431 self.introspection_dependencies_inner(from_item_id, out)
432 }
433 CatalogItem::Index(idx) => {
434 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
435 self.introspection_dependencies_inner(on_item_id, out)
436 }
437 CatalogItem::Table(_)
438 | CatalogItem::Source(_)
439 | CatalogItem::Type(_)
440 | CatalogItem::Func(_)
441 | CatalogItem::Secret(_) => (),
442 }
443 }
444
445 pub(super) fn object_dependents(
451 &self,
452 object_ids: &Vec<ObjectId>,
453 conn_id: &ConnectionId,
454 seen: &mut BTreeSet<ObjectId>,
455 ) -> Vec<ObjectId> {
456 let mut dependents = Vec::new();
457 for object_id in object_ids {
458 match object_id {
459 ObjectId::Cluster(id) => {
460 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
461 }
462 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
463 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
464 ),
465 ObjectId::Database(id) => {
466 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
467 }
468 ObjectId::Schema((database_spec, schema_spec)) => {
469 dependents.extend_from_slice(&self.schema_dependents(
470 database_spec.clone(),
471 schema_spec.clone(),
472 conn_id,
473 seen,
474 ));
475 }
476 ObjectId::NetworkPolicy(id) => {
477 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
478 }
479 id @ ObjectId::Role(_) => {
480 let unseen = seen.insert(id.clone());
481 if unseen {
482 dependents.push(id.clone());
483 }
484 }
485 ObjectId::Item(id) => {
486 dependents.extend_from_slice(&self.item_dependents(*id, seen))
487 }
488 }
489 }
490 dependents
491 }
492
493 fn cluster_dependents(
500 &self,
501 cluster_id: ClusterId,
502 seen: &mut BTreeSet<ObjectId>,
503 ) -> Vec<ObjectId> {
504 let mut dependents = Vec::new();
505 let object_id = ObjectId::Cluster(cluster_id);
506 if !seen.contains(&object_id) {
507 seen.insert(object_id.clone());
508 let cluster = self.get_cluster(cluster_id);
509 for item_id in cluster.bound_objects() {
510 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
511 }
512 for replica_id in cluster.replica_ids().values() {
513 dependents.extend_from_slice(&self.cluster_replica_dependents(
514 cluster_id,
515 *replica_id,
516 seen,
517 ));
518 }
519 dependents.push(object_id);
520 }
521 dependents
522 }
523
524 pub(super) fn cluster_replica_dependents(
531 &self,
532 cluster_id: ClusterId,
533 replica_id: ReplicaId,
534 seen: &mut BTreeSet<ObjectId>,
535 ) -> Vec<ObjectId> {
536 let mut dependents = Vec::new();
537 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
538 if !seen.contains(&object_id) {
539 seen.insert(object_id.clone());
540 dependents.push(object_id);
541 }
542 dependents
543 }
544
545 fn database_dependents(
552 &self,
553 database_id: DatabaseId,
554 conn_id: &ConnectionId,
555 seen: &mut BTreeSet<ObjectId>,
556 ) -> Vec<ObjectId> {
557 let mut dependents = Vec::new();
558 let object_id = ObjectId::Database(database_id);
559 if !seen.contains(&object_id) {
560 seen.insert(object_id.clone());
561 let database = self.get_database(&database_id);
562 for schema_id in database.schema_ids().values() {
563 dependents.extend_from_slice(&self.schema_dependents(
564 ResolvedDatabaseSpecifier::Id(database_id),
565 SchemaSpecifier::Id(*schema_id),
566 conn_id,
567 seen,
568 ));
569 }
570 dependents.push(object_id);
571 }
572 dependents
573 }
574
575 fn schema_dependents(
582 &self,
583 database_spec: ResolvedDatabaseSpecifier,
584 schema_spec: SchemaSpecifier,
585 conn_id: &ConnectionId,
586 seen: &mut BTreeSet<ObjectId>,
587 ) -> Vec<ObjectId> {
588 let mut dependents = Vec::new();
589 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
590 if !seen.contains(&object_id) {
591 seen.insert(object_id.clone());
592 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
593 for item_id in schema.item_ids() {
594 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
595 }
596 dependents.push(object_id)
597 }
598 dependents
599 }
600
601 pub(super) fn item_dependents(
608 &self,
609 item_id: CatalogItemId,
610 seen: &mut BTreeSet<ObjectId>,
611 ) -> Vec<ObjectId> {
612 let mut dependents = Vec::new();
613 let object_id = ObjectId::Item(item_id);
614 if !seen.contains(&object_id) {
615 seen.insert(object_id.clone());
616 let entry = self.get_entry(&item_id);
617 for dependent_id in entry.used_by() {
618 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
619 }
620 dependents.push(object_id);
621 if let Some(progress_id) = entry.progress_id() {
625 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
626 }
627 }
628 dependents
629 }
630
631 pub(super) fn network_policy_dependents(
638 &self,
639 network_policy_id: NetworkPolicyId,
640 _seen: &mut BTreeSet<ObjectId>,
641 ) -> Vec<ObjectId> {
642 let object_id = ObjectId::NetworkPolicy(network_policy_id);
643 vec![object_id]
647 }
648
649 fn is_stable(&self, id: CatalogItemId) -> bool {
653 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
654 !self.is_unstable_schema_specifier(spec)
655 }
656
657 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
658 if self.system_config().unsafe_enable_unstable_dependencies() {
659 return Ok(());
660 }
661
662 let unstable_dependencies: Vec<_> = item
663 .references()
664 .items()
665 .filter(|id| !self.is_stable(**id))
666 .map(|id| self.get_entry(id).name().item.clone())
667 .collect();
668
669 if unstable_dependencies.is_empty() || item.is_temporary() {
673 Ok(())
674 } else {
675 let object_type = item.typ().to_string();
676 Err(Error {
677 kind: ErrorKind::UnstableDependency {
678 object_type,
679 unstable_dependencies,
680 },
681 })
682 }
683 }
684
685 pub fn resolve_full_name(
686 &self,
687 name: &QualifiedItemName,
688 conn_id: Option<&ConnectionId>,
689 ) -> FullItemName {
690 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
691
692 let database = match &name.qualifiers.database_spec {
693 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
694 ResolvedDatabaseSpecifier::Id(id) => {
695 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
696 }
697 };
698 let schema = self
699 .get_schema(
700 &name.qualifiers.database_spec,
701 &name.qualifiers.schema_spec,
702 conn_id,
703 )
704 .name()
705 .schema
706 .clone();
707 FullItemName {
708 database,
709 schema,
710 item: name.item.clone(),
711 }
712 }
713
714 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
715 let database = match &name.database {
716 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
717 ResolvedDatabaseSpecifier::Id(id) => {
718 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
719 }
720 };
721 FullSchemaName {
722 database,
723 schema: name.schema.clone(),
724 }
725 }
726
727 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
728 self.entry_by_id
729 .get(id)
730 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
731 }
732
733 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
734 let item_id = self
735 .entry_by_global_id
736 .get(id)
737 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
738
739 let entry = self.get_entry(item_id).clone();
740 let version = match entry.item() {
741 CatalogItem::Table(table) => {
742 let (version, _) = table
743 .collections
744 .iter()
745 .find(|(_verison, gid)| *gid == id)
746 .expect("version to exist");
747 RelationVersionSelector::Specific(*version)
748 }
749 _ => RelationVersionSelector::Latest,
750 };
751 CatalogCollectionEntry { entry, version }
752 }
753
754 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
755 self.entry_by_id.iter()
756 }
757
758 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
759 let schema = self
760 .temporary_schemas
761 .get(conn)
762 .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
763 schema.items.values().copied().map(ObjectId::from)
764 }
765
766 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
772 let mut res = None;
773 for schema_id in self.system_schema_ids() {
774 let schema = &self.ambient_schemas_by_id[&schema_id];
775 if let Some(global_id) = schema.types.get(name) {
776 match res {
777 None => res = Some(self.get_entry(global_id)),
778 Some(_) => panic!(
779 "only call get_system_type on objects uniquely identifiable in one system schema"
780 ),
781 }
782 }
783 }
784
785 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
786 }
787
788 pub fn get_item_by_name(
789 &self,
790 name: &QualifiedItemName,
791 conn_id: &ConnectionId,
792 ) -> Option<&CatalogEntry> {
793 self.get_schema(
794 &name.qualifiers.database_spec,
795 &name.qualifiers.schema_spec,
796 conn_id,
797 )
798 .items
799 .get(&name.item)
800 .and_then(|id| self.try_get_entry(id))
801 }
802
803 pub fn get_type_by_name(
804 &self,
805 name: &QualifiedItemName,
806 conn_id: &ConnectionId,
807 ) -> Option<&CatalogEntry> {
808 self.get_schema(
809 &name.qualifiers.database_spec,
810 &name.qualifiers.schema_spec,
811 conn_id,
812 )
813 .types
814 .get(&name.item)
815 .and_then(|id| self.try_get_entry(id))
816 }
817
818 pub(super) fn find_available_name(
819 &self,
820 mut name: QualifiedItemName,
821 conn_id: &ConnectionId,
822 ) -> QualifiedItemName {
823 let mut i = 0;
824 let orig_item_name = name.item.clone();
825 while self.get_item_by_name(&name, conn_id).is_some() {
826 i += 1;
827 name.item = format!("{}{}", orig_item_name, i);
828 }
829 name
830 }
831
832 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
833 self.entry_by_id.get(id)
834 }
835
836 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
837 let item_id = self.entry_by_global_id.get(id)?;
838 self.try_get_entry(item_id)
839 }
840
841 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
844 let entry = self.try_get_entry_by_global_id(id)?;
845 let desc = match entry.item() {
846 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
847 other => other.desc_opt(RelationVersionSelector::Latest)?,
849 };
850 Some(desc)
851 }
852
853 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
854 self.try_get_cluster(cluster_id)
855 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
856 }
857
858 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
859 self.clusters_by_id.get(&cluster_id)
860 }
861
862 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
863 self.roles_by_id.get(id)
864 }
865
866 pub fn get_role(&self, id: &RoleId) -> &Role {
867 self.roles_by_id.get(id).expect("catalog out of sync")
868 }
869
870 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
871 self.roles_by_id.keys()
872 }
873
874 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
875 self.roles_by_name
876 .get(role_name)
877 .map(|id| &self.roles_by_id[id])
878 }
879
880 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
881 self.role_auth_by_id
882 .get(id)
883 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
884 }
885
886 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
887 self.role_auth_by_id.get(id)
888 }
889
890 pub(super) fn try_get_network_policy_by_name(
891 &self,
892 policy_name: &str,
893 ) -> Option<&NetworkPolicy> {
894 self.network_policies_by_name
895 .get(policy_name)
896 .map(|id| &self.network_policies_by_id[id])
897 }
898
899 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
900 let mut membership = BTreeSet::new();
901 let mut queue = VecDeque::from(vec![id]);
902 while let Some(cur_id) = queue.pop_front() {
903 if !membership.contains(cur_id) {
904 membership.insert(cur_id.clone());
905 let role = self.get_role(cur_id);
906 soft_assert_no_log!(
907 !role.membership().keys().contains(id),
908 "circular membership exists in the catalog"
909 );
910 queue.extend(role.membership().keys());
911 }
912 }
913 membership.insert(RoleId::Public);
914 membership
915 }
916
917 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
918 self.network_policies_by_id
919 .get(id)
920 .expect("catalog out of sync")
921 }
922
923 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
924 self.network_policies_by_id.keys()
925 }
926
927 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
932 let entry = self.try_get_entry(id)?;
933 let name = self.resolve_full_name(entry.name(), None);
935 let host_name = self
936 .http_host_name
937 .as_ref()
938 .map(|x| x.as_str())
939 .unwrap_or_else(|| "HOST");
940
941 let RawDatabaseSpecifier::Name(database) = name.database else {
942 return None;
943 };
944
945 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
946 url.path_segments_mut()
947 .ok()?
948 .push(&database)
949 .push(&name.schema)
950 .push(&name.item);
951
952 Some(url)
953 }
954
955 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
963 &mut self,
966 create_sql: &str,
967 force_if_exists_skip: bool,
968 ) -> Result<(Plan, ResolvedIds), AdapterError> {
969 self.with_enable_for_item_parsing(|state| {
970 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
971 let pcx = Some(&pcx);
972 let session_catalog = state.for_system_session();
973
974 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
975 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
976 let plan =
977 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
978
979 Ok((plan, resolved_ids))
980 })
981 }
982
983 #[mz_ore::instrument]
985 pub(crate) fn parse_plan(
986 create_sql: &str,
987 pcx: Option<&PlanContext>,
988 catalog: &ConnCatalog,
989 ) -> Result<(Plan, ResolvedIds), AdapterError> {
990 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
991 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
992 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
993
994 return Ok((plan, resolved_ids));
995 }
996
997 pub(crate) fn deserialize_item(
999 &self,
1000 global_id: GlobalId,
1001 create_sql: &str,
1002 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1003 local_expression_cache: &mut LocalExpressionCache,
1004 previous_item: Option<CatalogItem>,
1005 ) -> Result<CatalogItem, AdapterError> {
1006 self.parse_item(
1007 global_id,
1008 create_sql,
1009 extra_versions,
1010 None,
1011 false,
1012 None,
1013 local_expression_cache,
1014 previous_item,
1015 )
1016 }
1017
1018 #[mz_ore::instrument]
1020 pub(crate) fn parse_item(
1021 &self,
1022 global_id: GlobalId,
1023 create_sql: &str,
1024 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1025 pcx: Option<&PlanContext>,
1026 is_retained_metrics_object: bool,
1027 custom_logical_compaction_window: Option<CompactionWindow>,
1028 local_expression_cache: &mut LocalExpressionCache,
1029 previous_item: Option<CatalogItem>,
1030 ) -> Result<CatalogItem, AdapterError> {
1031 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1032 match self.parse_item_inner(
1033 global_id,
1034 create_sql,
1035 extra_versions,
1036 pcx,
1037 is_retained_metrics_object,
1038 custom_logical_compaction_window,
1039 cached_expr,
1040 previous_item,
1041 ) {
1042 Ok((item, uncached_expr)) => {
1043 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1044 local_expression_cache.insert_uncached_expression(
1045 global_id,
1046 uncached_expr,
1047 optimizer_features,
1048 );
1049 }
1050 Ok(item)
1051 }
1052 Err((err, cached_expr)) => {
1053 if let Some(local_expr) = cached_expr {
1054 local_expression_cache.insert_cached_expression(global_id, local_expr);
1055 }
1056 Err(err)
1057 }
1058 }
1059 }
1060
1061 #[mz_ore::instrument]
1068 pub(crate) fn parse_item_inner(
1069 &self,
1070 global_id: GlobalId,
1071 create_sql: &str,
1072 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1073 pcx: Option<&PlanContext>,
1074 is_retained_metrics_object: bool,
1075 custom_logical_compaction_window: Option<CompactionWindow>,
1076 cached_expr: Option<LocalExpressions>,
1077 previous_item: Option<CatalogItem>,
1078 ) -> Result<
1079 (
1080 CatalogItem,
1081 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1082 ),
1083 (AdapterError, Option<LocalExpressions>),
1084 > {
1085 let session_catalog = self.for_system_session();
1086
1087 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1088 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1089 Err(err) => return Err((err, cached_expr)),
1090 };
1091
1092 let mut uncached_expr = None;
1093
1094 let item = match plan {
1095 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1096 let collections = extra_versions
1097 .iter()
1098 .map(|(version, gid)| (*version, *gid))
1099 .chain([(RelationVersion::root(), global_id)].into_iter())
1100 .collect();
1101
1102 CatalogItem::Table(Table {
1103 create_sql: Some(table.create_sql),
1104 desc: table.desc,
1105 collections,
1106 conn_id: None,
1107 resolved_ids,
1108 custom_logical_compaction_window: custom_logical_compaction_window
1109 .or(table.compaction_window),
1110 is_retained_metrics_object,
1111 data_source: match table.data_source {
1112 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1113 TableDataSource::TableWrites { defaults }
1114 }
1115 mz_sql::plan::TableDataSource::DataSource {
1116 desc: data_source_desc,
1117 timeline,
1118 } => match data_source_desc {
1119 mz_sql::plan::DataSourceDesc::IngestionExport {
1120 ingestion_id,
1121 external_reference,
1122 details,
1123 data_config,
1124 } => TableDataSource::DataSource {
1125 desc: DataSourceDesc::IngestionExport {
1126 ingestion_id,
1127 external_reference,
1128 details,
1129 data_config,
1130 },
1131 timeline,
1132 },
1133 mz_sql::plan::DataSourceDesc::Webhook {
1134 validate_using,
1135 body_format,
1136 headers,
1137 cluster_id,
1138 } => TableDataSource::DataSource {
1139 desc: DataSourceDesc::Webhook {
1140 validate_using,
1141 body_format,
1142 headers,
1143 cluster_id: cluster_id
1144 .expect("Webhook Tables must have a cluster_id set"),
1145 },
1146 timeline,
1147 },
1148 _ => {
1149 return Err((
1150 AdapterError::Unstructured(anyhow::anyhow!(
1151 "unsupported data source for table"
1152 )),
1153 cached_expr,
1154 ));
1155 }
1156 },
1157 },
1158 })
1159 }
1160 Plan::CreateSource(CreateSourcePlan {
1161 source,
1162 timeline,
1163 in_cluster,
1164 ..
1165 }) => CatalogItem::Source(Source {
1166 create_sql: Some(source.create_sql),
1167 data_source: match source.data_source {
1168 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1169 desc,
1170 cluster_id: match in_cluster {
1171 Some(id) => id,
1172 None => {
1173 return Err((
1174 AdapterError::Unstructured(anyhow::anyhow!(
1175 "ingestion-based sources must have cluster specified"
1176 )),
1177 cached_expr,
1178 ));
1179 }
1180 },
1181 },
1182 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1183 desc,
1184 progress_subsource,
1185 data_config,
1186 details,
1187 } => DataSourceDesc::OldSyntaxIngestion {
1188 desc,
1189 progress_subsource,
1190 data_config,
1191 details,
1192 cluster_id: match in_cluster {
1193 Some(id) => id,
1194 None => {
1195 return Err((
1196 AdapterError::Unstructured(anyhow::anyhow!(
1197 "ingestion-based sources must have cluster specified"
1198 )),
1199 cached_expr,
1200 ));
1201 }
1202 },
1203 },
1204 mz_sql::plan::DataSourceDesc::IngestionExport {
1205 ingestion_id,
1206 external_reference,
1207 details,
1208 data_config,
1209 } => DataSourceDesc::IngestionExport {
1210 ingestion_id,
1211 external_reference,
1212 details,
1213 data_config,
1214 },
1215 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1216 mz_sql::plan::DataSourceDesc::Webhook {
1217 validate_using,
1218 body_format,
1219 headers,
1220 cluster_id,
1221 } => {
1222 mz_ore::soft_assert_or_log!(
1223 cluster_id.is_none(),
1224 "cluster_id set at Source level for Webhooks"
1225 );
1226 DataSourceDesc::Webhook {
1227 validate_using,
1228 body_format,
1229 headers,
1230 cluster_id: in_cluster
1231 .expect("webhook sources must use an existing cluster"),
1232 }
1233 }
1234 },
1235 desc: source.desc,
1236 global_id,
1237 timeline,
1238 resolved_ids,
1239 custom_logical_compaction_window: source
1240 .compaction_window
1241 .or(custom_logical_compaction_window),
1242 is_retained_metrics_object,
1243 }),
1244 Plan::CreateView(CreateViewPlan { view, .. }) => {
1245 let optimizer_config =
1247 optimize::OptimizerConfig::from(session_catalog.system_vars());
1248 let previous_exprs = previous_item.map(|item| match item {
1249 CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1250 _ => None,
1251 });
1252
1253 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1254 (Some(local_expr), _)
1255 if local_expr.optimizer_features == optimizer_config.features =>
1256 {
1257 debug!("local expression cache hit for {global_id:?}");
1258 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1259 }
1260 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1262 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1263 }
1264 (cached_expr, _) => {
1265 let optimizer_features = optimizer_config.features.clone();
1266 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1268
1269 let raw_expr = view.expr;
1271 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1272 Ok(optimzed_expr) => optimzed_expr,
1273 Err(err) => return Err((err.into(), cached_expr)),
1274 };
1275
1276 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1277
1278 (Arc::new(raw_expr), Arc::new(optimized_expr))
1279 }
1280 };
1281
1282 let dependencies: BTreeSet<_> = raw_expr
1284 .depends_on()
1285 .into_iter()
1286 .map(|gid| self.get_entry_by_global_id(&gid).id())
1287 .collect();
1288
1289 CatalogItem::View(View {
1290 create_sql: view.create_sql,
1291 global_id,
1292 raw_expr,
1293 desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1294 optimized_expr,
1295 conn_id: None,
1296 resolved_ids,
1297 dependencies: DependencyIds(dependencies),
1298 })
1299 }
1300 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1301 materialized_view, ..
1302 }) => {
1303 let optimizer_config =
1305 optimize::OptimizerConfig::from(session_catalog.system_vars());
1306 let previous_exprs = previous_item.map(|item| match item {
1307 CatalogItem::MaterializedView(materialized_view) => {
1308 (materialized_view.raw_expr, materialized_view.optimized_expr)
1309 }
1310 item => unreachable!("expected materialized view, found: {item:#?}"),
1311 });
1312
1313 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1314 (Some(local_expr), _)
1315 if local_expr.optimizer_features == optimizer_config.features =>
1316 {
1317 debug!("local expression cache hit for {global_id:?}");
1318 (
1319 Arc::new(materialized_view.expr),
1320 Arc::new(local_expr.local_mir),
1321 )
1322 }
1323 (_, Some((raw_expr, optimized_expr)))
1325 if *raw_expr == materialized_view.expr =>
1326 {
1327 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1328 }
1329 (cached_expr, _) => {
1330 let optimizer_features = optimizer_config.features.clone();
1331 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1333
1334 let raw_expr = materialized_view.expr;
1335 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1336 Ok(optimized_expr) => optimized_expr,
1337 Err(err) => return Err((err.into(), cached_expr)),
1338 };
1339
1340 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1341
1342 (Arc::new(raw_expr), Arc::new(optimized_expr))
1343 }
1344 };
1345 let mut typ = optimized_expr.typ();
1346 for &i in &materialized_view.non_null_assertions {
1347 typ.column_types[i].nullable = false;
1348 }
1349 let desc = RelationDesc::new(typ, materialized_view.column_names);
1350
1351 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1352
1353 let dependencies = raw_expr
1355 .depends_on()
1356 .into_iter()
1357 .map(|gid| self.get_entry_by_global_id(&gid).id())
1358 .collect();
1359
1360 CatalogItem::MaterializedView(MaterializedView {
1361 create_sql: materialized_view.create_sql,
1362 global_id,
1363 raw_expr,
1364 optimized_expr,
1365 desc,
1366 resolved_ids,
1367 dependencies,
1368 cluster_id: materialized_view.cluster_id,
1369 non_null_assertions: materialized_view.non_null_assertions,
1370 custom_logical_compaction_window: materialized_view.compaction_window,
1371 refresh_schedule: materialized_view.refresh_schedule,
1372 initial_as_of,
1373 })
1374 }
1375 Plan::CreateContinualTask(plan) => {
1376 let ct =
1377 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1378 Ok(ct) => ct,
1379 Err(err) => return Err((err, cached_expr)),
1380 };
1381 CatalogItem::ContinualTask(ct)
1382 }
1383 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1384 create_sql: index.create_sql,
1385 global_id,
1386 on: index.on,
1387 keys: index.keys.into(),
1388 conn_id: None,
1389 resolved_ids,
1390 cluster_id: index.cluster_id,
1391 custom_logical_compaction_window: custom_logical_compaction_window
1392 .or(index.compaction_window),
1393 is_retained_metrics_object,
1394 }),
1395 Plan::CreateSink(CreateSinkPlan {
1396 sink,
1397 with_snapshot,
1398 in_cluster,
1399 ..
1400 }) => CatalogItem::Sink(Sink {
1401 create_sql: sink.create_sql,
1402 global_id,
1403 from: sink.from,
1404 connection: sink.connection,
1405 envelope: sink.envelope,
1406 version: sink.version,
1407 with_snapshot,
1408 resolved_ids,
1409 cluster_id: in_cluster,
1410 }),
1411 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1412 let desc = match typ.inner.desc(&session_catalog) {
1413 Ok(desc) => desc,
1414 Err(err) => return Err((err.into(), cached_expr)),
1415 };
1416 CatalogItem::Type(Type {
1417 create_sql: Some(typ.create_sql),
1418 global_id,
1419 desc,
1420 details: CatalogTypeDetails {
1421 array_id: None,
1422 typ: typ.inner,
1423 pg_metadata: None,
1424 },
1425 resolved_ids,
1426 })
1427 }
1428 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1429 create_sql: secret.create_sql,
1430 global_id,
1431 }),
1432 Plan::CreateConnection(CreateConnectionPlan {
1433 connection:
1434 mz_sql::plan::Connection {
1435 create_sql,
1436 details,
1437 },
1438 ..
1439 }) => CatalogItem::Connection(Connection {
1440 create_sql,
1441 global_id,
1442 details,
1443 resolved_ids,
1444 }),
1445 _ => {
1446 return Err((
1447 Error::new(ErrorKind::Corruption {
1448 detail: "catalog entry generated inappropriate plan".to_string(),
1449 })
1450 .into(),
1451 cached_expr,
1452 ));
1453 }
1454 };
1455
1456 Ok((item, uncached_expr))
1457 }
1458
1459 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1465 let restore = self.system_configuration.clone();
1476 self.system_configuration.enable_for_item_parsing();
1477 let res = f(self);
1478 self.system_configuration = restore;
1479 res
1480 }
1481
1482 pub fn get_indexes_on(
1484 &self,
1485 id: GlobalId,
1486 cluster: ClusterId,
1487 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1488 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1489
1490 self.try_get_entry_by_global_id(&id)
1491 .into_iter()
1492 .map(move |e| {
1493 e.used_by()
1494 .iter()
1495 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1496 CatalogItem::Index(index) if index_matches(index) => {
1497 Some((index.global_id(), index))
1498 }
1499 _ => None,
1500 })
1501 })
1502 .flatten()
1503 }
1504
1505 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1506 &self.database_by_id[database_id]
1507 }
1508
1509 pub(super) fn try_get_cluster_replica(
1514 &self,
1515 id: ClusterId,
1516 replica_id: ReplicaId,
1517 ) -> Option<&ClusterReplica> {
1518 self.try_get_cluster(id)
1519 .and_then(|cluster| cluster.replica(replica_id))
1520 }
1521
1522 pub(super) fn get_cluster_replica(
1526 &self,
1527 cluster_id: ClusterId,
1528 replica_id: ReplicaId,
1529 ) -> &ClusterReplica {
1530 self.try_get_cluster_replica(cluster_id, replica_id)
1531 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1532 }
1533
1534 pub(super) fn resolve_replica_in_cluster(
1535 &self,
1536 cluster_id: &ClusterId,
1537 replica_name: &str,
1538 ) -> Result<&ClusterReplica, SqlCatalogError> {
1539 let cluster = self.get_cluster(*cluster_id);
1540 let replica_id = cluster
1541 .replica_id_by_name_
1542 .get(replica_name)
1543 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1544 Ok(&cluster.replicas_by_id_[replica_id])
1545 }
1546
1547 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1549 Ok(self.system_configuration.get(name)?)
1550 }
1551
1552 pub(super) fn parse_system_configuration(
1556 &self,
1557 name: &str,
1558 value: VarInput,
1559 ) -> Result<String, Error> {
1560 let value = self.system_configuration.parse(name, value)?;
1561 Ok(value.format())
1562 }
1563
1564 pub(super) fn resolve_schema_in_database(
1566 &self,
1567 database_spec: &ResolvedDatabaseSpecifier,
1568 schema_name: &str,
1569 conn_id: &ConnectionId,
1570 ) -> Result<&Schema, SqlCatalogError> {
1571 let schema = match database_spec {
1572 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1573 self.temporary_schemas.get(conn_id)
1574 }
1575 ResolvedDatabaseSpecifier::Ambient => self
1576 .ambient_schemas_by_name
1577 .get(schema_name)
1578 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1579 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1580 db.schemas_by_name
1581 .get(schema_name)
1582 .and_then(|id| db.schemas_by_id.get(id))
1583 }),
1584 };
1585 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1586 }
1587
1588 pub fn get_schema(
1589 &self,
1590 database_spec: &ResolvedDatabaseSpecifier,
1591 schema_spec: &SchemaSpecifier,
1592 conn_id: &ConnectionId,
1593 ) -> &Schema {
1594 match (database_spec, schema_spec) {
1596 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1597 &self.temporary_schemas[conn_id]
1598 }
1599 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1600 &self.ambient_schemas_by_id[id]
1601 }
1602
1603 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1604 &self.database_by_id[database_id].schemas_by_id[schema_id]
1605 }
1606 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1607 unreachable!("temporary schemas are in the ambient database")
1608 }
1609 }
1610 }
1611
1612 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1613 self.database_by_id
1614 .values()
1615 .filter_map(|database| database.schemas_by_id.get(schema_id))
1616 .chain(self.ambient_schemas_by_id.values())
1617 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1618 .into_first()
1619 }
1620
1621 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1622 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1623 }
1624
1625 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1626 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1627 }
1628
1629 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1630 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1631 }
1632
1633 pub fn get_information_schema_id(&self) -> SchemaId {
1634 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1635 }
1636
1637 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1638 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1639 }
1640
1641 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1642 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1643 }
1644
1645 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1646 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1647 }
1648
1649 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1650 SYSTEM_SCHEMAS
1651 .iter()
1652 .map(|name| self.ambient_schemas_by_name[*name])
1653 }
1654
1655 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1656 self.system_schema_ids().contains(&id)
1657 }
1658
1659 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1660 match spec {
1661 SchemaSpecifier::Temporary => false,
1662 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1663 }
1664 }
1665
1666 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1667 UNSTABLE_SCHEMAS
1668 .iter()
1669 .map(|name| self.ambient_schemas_by_name[*name])
1670 }
1671
1672 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1673 self.unstable_schema_ids().contains(&id)
1674 }
1675
1676 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1677 match spec {
1678 SchemaSpecifier::Temporary => false,
1679 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1680 }
1681 }
1682
1683 pub fn create_temporary_schema(
1686 &mut self,
1687 conn_id: &ConnectionId,
1688 owner_id: RoleId,
1689 ) -> Result<(), Error> {
1690 let oid = INVALID_OID;
1695 self.temporary_schemas.insert(
1696 conn_id.clone(),
1697 Schema {
1698 name: QualifiedSchemaName {
1699 database: ResolvedDatabaseSpecifier::Ambient,
1700 schema: MZ_TEMP_SCHEMA.into(),
1701 },
1702 id: SchemaSpecifier::Temporary,
1703 oid,
1704 items: BTreeMap::new(),
1705 functions: BTreeMap::new(),
1706 types: BTreeMap::new(),
1707 owner_id,
1708 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1709 mz_sql::catalog::ObjectType::Schema,
1710 owner_id,
1711 )]),
1712 },
1713 );
1714 Ok(())
1715 }
1716
1717 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1719 std::iter::empty()
1720 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1721 if schema.id.is_temporary() {
1722 Some(schema.oid)
1723 } else {
1724 None
1725 }
1726 }))
1727 .chain(self.entry_by_id.values().filter_map(|entry| {
1728 if entry.item().is_temporary() {
1729 Some(entry.oid)
1730 } else {
1731 None
1732 }
1733 }))
1734 }
1735
1736 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1740 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1741 }
1742
1743 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1747 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1748 let log = match self.get_entry(&item_id).item() {
1749 CatalogItem::Log(log) => log,
1750 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1751 };
1752 (item_id, log.global_id)
1753 }
1754
1755 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1759 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1760 }
1761
1762 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1766 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1767 let schema = &self.ambient_schemas_by_id[schema_id];
1768 match builtin.catalog_item_type() {
1769 CatalogItemType::Type => schema.types[builtin.name()],
1770 CatalogItemType::Func => schema.functions[builtin.name()],
1771 CatalogItemType::Table
1772 | CatalogItemType::Source
1773 | CatalogItemType::Sink
1774 | CatalogItemType::View
1775 | CatalogItemType::MaterializedView
1776 | CatalogItemType::Index
1777 | CatalogItemType::Secret
1778 | CatalogItemType::Connection
1779 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1780 }
1781 }
1782
1783 pub fn resolve_builtin_type_references(
1785 &self,
1786 builtin: &BuiltinType<NameReference>,
1787 ) -> BuiltinType<IdReference> {
1788 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1789 CatalogType::AclItem => CatalogType::AclItem,
1790 CatalogType::Array { element_reference } => CatalogType::Array {
1791 element_reference: self.get_system_type(element_reference).id,
1792 },
1793 CatalogType::List {
1794 element_reference,
1795 element_modifiers,
1796 } => CatalogType::List {
1797 element_reference: self.get_system_type(element_reference).id,
1798 element_modifiers: element_modifiers.clone(),
1799 },
1800 CatalogType::Map {
1801 key_reference,
1802 value_reference,
1803 key_modifiers,
1804 value_modifiers,
1805 } => CatalogType::Map {
1806 key_reference: self.get_system_type(key_reference).id,
1807 value_reference: self.get_system_type(value_reference).id,
1808 key_modifiers: key_modifiers.clone(),
1809 value_modifiers: value_modifiers.clone(),
1810 },
1811 CatalogType::Range { element_reference } => CatalogType::Range {
1812 element_reference: self.get_system_type(element_reference).id,
1813 },
1814 CatalogType::Record { fields } => CatalogType::Record {
1815 fields: fields
1816 .into_iter()
1817 .map(|f| CatalogRecordField {
1818 name: f.name.clone(),
1819 type_reference: self.get_system_type(f.type_reference).id,
1820 type_modifiers: f.type_modifiers.clone(),
1821 })
1822 .collect(),
1823 },
1824 CatalogType::Bool => CatalogType::Bool,
1825 CatalogType::Bytes => CatalogType::Bytes,
1826 CatalogType::Char => CatalogType::Char,
1827 CatalogType::Date => CatalogType::Date,
1828 CatalogType::Float32 => CatalogType::Float32,
1829 CatalogType::Float64 => CatalogType::Float64,
1830 CatalogType::Int16 => CatalogType::Int16,
1831 CatalogType::Int32 => CatalogType::Int32,
1832 CatalogType::Int64 => CatalogType::Int64,
1833 CatalogType::UInt16 => CatalogType::UInt16,
1834 CatalogType::UInt32 => CatalogType::UInt32,
1835 CatalogType::UInt64 => CatalogType::UInt64,
1836 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1837 CatalogType::Interval => CatalogType::Interval,
1838 CatalogType::Jsonb => CatalogType::Jsonb,
1839 CatalogType::Numeric => CatalogType::Numeric,
1840 CatalogType::Oid => CatalogType::Oid,
1841 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1842 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1843 CatalogType::Pseudo => CatalogType::Pseudo,
1844 CatalogType::RegClass => CatalogType::RegClass,
1845 CatalogType::RegProc => CatalogType::RegProc,
1846 CatalogType::RegType => CatalogType::RegType,
1847 CatalogType::String => CatalogType::String,
1848 CatalogType::Time => CatalogType::Time,
1849 CatalogType::Timestamp => CatalogType::Timestamp,
1850 CatalogType::TimestampTz => CatalogType::TimestampTz,
1851 CatalogType::Uuid => CatalogType::Uuid,
1852 CatalogType::VarChar => CatalogType::VarChar,
1853 CatalogType::Int2Vector => CatalogType::Int2Vector,
1854 CatalogType::MzAclItem => CatalogType::MzAclItem,
1855 };
1856
1857 BuiltinType {
1858 name: builtin.name,
1859 schema: builtin.schema,
1860 oid: builtin.oid,
1861 details: CatalogTypeDetails {
1862 array_id: builtin.details.array_id,
1863 typ,
1864 pg_metadata: builtin.details.pg_metadata.clone(),
1865 },
1866 }
1867 }
1868
1869 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1870 &self.config
1871 }
1872
1873 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1874 match self.database_by_name.get(database_name) {
1875 Some(id) => Ok(&self.database_by_id[id]),
1876 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1877 }
1878 }
1879
1880 pub fn resolve_schema(
1881 &self,
1882 current_database: Option<&DatabaseId>,
1883 database_name: Option<&str>,
1884 schema_name: &str,
1885 conn_id: &ConnectionId,
1886 ) -> Result<&Schema, SqlCatalogError> {
1887 let database_spec = match database_name {
1888 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1893 self.resolve_database(database)?.id().clone(),
1894 )),
1895 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1896 };
1897
1898 if let Some(database_spec) = database_spec {
1900 if let Ok(schema) =
1901 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1902 {
1903 return Ok(schema);
1904 }
1905 }
1906
1907 if let Ok(schema) = self.resolve_schema_in_database(
1909 &ResolvedDatabaseSpecifier::Ambient,
1910 schema_name,
1911 conn_id,
1912 ) {
1913 return Ok(schema);
1914 }
1915
1916 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1917 }
1918
1919 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1923 self.ambient_schemas_by_name[name]
1924 }
1925
1926 pub fn resolve_search_path(
1927 &self,
1928 session: &dyn SessionMetadata,
1929 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1930 let database = self
1931 .database_by_name
1932 .get(session.database())
1933 .map(|id| id.clone());
1934
1935 session
1936 .search_path()
1937 .iter()
1938 .map(|schema| {
1939 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1940 })
1941 .filter_map(|schema| schema.ok())
1942 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1943 .collect()
1944 }
1945
1946 pub fn effective_search_path(
1947 &self,
1948 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1949 include_temp_schema: bool,
1950 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1951 let mut v = Vec::with_capacity(search_path.len() + 3);
1952 let temp_schema = (
1954 ResolvedDatabaseSpecifier::Ambient,
1955 SchemaSpecifier::Temporary,
1956 );
1957 if include_temp_schema && !search_path.contains(&temp_schema) {
1958 v.push(temp_schema);
1959 }
1960 let default_schemas = [
1961 (
1962 ResolvedDatabaseSpecifier::Ambient,
1963 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1964 ),
1965 (
1966 ResolvedDatabaseSpecifier::Ambient,
1967 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1968 ),
1969 ];
1970 for schema in default_schemas.into_iter() {
1971 if !search_path.contains(&schema) {
1972 v.push(schema);
1973 }
1974 }
1975 v.extend_from_slice(search_path);
1976 v
1977 }
1978
1979 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1980 let id = self
1981 .clusters_by_name
1982 .get(name)
1983 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1984 Ok(&self.clusters_by_id[id])
1985 }
1986
1987 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1988 let id = self
1989 .clusters_by_name
1990 .get(cluster.name)
1991 .expect("failed to lookup BuiltinCluster by name");
1992 self.clusters_by_id
1993 .get(id)
1994 .expect("failed to lookup BuiltinCluster by ID")
1995 }
1996
1997 pub fn resolve_cluster_replica(
1998 &self,
1999 cluster_replica_name: &QualifiedReplica,
2000 ) -> Result<&ClusterReplica, SqlCatalogError> {
2001 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2002 let replica_name = cluster_replica_name.replica.as_str();
2003 let replica_id = cluster
2004 .replica_id(replica_name)
2005 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2006 Ok(cluster.replica(replica_id).expect("Must exist"))
2007 }
2008
2009 #[allow(clippy::useless_let_if_seq)]
2015 pub fn resolve(
2016 &self,
2017 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2018 current_database: Option<&DatabaseId>,
2019 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2020 name: &PartialItemName,
2021 conn_id: &ConnectionId,
2022 err_gen: fn(String) -> SqlCatalogError,
2023 ) -> Result<&CatalogEntry, SqlCatalogError> {
2024 let schemas = match &name.schema {
2029 Some(schema_name) => {
2030 match self.resolve_schema(
2031 current_database,
2032 name.database.as_deref(),
2033 schema_name,
2034 conn_id,
2035 ) {
2036 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2037 Err(e) => return Err(e),
2038 }
2039 }
2040 None => match self
2041 .get_schema(
2042 &ResolvedDatabaseSpecifier::Ambient,
2043 &SchemaSpecifier::Temporary,
2044 conn_id,
2045 )
2046 .items
2047 .get(&name.item)
2048 {
2049 Some(id) => return Ok(self.get_entry(id)),
2050 None => search_path.to_vec(),
2051 },
2052 };
2053
2054 for (database_spec, schema_spec) in &schemas {
2055 let schema = self.get_schema(database_spec, schema_spec, conn_id);
2056
2057 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2058 return Ok(&self.entry_by_id[id]);
2059 }
2060 }
2061
2062 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2067 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2068 for schema_id in [
2069 self.get_mz_catalog_unstable_schema_id(),
2070 self.get_mz_introspection_schema_id(),
2071 ] {
2072 let schema = self.get_schema(
2073 &ResolvedDatabaseSpecifier::Ambient,
2074 &SchemaSpecifier::Id(schema_id),
2075 conn_id,
2076 );
2077
2078 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2079 debug!(
2080 github_27831 = true,
2081 "encountered use of outdated schema `mz_internal` for relation: {name}",
2082 );
2083 return Ok(&self.entry_by_id[id]);
2084 }
2085 }
2086 }
2087
2088 Err(err_gen(name.to_string()))
2089 }
2090
2091 pub fn resolve_entry(
2093 &self,
2094 current_database: Option<&DatabaseId>,
2095 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2096 name: &PartialItemName,
2097 conn_id: &ConnectionId,
2098 ) -> Result<&CatalogEntry, SqlCatalogError> {
2099 self.resolve(
2100 |schema| &schema.items,
2101 current_database,
2102 search_path,
2103 name,
2104 conn_id,
2105 SqlCatalogError::UnknownItem,
2106 )
2107 }
2108
2109 pub fn resolve_function(
2111 &self,
2112 current_database: Option<&DatabaseId>,
2113 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2114 name: &PartialItemName,
2115 conn_id: &ConnectionId,
2116 ) -> Result<&CatalogEntry, SqlCatalogError> {
2117 self.resolve(
2118 |schema| &schema.functions,
2119 current_database,
2120 search_path,
2121 name,
2122 conn_id,
2123 |name| SqlCatalogError::UnknownFunction {
2124 name,
2125 alternative: None,
2126 },
2127 )
2128 }
2129
2130 pub fn resolve_type(
2132 &self,
2133 current_database: Option<&DatabaseId>,
2134 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2135 name: &PartialItemName,
2136 conn_id: &ConnectionId,
2137 ) -> Result<&CatalogEntry, SqlCatalogError> {
2138 static NON_PG_CATALOG_TYPES: LazyLock<
2139 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2140 > = LazyLock::new(|| {
2141 BUILTINS::types()
2142 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2143 .map(|typ| (typ.name, typ))
2144 .collect()
2145 });
2146
2147 let entry = self.resolve(
2148 |schema| &schema.types,
2149 current_database,
2150 search_path,
2151 name,
2152 conn_id,
2153 |name| SqlCatalogError::UnknownType { name },
2154 )?;
2155
2156 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2157 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2158 warn!(
2159 "user specified an incorrect schema of {} for the type {}, which should be in \
2160 the {} schema. This works now due to a bug but will be fixed in a later release.",
2161 PG_CATALOG_SCHEMA.quoted(),
2162 typ.name.quoted(),
2163 typ.schema.quoted(),
2164 )
2165 }
2166 }
2167
2168 Ok(entry)
2169 }
2170
2171 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2173 match object_id {
2174 ObjectId::Item(item_id) => {
2175 let entry = self.get_entry(&item_id);
2176 match entry.item_type() {
2177 CatalogItemType::Table => CommentObjectId::Table(item_id),
2178 CatalogItemType::Source => CommentObjectId::Source(item_id),
2179 CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2180 CatalogItemType::View => CommentObjectId::View(item_id),
2181 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2182 CatalogItemType::Index => CommentObjectId::Index(item_id),
2183 CatalogItemType::Func => CommentObjectId::Func(item_id),
2184 CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2185 CatalogItemType::Type => CommentObjectId::Type(item_id),
2186 CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2187 CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2188 }
2189 }
2190 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2191 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2192 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2193 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2194 ObjectId::ClusterReplica(cluster_replica_id) => {
2195 CommentObjectId::ClusterReplica(cluster_replica_id)
2196 }
2197 ObjectId::NetworkPolicy(network_policy_id) => {
2198 CommentObjectId::NetworkPolicy(network_policy_id)
2199 }
2200 }
2201 }
2202
2203 pub fn system_config(&self) -> &SystemVars {
2205 &self.system_configuration
2206 }
2207
2208 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2210 &mut self.system_configuration
2211 }
2212
2213 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2223 let mut dump = serde_json::to_value(&self).map_err(|e| {
2225 Error::new(ErrorKind::Unstructured(format!(
2226 "internal error: could not dump catalog: {}",
2229 e
2230 )))
2231 })?;
2232
2233 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2234 dump_obj.insert(
2236 "system_parameter_defaults".into(),
2237 serde_json::json!(self.system_config().defaults()),
2238 );
2239 if let Some(unfinalized_shards) = unfinalized_shards {
2241 dump_obj
2242 .get_mut("storage_metadata")
2243 .expect("known to exist")
2244 .as_object_mut()
2245 .expect("storage_metadata is an object")
2246 .insert(
2247 "unfinalized_shards".into(),
2248 serde_json::json!(unfinalized_shards),
2249 );
2250 }
2251 let temporary_gids: Vec<_> = self
2256 .entry_by_global_id
2257 .iter()
2258 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2259 .map(|(gid, _item_id)| *gid)
2260 .collect();
2261 if !temporary_gids.is_empty() {
2262 let gids = dump_obj
2263 .get_mut("entry_by_global_id")
2264 .expect("known_to_exist")
2265 .as_object_mut()
2266 .expect("entry_by_global_id is an object");
2267 for gid in temporary_gids {
2268 gids.remove(&gid.to_string());
2269 }
2270 }
2271 dump_obj.remove("role_auth_by_id");
2274
2275 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2277 }
2278
2279 pub fn availability_zones(&self) -> &[String] {
2280 &self.availability_zones
2281 }
2282
2283 pub fn concretize_replica_location(
2284 &self,
2285 location: mz_catalog::durable::ReplicaLocation,
2286 allowed_sizes: &Vec<String>,
2287 allowed_availability_zones: Option<&[String]>,
2288 ) -> Result<ReplicaLocation, Error> {
2289 let location = match location {
2290 mz_catalog::durable::ReplicaLocation::Unmanaged {
2291 storagectl_addrs,
2292 computectl_addrs,
2293 } => {
2294 if allowed_availability_zones.is_some() {
2295 return Err(Error {
2296 kind: ErrorKind::Internal(
2297 "tried concretize unmanaged replica with specific availability_zones"
2298 .to_string(),
2299 ),
2300 });
2301 }
2302 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2303 storagectl_addrs,
2304 computectl_addrs,
2305 })
2306 }
2307 mz_catalog::durable::ReplicaLocation::Managed {
2308 size,
2309 availability_zone,
2310 billed_as,
2311 internal,
2312 pending,
2313 } => {
2314 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2315 let message = "tried concretize managed replica with specific availability zones and availability zone";
2316 return Err(Error {
2317 kind: ErrorKind::Internal(message.to_string()),
2318 });
2319 }
2320 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2321 let cluster_replica_sizes = &self.cluster_replica_sizes;
2322
2323 ReplicaLocation::Managed(ManagedReplicaLocation {
2324 allocation: cluster_replica_sizes
2325 .0
2326 .get(&size)
2327 .expect("catalog out of sync")
2328 .clone(),
2329 availability_zones: match (availability_zone, allowed_availability_zones) {
2330 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2331 (None, Some(azs)) if azs.is_empty() => {
2332 ManagedReplicaAvailabilityZones::FromCluster(None)
2333 }
2334 (None, Some(azs)) => {
2335 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2336 }
2337 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2338 },
2339 size,
2340 billed_as,
2341 internal,
2342 pending,
2343 })
2344 }
2345 };
2346 Ok(location)
2347 }
2348
2349 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2360 let alloc = &self.cluster_replica_sizes.0[size];
2361 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2362 }
2363
2364 pub(crate) fn ensure_valid_replica_size(
2365 &self,
2366 allowed_sizes: &[String],
2367 size: &String,
2368 ) -> Result<(), Error> {
2369 let cluster_replica_sizes = &self.cluster_replica_sizes;
2370
2371 if !cluster_replica_sizes.0.contains_key(size)
2372 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2373 || cluster_replica_sizes.0[size].disabled
2374 {
2375 let mut entries = cluster_replica_sizes
2376 .enabled_allocations()
2377 .collect::<Vec<_>>();
2378
2379 if !allowed_sizes.is_empty() {
2380 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2381 entries.retain(|(name, _)| allowed_sizes.contains(name));
2382 }
2383
2384 entries.sort_by_key(
2385 |(
2386 _name,
2387 ReplicaAllocation {
2388 scale, cpu_limit, ..
2389 },
2390 )| (scale, cpu_limit),
2391 );
2392
2393 Err(Error {
2394 kind: ErrorKind::InvalidClusterReplicaSize {
2395 size: size.to_owned(),
2396 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2397 },
2398 })
2399 } else {
2400 Ok(())
2401 }
2402 }
2403
2404 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2405 if role_id.is_builtin() {
2406 let role = self.get_role(role_id);
2407 Err(Error::new(ErrorKind::ReservedRoleName(
2408 role.name().to_string(),
2409 )))
2410 } else {
2411 Ok(())
2412 }
2413 }
2414
2415 pub fn ensure_not_reserved_network_policy(
2416 &self,
2417 network_policy_id: &NetworkPolicyId,
2418 ) -> Result<(), Error> {
2419 if network_policy_id.is_builtin() {
2420 let policy = self.get_network_policy(network_policy_id);
2421 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2422 policy.name.clone(),
2423 )))
2424 } else {
2425 Ok(())
2426 }
2427 }
2428
2429 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2430 let is_grantable = !role_id.is_public() && !role_id.is_system();
2431 if is_grantable {
2432 Ok(())
2433 } else {
2434 let role = self.get_role(role_id);
2435 Err(Error::new(ErrorKind::UngrantableRoleName(
2436 role.name().to_string(),
2437 )))
2438 }
2439 }
2440
2441 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2442 if role_id.is_system() {
2443 let role = self.get_role(role_id);
2444 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2445 role.name().to_string(),
2446 )))
2447 } else {
2448 Ok(())
2449 }
2450 }
2451
2452 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2453 if role_id.is_predefined() {
2454 let role = self.get_role(role_id);
2455 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2456 role.name().to_string(),
2457 )))
2458 } else {
2459 Ok(())
2460 }
2461 }
2462
2463 pub(crate) fn add_to_audit_log(
2466 system_configuration: &SystemVars,
2467 oracle_write_ts: mz_repr::Timestamp,
2468 session: Option<&ConnMeta>,
2469 tx: &mut mz_catalog::durable::Transaction,
2470 audit_events: &mut Vec<VersionedEvent>,
2471 event_type: EventType,
2472 object_type: ObjectType,
2473 details: EventDetails,
2474 ) -> Result<(), Error> {
2475 let user = session.map(|session| session.user().name.to_string());
2476
2477 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2480 Some(ts) => ts.into(),
2481 _ => oracle_write_ts.into(),
2482 };
2483 let id = tx.allocate_audit_log_id()?;
2484 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2485 audit_events.push(event.clone());
2486 tx.insert_audit_log_event(event);
2487 Ok(())
2488 }
2489
2490 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2491 match id {
2492 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2493 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2494 self.get_cluster_replica(*cluster_id, *replica_id)
2495 .owner_id(),
2496 ),
2497 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2498 ObjectId::Schema((database_spec, schema_spec)) => Some(
2499 self.get_schema(database_spec, schema_spec, conn_id)
2500 .owner_id(),
2501 ),
2502 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2503 ObjectId::Role(_) => None,
2504 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2505 }
2506 }
2507
2508 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2509 match object_id {
2510 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2511 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2512 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2513 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2514 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2515 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2516 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2517 }
2518 }
2519
2520 pub(super) fn get_system_object_type(
2521 &self,
2522 id: &SystemObjectId,
2523 ) -> mz_sql::catalog::SystemObjectType {
2524 match id {
2525 SystemObjectId::Object(object_id) => {
2526 SystemObjectType::Object(self.get_object_type(object_id))
2527 }
2528 SystemObjectId::System => SystemObjectType::System,
2529 }
2530 }
2531
2532 pub fn storage_metadata(&self) -> &StorageMetadata {
2536 &self.storage_metadata
2537 }
2538
2539 pub fn source_compaction_windows(
2541 &self,
2542 ids: impl IntoIterator<Item = CatalogItemId>,
2543 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2544 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2545 let mut seen = BTreeSet::new();
2546 for item_id in ids {
2547 if !seen.insert(item_id) {
2548 continue;
2549 }
2550 let entry = self.get_entry(&item_id);
2551 match entry.item() {
2552 CatalogItem::Source(source) => {
2553 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2554 match source.data_source {
2555 DataSourceDesc::Ingestion { .. }
2556 | DataSourceDesc::OldSyntaxIngestion { .. }
2557 | DataSourceDesc::IngestionExport { .. } => {
2558 cws.entry(source_cw).or_default().insert(item_id);
2559 }
2560 DataSourceDesc::Introspection(_)
2561 | DataSourceDesc::Progress
2562 | DataSourceDesc::Webhook { .. } => {
2563 cws.entry(source_cw).or_default().insert(item_id);
2564 }
2565 }
2566 }
2567 CatalogItem::Table(table) => {
2568 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2569 match &table.data_source {
2570 TableDataSource::DataSource {
2571 desc: DataSourceDesc::IngestionExport { .. },
2572 timeline: _,
2573 } => {
2574 cws.entry(table_cw).or_default().insert(item_id);
2575 }
2576 _ => {}
2577 }
2578 }
2579 _ => {
2580 continue;
2582 }
2583 }
2584 }
2585 cws
2586 }
2587
2588 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2589 match id {
2590 CommentObjectId::Table(id)
2591 | CommentObjectId::View(id)
2592 | CommentObjectId::MaterializedView(id)
2593 | CommentObjectId::Source(id)
2594 | CommentObjectId::Sink(id)
2595 | CommentObjectId::Index(id)
2596 | CommentObjectId::Func(id)
2597 | CommentObjectId::Connection(id)
2598 | CommentObjectId::Type(id)
2599 | CommentObjectId::Secret(id)
2600 | CommentObjectId::ContinualTask(id) => Some(*id),
2601 CommentObjectId::Role(_)
2602 | CommentObjectId::Database(_)
2603 | CommentObjectId::Schema(_)
2604 | CommentObjectId::Cluster(_)
2605 | CommentObjectId::ClusterReplica(_)
2606 | CommentObjectId::NetworkPolicy(_) => None,
2607 }
2608 }
2609
2610 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2611 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2612 }
2613
2614 pub fn comment_id_to_audit_log_name(
2615 &self,
2616 id: CommentObjectId,
2617 conn_id: &ConnectionId,
2618 ) -> String {
2619 match id {
2620 CommentObjectId::Table(id)
2621 | CommentObjectId::View(id)
2622 | CommentObjectId::MaterializedView(id)
2623 | CommentObjectId::Source(id)
2624 | CommentObjectId::Sink(id)
2625 | CommentObjectId::Index(id)
2626 | CommentObjectId::Func(id)
2627 | CommentObjectId::Connection(id)
2628 | CommentObjectId::Type(id)
2629 | CommentObjectId::Secret(id)
2630 | CommentObjectId::ContinualTask(id) => {
2631 let item = self.get_entry(&id);
2632 let name = self.resolve_full_name(item.name(), Some(conn_id));
2633 name.to_string()
2634 }
2635 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2636 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2637 CommentObjectId::Schema((spec, schema_id)) => {
2638 let schema = self.get_schema(&spec, &schema_id, conn_id);
2639 self.resolve_full_schema_name(&schema.name).to_string()
2640 }
2641 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2642 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2643 let cluster = self.get_cluster(cluster_id);
2644 let replica = self.get_cluster_replica(cluster_id, replica_id);
2645 QualifiedReplica {
2646 cluster: Ident::new_unchecked(cluster.name.clone()),
2647 replica: Ident::new_unchecked(replica.name.clone()),
2648 }
2649 .to_string()
2650 }
2651 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2652 }
2653 }
2654
2655 pub fn mock_authentication_nonce(&self) -> String {
2656 self.mock_authentication_nonce.clone().unwrap_or_default()
2657 }
2658}
2659
2660impl ConnectionResolver for CatalogState {
2661 fn resolve_connection(
2662 &self,
2663 id: CatalogItemId,
2664 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2665 use mz_storage_types::connections::Connection::*;
2666 match self
2667 .get_entry(&id)
2668 .connection()
2669 .expect("catalog out of sync")
2670 .details
2671 .to_connection()
2672 {
2673 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2674 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2675 Csr(conn) => Csr(conn.into_inline_connection(self)),
2676 Ssh(conn) => Ssh(conn),
2677 Aws(conn) => Aws(conn),
2678 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2679 MySql(conn) => MySql(conn.into_inline_connection(self)),
2680 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2681 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2682 }
2683 }
2684}
2685
2686impl OptimizerCatalog for CatalogState {
2687 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2688 CatalogState::get_entry_by_global_id(self, id)
2689 }
2690 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2691 CatalogState::get_entry(self, id)
2692 }
2693 fn resolve_full_name(
2694 &self,
2695 name: &QualifiedItemName,
2696 conn_id: Option<&ConnectionId>,
2697 ) -> FullItemName {
2698 CatalogState::resolve_full_name(self, name, conn_id)
2699 }
2700 fn get_indexes_on(
2701 &self,
2702 id: GlobalId,
2703 cluster: ClusterId,
2704 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2705 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2706 }
2707}
2708
2709impl OptimizerCatalog for Catalog {
2710 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2711 self.state.get_entry_by_global_id(id)
2712 }
2713
2714 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2715 self.state.get_entry(id)
2716 }
2717
2718 fn resolve_full_name(
2719 &self,
2720 name: &QualifiedItemName,
2721 conn_id: Option<&ConnectionId>,
2722 ) -> FullItemName {
2723 self.state.resolve_full_name(name, conn_id)
2724 }
2725
2726 fn get_indexes_on(
2727 &self,
2728 id: GlobalId,
2729 cluster: ClusterId,
2730 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2731 Box::new(self.state.get_indexes_on(id, cluster))
2732 }
2733}
2734
2735impl Catalog {
2736 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2737 self
2738 }
2739}