1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::OptimizerFeatures;
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71 TypeReference,
72};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113 pub(super) database_by_name: BTreeMap<String, DatabaseId>,
119 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120 pub(super) database_by_id: BTreeMap<DatabaseId, Database>,
121 #[serde(serialize_with = "skip_temp_items")]
122 pub(super) entry_by_id: BTreeMap<CatalogItemId, CatalogEntry>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) entry_by_global_id: BTreeMap<GlobalId, CatalogItemId>,
125 pub(super) ambient_schemas_by_name: BTreeMap<String, SchemaId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) ambient_schemas_by_id: BTreeMap<SchemaId, Schema>,
128 pub(super) clusters_by_name: BTreeMap<String, ClusterId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) clusters_by_id: BTreeMap<ClusterId, Cluster>,
131 pub(super) roles_by_name: BTreeMap<String, RoleId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) roles_by_id: BTreeMap<RoleId, Role>,
134 pub(super) network_policies_by_name: BTreeMap<String, NetworkPolicyId>,
135 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136 pub(super) network_policies_by_id: BTreeMap<NetworkPolicyId, NetworkPolicy>,
137 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138 pub(super) role_auth_by_id: BTreeMap<RoleId, RoleAuth>,
139
140 #[serde(skip)]
141 pub(super) system_configuration: SystemVars,
142 pub(super) default_privileges: DefaultPrivileges,
143 pub(super) system_privileges: PrivilegeMap,
144 pub(super) comments: CommentsMap,
145 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146 pub(super) source_references: BTreeMap<CatalogItemId, SourceReferences>,
147 pub(super) storage_metadata: StorageMetadata,
148 pub(super) mock_authentication_nonce: Option<String>,
149
150 #[serde(skip)]
152 pub(super) temporary_schemas: BTreeMap<ConnectionId, Schema>,
153
154 #[serde(skip)]
156 pub(super) config: mz_sql::catalog::CatalogConfig,
157 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158 #[serde(skip)]
159 pub(crate) availability_zones: Vec<String>,
160
161 #[serde(skip)]
163 pub(super) egress_addresses: Vec<IpNet>,
164 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166 pub(super) http_host_name: Option<String>,
167
168 #[serde(skip)]
170 pub(super) license_key: ValidatedLicenseKey,
171}
172
173#[derive(Debug, Clone, Serialize)]
178pub(crate) enum LocalExpressionCache {
179 Open {
181 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
183 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
185 },
186 Closed,
188}
189
190impl LocalExpressionCache {
191 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
192 Self::Open {
193 cached_exprs,
194 uncached_exprs: BTreeMap::new(),
195 }
196 }
197
198 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
199 match self {
200 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
201 LocalExpressionCache::Closed => None,
202 }
203 }
204
205 pub(super) fn insert_cached_expression(
208 &mut self,
209 id: GlobalId,
210 local_expressions: LocalExpressions,
211 ) {
212 match self {
213 LocalExpressionCache::Open { cached_exprs, .. } => {
214 cached_exprs.insert(id, local_expressions);
215 }
216 LocalExpressionCache::Closed => {}
217 }
218 }
219
220 pub(super) fn insert_uncached_expression(
223 &mut self,
224 id: GlobalId,
225 local_mir: OptimizedMirRelationExpr,
226 optimizer_features: OptimizerFeatures,
227 ) {
228 match self {
229 LocalExpressionCache::Open { uncached_exprs, .. } => {
230 let local_expr = LocalExpressions {
231 local_mir,
232 optimizer_features,
233 };
234 let prev = uncached_exprs.remove(&id);
241 match prev {
242 Some(prev) if prev == local_expr => {
243 uncached_exprs.insert(id, local_expr);
244 }
245 None => {
246 uncached_exprs.insert(id, local_expr);
247 }
248 Some(_) => {}
249 }
250 }
251 LocalExpressionCache::Closed => {}
252 }
253 }
254
255 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
256 match self {
257 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
258 LocalExpressionCache::Closed => BTreeMap::new(),
259 }
260 }
261}
262
263fn skip_temp_items<S>(
264 entries: &BTreeMap<CatalogItemId, CatalogEntry>,
265 serializer: S,
266) -> Result<S::Ok, S::Error>
267where
268 S: serde::Serializer,
269{
270 mz_ore::serde::map_key_to_string(
271 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
272 serializer,
273 )
274}
275
276impl CatalogState {
277 pub fn empty_test() -> Self {
281 CatalogState {
282 database_by_name: Default::default(),
283 database_by_id: Default::default(),
284 entry_by_id: Default::default(),
285 entry_by_global_id: Default::default(),
286 ambient_schemas_by_name: Default::default(),
287 ambient_schemas_by_id: Default::default(),
288 temporary_schemas: Default::default(),
289 clusters_by_id: Default::default(),
290 clusters_by_name: Default::default(),
291 network_policies_by_name: Default::default(),
292 roles_by_name: Default::default(),
293 roles_by_id: Default::default(),
294 network_policies_by_id: Default::default(),
295 role_auth_by_id: Default::default(),
296 config: CatalogConfig {
297 start_time: Default::default(),
298 start_instant: Instant::now(),
299 nonce: Default::default(),
300 environment_id: EnvironmentId::for_tests(),
301 session_id: Default::default(),
302 build_info: &DUMMY_BUILD_INFO,
303 timestamp_interval: Default::default(),
304 now: NOW_ZERO.clone(),
305 connection_context: ConnectionContext::for_tests(Arc::new(
306 InMemorySecretsController::new(),
307 )),
308 builtins_cfg: BuiltinsConfig {
309 include_continual_tasks: true,
310 },
311 helm_chart_version: None,
312 },
313 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
314 availability_zones: Default::default(),
315 system_configuration: Default::default(),
316 egress_addresses: Default::default(),
317 aws_principal_context: Default::default(),
318 aws_privatelink_availability_zones: Default::default(),
319 http_host_name: Default::default(),
320 default_privileges: Default::default(),
321 system_privileges: Default::default(),
322 comments: Default::default(),
323 source_references: Default::default(),
324 storage_metadata: Default::default(),
325 license_key: ValidatedLicenseKey::for_tests(),
326 mock_authentication_nonce: Default::default(),
327 }
328 }
329
330 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
331 let search_path = self.resolve_search_path(session);
332 let database = self
333 .database_by_name
334 .get(session.vars().database())
335 .map(|id| id.clone());
336 let state = match session.transaction().catalog_state() {
337 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
338 None => Cow::Borrowed(self),
339 };
340 ConnCatalog {
341 state,
342 unresolvable_ids: BTreeSet::new(),
343 conn_id: session.conn_id().clone(),
344 cluster: session.vars().cluster().into(),
345 database,
346 search_path,
347 role_id: session.current_role_id().clone(),
348 prepared_statements: Some(session.prepared_statements()),
349 portals: Some(session.portals()),
350 notices_tx: session.retain_notice_transmitter(),
351 }
352 }
353
354 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
355 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
356 let cluster = self.system_configuration.default_cluster();
357
358 ConnCatalog {
359 state: Cow::Borrowed(self),
360 unresolvable_ids: BTreeSet::new(),
361 conn_id: SYSTEM_CONN_ID.clone(),
362 cluster,
363 database: self
364 .resolve_database(DEFAULT_DATABASE_NAME)
365 .ok()
366 .map(|db| db.id()),
367 search_path: Vec::new(),
370 role_id,
371 prepared_statements: None,
372 portals: None,
373 notices_tx,
374 }
375 }
376
377 pub fn for_system_session(&self) -> ConnCatalog<'_> {
378 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
379 }
380
381 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
386 struct I<'a> {
387 queue: VecDeque<CatalogItemId>,
388 seen: BTreeSet<CatalogItemId>,
389 this: &'a CatalogState,
390 }
391 impl<'a> Iterator for I<'a> {
392 type Item = CatalogItemId;
393 fn next(&mut self) -> Option<Self::Item> {
394 if let Some(next) = self.queue.pop_front() {
395 for child in self.this.get_entry(&next).item().uses() {
396 if !self.seen.contains(&child) {
397 self.queue.push_back(child);
398 self.seen.insert(child);
399 }
400 }
401 Some(next)
402 } else {
403 None
404 }
405 }
406 }
407
408 I {
409 queue: [id].into_iter().collect(),
410 seen: [id].into_iter().collect(),
411 this: self,
412 }
413 }
414
415 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
418 let mut out = Vec::new();
419 self.introspection_dependencies_inner(id, &mut out);
420 out
421 }
422
423 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
424 match self.get_entry(&id).item() {
425 CatalogItem::Log(_) => out.push(id),
426 item @ (CatalogItem::View(_)
427 | CatalogItem::MaterializedView(_)
428 | CatalogItem::Connection(_)
429 | CatalogItem::ContinualTask(_)) => {
430 for item_id in item.references().items() {
432 self.introspection_dependencies_inner(*item_id, out);
433 }
434 }
435 CatalogItem::Sink(sink) => {
436 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
437 self.introspection_dependencies_inner(from_item_id, out)
438 }
439 CatalogItem::Index(idx) => {
440 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
441 self.introspection_dependencies_inner(on_item_id, out)
442 }
443 CatalogItem::Table(_)
444 | CatalogItem::Source(_)
445 | CatalogItem::Type(_)
446 | CatalogItem::Func(_)
447 | CatalogItem::Secret(_) => (),
448 }
449 }
450
451 pub(super) fn object_dependents(
457 &self,
458 object_ids: &Vec<ObjectId>,
459 conn_id: &ConnectionId,
460 seen: &mut BTreeSet<ObjectId>,
461 ) -> Vec<ObjectId> {
462 let mut dependents = Vec::new();
463 for object_id in object_ids {
464 match object_id {
465 ObjectId::Cluster(id) => {
466 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
467 }
468 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
469 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
470 ),
471 ObjectId::Database(id) => {
472 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
473 }
474 ObjectId::Schema((database_spec, schema_spec)) => {
475 dependents.extend_from_slice(&self.schema_dependents(
476 database_spec.clone(),
477 schema_spec.clone(),
478 conn_id,
479 seen,
480 ));
481 }
482 ObjectId::NetworkPolicy(id) => {
483 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
484 }
485 id @ ObjectId::Role(_) => {
486 let unseen = seen.insert(id.clone());
487 if unseen {
488 dependents.push(id.clone());
489 }
490 }
491 ObjectId::Item(id) => {
492 dependents.extend_from_slice(&self.item_dependents(*id, seen))
493 }
494 }
495 }
496 dependents
497 }
498
499 fn cluster_dependents(
506 &self,
507 cluster_id: ClusterId,
508 seen: &mut BTreeSet<ObjectId>,
509 ) -> Vec<ObjectId> {
510 let mut dependents = Vec::new();
511 let object_id = ObjectId::Cluster(cluster_id);
512 if !seen.contains(&object_id) {
513 seen.insert(object_id.clone());
514 let cluster = self.get_cluster(cluster_id);
515 for item_id in cluster.bound_objects() {
516 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
517 }
518 for replica_id in cluster.replica_ids().values() {
519 dependents.extend_from_slice(&self.cluster_replica_dependents(
520 cluster_id,
521 *replica_id,
522 seen,
523 ));
524 }
525 dependents.push(object_id);
526 }
527 dependents
528 }
529
530 pub(super) fn cluster_replica_dependents(
537 &self,
538 cluster_id: ClusterId,
539 replica_id: ReplicaId,
540 seen: &mut BTreeSet<ObjectId>,
541 ) -> Vec<ObjectId> {
542 let mut dependents = Vec::new();
543 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
544 if !seen.contains(&object_id) {
545 seen.insert(object_id.clone());
546 dependents.push(object_id);
547 }
548 dependents
549 }
550
551 fn database_dependents(
558 &self,
559 database_id: DatabaseId,
560 conn_id: &ConnectionId,
561 seen: &mut BTreeSet<ObjectId>,
562 ) -> Vec<ObjectId> {
563 let mut dependents = Vec::new();
564 let object_id = ObjectId::Database(database_id);
565 if !seen.contains(&object_id) {
566 seen.insert(object_id.clone());
567 let database = self.get_database(&database_id);
568 for schema_id in database.schema_ids().values() {
569 dependents.extend_from_slice(&self.schema_dependents(
570 ResolvedDatabaseSpecifier::Id(database_id),
571 SchemaSpecifier::Id(*schema_id),
572 conn_id,
573 seen,
574 ));
575 }
576 dependents.push(object_id);
577 }
578 dependents
579 }
580
581 fn schema_dependents(
588 &self,
589 database_spec: ResolvedDatabaseSpecifier,
590 schema_spec: SchemaSpecifier,
591 conn_id: &ConnectionId,
592 seen: &mut BTreeSet<ObjectId>,
593 ) -> Vec<ObjectId> {
594 let mut dependents = Vec::new();
595 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
596 if !seen.contains(&object_id) {
597 seen.insert(object_id.clone());
598 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
599 for item_id in schema.item_ids() {
600 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
601 }
602 dependents.push(object_id)
603 }
604 dependents
605 }
606
607 pub(super) fn item_dependents(
614 &self,
615 item_id: CatalogItemId,
616 seen: &mut BTreeSet<ObjectId>,
617 ) -> Vec<ObjectId> {
618 let mut dependents = Vec::new();
619 let object_id = ObjectId::Item(item_id);
620 if !seen.contains(&object_id) {
621 seen.insert(object_id.clone());
622 let entry = self.get_entry(&item_id);
623 for dependent_id in entry.used_by() {
624 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
625 }
626 dependents.push(object_id);
627 if let Some(progress_id) = entry.progress_id() {
631 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
632 }
633 }
634 dependents
635 }
636
637 pub(super) fn network_policy_dependents(
644 &self,
645 network_policy_id: NetworkPolicyId,
646 _seen: &mut BTreeSet<ObjectId>,
647 ) -> Vec<ObjectId> {
648 let object_id = ObjectId::NetworkPolicy(network_policy_id);
649 vec![object_id]
653 }
654
655 fn is_stable(&self, id: CatalogItemId) -> bool {
659 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
660 !self.is_unstable_schema_specifier(spec)
661 }
662
663 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
664 if self.system_config().unsafe_enable_unstable_dependencies() {
665 return Ok(());
666 }
667
668 let unstable_dependencies: Vec<_> = item
669 .references()
670 .items()
671 .filter(|id| !self.is_stable(**id))
672 .map(|id| self.get_entry(id).name().item.clone())
673 .collect();
674
675 if unstable_dependencies.is_empty() || item.is_temporary() {
679 Ok(())
680 } else {
681 let object_type = item.typ().to_string();
682 Err(Error {
683 kind: ErrorKind::UnstableDependency {
684 object_type,
685 unstable_dependencies,
686 },
687 })
688 }
689 }
690
691 pub fn resolve_full_name(
692 &self,
693 name: &QualifiedItemName,
694 conn_id: Option<&ConnectionId>,
695 ) -> FullItemName {
696 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
697
698 let database = match &name.qualifiers.database_spec {
699 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
700 ResolvedDatabaseSpecifier::Id(id) => {
701 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
702 }
703 };
704 let schema = match &name.qualifiers.schema_spec {
707 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
708 _ => self
709 .get_schema(
710 &name.qualifiers.database_spec,
711 &name.qualifiers.schema_spec,
712 conn_id,
713 )
714 .name()
715 .schema
716 .clone(),
717 };
718 FullItemName {
719 database,
720 schema,
721 item: name.item.clone(),
722 }
723 }
724
725 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
726 let database = match &name.database {
727 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
728 ResolvedDatabaseSpecifier::Id(id) => {
729 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
730 }
731 };
732 FullSchemaName {
733 database,
734 schema: name.schema.clone(),
735 }
736 }
737
738 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
739 self.entry_by_id
740 .get(id)
741 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
742 }
743
744 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
745 let item_id = self
746 .entry_by_global_id
747 .get(id)
748 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
749
750 let entry = self.get_entry(item_id).clone();
751 let version = match entry.item() {
752 CatalogItem::Table(table) => {
753 let (version, _) = table
754 .collections
755 .iter()
756 .find(|(_verison, gid)| *gid == id)
757 .expect("version to exist");
758 RelationVersionSelector::Specific(*version)
759 }
760 _ => RelationVersionSelector::Latest,
761 };
762 CatalogCollectionEntry { entry, version }
763 }
764
765 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
766 self.entry_by_id.iter()
767 }
768
769 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
770 self.temporary_schemas
772 .get(conn)
773 .into_iter()
774 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
775 }
776
777 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
783 self.temporary_schemas.contains_key(conn)
784 }
785
786 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
792 let mut res = None;
793 for schema_id in self.system_schema_ids() {
794 let schema = &self.ambient_schemas_by_id[&schema_id];
795 if let Some(global_id) = schema.types.get(name) {
796 match res {
797 None => res = Some(self.get_entry(global_id)),
798 Some(_) => panic!(
799 "only call get_system_type on objects uniquely identifiable in one system schema"
800 ),
801 }
802 }
803 }
804
805 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
806 }
807
808 pub fn get_item_by_name(
809 &self,
810 name: &QualifiedItemName,
811 conn_id: &ConnectionId,
812 ) -> Option<&CatalogEntry> {
813 self.get_schema(
814 &name.qualifiers.database_spec,
815 &name.qualifiers.schema_spec,
816 conn_id,
817 )
818 .items
819 .get(&name.item)
820 .and_then(|id| self.try_get_entry(id))
821 }
822
823 pub fn get_type_by_name(
824 &self,
825 name: &QualifiedItemName,
826 conn_id: &ConnectionId,
827 ) -> Option<&CatalogEntry> {
828 self.get_schema(
829 &name.qualifiers.database_spec,
830 &name.qualifiers.schema_spec,
831 conn_id,
832 )
833 .types
834 .get(&name.item)
835 .and_then(|id| self.try_get_entry(id))
836 }
837
838 pub(super) fn find_available_name(
839 &self,
840 mut name: QualifiedItemName,
841 conn_id: &ConnectionId,
842 ) -> QualifiedItemName {
843 let mut i = 0;
844 let orig_item_name = name.item.clone();
845 while self.get_item_by_name(&name, conn_id).is_some() {
846 i += 1;
847 name.item = format!("{}{}", orig_item_name, i);
848 }
849 name
850 }
851
852 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
853 self.entry_by_id.get(id)
854 }
855
856 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
857 let item_id = self.entry_by_global_id.get(id)?;
858 self.try_get_entry(item_id)
859 }
860
861 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
864 let entry = self.try_get_entry_by_global_id(id)?;
865 let desc = match entry.item() {
866 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
867 other => other.relation_desc(RelationVersionSelector::Latest)?,
869 };
870 Some(desc)
871 }
872
873 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
874 self.try_get_cluster(cluster_id)
875 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
876 }
877
878 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
879 self.clusters_by_id.get(&cluster_id)
880 }
881
882 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
883 self.roles_by_id.get(id)
884 }
885
886 pub fn get_role(&self, id: &RoleId) -> &Role {
887 self.roles_by_id.get(id).expect("catalog out of sync")
888 }
889
890 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
891 self.roles_by_id.keys()
892 }
893
894 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
895 self.roles_by_name
896 .get(role_name)
897 .map(|id| &self.roles_by_id[id])
898 }
899
900 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
901 self.role_auth_by_id
902 .get(id)
903 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
904 }
905
906 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
907 self.role_auth_by_id.get(id)
908 }
909
910 pub(super) fn try_get_network_policy_by_name(
911 &self,
912 policy_name: &str,
913 ) -> Option<&NetworkPolicy> {
914 self.network_policies_by_name
915 .get(policy_name)
916 .map(|id| &self.network_policies_by_id[id])
917 }
918
919 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
920 let mut membership = BTreeSet::new();
921 let mut queue = VecDeque::from(vec![id]);
922 while let Some(cur_id) = queue.pop_front() {
923 if !membership.contains(cur_id) {
924 membership.insert(cur_id.clone());
925 let role = self.get_role(cur_id);
926 soft_assert_no_log!(
927 !role.membership().keys().contains(id),
928 "circular membership exists in the catalog"
929 );
930 queue.extend(role.membership().keys());
931 }
932 }
933 membership.insert(RoleId::Public);
934 membership
935 }
936
937 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
938 self.network_policies_by_id
939 .get(id)
940 .expect("catalog out of sync")
941 }
942
943 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
944 self.network_policies_by_id.keys()
945 }
946
947 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
952 let entry = self.try_get_entry(id)?;
953 let name = self.resolve_full_name(entry.name(), None);
955 let host_name = self
956 .http_host_name
957 .as_ref()
958 .map(|x| x.as_str())
959 .unwrap_or_else(|| "HOST");
960
961 let RawDatabaseSpecifier::Name(database) = name.database else {
962 return None;
963 };
964
965 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
966 url.path_segments_mut()
967 .ok()?
968 .push(&database)
969 .push(&name.schema)
970 .push(&name.item);
971
972 Some(url)
973 }
974
975 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
983 &mut self,
986 create_sql: &str,
987 force_if_exists_skip: bool,
988 ) -> Result<(Plan, ResolvedIds), AdapterError> {
989 self.with_enable_for_item_parsing(|state| {
990 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
991 let pcx = Some(&pcx);
992 let session_catalog = state.for_system_session();
993
994 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
995 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
996 let plan =
997 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
998
999 Ok((plan, resolved_ids))
1000 })
1001 }
1002
1003 #[mz_ore::instrument]
1005 pub(crate) fn parse_plan(
1006 create_sql: &str,
1007 pcx: Option<&PlanContext>,
1008 catalog: &ConnCatalog,
1009 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1010 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1011 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1012 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1013
1014 Ok((plan, resolved_ids))
1015 }
1016
1017 pub(crate) fn deserialize_item(
1019 &self,
1020 global_id: GlobalId,
1021 create_sql: &str,
1022 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1023 local_expression_cache: &mut LocalExpressionCache,
1024 previous_item: Option<CatalogItem>,
1025 ) -> Result<CatalogItem, AdapterError> {
1026 self.parse_item(
1027 global_id,
1028 create_sql,
1029 extra_versions,
1030 None,
1031 false,
1032 None,
1033 local_expression_cache,
1034 previous_item,
1035 )
1036 }
1037
1038 #[mz_ore::instrument]
1040 pub(crate) fn parse_item(
1041 &self,
1042 global_id: GlobalId,
1043 create_sql: &str,
1044 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1045 pcx: Option<&PlanContext>,
1046 is_retained_metrics_object: bool,
1047 custom_logical_compaction_window: Option<CompactionWindow>,
1048 local_expression_cache: &mut LocalExpressionCache,
1049 previous_item: Option<CatalogItem>,
1050 ) -> Result<CatalogItem, AdapterError> {
1051 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1052 match self.parse_item_inner(
1053 global_id,
1054 create_sql,
1055 extra_versions,
1056 pcx,
1057 is_retained_metrics_object,
1058 custom_logical_compaction_window,
1059 cached_expr,
1060 previous_item,
1061 ) {
1062 Ok((item, uncached_expr)) => {
1063 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1064 local_expression_cache.insert_uncached_expression(
1065 global_id,
1066 uncached_expr,
1067 optimizer_features,
1068 );
1069 }
1070 Ok(item)
1071 }
1072 Err((err, cached_expr)) => {
1073 if let Some(local_expr) = cached_expr {
1074 local_expression_cache.insert_cached_expression(global_id, local_expr);
1075 }
1076 Err(err)
1077 }
1078 }
1079 }
1080
1081 #[mz_ore::instrument]
1088 pub(crate) fn parse_item_inner(
1089 &self,
1090 global_id: GlobalId,
1091 create_sql: &str,
1092 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1093 pcx: Option<&PlanContext>,
1094 is_retained_metrics_object: bool,
1095 custom_logical_compaction_window: Option<CompactionWindow>,
1096 cached_expr: Option<LocalExpressions>,
1097 previous_item: Option<CatalogItem>,
1098 ) -> Result<
1099 (
1100 CatalogItem,
1101 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1102 ),
1103 (AdapterError, Option<LocalExpressions>),
1104 > {
1105 let session_catalog = self.for_system_session();
1106
1107 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1108 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1109 Err(err) => return Err((err, cached_expr)),
1110 };
1111
1112 let mut uncached_expr = None;
1113
1114 let item = match plan {
1115 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1116 let collections = extra_versions
1117 .iter()
1118 .map(|(version, gid)| (*version, *gid))
1119 .chain([(RelationVersion::root(), global_id)].into_iter())
1120 .collect();
1121
1122 CatalogItem::Table(Table {
1123 create_sql: Some(table.create_sql),
1124 desc: table.desc,
1125 collections,
1126 conn_id: None,
1127 resolved_ids,
1128 custom_logical_compaction_window: custom_logical_compaction_window
1129 .or(table.compaction_window),
1130 is_retained_metrics_object,
1131 data_source: match table.data_source {
1132 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1133 TableDataSource::TableWrites { defaults }
1134 }
1135 mz_sql::plan::TableDataSource::DataSource {
1136 desc: data_source_desc,
1137 timeline,
1138 } => match data_source_desc {
1139 mz_sql::plan::DataSourceDesc::IngestionExport {
1140 ingestion_id,
1141 external_reference,
1142 details,
1143 data_config,
1144 } => TableDataSource::DataSource {
1145 desc: DataSourceDesc::IngestionExport {
1146 ingestion_id,
1147 external_reference,
1148 details,
1149 data_config,
1150 },
1151 timeline,
1152 },
1153 mz_sql::plan::DataSourceDesc::Webhook {
1154 validate_using,
1155 body_format,
1156 headers,
1157 cluster_id,
1158 } => TableDataSource::DataSource {
1159 desc: DataSourceDesc::Webhook {
1160 validate_using,
1161 body_format,
1162 headers,
1163 cluster_id: cluster_id
1164 .expect("Webhook Tables must have a cluster_id set"),
1165 },
1166 timeline,
1167 },
1168 _ => {
1169 return Err((
1170 AdapterError::Unstructured(anyhow::anyhow!(
1171 "unsupported data source for table"
1172 )),
1173 cached_expr,
1174 ));
1175 }
1176 },
1177 },
1178 })
1179 }
1180 Plan::CreateSource(CreateSourcePlan {
1181 source,
1182 timeline,
1183 in_cluster,
1184 ..
1185 }) => CatalogItem::Source(Source {
1186 create_sql: Some(source.create_sql),
1187 data_source: match source.data_source {
1188 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1189 desc,
1190 cluster_id: match in_cluster {
1191 Some(id) => id,
1192 None => {
1193 return Err((
1194 AdapterError::Unstructured(anyhow::anyhow!(
1195 "ingestion-based sources must have cluster specified"
1196 )),
1197 cached_expr,
1198 ));
1199 }
1200 },
1201 },
1202 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1203 desc,
1204 progress_subsource,
1205 data_config,
1206 details,
1207 } => DataSourceDesc::OldSyntaxIngestion {
1208 desc,
1209 progress_subsource,
1210 data_config,
1211 details,
1212 cluster_id: match in_cluster {
1213 Some(id) => id,
1214 None => {
1215 return Err((
1216 AdapterError::Unstructured(anyhow::anyhow!(
1217 "ingestion-based sources must have cluster specified"
1218 )),
1219 cached_expr,
1220 ));
1221 }
1222 },
1223 },
1224 mz_sql::plan::DataSourceDesc::IngestionExport {
1225 ingestion_id,
1226 external_reference,
1227 details,
1228 data_config,
1229 } => DataSourceDesc::IngestionExport {
1230 ingestion_id,
1231 external_reference,
1232 details,
1233 data_config,
1234 },
1235 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1236 mz_sql::plan::DataSourceDesc::Webhook {
1237 validate_using,
1238 body_format,
1239 headers,
1240 cluster_id,
1241 } => {
1242 mz_ore::soft_assert_or_log!(
1243 cluster_id.is_none(),
1244 "cluster_id set at Source level for Webhooks"
1245 );
1246 DataSourceDesc::Webhook {
1247 validate_using,
1248 body_format,
1249 headers,
1250 cluster_id: in_cluster
1251 .expect("webhook sources must use an existing cluster"),
1252 }
1253 }
1254 },
1255 desc: source.desc,
1256 global_id,
1257 timeline,
1258 resolved_ids,
1259 custom_logical_compaction_window: source
1260 .compaction_window
1261 .or(custom_logical_compaction_window),
1262 is_retained_metrics_object,
1263 }),
1264 Plan::CreateView(CreateViewPlan { view, .. }) => {
1265 let optimizer_config =
1267 optimize::OptimizerConfig::from(session_catalog.system_vars());
1268 let previous_exprs = previous_item.map(|item| match item {
1269 CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1270 _ => None,
1271 });
1272
1273 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1274 (Some(local_expr), _)
1275 if local_expr.optimizer_features == optimizer_config.features =>
1276 {
1277 debug!("local expression cache hit for {global_id:?}");
1278 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1279 }
1280 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1282 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1283 }
1284 (cached_expr, _) => {
1285 let optimizer_features = optimizer_config.features.clone();
1286 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1288
1289 let raw_expr = view.expr;
1291 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1292 Ok(optimzed_expr) => optimzed_expr,
1293 Err(err) => return Err((err.into(), cached_expr)),
1294 };
1295
1296 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1297
1298 (Arc::new(raw_expr), Arc::new(optimized_expr))
1299 }
1300 };
1301
1302 let dependencies: BTreeSet<_> = raw_expr
1304 .depends_on()
1305 .into_iter()
1306 .map(|gid| self.get_entry_by_global_id(&gid).id())
1307 .collect();
1308
1309 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1310 CatalogItem::View(View {
1311 create_sql: view.create_sql,
1312 global_id,
1313 raw_expr,
1314 desc: RelationDesc::new(typ, view.column_names),
1315 optimized_expr,
1316 conn_id: None,
1317 resolved_ids,
1318 dependencies: DependencyIds(dependencies),
1319 })
1320 }
1321 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1322 materialized_view, ..
1323 }) => {
1324 let collections = extra_versions
1325 .iter()
1326 .map(|(version, gid)| (*version, *gid))
1327 .chain([(RelationVersion::root(), global_id)].into_iter())
1328 .collect();
1329
1330 let optimizer_config =
1332 optimize::OptimizerConfig::from(session_catalog.system_vars());
1333 let previous_exprs = previous_item.map(|item| match item {
1334 CatalogItem::MaterializedView(materialized_view) => {
1335 (materialized_view.raw_expr, materialized_view.optimized_expr)
1336 }
1337 item => unreachable!("expected materialized view, found: {item:#?}"),
1338 });
1339
1340 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1341 (Some(local_expr), _)
1342 if local_expr.optimizer_features == optimizer_config.features =>
1343 {
1344 debug!("local expression cache hit for {global_id:?}");
1345 (
1346 Arc::new(materialized_view.expr),
1347 Arc::new(local_expr.local_mir),
1348 )
1349 }
1350 (_, Some((raw_expr, optimized_expr)))
1352 if *raw_expr == materialized_view.expr =>
1353 {
1354 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1355 }
1356 (cached_expr, _) => {
1357 let optimizer_features = optimizer_config.features.clone();
1358 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1360
1361 let raw_expr = materialized_view.expr;
1362 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1363 Ok(optimized_expr) => optimized_expr,
1364 Err(err) => return Err((err.into(), cached_expr)),
1365 };
1366
1367 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1368
1369 (Arc::new(raw_expr), Arc::new(optimized_expr))
1370 }
1371 };
1372 let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1373
1374 for &i in &materialized_view.non_null_assertions {
1375 typ.column_types[i].nullable = false;
1376 }
1377 let desc = RelationDesc::new(typ, materialized_view.column_names);
1378 let desc = VersionedRelationDesc::new(desc);
1379
1380 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1381
1382 let dependencies = raw_expr
1384 .depends_on()
1385 .into_iter()
1386 .map(|gid| self.get_entry_by_global_id(&gid).id())
1387 .collect();
1388
1389 CatalogItem::MaterializedView(MaterializedView {
1390 create_sql: materialized_view.create_sql,
1391 collections,
1392 raw_expr,
1393 optimized_expr,
1394 desc,
1395 resolved_ids,
1396 dependencies,
1397 replacement_target: materialized_view.replacement_target,
1398 cluster_id: materialized_view.cluster_id,
1399 non_null_assertions: materialized_view.non_null_assertions,
1400 custom_logical_compaction_window: materialized_view.compaction_window,
1401 refresh_schedule: materialized_view.refresh_schedule,
1402 initial_as_of,
1403 })
1404 }
1405 Plan::CreateContinualTask(plan) => {
1406 let ct =
1407 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1408 Ok(ct) => ct,
1409 Err(err) => return Err((err, cached_expr)),
1410 };
1411 CatalogItem::ContinualTask(ct)
1412 }
1413 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1414 create_sql: index.create_sql,
1415 global_id,
1416 on: index.on,
1417 keys: index.keys.into(),
1418 conn_id: None,
1419 resolved_ids,
1420 cluster_id: index.cluster_id,
1421 custom_logical_compaction_window: custom_logical_compaction_window
1422 .or(index.compaction_window),
1423 is_retained_metrics_object,
1424 }),
1425 Plan::CreateSink(CreateSinkPlan {
1426 sink,
1427 with_snapshot,
1428 in_cluster,
1429 ..
1430 }) => CatalogItem::Sink(Sink {
1431 create_sql: sink.create_sql,
1432 global_id,
1433 from: sink.from,
1434 connection: sink.connection,
1435 envelope: sink.envelope,
1436 version: sink.version,
1437 with_snapshot,
1438 resolved_ids,
1439 cluster_id: in_cluster,
1440 commit_interval: sink.commit_interval,
1441 }),
1442 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1443 if let Err(err) = typ.inner.desc(&session_catalog) {
1447 return Err((err.into(), cached_expr));
1448 }
1449 CatalogItem::Type(Type {
1450 create_sql: Some(typ.create_sql),
1451 global_id,
1452 details: CatalogTypeDetails {
1453 array_id: None,
1454 typ: typ.inner,
1455 pg_metadata: None,
1456 },
1457 resolved_ids,
1458 })
1459 }
1460 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1461 create_sql: secret.create_sql,
1462 global_id,
1463 }),
1464 Plan::CreateConnection(CreateConnectionPlan {
1465 connection:
1466 mz_sql::plan::Connection {
1467 create_sql,
1468 details,
1469 },
1470 ..
1471 }) => CatalogItem::Connection(Connection {
1472 create_sql,
1473 global_id,
1474 details,
1475 resolved_ids,
1476 }),
1477 _ => {
1478 return Err((
1479 Error::new(ErrorKind::Corruption {
1480 detail: "catalog entry generated inappropriate plan".to_string(),
1481 })
1482 .into(),
1483 cached_expr,
1484 ));
1485 }
1486 };
1487
1488 Ok((item, uncached_expr))
1489 }
1490
1491 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1497 let restore = self.system_configuration.clone();
1508 self.system_configuration.enable_for_item_parsing();
1509 let res = f(self);
1510 self.system_configuration = restore;
1511 res
1512 }
1513
1514 pub fn get_indexes_on(
1516 &self,
1517 id: GlobalId,
1518 cluster: ClusterId,
1519 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1520 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1521
1522 self.try_get_entry_by_global_id(&id)
1523 .into_iter()
1524 .map(move |e| {
1525 e.used_by()
1526 .iter()
1527 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1528 CatalogItem::Index(index) if index_matches(index) => {
1529 Some((index.global_id(), index))
1530 }
1531 _ => None,
1532 })
1533 })
1534 .flatten()
1535 }
1536
1537 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1538 &self.database_by_id[database_id]
1539 }
1540
1541 pub(super) fn try_get_cluster_replica(
1546 &self,
1547 id: ClusterId,
1548 replica_id: ReplicaId,
1549 ) -> Option<&ClusterReplica> {
1550 self.try_get_cluster(id)
1551 .and_then(|cluster| cluster.replica(replica_id))
1552 }
1553
1554 pub(crate) fn get_cluster_replica(
1558 &self,
1559 cluster_id: ClusterId,
1560 replica_id: ReplicaId,
1561 ) -> &ClusterReplica {
1562 self.try_get_cluster_replica(cluster_id, replica_id)
1563 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1564 }
1565
1566 pub(super) fn resolve_replica_in_cluster(
1567 &self,
1568 cluster_id: &ClusterId,
1569 replica_name: &str,
1570 ) -> Result<&ClusterReplica, SqlCatalogError> {
1571 let cluster = self.get_cluster(*cluster_id);
1572 let replica_id = cluster
1573 .replica_id_by_name_
1574 .get(replica_name)
1575 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1576 Ok(&cluster.replicas_by_id_[replica_id])
1577 }
1578
1579 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1581 Ok(self.system_configuration.get(name)?)
1582 }
1583
1584 pub(super) fn parse_system_configuration(
1588 &self,
1589 name: &str,
1590 value: VarInput,
1591 ) -> Result<String, Error> {
1592 let value = self.system_configuration.parse(name, value)?;
1593 Ok(value.format())
1594 }
1595
1596 pub(super) fn resolve_schema_in_database(
1598 &self,
1599 database_spec: &ResolvedDatabaseSpecifier,
1600 schema_name: &str,
1601 conn_id: &ConnectionId,
1602 ) -> Result<&Schema, SqlCatalogError> {
1603 let schema = match database_spec {
1604 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1605 self.temporary_schemas.get(conn_id)
1606 }
1607 ResolvedDatabaseSpecifier::Ambient => self
1608 .ambient_schemas_by_name
1609 .get(schema_name)
1610 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1611 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1612 db.schemas_by_name
1613 .get(schema_name)
1614 .and_then(|id| db.schemas_by_id.get(id))
1615 }),
1616 };
1617 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1618 }
1619
1620 pub fn try_get_schema(
1625 &self,
1626 database_spec: &ResolvedDatabaseSpecifier,
1627 schema_spec: &SchemaSpecifier,
1628 conn_id: &ConnectionId,
1629 ) -> Option<&Schema> {
1630 match (database_spec, schema_spec) {
1632 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1633 self.temporary_schemas.get(conn_id)
1634 }
1635 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1636 self.ambient_schemas_by_id.get(id)
1637 }
1638 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1639 .database_by_id
1640 .get(database_id)
1641 .and_then(|db| db.schemas_by_id.get(schema_id)),
1642 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1643 unreachable!("temporary schemas are in the ambient database")
1644 }
1645 }
1646 }
1647
1648 pub fn get_schema(
1649 &self,
1650 database_spec: &ResolvedDatabaseSpecifier,
1651 schema_spec: &SchemaSpecifier,
1652 conn_id: &ConnectionId,
1653 ) -> &Schema {
1654 self.try_get_schema(database_spec, schema_spec, conn_id)
1656 .expect("schema must exist")
1657 }
1658
1659 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1660 self.database_by_id
1661 .values()
1662 .filter_map(|database| database.schemas_by_id.get(schema_id))
1663 .chain(self.ambient_schemas_by_id.values())
1664 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1665 .into_first()
1666 }
1667
1668 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1669 self.temporary_schemas
1670 .values()
1671 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1672 .into_first()
1673 }
1674
1675 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1676 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1677 }
1678
1679 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1680 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1681 }
1682
1683 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1684 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1685 }
1686
1687 pub fn get_information_schema_id(&self) -> SchemaId {
1688 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1689 }
1690
1691 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1692 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1693 }
1694
1695 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1696 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1697 }
1698
1699 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1700 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1701 }
1702
1703 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1704 SYSTEM_SCHEMAS
1705 .iter()
1706 .map(|name| self.ambient_schemas_by_name[*name])
1707 }
1708
1709 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1710 self.system_schema_ids().contains(&id)
1711 }
1712
1713 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1714 match spec {
1715 SchemaSpecifier::Temporary => false,
1716 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1717 }
1718 }
1719
1720 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1721 UNSTABLE_SCHEMAS
1722 .iter()
1723 .map(|name| self.ambient_schemas_by_name[*name])
1724 }
1725
1726 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1727 self.unstable_schema_ids().contains(&id)
1728 }
1729
1730 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1731 match spec {
1732 SchemaSpecifier::Temporary => false,
1733 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1734 }
1735 }
1736
1737 pub fn create_temporary_schema(
1740 &mut self,
1741 conn_id: &ConnectionId,
1742 owner_id: RoleId,
1743 ) -> Result<(), Error> {
1744 let oid = INVALID_OID;
1749 self.temporary_schemas.insert(
1750 conn_id.clone(),
1751 Schema {
1752 name: QualifiedSchemaName {
1753 database: ResolvedDatabaseSpecifier::Ambient,
1754 schema: MZ_TEMP_SCHEMA.into(),
1755 },
1756 id: SchemaSpecifier::Temporary,
1757 oid,
1758 items: BTreeMap::new(),
1759 functions: BTreeMap::new(),
1760 types: BTreeMap::new(),
1761 owner_id,
1762 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1763 mz_sql::catalog::ObjectType::Schema,
1764 owner_id,
1765 )]),
1766 },
1767 );
1768 Ok(())
1769 }
1770
1771 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1773 std::iter::empty()
1774 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1775 if schema.id.is_temporary() {
1776 Some(schema.oid)
1777 } else {
1778 None
1779 }
1780 }))
1781 .chain(self.entry_by_id.values().filter_map(|entry| {
1782 if entry.item().is_temporary() {
1783 Some(entry.oid)
1784 } else {
1785 None
1786 }
1787 }))
1788 }
1789
1790 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1794 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1795 }
1796
1797 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1801 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1802 let log = match self.get_entry(&item_id).item() {
1803 CatalogItem::Log(log) => log,
1804 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1805 };
1806 (item_id, log.global_id)
1807 }
1808
1809 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1813 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1814 }
1815
1816 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1820 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1821 let schema = &self.ambient_schemas_by_id[schema_id];
1822 match builtin.catalog_item_type() {
1823 CatalogItemType::Type => schema.types[builtin.name()],
1824 CatalogItemType::Func => schema.functions[builtin.name()],
1825 CatalogItemType::Table
1826 | CatalogItemType::Source
1827 | CatalogItemType::Sink
1828 | CatalogItemType::View
1829 | CatalogItemType::MaterializedView
1830 | CatalogItemType::Index
1831 | CatalogItemType::Secret
1832 | CatalogItemType::Connection
1833 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1834 }
1835 }
1836
1837 pub fn resolve_builtin_type_references(
1839 &self,
1840 builtin: &BuiltinType<NameReference>,
1841 ) -> BuiltinType<IdReference> {
1842 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1843 CatalogType::AclItem => CatalogType::AclItem,
1844 CatalogType::Array { element_reference } => CatalogType::Array {
1845 element_reference: self.get_system_type(element_reference).id,
1846 },
1847 CatalogType::List {
1848 element_reference,
1849 element_modifiers,
1850 } => CatalogType::List {
1851 element_reference: self.get_system_type(element_reference).id,
1852 element_modifiers: element_modifiers.clone(),
1853 },
1854 CatalogType::Map {
1855 key_reference,
1856 value_reference,
1857 key_modifiers,
1858 value_modifiers,
1859 } => CatalogType::Map {
1860 key_reference: self.get_system_type(key_reference).id,
1861 value_reference: self.get_system_type(value_reference).id,
1862 key_modifiers: key_modifiers.clone(),
1863 value_modifiers: value_modifiers.clone(),
1864 },
1865 CatalogType::Range { element_reference } => CatalogType::Range {
1866 element_reference: self.get_system_type(element_reference).id,
1867 },
1868 CatalogType::Record { fields } => CatalogType::Record {
1869 fields: fields
1870 .into_iter()
1871 .map(|f| CatalogRecordField {
1872 name: f.name.clone(),
1873 type_reference: self.get_system_type(f.type_reference).id,
1874 type_modifiers: f.type_modifiers.clone(),
1875 })
1876 .collect(),
1877 },
1878 CatalogType::Bool => CatalogType::Bool,
1879 CatalogType::Bytes => CatalogType::Bytes,
1880 CatalogType::Char => CatalogType::Char,
1881 CatalogType::Date => CatalogType::Date,
1882 CatalogType::Float32 => CatalogType::Float32,
1883 CatalogType::Float64 => CatalogType::Float64,
1884 CatalogType::Int16 => CatalogType::Int16,
1885 CatalogType::Int32 => CatalogType::Int32,
1886 CatalogType::Int64 => CatalogType::Int64,
1887 CatalogType::UInt16 => CatalogType::UInt16,
1888 CatalogType::UInt32 => CatalogType::UInt32,
1889 CatalogType::UInt64 => CatalogType::UInt64,
1890 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1891 CatalogType::Interval => CatalogType::Interval,
1892 CatalogType::Jsonb => CatalogType::Jsonb,
1893 CatalogType::Numeric => CatalogType::Numeric,
1894 CatalogType::Oid => CatalogType::Oid,
1895 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1896 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1897 CatalogType::Pseudo => CatalogType::Pseudo,
1898 CatalogType::RegClass => CatalogType::RegClass,
1899 CatalogType::RegProc => CatalogType::RegProc,
1900 CatalogType::RegType => CatalogType::RegType,
1901 CatalogType::String => CatalogType::String,
1902 CatalogType::Time => CatalogType::Time,
1903 CatalogType::Timestamp => CatalogType::Timestamp,
1904 CatalogType::TimestampTz => CatalogType::TimestampTz,
1905 CatalogType::Uuid => CatalogType::Uuid,
1906 CatalogType::VarChar => CatalogType::VarChar,
1907 CatalogType::Int2Vector => CatalogType::Int2Vector,
1908 CatalogType::MzAclItem => CatalogType::MzAclItem,
1909 };
1910
1911 BuiltinType {
1912 name: builtin.name,
1913 schema: builtin.schema,
1914 oid: builtin.oid,
1915 details: CatalogTypeDetails {
1916 array_id: builtin.details.array_id,
1917 typ,
1918 pg_metadata: builtin.details.pg_metadata.clone(),
1919 },
1920 }
1921 }
1922
1923 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1924 &self.config
1925 }
1926
1927 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1928 match self.database_by_name.get(database_name) {
1929 Some(id) => Ok(&self.database_by_id[id]),
1930 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1931 }
1932 }
1933
1934 pub fn resolve_schema(
1935 &self,
1936 current_database: Option<&DatabaseId>,
1937 database_name: Option<&str>,
1938 schema_name: &str,
1939 conn_id: &ConnectionId,
1940 ) -> Result<&Schema, SqlCatalogError> {
1941 let database_spec = match database_name {
1942 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1947 self.resolve_database(database)?.id().clone(),
1948 )),
1949 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1950 };
1951
1952 if let Some(database_spec) = database_spec {
1954 if let Ok(schema) =
1955 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1956 {
1957 return Ok(schema);
1958 }
1959 }
1960
1961 if let Ok(schema) = self.resolve_schema_in_database(
1963 &ResolvedDatabaseSpecifier::Ambient,
1964 schema_name,
1965 conn_id,
1966 ) {
1967 return Ok(schema);
1968 }
1969
1970 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1971 }
1972
1973 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1977 self.ambient_schemas_by_name[name]
1978 }
1979
1980 pub fn resolve_search_path(
1981 &self,
1982 session: &dyn SessionMetadata,
1983 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1984 let database = self
1985 .database_by_name
1986 .get(session.database())
1987 .map(|id| id.clone());
1988
1989 session
1990 .search_path()
1991 .iter()
1992 .map(|schema| {
1993 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1994 })
1995 .filter_map(|schema| schema.ok())
1996 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
1997 .collect()
1998 }
1999
2000 pub fn effective_search_path(
2001 &self,
2002 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2003 include_temp_schema: bool,
2004 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2005 let mut v = Vec::with_capacity(search_path.len() + 3);
2006 let temp_schema = (
2008 ResolvedDatabaseSpecifier::Ambient,
2009 SchemaSpecifier::Temporary,
2010 );
2011 if include_temp_schema && !search_path.contains(&temp_schema) {
2012 v.push(temp_schema);
2013 }
2014 let default_schemas = [
2015 (
2016 ResolvedDatabaseSpecifier::Ambient,
2017 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2018 ),
2019 (
2020 ResolvedDatabaseSpecifier::Ambient,
2021 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2022 ),
2023 ];
2024 for schema in default_schemas.into_iter() {
2025 if !search_path.contains(&schema) {
2026 v.push(schema);
2027 }
2028 }
2029 v.extend_from_slice(search_path);
2030 v
2031 }
2032
2033 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2034 let id = self
2035 .clusters_by_name
2036 .get(name)
2037 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2038 Ok(&self.clusters_by_id[id])
2039 }
2040
2041 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2042 let id = self
2043 .clusters_by_name
2044 .get(cluster.name)
2045 .expect("failed to lookup BuiltinCluster by name");
2046 self.clusters_by_id
2047 .get(id)
2048 .expect("failed to lookup BuiltinCluster by ID")
2049 }
2050
2051 pub fn resolve_cluster_replica(
2052 &self,
2053 cluster_replica_name: &QualifiedReplica,
2054 ) -> Result<&ClusterReplica, SqlCatalogError> {
2055 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2056 let replica_name = cluster_replica_name.replica.as_str();
2057 let replica_id = cluster
2058 .replica_id(replica_name)
2059 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2060 Ok(cluster.replica(replica_id).expect("Must exist"))
2061 }
2062
2063 #[allow(clippy::useless_let_if_seq)]
2069 pub fn resolve(
2070 &self,
2071 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2072 current_database: Option<&DatabaseId>,
2073 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2074 name: &PartialItemName,
2075 conn_id: &ConnectionId,
2076 err_gen: fn(String) -> SqlCatalogError,
2077 ) -> Result<&CatalogEntry, SqlCatalogError> {
2078 let schemas = match &name.schema {
2083 Some(schema_name) => {
2084 match self.resolve_schema(
2085 current_database,
2086 name.database.as_deref(),
2087 schema_name,
2088 conn_id,
2089 ) {
2090 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2091 Err(e) => return Err(e),
2092 }
2093 }
2094 None => match self
2095 .try_get_schema(
2096 &ResolvedDatabaseSpecifier::Ambient,
2097 &SchemaSpecifier::Temporary,
2098 conn_id,
2099 )
2100 .and_then(|schema| schema.items.get(&name.item))
2101 {
2102 Some(id) => return Ok(self.get_entry(id)),
2103 None => search_path.to_vec(),
2104 },
2105 };
2106
2107 for (database_spec, schema_spec) in &schemas {
2108 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2111 continue;
2112 };
2113
2114 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2115 return Ok(&self.entry_by_id[id]);
2116 }
2117 }
2118
2119 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2124 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2125 for schema_id in [
2126 self.get_mz_catalog_unstable_schema_id(),
2127 self.get_mz_introspection_schema_id(),
2128 ] {
2129 let schema = self.get_schema(
2130 &ResolvedDatabaseSpecifier::Ambient,
2131 &SchemaSpecifier::Id(schema_id),
2132 conn_id,
2133 );
2134
2135 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2136 debug!(
2137 github_27831 = true,
2138 "encountered use of outdated schema `mz_internal` for relation: {name}",
2139 );
2140 return Ok(&self.entry_by_id[id]);
2141 }
2142 }
2143 }
2144
2145 Err(err_gen(name.to_string()))
2146 }
2147
2148 pub fn resolve_entry(
2150 &self,
2151 current_database: Option<&DatabaseId>,
2152 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2153 name: &PartialItemName,
2154 conn_id: &ConnectionId,
2155 ) -> Result<&CatalogEntry, SqlCatalogError> {
2156 self.resolve(
2157 |schema| &schema.items,
2158 current_database,
2159 search_path,
2160 name,
2161 conn_id,
2162 SqlCatalogError::UnknownItem,
2163 )
2164 }
2165
2166 pub fn resolve_function(
2168 &self,
2169 current_database: Option<&DatabaseId>,
2170 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2171 name: &PartialItemName,
2172 conn_id: &ConnectionId,
2173 ) -> Result<&CatalogEntry, SqlCatalogError> {
2174 self.resolve(
2175 |schema| &schema.functions,
2176 current_database,
2177 search_path,
2178 name,
2179 conn_id,
2180 |name| SqlCatalogError::UnknownFunction {
2181 name,
2182 alternative: None,
2183 },
2184 )
2185 }
2186
2187 pub fn resolve_type(
2189 &self,
2190 current_database: Option<&DatabaseId>,
2191 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2192 name: &PartialItemName,
2193 conn_id: &ConnectionId,
2194 ) -> Result<&CatalogEntry, SqlCatalogError> {
2195 static NON_PG_CATALOG_TYPES: LazyLock<
2196 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2197 > = LazyLock::new(|| {
2198 BUILTINS::types()
2199 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2200 .map(|typ| (typ.name, typ))
2201 .collect()
2202 });
2203
2204 let entry = self.resolve(
2205 |schema| &schema.types,
2206 current_database,
2207 search_path,
2208 name,
2209 conn_id,
2210 |name| SqlCatalogError::UnknownType { name },
2211 )?;
2212
2213 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2214 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2215 warn!(
2216 "user specified an incorrect schema of {} for the type {}, which should be in \
2217 the {} schema. This works now due to a bug but will be fixed in a later release.",
2218 PG_CATALOG_SCHEMA.quoted(),
2219 typ.name.quoted(),
2220 typ.schema.quoted(),
2221 )
2222 }
2223 }
2224
2225 Ok(entry)
2226 }
2227
2228 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2230 match object_id {
2231 ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2232 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2233 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2234 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2235 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2236 ObjectId::ClusterReplica(cluster_replica_id) => {
2237 CommentObjectId::ClusterReplica(cluster_replica_id)
2238 }
2239 ObjectId::NetworkPolicy(network_policy_id) => {
2240 CommentObjectId::NetworkPolicy(network_policy_id)
2241 }
2242 }
2243 }
2244
2245 pub fn system_config(&self) -> &SystemVars {
2247 &self.system_configuration
2248 }
2249
2250 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2252 &mut self.system_configuration
2253 }
2254
2255 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2265 let mut dump = serde_json::to_value(&self).map_err(|e| {
2267 Error::new(ErrorKind::Unstructured(format!(
2268 "internal error: could not dump catalog: {}",
2271 e
2272 )))
2273 })?;
2274
2275 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2276 dump_obj.insert(
2278 "system_parameter_defaults".into(),
2279 serde_json::json!(self.system_config().defaults()),
2280 );
2281 if let Some(unfinalized_shards) = unfinalized_shards {
2283 dump_obj
2284 .get_mut("storage_metadata")
2285 .expect("known to exist")
2286 .as_object_mut()
2287 .expect("storage_metadata is an object")
2288 .insert(
2289 "unfinalized_shards".into(),
2290 serde_json::json!(unfinalized_shards),
2291 );
2292 }
2293 let temporary_gids: Vec<_> = self
2298 .entry_by_global_id
2299 .iter()
2300 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2301 .map(|(gid, _item_id)| *gid)
2302 .collect();
2303 if !temporary_gids.is_empty() {
2304 let gids = dump_obj
2305 .get_mut("entry_by_global_id")
2306 .expect("known_to_exist")
2307 .as_object_mut()
2308 .expect("entry_by_global_id is an object");
2309 for gid in temporary_gids {
2310 gids.remove(&gid.to_string());
2311 }
2312 }
2313 dump_obj.remove("role_auth_by_id");
2316
2317 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2319 }
2320
2321 pub fn availability_zones(&self) -> &[String] {
2322 &self.availability_zones
2323 }
2324
2325 pub fn concretize_replica_location(
2326 &self,
2327 location: mz_catalog::durable::ReplicaLocation,
2328 allowed_sizes: &Vec<String>,
2329 allowed_availability_zones: Option<&[String]>,
2330 ) -> Result<ReplicaLocation, Error> {
2331 let location = match location {
2332 mz_catalog::durable::ReplicaLocation::Unmanaged {
2333 storagectl_addrs,
2334 computectl_addrs,
2335 } => {
2336 if allowed_availability_zones.is_some() {
2337 return Err(Error {
2338 kind: ErrorKind::Internal(
2339 "tried concretize unmanaged replica with specific availability_zones"
2340 .to_string(),
2341 ),
2342 });
2343 }
2344 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2345 storagectl_addrs,
2346 computectl_addrs,
2347 })
2348 }
2349 mz_catalog::durable::ReplicaLocation::Managed {
2350 size,
2351 availability_zone,
2352 billed_as,
2353 internal,
2354 pending,
2355 } => {
2356 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2357 let message = "tried concretize managed replica with specific availability zones and availability zone";
2358 return Err(Error {
2359 kind: ErrorKind::Internal(message.to_string()),
2360 });
2361 }
2362 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2363 let cluster_replica_sizes = &self.cluster_replica_sizes;
2364
2365 ReplicaLocation::Managed(ManagedReplicaLocation {
2366 allocation: cluster_replica_sizes
2367 .0
2368 .get(&size)
2369 .expect("catalog out of sync")
2370 .clone(),
2371 availability_zones: match (availability_zone, allowed_availability_zones) {
2372 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2373 (None, Some(azs)) if azs.is_empty() => {
2374 ManagedReplicaAvailabilityZones::FromCluster(None)
2375 }
2376 (None, Some(azs)) => {
2377 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2378 }
2379 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2380 },
2381 size,
2382 billed_as,
2383 internal,
2384 pending,
2385 })
2386 }
2387 };
2388 Ok(location)
2389 }
2390
2391 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2402 let alloc = &self.cluster_replica_sizes.0[size];
2403 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2404 }
2405
2406 pub(crate) fn ensure_valid_replica_size(
2407 &self,
2408 allowed_sizes: &[String],
2409 size: &String,
2410 ) -> Result<(), Error> {
2411 let cluster_replica_sizes = &self.cluster_replica_sizes;
2412
2413 if !cluster_replica_sizes.0.contains_key(size)
2414 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2415 || cluster_replica_sizes.0[size].disabled
2416 {
2417 let mut entries = cluster_replica_sizes
2418 .enabled_allocations()
2419 .collect::<Vec<_>>();
2420
2421 if !allowed_sizes.is_empty() {
2422 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2423 entries.retain(|(name, _)| allowed_sizes.contains(name));
2424 }
2425
2426 entries.sort_by_key(
2427 |(
2428 _name,
2429 ReplicaAllocation {
2430 scale, cpu_limit, ..
2431 },
2432 )| (scale, cpu_limit),
2433 );
2434
2435 Err(Error {
2436 kind: ErrorKind::InvalidClusterReplicaSize {
2437 size: size.to_owned(),
2438 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2439 },
2440 })
2441 } else {
2442 Ok(())
2443 }
2444 }
2445
2446 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2447 if role_id.is_builtin() {
2448 let role = self.get_role(role_id);
2449 Err(Error::new(ErrorKind::ReservedRoleName(
2450 role.name().to_string(),
2451 )))
2452 } else {
2453 Ok(())
2454 }
2455 }
2456
2457 pub fn ensure_not_reserved_network_policy(
2458 &self,
2459 network_policy_id: &NetworkPolicyId,
2460 ) -> Result<(), Error> {
2461 if network_policy_id.is_builtin() {
2462 let policy = self.get_network_policy(network_policy_id);
2463 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2464 policy.name.clone(),
2465 )))
2466 } else {
2467 Ok(())
2468 }
2469 }
2470
2471 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2472 let is_grantable = !role_id.is_public() && !role_id.is_system();
2473 if is_grantable {
2474 Ok(())
2475 } else {
2476 let role = self.get_role(role_id);
2477 Err(Error::new(ErrorKind::UngrantableRoleName(
2478 role.name().to_string(),
2479 )))
2480 }
2481 }
2482
2483 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2484 if role_id.is_system() {
2485 let role = self.get_role(role_id);
2486 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2487 role.name().to_string(),
2488 )))
2489 } else {
2490 Ok(())
2491 }
2492 }
2493
2494 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2495 if role_id.is_predefined() {
2496 let role = self.get_role(role_id);
2497 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2498 role.name().to_string(),
2499 )))
2500 } else {
2501 Ok(())
2502 }
2503 }
2504
2505 pub(crate) fn add_to_audit_log(
2508 system_configuration: &SystemVars,
2509 oracle_write_ts: mz_repr::Timestamp,
2510 session: Option<&ConnMeta>,
2511 tx: &mut mz_catalog::durable::Transaction,
2512 audit_events: &mut Vec<VersionedEvent>,
2513 event_type: EventType,
2514 object_type: ObjectType,
2515 details: EventDetails,
2516 ) -> Result<(), Error> {
2517 let user = session.map(|session| session.user().name.to_string());
2518
2519 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2522 Some(ts) => ts.into(),
2523 _ => oracle_write_ts.into(),
2524 };
2525 let id = tx.allocate_audit_log_id()?;
2526 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2527 audit_events.push(event.clone());
2528 tx.insert_audit_log_event(event);
2529 Ok(())
2530 }
2531
2532 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2533 match id {
2534 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2535 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2536 self.get_cluster_replica(*cluster_id, *replica_id)
2537 .owner_id(),
2538 ),
2539 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2540 ObjectId::Schema((database_spec, schema_spec)) => Some(
2541 self.get_schema(database_spec, schema_spec, conn_id)
2542 .owner_id(),
2543 ),
2544 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2545 ObjectId::Role(_) => None,
2546 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2547 }
2548 }
2549
2550 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2551 match object_id {
2552 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2553 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2554 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2555 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2556 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2557 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2558 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2559 }
2560 }
2561
2562 pub(super) fn get_system_object_type(
2563 &self,
2564 id: &SystemObjectId,
2565 ) -> mz_sql::catalog::SystemObjectType {
2566 match id {
2567 SystemObjectId::Object(object_id) => {
2568 SystemObjectType::Object(self.get_object_type(object_id))
2569 }
2570 SystemObjectId::System => SystemObjectType::System,
2571 }
2572 }
2573
2574 pub fn storage_metadata(&self) -> &StorageMetadata {
2578 &self.storage_metadata
2579 }
2580
2581 pub fn source_compaction_windows(
2583 &self,
2584 ids: impl IntoIterator<Item = CatalogItemId>,
2585 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2586 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2587 let mut seen = BTreeSet::new();
2588 for item_id in ids {
2589 if !seen.insert(item_id) {
2590 continue;
2591 }
2592 let entry = self.get_entry(&item_id);
2593 match entry.item() {
2594 CatalogItem::Source(source) => {
2595 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2596 match source.data_source {
2597 DataSourceDesc::Ingestion { .. }
2598 | DataSourceDesc::OldSyntaxIngestion { .. }
2599 | DataSourceDesc::IngestionExport { .. } => {
2600 cws.entry(source_cw).or_default().insert(item_id);
2601 }
2602 DataSourceDesc::Introspection(_)
2603 | DataSourceDesc::Progress
2604 | DataSourceDesc::Webhook { .. } => {
2605 cws.entry(source_cw).or_default().insert(item_id);
2606 }
2607 }
2608 }
2609 CatalogItem::Table(table) => {
2610 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2611 match &table.data_source {
2612 TableDataSource::DataSource {
2613 desc: DataSourceDesc::IngestionExport { .. },
2614 timeline: _,
2615 } => {
2616 cws.entry(table_cw).or_default().insert(item_id);
2617 }
2618 _ => {}
2619 }
2620 }
2621 _ => {
2622 continue;
2624 }
2625 }
2626 }
2627 cws
2628 }
2629
2630 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2631 match id {
2632 CommentObjectId::Table(id)
2633 | CommentObjectId::View(id)
2634 | CommentObjectId::MaterializedView(id)
2635 | CommentObjectId::Source(id)
2636 | CommentObjectId::Sink(id)
2637 | CommentObjectId::Index(id)
2638 | CommentObjectId::Func(id)
2639 | CommentObjectId::Connection(id)
2640 | CommentObjectId::Type(id)
2641 | CommentObjectId::Secret(id)
2642 | CommentObjectId::ContinualTask(id) => Some(*id),
2643 CommentObjectId::Role(_)
2644 | CommentObjectId::Database(_)
2645 | CommentObjectId::Schema(_)
2646 | CommentObjectId::Cluster(_)
2647 | CommentObjectId::ClusterReplica(_)
2648 | CommentObjectId::NetworkPolicy(_) => None,
2649 }
2650 }
2651
2652 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2653 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2654 }
2655
2656 pub fn comment_id_to_audit_log_name(
2657 &self,
2658 id: CommentObjectId,
2659 conn_id: &ConnectionId,
2660 ) -> String {
2661 match id {
2662 CommentObjectId::Table(id)
2663 | CommentObjectId::View(id)
2664 | CommentObjectId::MaterializedView(id)
2665 | CommentObjectId::Source(id)
2666 | CommentObjectId::Sink(id)
2667 | CommentObjectId::Index(id)
2668 | CommentObjectId::Func(id)
2669 | CommentObjectId::Connection(id)
2670 | CommentObjectId::Type(id)
2671 | CommentObjectId::Secret(id)
2672 | CommentObjectId::ContinualTask(id) => {
2673 let item = self.get_entry(&id);
2674 let name = self.resolve_full_name(item.name(), Some(conn_id));
2675 name.to_string()
2676 }
2677 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2678 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2679 CommentObjectId::Schema((spec, schema_id)) => {
2680 let schema = self.get_schema(&spec, &schema_id, conn_id);
2681 self.resolve_full_schema_name(&schema.name).to_string()
2682 }
2683 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2684 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2685 let cluster = self.get_cluster(cluster_id);
2686 let replica = self.get_cluster_replica(cluster_id, replica_id);
2687 QualifiedReplica {
2688 cluster: Ident::new_unchecked(cluster.name.clone()),
2689 replica: Ident::new_unchecked(replica.name.clone()),
2690 }
2691 .to_string()
2692 }
2693 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2694 }
2695 }
2696
2697 pub fn mock_authentication_nonce(&self) -> String {
2698 self.mock_authentication_nonce.clone().unwrap_or_default()
2699 }
2700}
2701
2702impl ConnectionResolver for CatalogState {
2703 fn resolve_connection(
2704 &self,
2705 id: CatalogItemId,
2706 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2707 use mz_storage_types::connections::Connection::*;
2708 match self
2709 .get_entry(&id)
2710 .connection()
2711 .expect("catalog out of sync")
2712 .details
2713 .to_connection()
2714 {
2715 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2716 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2717 Csr(conn) => Csr(conn.into_inline_connection(self)),
2718 Ssh(conn) => Ssh(conn),
2719 Aws(conn) => Aws(conn),
2720 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2721 MySql(conn) => MySql(conn.into_inline_connection(self)),
2722 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2723 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2724 }
2725 }
2726}
2727
2728impl OptimizerCatalog for CatalogState {
2729 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2730 CatalogState::get_entry_by_global_id(self, id)
2731 }
2732 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2733 CatalogState::get_entry(self, id)
2734 }
2735 fn resolve_full_name(
2736 &self,
2737 name: &QualifiedItemName,
2738 conn_id: Option<&ConnectionId>,
2739 ) -> FullItemName {
2740 CatalogState::resolve_full_name(self, name, conn_id)
2741 }
2742 fn get_indexes_on(
2743 &self,
2744 id: GlobalId,
2745 cluster: ClusterId,
2746 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2747 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2748 }
2749}
2750
2751impl OptimizerCatalog for Catalog {
2752 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2753 self.state.get_entry_by_global_id(id)
2754 }
2755
2756 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2757 self.state.get_entry(id)
2758 }
2759
2760 fn resolve_full_name(
2761 &self,
2762 name: &QualifiedItemName,
2763 conn_id: Option<&ConnectionId>,
2764 ) -> FullItemName {
2765 self.state.resolve_full_name(name, conn_id)
2766 }
2767
2768 fn get_indexes_on(
2769 &self,
2770 id: GlobalId,
2771 cluster: ClusterId,
2772 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2773 Box::new(self.state.get_indexes_on(id, cluster))
2774 }
2775}
2776
2777impl Catalog {
2778 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2779 self
2780 }
2781}