1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_ore::collections::CollectionExt;
45use mz_ore::now::NOW_ZERO;
46use mz_ore::soft_assert_no_log;
47use mz_ore::str::StrExt;
48use mz_pgrepr::oid::INVALID_OID;
49use mz_repr::adt::mz_acl_item::PrivilegeMap;
50use mz_repr::namespaces::{
51 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
52 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
53 UNSTABLE_SCHEMAS,
54};
55use mz_repr::network_policy_id::NetworkPolicyId;
56use mz_repr::optimize::OptimizerFeatures;
57use mz_repr::role_id::RoleId;
58use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector};
59use mz_secrets::InMemorySecretsController;
60use mz_sql::ast::Ident;
61use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
62use mz_sql::catalog::{
63 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
64 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
65 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
66 TypeReference,
67};
68use mz_sql::names::{
69 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
70 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
71 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
72};
73use mz_sql::plan::{
74 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
75 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
76 Plan, PlanContext,
77};
78use mz_sql::rbac;
79use mz_sql::session::metadata::SessionMetadata;
80use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
81use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
82use mz_sql_parser::ast::QualifiedReplica;
83use mz_storage_client::controller::StorageMetadata;
84use mz_storage_types::connections::ConnectionContext;
85use mz_storage_types::connections::inline::{
86 ConnectionResolver, InlinedConnection, IntoInlineConnection,
87};
88use serde::Serialize;
89use timely::progress::Antichain;
90use tokio::sync::mpsc;
91use tracing::{debug, warn};
92
93use crate::AdapterError;
95use crate::catalog::{Catalog, ConnCatalog};
96use crate::coord::ConnMeta;
97use crate::optimize::{self, Optimize, OptimizerCatalog};
98use crate::session::Session;
99
100#[derive(Debug, Clone, Serialize)]
107pub struct CatalogState {
108 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
114 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
115 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
116 #[serde(serialize_with = "skip_temp_items")]
117 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
118 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
119 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
120 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
121 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
122 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
123 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
124 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
126 pub(super) roles_by_name: BTreeMap<String, RoleId>,
127 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
129 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
130 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
134
135 #[serde(skip)]
136 pub(super) system_configuration: SystemVars,
137 pub(super) default_privileges: DefaultPrivileges,
138 pub(super) system_privileges: PrivilegeMap,
139 pub(super) comments: CommentsMap,
140 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
141 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
142 pub(super) storage_metadata: StorageMetadata,
143
144 #[serde(skip)]
146 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
147
148 #[serde(skip)]
150 pub(super) config: mz_sql::catalog::CatalogConfig,
151 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
152 #[serde(skip)]
153 pub(crate) availability_zones: Vec<String>,
154
155 #[serde(skip)]
157 pub(super) egress_addresses: Vec<IpNet>,
158 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
159 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
160 pub(super) http_host_name: Option<String>,
161}
162
163#[derive(Debug, Clone, Serialize)]
165pub(crate) enum LocalExpressionCache {
166 Open {
168 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
170 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
172 },
173 Closed,
175}
176
177impl LocalExpressionCache {
178 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
179 Self::Open {
180 cached_exprs,
181 uncached_exprs: BTreeMap::new(),
182 }
183 }
184
185 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
186 match self {
187 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
188 LocalExpressionCache::Closed => None,
189 }
190 }
191
192 pub(super) fn insert_cached_expression(
195 &mut self,
196 id: GlobalId,
197 local_expressions: LocalExpressions,
198 ) {
199 match self {
200 LocalExpressionCache::Open { cached_exprs, .. } => {
201 cached_exprs.insert(id, local_expressions);
202 }
203 LocalExpressionCache::Closed => {}
204 }
205 }
206
207 pub(super) fn insert_uncached_expression(
210 &mut self,
211 id: GlobalId,
212 local_mir: OptimizedMirRelationExpr,
213 optimizer_features: OptimizerFeatures,
214 ) {
215 match self {
216 LocalExpressionCache::Open { uncached_exprs, .. } => {
217 let local_expr = LocalExpressions {
218 local_mir,
219 optimizer_features,
220 };
221 let prev = uncached_exprs.remove(&id);
228 match prev {
229 Some(prev) if prev == local_expr => {
230 uncached_exprs.insert(id, local_expr);
231 }
232 None => {
233 uncached_exprs.insert(id, local_expr);
234 }
235 Some(_) => {}
236 }
237 }
238 LocalExpressionCache::Closed => {}
239 }
240 }
241
242 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
243 match self {
244 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
245 LocalExpressionCache::Closed => BTreeMap::new(),
246 }
247 }
248}
249
250fn skip_temp_items<S>(
251 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
252 serializer: S,
253) -> Result<S::Ok, S::Error>
254where
255 S: serde::Serializer,
256{
257 mz_ore::serde::map_key_to_string(
258 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
259 serializer,
260 )
261}
262
263impl CatalogState {
264 pub fn empty_test() -> Self {
268 CatalogState {
269 database_by_name: Default::default(),
270 database_by_id: Default::default(),
271 entry_by_id: Default::default(),
272 entry_by_global_id: Default::default(),
273 ambient_schemas_by_name: Default::default(),
274 ambient_schemas_by_id: Default::default(),
275 temporary_schemas: Default::default(),
276 clusters_by_id: Default::default(),
277 clusters_by_name: Default::default(),
278 network_policies_by_name: Default::default(),
279 roles_by_name: Default::default(),
280 roles_by_id: Default::default(),
281 network_policies_by_id: Default::default(),
282 role_auth_by_id: Default::default(),
283 config: CatalogConfig {
284 start_time: Default::default(),
285 start_instant: Instant::now(),
286 nonce: Default::default(),
287 environment_id: EnvironmentId::for_tests(),
288 session_id: Default::default(),
289 build_info: &DUMMY_BUILD_INFO,
290 timestamp_interval: Default::default(),
291 now: NOW_ZERO.clone(),
292 connection_context: ConnectionContext::for_tests(Arc::new(
293 InMemorySecretsController::new(),
294 )),
295 builtins_cfg: BuiltinsConfig {
296 include_continual_tasks: true,
297 },
298 helm_chart_version: None,
299 },
300 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
301 availability_zones: Default::default(),
302 system_configuration: Default::default(),
303 egress_addresses: Default::default(),
304 aws_principal_context: Default::default(),
305 aws_privatelink_availability_zones: Default::default(),
306 http_host_name: Default::default(),
307 default_privileges: Default::default(),
308 system_privileges: Default::default(),
309 comments: Default::default(),
310 source_references: Default::default(),
311 storage_metadata: Default::default(),
312 }
313 }
314
315 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
316 let search_path = self.resolve_search_path(session);
317 let database = self
318 .database_by_name
319 .get(session.vars().database())
320 .map(|id| id.clone());
321 let state = match session.transaction().catalog_state() {
322 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
323 None => Cow::Borrowed(self),
324 };
325 ConnCatalog {
326 state,
327 unresolvable_ids: BTreeSet::new(),
328 conn_id: session.conn_id().clone(),
329 cluster: session.vars().cluster().into(),
330 database,
331 search_path,
332 role_id: session.current_role_id().clone(),
333 prepared_statements: Some(session.prepared_statements()),
334 notices_tx: session.retain_notice_transmitter(),
335 }
336 }
337
338 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog {
339 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
340 let cluster = self.system_configuration.default_cluster();
341
342 ConnCatalog {
343 state: Cow::Borrowed(self),
344 unresolvable_ids: BTreeSet::new(),
345 conn_id: SYSTEM_CONN_ID.clone(),
346 cluster,
347 database: self
348 .resolve_database(DEFAULT_DATABASE_NAME)
349 .ok()
350 .map(|db| db.id()),
351 search_path: Vec::new(),
354 role_id,
355 prepared_statements: None,
356 notices_tx,
357 }
358 }
359
360 pub fn for_system_session(&self) -> ConnCatalog {
361 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
362 }
363
364 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
369 struct I<'a> {
370 queue: VecDeque<CatalogItemId>,
371 seen: BTreeSet<CatalogItemId>,
372 this: &'a CatalogState,
373 }
374 impl<'a> Iterator for I<'a> {
375 type Item = CatalogItemId;
376 fn next(&mut self) -> Option<Self::Item> {
377 if let Some(next) = self.queue.pop_front() {
378 for child in self.this.get_entry(&next).item().uses() {
379 if !self.seen.contains(&child) {
380 self.queue.push_back(child);
381 self.seen.insert(child);
382 }
383 }
384 Some(next)
385 } else {
386 None
387 }
388 }
389 }
390
391 I {
392 queue: [id].into_iter().collect(),
393 seen: [id].into_iter().collect(),
394 this: self,
395 }
396 }
397
398 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
401 let mut out = Vec::new();
402 self.introspection_dependencies_inner(id, &mut out);
403 out
404 }
405
406 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
407 match self.get_entry(&id).item() {
408 CatalogItem::Log(_) => out.push(id),
409 item @ (CatalogItem::View(_)
410 | CatalogItem::MaterializedView(_)
411 | CatalogItem::Connection(_)
412 | CatalogItem::ContinualTask(_)) => {
413 for item_id in item.references().items() {
415 self.introspection_dependencies_inner(*item_id, out);
416 }
417 }
418 CatalogItem::Sink(sink) => {
419 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
420 self.introspection_dependencies_inner(from_item_id, out)
421 }
422 CatalogItem::Index(idx) => {
423 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
424 self.introspection_dependencies_inner(on_item_id, out)
425 }
426 CatalogItem::Table(_)
427 | CatalogItem::Source(_)
428 | CatalogItem::Type(_)
429 | CatalogItem::Func(_)
430 | CatalogItem::Secret(_) => (),
431 }
432 }
433
434 pub(super) fn object_dependents(
440 &self,
441 object_ids: &Vec<ObjectId>,
442 conn_id: &ConnectionId,
443 seen: &mut BTreeSet<ObjectId>,
444 ) -> Vec<ObjectId> {
445 let mut dependents = Vec::new();
446 for object_id in object_ids {
447 match object_id {
448 ObjectId::Cluster(id) => {
449 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
450 }
451 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
452 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
453 ),
454 ObjectId::Database(id) => {
455 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
456 }
457 ObjectId::Schema((database_spec, schema_spec)) => {
458 dependents.extend_from_slice(&self.schema_dependents(
459 database_spec.clone(),
460 schema_spec.clone(),
461 conn_id,
462 seen,
463 ));
464 }
465 ObjectId::NetworkPolicy(id) => {
466 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
467 }
468 id @ ObjectId::Role(_) => {
469 let unseen = seen.insert(id.clone());
470 if unseen {
471 dependents.push(id.clone());
472 }
473 }
474 ObjectId::Item(id) => {
475 dependents.extend_from_slice(&self.item_dependents(*id, seen))
476 }
477 }
478 }
479 dependents
480 }
481
482 fn cluster_dependents(
489 &self,
490 cluster_id: ClusterId,
491 seen: &mut BTreeSet<ObjectId>,
492 ) -> Vec<ObjectId> {
493 let mut dependents = Vec::new();
494 let object_id = ObjectId::Cluster(cluster_id);
495 if !seen.contains(&object_id) {
496 seen.insert(object_id.clone());
497 let cluster = self.get_cluster(cluster_id);
498 for item_id in cluster.bound_objects() {
499 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
500 }
501 for replica_id in cluster.replica_ids().values() {
502 dependents.extend_from_slice(&self.cluster_replica_dependents(
503 cluster_id,
504 *replica_id,
505 seen,
506 ));
507 }
508 dependents.push(object_id);
509 }
510 dependents
511 }
512
513 pub(super) fn cluster_replica_dependents(
520 &self,
521 cluster_id: ClusterId,
522 replica_id: ReplicaId,
523 seen: &mut BTreeSet<ObjectId>,
524 ) -> Vec<ObjectId> {
525 let mut dependents = Vec::new();
526 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
527 if !seen.contains(&object_id) {
528 seen.insert(object_id.clone());
529 dependents.push(object_id);
530 }
531 dependents
532 }
533
534 fn database_dependents(
541 &self,
542 database_id: DatabaseId,
543 conn_id: &ConnectionId,
544 seen: &mut BTreeSet<ObjectId>,
545 ) -> Vec<ObjectId> {
546 let mut dependents = Vec::new();
547 let object_id = ObjectId::Database(database_id);
548 if !seen.contains(&object_id) {
549 seen.insert(object_id.clone());
550 let database = self.get_database(&database_id);
551 for schema_id in database.schema_ids().values() {
552 dependents.extend_from_slice(&self.schema_dependents(
553 ResolvedDatabaseSpecifier::Id(database_id),
554 SchemaSpecifier::Id(*schema_id),
555 conn_id,
556 seen,
557 ));
558 }
559 dependents.push(object_id);
560 }
561 dependents
562 }
563
564 fn schema_dependents(
571 &self,
572 database_spec: ResolvedDatabaseSpecifier,
573 schema_spec: SchemaSpecifier,
574 conn_id: &ConnectionId,
575 seen: &mut BTreeSet<ObjectId>,
576 ) -> Vec<ObjectId> {
577 let mut dependents = Vec::new();
578 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
579 if !seen.contains(&object_id) {
580 seen.insert(object_id.clone());
581 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
582 for item_id in schema.item_ids() {
583 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
584 }
585 dependents.push(object_id)
586 }
587 dependents
588 }
589
590 pub(super) fn item_dependents(
597 &self,
598 item_id: CatalogItemId,
599 seen: &mut BTreeSet<ObjectId>,
600 ) -> Vec<ObjectId> {
601 let mut dependents = Vec::new();
602 let object_id = ObjectId::Item(item_id);
603 if !seen.contains(&object_id) {
604 seen.insert(object_id.clone());
605 let entry = self.get_entry(&item_id);
606 for dependent_id in entry.used_by() {
607 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
608 }
609 dependents.push(object_id);
610 if let Some(progress_id) = entry.progress_id() {
614 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
615 }
616 }
617 dependents
618 }
619
620 pub(super) fn network_policy_dependents(
627 &self,
628 network_policy_id: NetworkPolicyId,
629 _seen: &mut BTreeSet<ObjectId>,
630 ) -> Vec<ObjectId> {
631 let object_id = ObjectId::NetworkPolicy(network_policy_id);
632 vec![object_id]
636 }
637
638 fn is_stable(&self, id: CatalogItemId) -> bool {
642 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
643 !self.is_unstable_schema_specifier(spec)
644 }
645
646 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
647 if self.system_config().unsafe_enable_unstable_dependencies() {
648 return Ok(());
649 }
650
651 let unstable_dependencies: Vec<_> = item
652 .references()
653 .items()
654 .filter(|id| !self.is_stable(**id))
655 .map(|id| self.get_entry(id).name().item.clone())
656 .collect();
657
658 if unstable_dependencies.is_empty() || item.is_temporary() {
662 Ok(())
663 } else {
664 let object_type = item.typ().to_string();
665 Err(Error {
666 kind: ErrorKind::UnstableDependency {
667 object_type,
668 unstable_dependencies,
669 },
670 })
671 }
672 }
673
674 pub fn resolve_full_name(
675 &self,
676 name: &QualifiedItemName,
677 conn_id: Option<&ConnectionId>,
678 ) -> FullItemName {
679 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
680
681 let database = match &name.qualifiers.database_spec {
682 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
683 ResolvedDatabaseSpecifier::Id(id) => {
684 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
685 }
686 };
687 let schema = self
688 .get_schema(
689 &name.qualifiers.database_spec,
690 &name.qualifiers.schema_spec,
691 conn_id,
692 )
693 .name()
694 .schema
695 .clone();
696 FullItemName {
697 database,
698 schema,
699 item: name.item.clone(),
700 }
701 }
702
703 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
704 let database = match &name.database {
705 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
706 ResolvedDatabaseSpecifier::Id(id) => {
707 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
708 }
709 };
710 FullSchemaName {
711 database,
712 schema: name.schema.clone(),
713 }
714 }
715
716 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
717 self.entry_by_id
718 .get(id)
719 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
720 }
721
722 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
723 let item_id = self
724 .entry_by_global_id
725 .get(id)
726 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
727
728 let entry = self.get_entry(item_id).clone();
729 let version = match entry.item() {
730 CatalogItem::Table(table) => {
731 let (version, _) = table
732 .collections
733 .iter()
734 .find(|(_verison, gid)| *gid == id)
735 .expect("version to exist");
736 RelationVersionSelector::Specific(*version)
737 }
738 _ => RelationVersionSelector::Latest,
739 };
740 CatalogCollectionEntry { entry, version }
741 }
742
743 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
744 self.entry_by_id.iter()
745 }
746
747 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
748 let schema = self
749 .temporary_schemas
750 .get(conn)
751 .unwrap_or_else(|| panic!("catalog out of sync, missing temporary schema for {conn}"));
752 schema.items.values().copied().map(ObjectId::from)
753 }
754
755 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
761 let mut res = None;
762 for schema_id in self.system_schema_ids() {
763 let schema = &self.ambient_schemas_by_id[&schema_id];
764 if let Some(global_id) = schema.types.get(name) {
765 match res {
766 None => res = Some(self.get_entry(global_id)),
767 Some(_) => panic!(
768 "only call get_system_type on objects uniquely identifiable in one system schema"
769 ),
770 }
771 }
772 }
773
774 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
775 }
776
777 pub fn get_item_by_name(
778 &self,
779 name: &QualifiedItemName,
780 conn_id: &ConnectionId,
781 ) -> Option<&CatalogEntry> {
782 self.get_schema(
783 &name.qualifiers.database_spec,
784 &name.qualifiers.schema_spec,
785 conn_id,
786 )
787 .items
788 .get(&name.item)
789 .and_then(|id| self.try_get_entry(id))
790 }
791
792 pub fn get_type_by_name(
793 &self,
794 name: &QualifiedItemName,
795 conn_id: &ConnectionId,
796 ) -> Option<&CatalogEntry> {
797 self.get_schema(
798 &name.qualifiers.database_spec,
799 &name.qualifiers.schema_spec,
800 conn_id,
801 )
802 .types
803 .get(&name.item)
804 .and_then(|id| self.try_get_entry(id))
805 }
806
807 pub(super) fn find_available_name(
808 &self,
809 mut name: QualifiedItemName,
810 conn_id: &ConnectionId,
811 ) -> QualifiedItemName {
812 let mut i = 0;
813 let orig_item_name = name.item.clone();
814 while self.get_item_by_name(&name, conn_id).is_some() {
815 i += 1;
816 name.item = format!("{}{}", orig_item_name, i);
817 }
818 name
819 }
820
821 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
822 self.entry_by_id.get(id)
823 }
824
825 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
826 let item_id = self.entry_by_global_id.get(id)?;
827 self.try_get_entry(item_id)
828 }
829
830 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<RelationDesc>> {
833 let entry = self.try_get_entry_by_global_id(id)?;
834 let desc = match entry.item() {
835 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
836 other => other.desc_opt(RelationVersionSelector::Latest)?,
838 };
839 Some(desc)
840 }
841
842 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
843 self.try_get_cluster(cluster_id)
844 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
845 }
846
847 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
848 self.clusters_by_id.get(&cluster_id)
849 }
850
851 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
852 self.roles_by_id.get(id)
853 }
854
855 pub fn get_role(&self, id: &RoleId) -> &Role {
856 self.roles_by_id.get(id).expect("catalog out of sync")
857 }
858
859 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
860 self.roles_by_id.keys()
861 }
862
863 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
864 self.roles_by_name
865 .get(role_name)
866 .map(|id| &self.roles_by_id[id])
867 }
868
869 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
870 self.role_auth_by_id.get(id)
871 }
872
873 pub(super) fn try_get_network_policy_by_name(
874 &self,
875 policy_name: &str,
876 ) -> Option<&NetworkPolicy> {
877 self.network_policies_by_name
878 .get(policy_name)
879 .map(|id| &self.network_policies_by_id[id])
880 }
881
882 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
883 let mut membership = BTreeSet::new();
884 let mut queue = VecDeque::from(vec![id]);
885 while let Some(cur_id) = queue.pop_front() {
886 if !membership.contains(cur_id) {
887 membership.insert(cur_id.clone());
888 let role = self.get_role(cur_id);
889 soft_assert_no_log!(
890 !role.membership().keys().contains(id),
891 "circular membership exists in the catalog"
892 );
893 queue.extend(role.membership().keys());
894 }
895 }
896 membership.insert(RoleId::Public);
897 membership
898 }
899
900 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
901 self.network_policies_by_id
902 .get(id)
903 .expect("catalog out of sync")
904 }
905
906 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
907 self.network_policies_by_id.keys()
908 }
909
910 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
915 let entry = self.try_get_entry(id)?;
916 let name = self.resolve_full_name(entry.name(), None);
918 let host_name = self
919 .http_host_name
920 .as_ref()
921 .map(|x| x.as_str())
922 .unwrap_or_else(|| "HOST");
923
924 let RawDatabaseSpecifier::Name(database) = name.database else {
925 return None;
926 };
927
928 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
929 url.path_segments_mut()
930 .ok()?
931 .push(&database)
932 .push(&name.schema)
933 .push(&name.item);
934
935 Some(url)
936 }
937
938 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
946 &mut self,
949 create_sql: &str,
950 force_if_exists_skip: bool,
951 ) -> Result<(Plan, ResolvedIds), AdapterError> {
952 self.with_enable_for_item_parsing(|state| {
953 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
954 let pcx = Some(&pcx);
955 let session_catalog = state.for_system_session();
956
957 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
958 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
959 let plan =
960 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
961
962 Ok((plan, resolved_ids))
963 })
964 }
965
966 #[mz_ore::instrument]
968 pub(crate) fn parse_plan(
969 create_sql: &str,
970 pcx: Option<&PlanContext>,
971 catalog: &ConnCatalog,
972 ) -> Result<(Plan, ResolvedIds), AdapterError> {
973 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
974 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
975 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
976
977 return Ok((plan, resolved_ids));
978 }
979
980 pub(crate) fn deserialize_item(
982 &self,
983 global_id: GlobalId,
984 create_sql: &str,
985 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
986 local_expression_cache: &mut LocalExpressionCache,
987 previous_item: Option<CatalogItem>,
988 ) -> Result<CatalogItem, AdapterError> {
989 self.parse_item(
990 global_id,
991 create_sql,
992 extra_versions,
993 None,
994 false,
995 None,
996 local_expression_cache,
997 previous_item,
998 )
999 }
1000
1001 #[mz_ore::instrument]
1003 pub(crate) fn parse_item(
1004 &self,
1005 global_id: GlobalId,
1006 create_sql: &str,
1007 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1008 pcx: Option<&PlanContext>,
1009 is_retained_metrics_object: bool,
1010 custom_logical_compaction_window: Option<CompactionWindow>,
1011 local_expression_cache: &mut LocalExpressionCache,
1012 previous_item: Option<CatalogItem>,
1013 ) -> Result<CatalogItem, AdapterError> {
1014 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1015 match self.parse_item_inner(
1016 global_id,
1017 create_sql,
1018 extra_versions,
1019 pcx,
1020 is_retained_metrics_object,
1021 custom_logical_compaction_window,
1022 cached_expr,
1023 previous_item,
1024 ) {
1025 Ok((item, uncached_expr)) => {
1026 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1027 local_expression_cache.insert_uncached_expression(
1028 global_id,
1029 uncached_expr,
1030 optimizer_features,
1031 );
1032 }
1033 Ok(item)
1034 }
1035 Err((err, cached_expr)) => {
1036 if let Some(local_expr) = cached_expr {
1037 local_expression_cache.insert_cached_expression(global_id, local_expr);
1038 }
1039 Err(err)
1040 }
1041 }
1042 }
1043
1044 #[mz_ore::instrument]
1051 pub(crate) fn parse_item_inner(
1052 &self,
1053 global_id: GlobalId,
1054 create_sql: &str,
1055 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1056 pcx: Option<&PlanContext>,
1057 is_retained_metrics_object: bool,
1058 custom_logical_compaction_window: Option<CompactionWindow>,
1059 cached_expr: Option<LocalExpressions>,
1060 previous_item: Option<CatalogItem>,
1061 ) -> Result<
1062 (
1063 CatalogItem,
1064 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1065 ),
1066 (AdapterError, Option<LocalExpressions>),
1067 > {
1068 let session_catalog = self.for_system_session();
1069
1070 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1071 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1072 Err(err) => return Err((err, cached_expr)),
1073 };
1074
1075 let mut uncached_expr = None;
1076
1077 let item = match plan {
1078 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1079 let collections = extra_versions
1080 .iter()
1081 .map(|(version, gid)| (*version, *gid))
1082 .chain([(RelationVersion::root(), global_id)].into_iter())
1083 .collect();
1084
1085 CatalogItem::Table(Table {
1086 create_sql: Some(table.create_sql),
1087 desc: table.desc,
1088 collections,
1089 conn_id: None,
1090 resolved_ids,
1091 custom_logical_compaction_window: custom_logical_compaction_window
1092 .or(table.compaction_window),
1093 is_retained_metrics_object,
1094 data_source: match table.data_source {
1095 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1096 TableDataSource::TableWrites { defaults }
1097 }
1098 mz_sql::plan::TableDataSource::DataSource {
1099 desc: data_source_desc,
1100 timeline,
1101 } => match data_source_desc {
1102 mz_sql::plan::DataSourceDesc::IngestionExport {
1103 ingestion_id,
1104 external_reference,
1105 details,
1106 data_config,
1107 } => TableDataSource::DataSource {
1108 desc: DataSourceDesc::IngestionExport {
1109 ingestion_id,
1110 external_reference,
1111 details,
1112 data_config,
1113 },
1114 timeline,
1115 },
1116 mz_sql::plan::DataSourceDesc::Webhook {
1117 validate_using,
1118 body_format,
1119 headers,
1120 cluster_id,
1121 } => TableDataSource::DataSource {
1122 desc: DataSourceDesc::Webhook {
1123 validate_using,
1124 body_format,
1125 headers,
1126 cluster_id: cluster_id
1127 .expect("Webhook Tables must have a cluster_id set"),
1128 },
1129 timeline,
1130 },
1131 _ => {
1132 return Err((
1133 AdapterError::Unstructured(anyhow::anyhow!(
1134 "unsupported data source for table"
1135 )),
1136 cached_expr,
1137 ));
1138 }
1139 },
1140 },
1141 })
1142 }
1143 Plan::CreateSource(CreateSourcePlan {
1144 source,
1145 timeline,
1146 in_cluster,
1147 ..
1148 }) => CatalogItem::Source(Source {
1149 create_sql: Some(source.create_sql),
1150 data_source: match source.data_source {
1151 mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1152 DataSourceDesc::Ingestion {
1153 ingestion_desc,
1154 cluster_id: match in_cluster {
1155 Some(id) => id,
1156 None => {
1157 return Err((
1158 AdapterError::Unstructured(anyhow::anyhow!(
1159 "ingestion-based sources must have cluster specified"
1160 )),
1161 cached_expr,
1162 ));
1163 }
1164 },
1165 }
1166 }
1167 mz_sql::plan::DataSourceDesc::IngestionExport {
1168 ingestion_id,
1169 external_reference,
1170 details,
1171 data_config,
1172 } => DataSourceDesc::IngestionExport {
1173 ingestion_id,
1174 external_reference,
1175 details,
1176 data_config,
1177 },
1178 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1179 mz_sql::plan::DataSourceDesc::Webhook {
1180 validate_using,
1181 body_format,
1182 headers,
1183 cluster_id,
1184 } => {
1185 mz_ore::soft_assert_or_log!(
1186 cluster_id.is_none(),
1187 "cluster_id set at Source level for Webhooks"
1188 );
1189 DataSourceDesc::Webhook {
1190 validate_using,
1191 body_format,
1192 headers,
1193 cluster_id: in_cluster
1194 .expect("webhook sources must use an existing cluster"),
1195 }
1196 }
1197 },
1198 desc: source.desc,
1199 global_id,
1200 timeline,
1201 resolved_ids,
1202 custom_logical_compaction_window: source
1203 .compaction_window
1204 .or(custom_logical_compaction_window),
1205 is_retained_metrics_object,
1206 }),
1207 Plan::CreateView(CreateViewPlan { view, .. }) => {
1208 let optimizer_config =
1210 optimize::OptimizerConfig::from(session_catalog.system_vars());
1211 let previous_exprs = previous_item.map(|item| match item {
1212 CatalogItem::View(view) => (view.raw_expr, view.optimized_expr),
1213 item => unreachable!("expected view, found: {item:#?}"),
1214 });
1215
1216 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1217 (Some(local_expr), _)
1218 if local_expr.optimizer_features == optimizer_config.features =>
1219 {
1220 debug!("local expression cache hit for {global_id:?}");
1221 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1222 }
1223 (_, Some((raw_expr, optimized_expr))) if *raw_expr == view.expr => {
1225 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1226 }
1227 (cached_expr, _) => {
1228 let optimizer_features = optimizer_config.features.clone();
1229 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1231
1232 let raw_expr = view.expr;
1234 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1235 Ok(optimzed_expr) => optimzed_expr,
1236 Err(err) => return Err((err.into(), cached_expr)),
1237 };
1238
1239 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1240
1241 (Arc::new(raw_expr), Arc::new(optimized_expr))
1242 }
1243 };
1244
1245 let dependencies: BTreeSet<_> = raw_expr
1247 .depends_on()
1248 .into_iter()
1249 .map(|gid| self.get_entry_by_global_id(&gid).id())
1250 .collect();
1251
1252 CatalogItem::View(View {
1253 create_sql: view.create_sql,
1254 global_id,
1255 raw_expr,
1256 desc: RelationDesc::new(optimized_expr.typ(), view.column_names),
1257 optimized_expr,
1258 conn_id: None,
1259 resolved_ids,
1260 dependencies: DependencyIds(dependencies),
1261 })
1262 }
1263 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1264 materialized_view, ..
1265 }) => {
1266 let optimizer_config =
1268 optimize::OptimizerConfig::from(session_catalog.system_vars());
1269 let previous_exprs = previous_item.map(|item| match item {
1270 CatalogItem::MaterializedView(materialized_view) => {
1271 (materialized_view.raw_expr, materialized_view.optimized_expr)
1272 }
1273 item => unreachable!("expected materialized view, found: {item:#?}"),
1274 });
1275
1276 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1277 (Some(local_expr), _)
1278 if local_expr.optimizer_features == optimizer_config.features =>
1279 {
1280 debug!("local expression cache hit for {global_id:?}");
1281 (
1282 Arc::new(materialized_view.expr),
1283 Arc::new(local_expr.local_mir),
1284 )
1285 }
1286 (_, Some((raw_expr, optimized_expr)))
1288 if *raw_expr == materialized_view.expr =>
1289 {
1290 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1291 }
1292 (cached_expr, _) => {
1293 let optimizer_features = optimizer_config.features.clone();
1294 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1296
1297 let raw_expr = materialized_view.expr;
1298 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1299 Ok(optimized_expr) => optimized_expr,
1300 Err(err) => return Err((err.into(), cached_expr)),
1301 };
1302
1303 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1304
1305 (Arc::new(raw_expr), Arc::new(optimized_expr))
1306 }
1307 };
1308 let mut typ = optimized_expr.typ();
1309 for &i in &materialized_view.non_null_assertions {
1310 typ.column_types[i].nullable = false;
1311 }
1312 let desc = RelationDesc::new(typ, materialized_view.column_names);
1313
1314 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1315
1316 let dependencies = raw_expr
1318 .depends_on()
1319 .into_iter()
1320 .map(|gid| self.get_entry_by_global_id(&gid).id())
1321 .collect();
1322
1323 CatalogItem::MaterializedView(MaterializedView {
1324 create_sql: materialized_view.create_sql,
1325 global_id,
1326 raw_expr,
1327 optimized_expr,
1328 desc,
1329 resolved_ids,
1330 dependencies,
1331 cluster_id: materialized_view.cluster_id,
1332 non_null_assertions: materialized_view.non_null_assertions,
1333 custom_logical_compaction_window: materialized_view.compaction_window,
1334 refresh_schedule: materialized_view.refresh_schedule,
1335 initial_as_of,
1336 })
1337 }
1338 Plan::CreateContinualTask(plan) => {
1339 let ct =
1340 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1341 Ok(ct) => ct,
1342 Err(err) => return Err((err, cached_expr)),
1343 };
1344 CatalogItem::ContinualTask(ct)
1345 }
1346 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1347 create_sql: index.create_sql,
1348 global_id,
1349 on: index.on,
1350 keys: index.keys.into(),
1351 conn_id: None,
1352 resolved_ids,
1353 cluster_id: index.cluster_id,
1354 custom_logical_compaction_window: custom_logical_compaction_window
1355 .or(index.compaction_window),
1356 is_retained_metrics_object,
1357 }),
1358 Plan::CreateSink(CreateSinkPlan {
1359 sink,
1360 with_snapshot,
1361 in_cluster,
1362 ..
1363 }) => CatalogItem::Sink(Sink {
1364 create_sql: sink.create_sql,
1365 global_id,
1366 from: sink.from,
1367 connection: sink.connection,
1368 envelope: sink.envelope,
1369 version: sink.version,
1370 with_snapshot,
1371 resolved_ids,
1372 cluster_id: in_cluster,
1373 }),
1374 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1375 let desc = match typ.inner.desc(&session_catalog) {
1376 Ok(desc) => desc,
1377 Err(err) => return Err((err.into(), cached_expr)),
1378 };
1379 CatalogItem::Type(Type {
1380 create_sql: Some(typ.create_sql),
1381 global_id,
1382 desc,
1383 details: CatalogTypeDetails {
1384 array_id: None,
1385 typ: typ.inner,
1386 pg_metadata: None,
1387 },
1388 resolved_ids,
1389 })
1390 }
1391 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1392 create_sql: secret.create_sql,
1393 global_id,
1394 }),
1395 Plan::CreateConnection(CreateConnectionPlan {
1396 connection:
1397 mz_sql::plan::Connection {
1398 create_sql,
1399 details,
1400 },
1401 ..
1402 }) => CatalogItem::Connection(Connection {
1403 create_sql,
1404 global_id,
1405 details,
1406 resolved_ids,
1407 }),
1408 _ => {
1409 return Err((
1410 Error::new(ErrorKind::Corruption {
1411 detail: "catalog entry generated inappropriate plan".to_string(),
1412 })
1413 .into(),
1414 cached_expr,
1415 ));
1416 }
1417 };
1418
1419 Ok((item, uncached_expr))
1420 }
1421
1422 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1428 let restore = self.system_configuration.clone();
1439 self.system_configuration.enable_for_item_parsing();
1440 let res = f(self);
1441 self.system_configuration = restore;
1442 res
1443 }
1444
1445 pub fn get_indexes_on(
1447 &self,
1448 id: GlobalId,
1449 cluster: ClusterId,
1450 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1451 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1452
1453 self.try_get_entry_by_global_id(&id)
1454 .into_iter()
1455 .map(move |e| {
1456 e.used_by()
1457 .iter()
1458 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1459 CatalogItem::Index(index) if index_matches(index) => {
1460 Some((index.global_id(), index))
1461 }
1462 _ => None,
1463 })
1464 })
1465 .flatten()
1466 }
1467
1468 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1469 &self.database_by_id[database_id]
1470 }
1471
1472 pub(super) fn try_get_cluster_replica(
1477 &self,
1478 id: ClusterId,
1479 replica_id: ReplicaId,
1480 ) -> Option<&ClusterReplica> {
1481 self.try_get_cluster(id)
1482 .and_then(|cluster| cluster.replica(replica_id))
1483 }
1484
1485 pub(super) fn get_cluster_replica(
1489 &self,
1490 cluster_id: ClusterId,
1491 replica_id: ReplicaId,
1492 ) -> &ClusterReplica {
1493 self.try_get_cluster_replica(cluster_id, replica_id)
1494 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1495 }
1496
1497 pub(super) fn resolve_replica_in_cluster(
1498 &self,
1499 cluster_id: &ClusterId,
1500 replica_name: &str,
1501 ) -> Result<&ClusterReplica, SqlCatalogError> {
1502 let cluster = self.get_cluster(*cluster_id);
1503 let replica_id = cluster
1504 .replica_id_by_name_
1505 .get(replica_name)
1506 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1507 Ok(&cluster.replicas_by_id_[replica_id])
1508 }
1509
1510 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1512 Ok(self.system_configuration.get(name)?)
1513 }
1514
1515 pub(super) fn parse_system_configuration(
1519 &self,
1520 name: &str,
1521 value: VarInput,
1522 ) -> Result<String, Error> {
1523 let value = self.system_configuration.parse(name, value)?;
1524 Ok(value.format())
1525 }
1526
1527 pub(super) fn resolve_schema_in_database(
1529 &self,
1530 database_spec: &ResolvedDatabaseSpecifier,
1531 schema_name: &str,
1532 conn_id: &ConnectionId,
1533 ) -> Result<&Schema, SqlCatalogError> {
1534 let schema = match database_spec {
1535 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1536 self.temporary_schemas.get(conn_id)
1537 }
1538 ResolvedDatabaseSpecifier::Ambient => self
1539 .ambient_schemas_by_name
1540 .get(schema_name)
1541 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1542 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1543 db.schemas_by_name
1544 .get(schema_name)
1545 .and_then(|id| db.schemas_by_id.get(id))
1546 }),
1547 };
1548 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1549 }
1550
1551 pub fn get_schema(
1552 &self,
1553 database_spec: &ResolvedDatabaseSpecifier,
1554 schema_spec: &SchemaSpecifier,
1555 conn_id: &ConnectionId,
1556 ) -> &Schema {
1557 match (database_spec, schema_spec) {
1559 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1560 &self.temporary_schemas[conn_id]
1561 }
1562 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1563 &self.ambient_schemas_by_id[id]
1564 }
1565
1566 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => {
1567 &self.database_by_id[database_id].schemas_by_id[schema_id]
1568 }
1569 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1570 unreachable!("temporary schemas are in the ambient database")
1571 }
1572 }
1573 }
1574
1575 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1576 self.database_by_id
1577 .values()
1578 .filter_map(|database| database.schemas_by_id.get(schema_id))
1579 .chain(self.ambient_schemas_by_id.values())
1580 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1581 .into_first()
1582 }
1583
1584 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1585 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1586 }
1587
1588 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1589 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1590 }
1591
1592 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1593 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1594 }
1595
1596 pub fn get_information_schema_id(&self) -> SchemaId {
1597 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1598 }
1599
1600 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1601 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1602 }
1603
1604 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1605 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1606 }
1607
1608 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1609 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1610 }
1611
1612 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1613 SYSTEM_SCHEMAS
1614 .iter()
1615 .map(|name| self.ambient_schemas_by_name[*name])
1616 }
1617
1618 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1619 self.system_schema_ids().contains(&id)
1620 }
1621
1622 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1623 match spec {
1624 SchemaSpecifier::Temporary => false,
1625 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1626 }
1627 }
1628
1629 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1630 UNSTABLE_SCHEMAS
1631 .iter()
1632 .map(|name| self.ambient_schemas_by_name[*name])
1633 }
1634
1635 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1636 self.unstable_schema_ids().contains(&id)
1637 }
1638
1639 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1640 match spec {
1641 SchemaSpecifier::Temporary => false,
1642 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1643 }
1644 }
1645
1646 pub fn create_temporary_schema(
1649 &mut self,
1650 conn_id: &ConnectionId,
1651 owner_id: RoleId,
1652 ) -> Result<(), Error> {
1653 let oid = INVALID_OID;
1658 self.temporary_schemas.insert(
1659 conn_id.clone(),
1660 Schema {
1661 name: QualifiedSchemaName {
1662 database: ResolvedDatabaseSpecifier::Ambient,
1663 schema: MZ_TEMP_SCHEMA.into(),
1664 },
1665 id: SchemaSpecifier::Temporary,
1666 oid,
1667 items: BTreeMap::new(),
1668 functions: BTreeMap::new(),
1669 types: BTreeMap::new(),
1670 owner_id,
1671 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1672 mz_sql::catalog::ObjectType::Schema,
1673 owner_id,
1674 )]),
1675 },
1676 );
1677 Ok(())
1678 }
1679
1680 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1682 std::iter::empty()
1683 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1684 if schema.id.is_temporary() {
1685 Some(schema.oid)
1686 } else {
1687 None
1688 }
1689 }))
1690 .chain(self.entry_by_id.values().filter_map(|entry| {
1691 if entry.item().is_temporary() {
1692 Some(entry.oid)
1693 } else {
1694 None
1695 }
1696 }))
1697 }
1698
1699 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1703 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1704 }
1705
1706 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1710 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1711 let log = match self.get_entry(&item_id).item() {
1712 CatalogItem::Log(log) => log,
1713 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1714 };
1715 (item_id, log.global_id)
1716 }
1717
1718 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1722 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1723 }
1724
1725 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1729 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1730 let schema = &self.ambient_schemas_by_id[schema_id];
1731 match builtin.catalog_item_type() {
1732 CatalogItemType::Type => schema.types[builtin.name()],
1733 CatalogItemType::Func => schema.functions[builtin.name()],
1734 CatalogItemType::Table
1735 | CatalogItemType::Source
1736 | CatalogItemType::Sink
1737 | CatalogItemType::View
1738 | CatalogItemType::MaterializedView
1739 | CatalogItemType::Index
1740 | CatalogItemType::Secret
1741 | CatalogItemType::Connection
1742 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1743 }
1744 }
1745
1746 pub fn resolve_builtin_type_references(
1748 &self,
1749 builtin: &BuiltinType<NameReference>,
1750 ) -> BuiltinType<IdReference> {
1751 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1752 CatalogType::AclItem => CatalogType::AclItem,
1753 CatalogType::Array { element_reference } => CatalogType::Array {
1754 element_reference: self.get_system_type(element_reference).id,
1755 },
1756 CatalogType::List {
1757 element_reference,
1758 element_modifiers,
1759 } => CatalogType::List {
1760 element_reference: self.get_system_type(element_reference).id,
1761 element_modifiers: element_modifiers.clone(),
1762 },
1763 CatalogType::Map {
1764 key_reference,
1765 value_reference,
1766 key_modifiers,
1767 value_modifiers,
1768 } => CatalogType::Map {
1769 key_reference: self.get_system_type(key_reference).id,
1770 value_reference: self.get_system_type(value_reference).id,
1771 key_modifiers: key_modifiers.clone(),
1772 value_modifiers: value_modifiers.clone(),
1773 },
1774 CatalogType::Range { element_reference } => CatalogType::Range {
1775 element_reference: self.get_system_type(element_reference).id,
1776 },
1777 CatalogType::Record { fields } => CatalogType::Record {
1778 fields: fields
1779 .into_iter()
1780 .map(|f| CatalogRecordField {
1781 name: f.name.clone(),
1782 type_reference: self.get_system_type(f.type_reference).id,
1783 type_modifiers: f.type_modifiers.clone(),
1784 })
1785 .collect(),
1786 },
1787 CatalogType::Bool => CatalogType::Bool,
1788 CatalogType::Bytes => CatalogType::Bytes,
1789 CatalogType::Char => CatalogType::Char,
1790 CatalogType::Date => CatalogType::Date,
1791 CatalogType::Float32 => CatalogType::Float32,
1792 CatalogType::Float64 => CatalogType::Float64,
1793 CatalogType::Int16 => CatalogType::Int16,
1794 CatalogType::Int32 => CatalogType::Int32,
1795 CatalogType::Int64 => CatalogType::Int64,
1796 CatalogType::UInt16 => CatalogType::UInt16,
1797 CatalogType::UInt32 => CatalogType::UInt32,
1798 CatalogType::UInt64 => CatalogType::UInt64,
1799 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1800 CatalogType::Interval => CatalogType::Interval,
1801 CatalogType::Jsonb => CatalogType::Jsonb,
1802 CatalogType::Numeric => CatalogType::Numeric,
1803 CatalogType::Oid => CatalogType::Oid,
1804 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1805 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1806 CatalogType::Pseudo => CatalogType::Pseudo,
1807 CatalogType::RegClass => CatalogType::RegClass,
1808 CatalogType::RegProc => CatalogType::RegProc,
1809 CatalogType::RegType => CatalogType::RegType,
1810 CatalogType::String => CatalogType::String,
1811 CatalogType::Time => CatalogType::Time,
1812 CatalogType::Timestamp => CatalogType::Timestamp,
1813 CatalogType::TimestampTz => CatalogType::TimestampTz,
1814 CatalogType::Uuid => CatalogType::Uuid,
1815 CatalogType::VarChar => CatalogType::VarChar,
1816 CatalogType::Int2Vector => CatalogType::Int2Vector,
1817 CatalogType::MzAclItem => CatalogType::MzAclItem,
1818 };
1819
1820 BuiltinType {
1821 name: builtin.name,
1822 schema: builtin.schema,
1823 oid: builtin.oid,
1824 details: CatalogTypeDetails {
1825 array_id: builtin.details.array_id,
1826 typ,
1827 pg_metadata: builtin.details.pg_metadata.clone(),
1828 },
1829 }
1830 }
1831
1832 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1833 &self.config
1834 }
1835
1836 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1837 match self.database_by_name.get(database_name) {
1838 Some(id) => Ok(&self.database_by_id[id]),
1839 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1840 }
1841 }
1842
1843 pub fn resolve_schema(
1844 &self,
1845 current_database: Option<&DatabaseId>,
1846 database_name: Option<&str>,
1847 schema_name: &str,
1848 conn_id: &ConnectionId,
1849 ) -> Result<&Schema, SqlCatalogError> {
1850 let database_spec = match database_name {
1851 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1856 self.resolve_database(database)?.id().clone(),
1857 )),
1858 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1859 };
1860
1861 if let Some(database_spec) = database_spec {
1863 if let Ok(schema) =
1864 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1865 {
1866 return Ok(schema);
1867 }
1868 }
1869
1870 if let Ok(schema) = self.resolve_schema_in_database(
1872 &ResolvedDatabaseSpecifier::Ambient,
1873 schema_name,
1874 conn_id,
1875 ) {
1876 return Ok(schema);
1877 }
1878
1879 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1880 }
1881
1882 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1886 self.ambient_schemas_by_name[name]
1887 }
1888
1889 pub fn resolve_search_path(
1890 &self,
1891 session: &dyn SessionMetadata,
1892 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1893 let database = self
1894 .database_by_name
1895 .get(session.database())
1896 .map(|id| id.clone());
1897
1898 session
1899 .search_path()
1900 .iter()
1901 .map(|schema| {
1902 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1903 })
1904 .filter_map(|schema| schema.ok())
1905 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1906 .collect()
1907 }
1908
1909 pub fn effective_search_path(
1910 &self,
1911 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
1912 include_temp_schema: bool,
1913 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1914 let mut v = Vec::with_capacity(search_path.len() + 3);
1915 let temp_schema = (
1917 ResolvedDatabaseSpecifier::Ambient,
1918 SchemaSpecifier::Temporary,
1919 );
1920 if include_temp_schema && !search_path.contains(&temp_schema) {
1921 v.push(temp_schema);
1922 }
1923 let default_schemas = [
1924 (
1925 ResolvedDatabaseSpecifier::Ambient,
1926 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
1927 ),
1928 (
1929 ResolvedDatabaseSpecifier::Ambient,
1930 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
1931 ),
1932 ];
1933 for schema in default_schemas.into_iter() {
1934 if !search_path.contains(&schema) {
1935 v.push(schema);
1936 }
1937 }
1938 v.extend_from_slice(search_path);
1939 v
1940 }
1941
1942 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
1943 let id = self
1944 .clusters_by_name
1945 .get(name)
1946 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
1947 Ok(&self.clusters_by_id[id])
1948 }
1949
1950 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
1951 let id = self
1952 .clusters_by_name
1953 .get(cluster.name)
1954 .expect("failed to lookup BuiltinCluster by name");
1955 self.clusters_by_id
1956 .get(id)
1957 .expect("failed to lookup BuiltinCluster by ID")
1958 }
1959
1960 pub fn resolve_cluster_replica(
1961 &self,
1962 cluster_replica_name: &QualifiedReplica,
1963 ) -> Result<&ClusterReplica, SqlCatalogError> {
1964 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
1965 let replica_name = cluster_replica_name.replica.as_str();
1966 let replica_id = cluster
1967 .replica_id(replica_name)
1968 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1969 Ok(cluster.replica(replica_id).expect("Must exist"))
1970 }
1971
1972 #[allow(clippy::useless_let_if_seq)]
1978 pub fn resolve(
1979 &self,
1980 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
1981 current_database: Option<&DatabaseId>,
1982 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
1983 name: &PartialItemName,
1984 conn_id: &ConnectionId,
1985 err_gen: fn(String) -> SqlCatalogError,
1986 ) -> Result<&CatalogEntry, SqlCatalogError> {
1987 let schemas = match &name.schema {
1992 Some(schema_name) => {
1993 match self.resolve_schema(
1994 current_database,
1995 name.database.as_deref(),
1996 schema_name,
1997 conn_id,
1998 ) {
1999 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2000 Err(e) => return Err(e),
2001 }
2002 }
2003 None => match self
2004 .get_schema(
2005 &ResolvedDatabaseSpecifier::Ambient,
2006 &SchemaSpecifier::Temporary,
2007 conn_id,
2008 )
2009 .items
2010 .get(&name.item)
2011 {
2012 Some(id) => return Ok(self.get_entry(id)),
2013 None => search_path.to_vec(),
2014 },
2015 };
2016
2017 for (database_spec, schema_spec) in &schemas {
2018 let schema = self.get_schema(database_spec, schema_spec, conn_id);
2019
2020 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2021 return Ok(&self.entry_by_id[id]);
2022 }
2023 }
2024
2025 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2030 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2031 for schema_id in [
2032 self.get_mz_catalog_unstable_schema_id(),
2033 self.get_mz_introspection_schema_id(),
2034 ] {
2035 let schema = self.get_schema(
2036 &ResolvedDatabaseSpecifier::Ambient,
2037 &SchemaSpecifier::Id(schema_id),
2038 conn_id,
2039 );
2040
2041 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2042 debug!(
2043 github_27831 = true,
2044 "encountered use of outdated schema `mz_internal` for relation: {name}",
2045 );
2046 return Ok(&self.entry_by_id[id]);
2047 }
2048 }
2049 }
2050
2051 Err(err_gen(name.to_string()))
2052 }
2053
2054 pub fn resolve_entry(
2056 &self,
2057 current_database: Option<&DatabaseId>,
2058 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2059 name: &PartialItemName,
2060 conn_id: &ConnectionId,
2061 ) -> Result<&CatalogEntry, SqlCatalogError> {
2062 self.resolve(
2063 |schema| &schema.items,
2064 current_database,
2065 search_path,
2066 name,
2067 conn_id,
2068 SqlCatalogError::UnknownItem,
2069 )
2070 }
2071
2072 pub fn resolve_function(
2074 &self,
2075 current_database: Option<&DatabaseId>,
2076 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2077 name: &PartialItemName,
2078 conn_id: &ConnectionId,
2079 ) -> Result<&CatalogEntry, SqlCatalogError> {
2080 self.resolve(
2081 |schema| &schema.functions,
2082 current_database,
2083 search_path,
2084 name,
2085 conn_id,
2086 |name| SqlCatalogError::UnknownFunction {
2087 name,
2088 alternative: None,
2089 },
2090 )
2091 }
2092
2093 pub fn resolve_type(
2095 &self,
2096 current_database: Option<&DatabaseId>,
2097 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2098 name: &PartialItemName,
2099 conn_id: &ConnectionId,
2100 ) -> Result<&CatalogEntry, SqlCatalogError> {
2101 static NON_PG_CATALOG_TYPES: LazyLock<
2102 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2103 > = LazyLock::new(|| {
2104 BUILTINS::types()
2105 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2106 .map(|typ| (typ.name, typ))
2107 .collect()
2108 });
2109
2110 let entry = self.resolve(
2111 |schema| &schema.types,
2112 current_database,
2113 search_path,
2114 name,
2115 conn_id,
2116 |name| SqlCatalogError::UnknownType { name },
2117 )?;
2118
2119 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2120 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2121 warn!(
2122 "user specified an incorrect schema of {} for the type {}, which should be in \
2123 the {} schema. This works now due to a bug but will be fixed in a later release.",
2124 PG_CATALOG_SCHEMA.quoted(),
2125 typ.name.quoted(),
2126 typ.schema.quoted(),
2127 )
2128 }
2129 }
2130
2131 Ok(entry)
2132 }
2133
2134 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2136 match object_id {
2137 ObjectId::Item(item_id) => {
2138 let entry = self.get_entry(&item_id);
2139 match entry.item_type() {
2140 CatalogItemType::Table => CommentObjectId::Table(item_id),
2141 CatalogItemType::Source => CommentObjectId::Source(item_id),
2142 CatalogItemType::Sink => CommentObjectId::Sink(item_id),
2143 CatalogItemType::View => CommentObjectId::View(item_id),
2144 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
2145 CatalogItemType::Index => CommentObjectId::Index(item_id),
2146 CatalogItemType::Func => CommentObjectId::Func(item_id),
2147 CatalogItemType::Connection => CommentObjectId::Connection(item_id),
2148 CatalogItemType::Type => CommentObjectId::Type(item_id),
2149 CatalogItemType::Secret => CommentObjectId::Secret(item_id),
2150 CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
2151 }
2152 }
2153 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2154 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2155 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2156 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2157 ObjectId::ClusterReplica(cluster_replica_id) => {
2158 CommentObjectId::ClusterReplica(cluster_replica_id)
2159 }
2160 ObjectId::NetworkPolicy(network_policy_id) => {
2161 CommentObjectId::NetworkPolicy(network_policy_id)
2162 }
2163 }
2164 }
2165
2166 pub fn system_config(&self) -> &SystemVars {
2168 &self.system_configuration
2169 }
2170
2171 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2173 &mut self.system_configuration
2174 }
2175
2176 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2186 let mut dump = serde_json::to_value(&self).map_err(|e| {
2188 Error::new(ErrorKind::Unstructured(format!(
2189 "internal error: could not dump catalog: {}",
2192 e
2193 )))
2194 })?;
2195
2196 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2197 dump_obj.insert(
2199 "system_parameter_defaults".into(),
2200 serde_json::json!(self.system_config().defaults()),
2201 );
2202 if let Some(unfinalized_shards) = unfinalized_shards {
2204 dump_obj
2205 .get_mut("storage_metadata")
2206 .expect("known to exist")
2207 .as_object_mut()
2208 .expect("storage_metadata is an object")
2209 .insert(
2210 "unfinalized_shards".into(),
2211 serde_json::json!(unfinalized_shards),
2212 );
2213 }
2214 let temporary_gids: Vec<_> = self
2219 .entry_by_global_id
2220 .iter()
2221 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2222 .map(|(gid, _item_id)| *gid)
2223 .collect();
2224 if !temporary_gids.is_empty() {
2225 let gids = dump_obj
2226 .get_mut("entry_by_global_id")
2227 .expect("known_to_exist")
2228 .as_object_mut()
2229 .expect("entry_by_global_id is an object");
2230 for gid in temporary_gids {
2231 gids.remove(&gid.to_string());
2232 }
2233 }
2234
2235 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2237 }
2238
2239 pub fn availability_zones(&self) -> &[String] {
2240 &self.availability_zones
2241 }
2242
2243 pub fn concretize_replica_location(
2244 &self,
2245 location: mz_catalog::durable::ReplicaLocation,
2246 allowed_sizes: &Vec<String>,
2247 allowed_availability_zones: Option<&[String]>,
2248 ) -> Result<ReplicaLocation, Error> {
2249 let location = match location {
2250 mz_catalog::durable::ReplicaLocation::Unmanaged {
2251 storagectl_addrs,
2252 storage_addrs,
2253 computectl_addrs,
2254 compute_addrs,
2255 workers,
2256 } => {
2257 if allowed_availability_zones.is_some() {
2258 return Err(Error {
2259 kind: ErrorKind::Internal(
2260 "tried concretize unmanaged replica with specific availability_zones"
2261 .to_string(),
2262 ),
2263 });
2264 }
2265 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2266 storagectl_addrs,
2267 storage_addrs,
2268 computectl_addrs,
2269 compute_addrs,
2270 workers,
2271 })
2272 }
2273 mz_catalog::durable::ReplicaLocation::Managed {
2274 size,
2275 availability_zone,
2276 disk,
2277 billed_as,
2278 internal,
2279 pending,
2280 } => {
2281 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2282 let message = "tried concretize managed replica with specific availability zones and availability zone";
2283 return Err(Error {
2284 kind: ErrorKind::Internal(message.to_string()),
2285 });
2286 }
2287 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2288 let cluster_replica_sizes = &self.cluster_replica_sizes;
2289
2290 ReplicaLocation::Managed(ManagedReplicaLocation {
2291 allocation: cluster_replica_sizes
2292 .0
2293 .get(&size)
2294 .expect("catalog out of sync")
2295 .clone(),
2296 availability_zones: match (availability_zone, allowed_availability_zones) {
2297 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2298 (None, Some(azs)) if azs.is_empty() => {
2299 ManagedReplicaAvailabilityZones::FromCluster(None)
2300 }
2301 (None, Some(azs)) => {
2302 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2303 }
2304 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2305 },
2306 size,
2307 disk,
2308 billed_as,
2309 internal,
2310 pending,
2311 })
2312 }
2313 };
2314 Ok(location)
2315 }
2316
2317 pub(crate) fn ensure_valid_replica_size(
2318 &self,
2319 allowed_sizes: &[String],
2320 size: &String,
2321 ) -> Result<(), Error> {
2322 let cluster_replica_sizes = &self.cluster_replica_sizes;
2323
2324 if !cluster_replica_sizes.0.contains_key(size)
2325 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2326 || cluster_replica_sizes.0[size].disabled
2327 {
2328 let mut entries = cluster_replica_sizes
2329 .enabled_allocations()
2330 .collect::<Vec<_>>();
2331
2332 if !allowed_sizes.is_empty() {
2333 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2334 entries.retain(|(name, _)| allowed_sizes.contains(name));
2335 }
2336
2337 entries.sort_by_key(
2338 |(
2339 _name,
2340 ReplicaAllocation {
2341 scale, cpu_limit, ..
2342 },
2343 )| (scale, cpu_limit),
2344 );
2345
2346 Err(Error {
2347 kind: ErrorKind::InvalidClusterReplicaSize {
2348 size: size.to_owned(),
2349 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2350 },
2351 })
2352 } else {
2353 Ok(())
2354 }
2355 }
2356
2357 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2358 if role_id.is_builtin() {
2359 let role = self.get_role(role_id);
2360 Err(Error::new(ErrorKind::ReservedRoleName(
2361 role.name().to_string(),
2362 )))
2363 } else {
2364 Ok(())
2365 }
2366 }
2367
2368 pub fn ensure_not_reserved_network_policy(
2369 &self,
2370 network_policy_id: &NetworkPolicyId,
2371 ) -> Result<(), Error> {
2372 if network_policy_id.is_builtin() {
2373 let policy = self.get_network_policy(network_policy_id);
2374 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2375 policy.name.clone(),
2376 )))
2377 } else {
2378 Ok(())
2379 }
2380 }
2381
2382 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2383 let is_grantable = !role_id.is_public() && !role_id.is_system();
2384 if is_grantable {
2385 Ok(())
2386 } else {
2387 let role = self.get_role(role_id);
2388 Err(Error::new(ErrorKind::UngrantableRoleName(
2389 role.name().to_string(),
2390 )))
2391 }
2392 }
2393
2394 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2395 if role_id.is_system() {
2396 let role = self.get_role(role_id);
2397 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2398 role.name().to_string(),
2399 )))
2400 } else {
2401 Ok(())
2402 }
2403 }
2404
2405 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2406 if role_id.is_predefined() {
2407 let role = self.get_role(role_id);
2408 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2409 role.name().to_string(),
2410 )))
2411 } else {
2412 Ok(())
2413 }
2414 }
2415
2416 pub(crate) fn add_to_audit_log(
2419 system_configuration: &SystemVars,
2420 oracle_write_ts: mz_repr::Timestamp,
2421 session: Option<&ConnMeta>,
2422 tx: &mut mz_catalog::durable::Transaction,
2423 audit_events: &mut Vec<VersionedEvent>,
2424 event_type: EventType,
2425 object_type: ObjectType,
2426 details: EventDetails,
2427 ) -> Result<(), Error> {
2428 let user = session.map(|session| session.user().name.to_string());
2429
2430 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2433 Some(ts) => ts.into(),
2434 _ => oracle_write_ts.into(),
2435 };
2436 let id = tx.allocate_audit_log_id()?;
2437 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2438 audit_events.push(event.clone());
2439 tx.insert_audit_log_event(event);
2440 Ok(())
2441 }
2442
2443 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2444 match id {
2445 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2446 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2447 self.get_cluster_replica(*cluster_id, *replica_id)
2448 .owner_id(),
2449 ),
2450 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2451 ObjectId::Schema((database_spec, schema_spec)) => Some(
2452 self.get_schema(database_spec, schema_spec, conn_id)
2453 .owner_id(),
2454 ),
2455 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2456 ObjectId::Role(_) => None,
2457 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2458 }
2459 }
2460
2461 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2462 match object_id {
2463 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2464 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2465 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2466 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2467 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2468 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2469 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2470 }
2471 }
2472
2473 pub(super) fn get_system_object_type(
2474 &self,
2475 id: &SystemObjectId,
2476 ) -> mz_sql::catalog::SystemObjectType {
2477 match id {
2478 SystemObjectId::Object(object_id) => {
2479 SystemObjectType::Object(self.get_object_type(object_id))
2480 }
2481 SystemObjectId::System => SystemObjectType::System,
2482 }
2483 }
2484
2485 pub fn storage_metadata(&self) -> &StorageMetadata {
2489 &self.storage_metadata
2490 }
2491
2492 pub fn source_compaction_windows(
2496 &self,
2497 ids: impl IntoIterator<Item = CatalogItemId>,
2498 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2499 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2500 let mut ids = VecDeque::from_iter(ids);
2501 let mut seen = BTreeSet::new();
2502 while let Some(item_id) = ids.pop_front() {
2503 if !seen.insert(item_id) {
2504 continue;
2505 }
2506 let entry = self.get_entry(&item_id);
2507 match entry.item() {
2508 CatalogItem::Source(source) => {
2509 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2510 match source.data_source {
2511 DataSourceDesc::Ingestion { .. } => {
2512 cws.entry(source_cw).or_default().insert(item_id);
2514 ids.extend(entry.used_by());
2515 }
2516 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
2517 let ingestion = self
2520 .get_entry(&ingestion_id)
2521 .source()
2522 .expect("must be source");
2523 let cw = ingestion
2524 .custom_logical_compaction_window
2525 .unwrap_or(source_cw);
2526 cws.entry(cw).or_default().insert(item_id);
2527 }
2528 DataSourceDesc::Introspection(_)
2529 | DataSourceDesc::Progress
2530 | DataSourceDesc::Webhook { .. } => {
2531 cws.entry(source_cw).or_default().insert(item_id);
2532 }
2533 }
2534 }
2535 CatalogItem::Table(table) => match &table.data_source {
2536 TableDataSource::DataSource {
2537 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
2538 timeline: _,
2539 } => {
2540 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2541 let ingestion = self
2542 .get_entry(ingestion_id)
2543 .source()
2544 .expect("must be source");
2545 let cw = ingestion
2546 .custom_logical_compaction_window
2547 .unwrap_or(table_cw);
2548 cws.entry(cw).or_default().insert(item_id);
2549 }
2550 _ => {}
2551 },
2552 _ => {
2553 continue;
2555 }
2556 }
2557 }
2558 cws
2559 }
2560
2561 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2562 match id {
2563 CommentObjectId::Table(id)
2564 | CommentObjectId::View(id)
2565 | CommentObjectId::MaterializedView(id)
2566 | CommentObjectId::Source(id)
2567 | CommentObjectId::Sink(id)
2568 | CommentObjectId::Index(id)
2569 | CommentObjectId::Func(id)
2570 | CommentObjectId::Connection(id)
2571 | CommentObjectId::Type(id)
2572 | CommentObjectId::Secret(id)
2573 | CommentObjectId::ContinualTask(id) => Some(*id),
2574 CommentObjectId::Role(_)
2575 | CommentObjectId::Database(_)
2576 | CommentObjectId::Schema(_)
2577 | CommentObjectId::Cluster(_)
2578 | CommentObjectId::ClusterReplica(_)
2579 | CommentObjectId::NetworkPolicy(_) => None,
2580 }
2581 }
2582
2583 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2584 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2585 }
2586
2587 pub fn comment_id_to_audit_log_name(
2588 &self,
2589 id: CommentObjectId,
2590 conn_id: &ConnectionId,
2591 ) -> String {
2592 match id {
2593 CommentObjectId::Table(id)
2594 | CommentObjectId::View(id)
2595 | CommentObjectId::MaterializedView(id)
2596 | CommentObjectId::Source(id)
2597 | CommentObjectId::Sink(id)
2598 | CommentObjectId::Index(id)
2599 | CommentObjectId::Func(id)
2600 | CommentObjectId::Connection(id)
2601 | CommentObjectId::Type(id)
2602 | CommentObjectId::Secret(id)
2603 | CommentObjectId::ContinualTask(id) => {
2604 let item = self.get_entry(&id);
2605 let name = self.resolve_full_name(item.name(), Some(conn_id));
2606 name.to_string()
2607 }
2608 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2609 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2610 CommentObjectId::Schema((spec, schema_id)) => {
2611 let schema = self.get_schema(&spec, &schema_id, conn_id);
2612 self.resolve_full_schema_name(&schema.name).to_string()
2613 }
2614 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2615 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2616 let cluster = self.get_cluster(cluster_id);
2617 let replica = self.get_cluster_replica(cluster_id, replica_id);
2618 QualifiedReplica {
2619 cluster: Ident::new_unchecked(cluster.name.clone()),
2620 replica: Ident::new_unchecked(replica.name.clone()),
2621 }
2622 .to_string()
2623 }
2624 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2625 }
2626 }
2627}
2628
2629impl ConnectionResolver for CatalogState {
2630 fn resolve_connection(
2631 &self,
2632 id: CatalogItemId,
2633 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2634 use mz_storage_types::connections::Connection::*;
2635 match self
2636 .get_entry(&id)
2637 .connection()
2638 .expect("catalog out of sync")
2639 .details
2640 .to_connection()
2641 {
2642 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2643 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2644 Csr(conn) => Csr(conn.into_inline_connection(self)),
2645 Ssh(conn) => Ssh(conn),
2646 Aws(conn) => Aws(conn),
2647 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2648 MySql(conn) => MySql(conn.into_inline_connection(self)),
2649 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2650 }
2651 }
2652}
2653
2654impl OptimizerCatalog for CatalogState {
2655 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2656 CatalogState::get_entry_by_global_id(self, id)
2657 }
2658 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2659 CatalogState::get_entry(self, id)
2660 }
2661 fn resolve_full_name(
2662 &self,
2663 name: &QualifiedItemName,
2664 conn_id: Option<&ConnectionId>,
2665 ) -> FullItemName {
2666 CatalogState::resolve_full_name(self, name, conn_id)
2667 }
2668 fn get_indexes_on(
2669 &self,
2670 id: GlobalId,
2671 cluster: ClusterId,
2672 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2673 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2674 }
2675}
2676
2677impl OptimizerCatalog for Catalog {
2678 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2679 self.state.get_entry_by_global_id(id)
2680 }
2681
2682 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2683 self.state.get_entry(id)
2684 }
2685
2686 fn resolve_full_name(
2687 &self,
2688 name: &QualifiedItemName,
2689 conn_id: Option<&ConnectionId>,
2690 ) -> FullItemName {
2691 self.state.resolve_full_name(name, conn_id)
2692 }
2693
2694 fn get_indexes_on(
2695 &self,
2696 id: GlobalId,
2697 cluster: ClusterId,
2698 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2699 Box::new(self.state.get_indexes_on(id, cluster))
2700 }
2701}
2702
2703impl Catalog {
2704 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2705 self
2706 }
2707}