1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{BuiltinsConfig, CatalogConfig, EnvironmentId};
67use mz_sql::catalog::{
68 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
69 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
70 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
71 TypeReference,
72};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use serde::Serialize;
94use timely::progress::Antichain;
95use tokio::sync::mpsc;
96use tracing::{debug, warn};
97
98use crate::AdapterError;
100use crate::catalog::{Catalog, ConnCatalog};
101use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
102use crate::optimize::{self, Optimize, OptimizerCatalog};
103use crate::session::Session;
104
105#[derive(Debug, Clone, Serialize)]
112pub struct CatalogState {
113 pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
119 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
120 pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
121 #[serde(serialize_with = "skip_temp_items")]
122 pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
123 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
124 pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
125 pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
126 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
127 pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
128 pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
129 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
130 pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
131 pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
132 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
133 pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
134 pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
135 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
136 pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
137 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
138 pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
139
140 #[serde(skip)]
141 pub(super) system_configuration: Arc<SystemVars>,
142 pub(super) default_privileges: Arc<DefaultPrivileges>,
143 pub(super) system_privileges: Arc<PrivilegeMap>,
144 pub(super) comments: Arc<CommentsMap>,
145 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
146 pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
147 pub(super) storage_metadata: Arc<StorageMetadata>,
148 pub(super) mock_authentication_nonce: Option<String>,
149
150 #[serde(skip)]
152 pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
153
154 #[serde(skip)]
156 pub(super) config: mz_sql::catalog::CatalogConfig,
157 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
158 #[serde(skip)]
159 pub(crate) availability_zones: Vec<String>,
160
161 #[serde(skip)]
163 pub(super) egress_addresses: Vec<IpNet>,
164 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
165 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
166 pub(super) http_host_name: Option<String>,
167
168 #[serde(skip)]
170 pub(super) license_key: ValidatedLicenseKey,
171}
172
173#[derive(Debug, Clone, Serialize)]
178pub(crate) enum LocalExpressionCache {
179 Open {
181 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
183 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
185 },
186 Closed,
188}
189
190impl LocalExpressionCache {
191 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
192 Self::Open {
193 cached_exprs,
194 uncached_exprs: BTreeMap::new(),
195 }
196 }
197
198 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
199 match self {
200 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
201 LocalExpressionCache::Closed => None,
202 }
203 }
204
205 pub(super) fn insert_cached_expression(
208 &mut self,
209 id: GlobalId,
210 local_expressions: LocalExpressions,
211 ) {
212 match self {
213 LocalExpressionCache::Open { cached_exprs, .. } => {
214 cached_exprs.insert(id, local_expressions);
215 }
216 LocalExpressionCache::Closed => {}
217 }
218 }
219
220 pub(super) fn insert_uncached_expression(
223 &mut self,
224 id: GlobalId,
225 local_mir: OptimizedMirRelationExpr,
226 optimizer_features: OptimizerFeatures,
227 ) {
228 match self {
229 LocalExpressionCache::Open { uncached_exprs, .. } => {
230 let local_expr = LocalExpressions {
231 local_mir,
232 optimizer_features,
233 };
234 let prev = uncached_exprs.remove(&id);
241 match prev {
242 Some(prev) if prev == local_expr => {
243 uncached_exprs.insert(id, local_expr);
244 }
245 None => {
246 uncached_exprs.insert(id, local_expr);
247 }
248 Some(_) => {}
249 }
250 }
251 LocalExpressionCache::Closed => {}
252 }
253 }
254
255 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
256 match self {
257 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
258 LocalExpressionCache::Closed => BTreeMap::new(),
259 }
260 }
261}
262
263fn skip_temp_items<S>(
264 entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
265 serializer: S,
266) -> Result<S::Ok, S::Error>
267where
268 S: serde::Serializer,
269{
270 mz_ore::serde::map_key_to_string(
271 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
272 serializer,
273 )
274}
275
276impl CatalogState {
277 pub fn empty_test() -> Self {
281 CatalogState {
282 database_by_name: Default::default(),
283 database_by_id: Default::default(),
284 entry_by_id: Default::default(),
285 entry_by_global_id: Default::default(),
286 ambient_schemas_by_name: Default::default(),
287 ambient_schemas_by_id: Default::default(),
288 temporary_schemas: Default::default(),
289 clusters_by_id: Default::default(),
290 clusters_by_name: Default::default(),
291 network_policies_by_name: Default::default(),
292 roles_by_name: Default::default(),
293 roles_by_id: Default::default(),
294 network_policies_by_id: Default::default(),
295 role_auth_by_id: Default::default(),
296 config: CatalogConfig {
297 start_time: Default::default(),
298 start_instant: Instant::now(),
299 nonce: Default::default(),
300 environment_id: EnvironmentId::for_tests(),
301 session_id: Default::default(),
302 build_info: &DUMMY_BUILD_INFO,
303 now: NOW_ZERO.clone(),
304 connection_context: ConnectionContext::for_tests(Arc::new(
305 InMemorySecretsController::new(),
306 )),
307 builtins_cfg: BuiltinsConfig {
308 include_continual_tasks: true,
309 },
310 helm_chart_version: None,
311 },
312 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
313 availability_zones: Default::default(),
314 system_configuration: Arc::new(SystemVars::default()),
315 egress_addresses: Default::default(),
316 aws_principal_context: Default::default(),
317 aws_privatelink_availability_zones: Default::default(),
318 http_host_name: Default::default(),
319 default_privileges: Arc::new(DefaultPrivileges::default()),
320 system_privileges: Arc::new(PrivilegeMap::default()),
321 comments: Arc::new(CommentsMap::default()),
322 source_references: Default::default(),
323 storage_metadata: Arc::new(StorageMetadata::default()),
324 license_key: ValidatedLicenseKey::for_tests(),
325 mock_authentication_nonce: Default::default(),
326 }
327 }
328
329 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
330 let search_path = self.resolve_search_path(session);
331 let database = self
332 .database_by_name
333 .get(session.vars().database())
334 .map(|id| id.clone());
335 let state = match session.transaction().catalog_state() {
336 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
337 None => Cow::Borrowed(self),
338 };
339 ConnCatalog {
340 state,
341 unresolvable_ids: BTreeSet::new(),
342 conn_id: session.conn_id().clone(),
343 cluster: session.vars().cluster().into(),
344 database,
345 search_path,
346 role_id: session.current_role_id().clone(),
347 prepared_statements: Some(session.prepared_statements()),
348 portals: Some(session.portals()),
349 notices_tx: session.retain_notice_transmitter(),
350 }
351 }
352
353 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
354 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
355 let cluster = self.system_configuration.default_cluster();
356
357 ConnCatalog {
358 state: Cow::Borrowed(self),
359 unresolvable_ids: BTreeSet::new(),
360 conn_id: SYSTEM_CONN_ID.clone(),
361 cluster,
362 database: self
363 .resolve_database(DEFAULT_DATABASE_NAME)
364 .ok()
365 .map(|db| db.id()),
366 search_path: Vec::new(),
369 role_id,
370 prepared_statements: None,
371 portals: None,
372 notices_tx,
373 }
374 }
375
376 pub fn for_system_session(&self) -> ConnCatalog<'_> {
377 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
378 }
379
380 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
385 struct I<'a> {
386 queue: VecDeque<CatalogItemId>,
387 seen: BTreeSet<CatalogItemId>,
388 this: &'a CatalogState,
389 }
390 impl<'a> Iterator for I<'a> {
391 type Item = CatalogItemId;
392 fn next(&mut self) -> Option<Self::Item> {
393 if let Some(next) = self.queue.pop_front() {
394 for child in self.this.get_entry(&next).item().uses() {
395 if !self.seen.contains(&child) {
396 self.queue.push_back(child);
397 self.seen.insert(child);
398 }
399 }
400 Some(next)
401 } else {
402 None
403 }
404 }
405 }
406
407 I {
408 queue: [id].into_iter().collect(),
409 seen: [id].into_iter().collect(),
410 this: self,
411 }
412 }
413
414 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
417 let mut out = Vec::new();
418 self.introspection_dependencies_inner(id, &mut out);
419 out
420 }
421
422 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
423 match self.get_entry(&id).item() {
424 CatalogItem::Log(_) => out.push(id),
425 item @ (CatalogItem::View(_)
426 | CatalogItem::MaterializedView(_)
427 | CatalogItem::Connection(_)
428 | CatalogItem::ContinualTask(_)) => {
429 for item_id in item.references().items() {
431 self.introspection_dependencies_inner(*item_id, out);
432 }
433 }
434 CatalogItem::Sink(sink) => {
435 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
436 self.introspection_dependencies_inner(from_item_id, out)
437 }
438 CatalogItem::Index(idx) => {
439 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
440 self.introspection_dependencies_inner(on_item_id, out)
441 }
442 CatalogItem::Table(_)
443 | CatalogItem::Source(_)
444 | CatalogItem::Type(_)
445 | CatalogItem::Func(_)
446 | CatalogItem::Secret(_) => (),
447 }
448 }
449
450 pub(super) fn object_dependents(
456 &self,
457 object_ids: &Vec<ObjectId>,
458 conn_id: &ConnectionId,
459 seen: &mut BTreeSet<ObjectId>,
460 ) -> Vec<ObjectId> {
461 let mut dependents = Vec::new();
462 for object_id in object_ids {
463 match object_id {
464 ObjectId::Cluster(id) => {
465 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
466 }
467 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
468 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
469 ),
470 ObjectId::Database(id) => {
471 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
472 }
473 ObjectId::Schema((database_spec, schema_spec)) => {
474 dependents.extend_from_slice(&self.schema_dependents(
475 database_spec.clone(),
476 schema_spec.clone(),
477 conn_id,
478 seen,
479 ));
480 }
481 ObjectId::NetworkPolicy(id) => {
482 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
483 }
484 id @ ObjectId::Role(_) => {
485 let unseen = seen.insert(id.clone());
486 if unseen {
487 dependents.push(id.clone());
488 }
489 }
490 ObjectId::Item(id) => {
491 dependents.extend_from_slice(&self.item_dependents(*id, seen))
492 }
493 }
494 }
495 dependents
496 }
497
498 fn cluster_dependents(
505 &self,
506 cluster_id: ClusterId,
507 seen: &mut BTreeSet<ObjectId>,
508 ) -> Vec<ObjectId> {
509 let mut dependents = Vec::new();
510 let object_id = ObjectId::Cluster(cluster_id);
511 if !seen.contains(&object_id) {
512 seen.insert(object_id.clone());
513 let cluster = self.get_cluster(cluster_id);
514 for item_id in cluster.bound_objects() {
515 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
516 }
517 for replica_id in cluster.replica_ids().values() {
518 dependents.extend_from_slice(&self.cluster_replica_dependents(
519 cluster_id,
520 *replica_id,
521 seen,
522 ));
523 }
524 dependents.push(object_id);
525 }
526 dependents
527 }
528
529 pub(super) fn cluster_replica_dependents(
536 &self,
537 cluster_id: ClusterId,
538 replica_id: ReplicaId,
539 seen: &mut BTreeSet<ObjectId>,
540 ) -> Vec<ObjectId> {
541 let mut dependents = Vec::new();
542 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
543 if !seen.contains(&object_id) {
544 seen.insert(object_id.clone());
545 dependents.push(object_id);
546 }
547 dependents
548 }
549
550 fn database_dependents(
557 &self,
558 database_id: DatabaseId,
559 conn_id: &ConnectionId,
560 seen: &mut BTreeSet<ObjectId>,
561 ) -> Vec<ObjectId> {
562 let mut dependents = Vec::new();
563 let object_id = ObjectId::Database(database_id);
564 if !seen.contains(&object_id) {
565 seen.insert(object_id.clone());
566 let database = self.get_database(&database_id);
567 for schema_id in database.schema_ids().values() {
568 dependents.extend_from_slice(&self.schema_dependents(
569 ResolvedDatabaseSpecifier::Id(database_id),
570 SchemaSpecifier::Id(*schema_id),
571 conn_id,
572 seen,
573 ));
574 }
575 dependents.push(object_id);
576 }
577 dependents
578 }
579
580 fn schema_dependents(
587 &self,
588 database_spec: ResolvedDatabaseSpecifier,
589 schema_spec: SchemaSpecifier,
590 conn_id: &ConnectionId,
591 seen: &mut BTreeSet<ObjectId>,
592 ) -> Vec<ObjectId> {
593 let mut dependents = Vec::new();
594 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
595 if !seen.contains(&object_id) {
596 seen.insert(object_id.clone());
597 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
598 for item_id in schema.item_ids() {
599 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
600 }
601 dependents.push(object_id)
602 }
603 dependents
604 }
605
606 pub(super) fn item_dependents(
613 &self,
614 item_id: CatalogItemId,
615 seen: &mut BTreeSet<ObjectId>,
616 ) -> Vec<ObjectId> {
617 let mut dependents = Vec::new();
618 let object_id = ObjectId::Item(item_id);
619 if !seen.contains(&object_id) {
620 seen.insert(object_id.clone());
621 let entry = self.get_entry(&item_id);
622 for dependent_id in entry.used_by() {
623 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
624 }
625 dependents.push(object_id);
626 if let Some(progress_id) = entry.progress_id() {
630 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
631 }
632 }
633 dependents
634 }
635
636 pub(super) fn network_policy_dependents(
643 &self,
644 network_policy_id: NetworkPolicyId,
645 _seen: &mut BTreeSet<ObjectId>,
646 ) -> Vec<ObjectId> {
647 let object_id = ObjectId::NetworkPolicy(network_policy_id);
648 vec![object_id]
652 }
653
654 fn is_stable(&self, id: CatalogItemId) -> bool {
658 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
659 !self.is_unstable_schema_specifier(spec)
660 }
661
662 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
663 if self.system_config().unsafe_enable_unstable_dependencies() {
664 return Ok(());
665 }
666
667 let unstable_dependencies: Vec<_> = item
668 .references()
669 .items()
670 .filter(|id| !self.is_stable(**id))
671 .map(|id| self.get_entry(id).name().item.clone())
672 .collect();
673
674 if unstable_dependencies.is_empty() || item.is_temporary() {
678 Ok(())
679 } else {
680 let object_type = item.typ().to_string();
681 Err(Error {
682 kind: ErrorKind::UnstableDependency {
683 object_type,
684 unstable_dependencies,
685 },
686 })
687 }
688 }
689
690 pub fn resolve_full_name(
691 &self,
692 name: &QualifiedItemName,
693 conn_id: Option<&ConnectionId>,
694 ) -> FullItemName {
695 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
696
697 let database = match &name.qualifiers.database_spec {
698 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
699 ResolvedDatabaseSpecifier::Id(id) => {
700 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
701 }
702 };
703 let schema = match &name.qualifiers.schema_spec {
706 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
707 _ => self
708 .get_schema(
709 &name.qualifiers.database_spec,
710 &name.qualifiers.schema_spec,
711 conn_id,
712 )
713 .name()
714 .schema
715 .clone(),
716 };
717 FullItemName {
718 database,
719 schema,
720 item: name.item.clone(),
721 }
722 }
723
724 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
725 let database = match &name.database {
726 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
727 ResolvedDatabaseSpecifier::Id(id) => {
728 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
729 }
730 };
731 FullSchemaName {
732 database,
733 schema: name.schema.clone(),
734 }
735 }
736
737 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
738 self.entry_by_id
739 .get(id)
740 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
741 }
742
743 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
744 let item_id = self
745 .entry_by_global_id
746 .get(id)
747 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
748
749 let entry = self.get_entry(item_id).clone();
750 let version = match entry.item() {
751 CatalogItem::Table(table) => {
752 let (version, _) = table
753 .collections
754 .iter()
755 .find(|(_verison, gid)| *gid == id)
756 .expect("version to exist");
757 RelationVersionSelector::Specific(*version)
758 }
759 _ => RelationVersionSelector::Latest,
760 };
761 CatalogCollectionEntry { entry, version }
762 }
763
764 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
765 self.entry_by_id.iter()
766 }
767
768 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
769 self.temporary_schemas
771 .get(conn)
772 .into_iter()
773 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
774 }
775
776 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
782 self.temporary_schemas.contains_key(conn)
783 }
784
785 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
791 let mut res = None;
792 for schema_id in self.system_schema_ids() {
793 let schema = &self.ambient_schemas_by_id[&schema_id];
794 if let Some(global_id) = schema.types.get(name) {
795 match res {
796 None => res = Some(self.get_entry(global_id)),
797 Some(_) => panic!(
798 "only call get_system_type on objects uniquely identifiable in one system schema"
799 ),
800 }
801 }
802 }
803
804 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
805 }
806
807 pub fn get_item_by_name(
808 &self,
809 name: &QualifiedItemName,
810 conn_id: &ConnectionId,
811 ) -> Option<&CatalogEntry> {
812 self.get_schema(
813 &name.qualifiers.database_spec,
814 &name.qualifiers.schema_spec,
815 conn_id,
816 )
817 .items
818 .get(&name.item)
819 .and_then(|id| self.try_get_entry(id))
820 }
821
822 pub fn get_type_by_name(
823 &self,
824 name: &QualifiedItemName,
825 conn_id: &ConnectionId,
826 ) -> Option<&CatalogEntry> {
827 self.get_schema(
828 &name.qualifiers.database_spec,
829 &name.qualifiers.schema_spec,
830 conn_id,
831 )
832 .types
833 .get(&name.item)
834 .and_then(|id| self.try_get_entry(id))
835 }
836
837 pub(super) fn find_available_name(
838 &self,
839 mut name: QualifiedItemName,
840 conn_id: &ConnectionId,
841 ) -> QualifiedItemName {
842 let mut i = 0;
843 let orig_item_name = name.item.clone();
844 while self.get_item_by_name(&name, conn_id).is_some() {
845 i += 1;
846 name.item = format!("{}{}", orig_item_name, i);
847 }
848 name
849 }
850
851 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
852 self.entry_by_id.get(id)
853 }
854
855 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
856 let item_id = self.entry_by_global_id.get(id)?;
857 self.try_get_entry(item_id)
858 }
859
860 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
863 let entry = self.try_get_entry_by_global_id(id)?;
864 let desc = match entry.item() {
865 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
866 other => other.relation_desc(RelationVersionSelector::Latest)?,
868 };
869 Some(desc)
870 }
871
872 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
873 self.try_get_cluster(cluster_id)
874 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
875 }
876
877 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
878 self.clusters_by_id.get(&cluster_id)
879 }
880
881 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
882 self.roles_by_id.get(id)
883 }
884
885 pub fn get_role(&self, id: &RoleId) -> &Role {
886 self.roles_by_id.get(id).expect("catalog out of sync")
887 }
888
889 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
890 self.roles_by_id.keys()
891 }
892
893 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
894 self.roles_by_name
895 .get(role_name)
896 .map(|id| &self.roles_by_id[id])
897 }
898
899 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
900 self.role_auth_by_id
901 .get(id)
902 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
903 }
904
905 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
906 self.role_auth_by_id.get(id)
907 }
908
909 pub(super) fn try_get_network_policy_by_name(
910 &self,
911 policy_name: &str,
912 ) -> Option<&NetworkPolicy> {
913 self.network_policies_by_name
914 .get(policy_name)
915 .map(|id| &self.network_policies_by_id[id])
916 }
917
918 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
919 let mut membership = BTreeSet::new();
920 let mut queue = VecDeque::from(vec![id]);
921 while let Some(cur_id) = queue.pop_front() {
922 if !membership.contains(cur_id) {
923 membership.insert(cur_id.clone());
924 let role = self.get_role(cur_id);
925 soft_assert_no_log!(
926 !role.membership().keys().contains(id),
927 "circular membership exists in the catalog"
928 );
929 queue.extend(role.membership().keys());
930 }
931 }
932 membership.insert(RoleId::Public);
933 membership
934 }
935
936 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
937 self.network_policies_by_id
938 .get(id)
939 .expect("catalog out of sync")
940 }
941
942 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
943 self.network_policies_by_id.keys()
944 }
945
946 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
951 let entry = self.try_get_entry(id)?;
952 let name = self.resolve_full_name(entry.name(), None);
954 let host_name = self
955 .http_host_name
956 .as_ref()
957 .map(|x| x.as_str())
958 .unwrap_or_else(|| "HOST");
959
960 let RawDatabaseSpecifier::Name(database) = name.database else {
961 return None;
962 };
963
964 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
965 url.path_segments_mut()
966 .ok()?
967 .push(&database)
968 .push(&name.schema)
969 .push(&name.item);
970
971 Some(url)
972 }
973
974 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
982 &mut self,
985 create_sql: &str,
986 force_if_exists_skip: bool,
987 ) -> Result<(Plan, ResolvedIds), AdapterError> {
988 self.with_enable_for_item_parsing(|state| {
989 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
990 let pcx = Some(&pcx);
991 let session_catalog = state.for_system_session();
992
993 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
994 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
995 let plan =
996 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
997
998 Ok((plan, resolved_ids))
999 })
1000 }
1001
1002 #[mz_ore::instrument]
1004 pub(crate) fn parse_plan(
1005 create_sql: &str,
1006 pcx: Option<&PlanContext>,
1007 catalog: &ConnCatalog,
1008 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1009 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1010 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1011 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1012
1013 Ok((plan, resolved_ids))
1014 }
1015
1016 pub(crate) fn deserialize_item(
1018 &self,
1019 global_id: GlobalId,
1020 create_sql: &str,
1021 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1022 local_expression_cache: &mut LocalExpressionCache,
1023 previous_item: Option<CatalogItem>,
1024 ) -> Result<CatalogItem, AdapterError> {
1025 self.parse_item(
1026 global_id,
1027 create_sql,
1028 extra_versions,
1029 None,
1030 false,
1031 None,
1032 local_expression_cache,
1033 previous_item,
1034 )
1035 }
1036
1037 #[mz_ore::instrument]
1039 pub(crate) fn parse_item(
1040 &self,
1041 global_id: GlobalId,
1042 create_sql: &str,
1043 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1044 pcx: Option<&PlanContext>,
1045 is_retained_metrics_object: bool,
1046 custom_logical_compaction_window: Option<CompactionWindow>,
1047 local_expression_cache: &mut LocalExpressionCache,
1048 previous_item: Option<CatalogItem>,
1049 ) -> Result<CatalogItem, AdapterError> {
1050 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1051 match self.parse_item_inner(
1052 global_id,
1053 create_sql,
1054 extra_versions,
1055 pcx,
1056 is_retained_metrics_object,
1057 custom_logical_compaction_window,
1058 cached_expr,
1059 previous_item,
1060 ) {
1061 Ok((item, uncached_expr)) => {
1062 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1063 local_expression_cache.insert_uncached_expression(
1064 global_id,
1065 uncached_expr,
1066 optimizer_features,
1067 );
1068 }
1069 Ok(item)
1070 }
1071 Err((err, cached_expr)) => {
1072 if let Some(local_expr) = cached_expr {
1073 local_expression_cache.insert_cached_expression(global_id, local_expr);
1074 }
1075 Err(err)
1076 }
1077 }
1078 }
1079
1080 #[mz_ore::instrument]
1087 pub(crate) fn parse_item_inner(
1088 &self,
1089 global_id: GlobalId,
1090 create_sql: &str,
1091 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1092 pcx: Option<&PlanContext>,
1093 is_retained_metrics_object: bool,
1094 custom_logical_compaction_window: Option<CompactionWindow>,
1095 cached_expr: Option<LocalExpressions>,
1096 previous_item: Option<CatalogItem>,
1097 ) -> Result<
1098 (
1099 CatalogItem,
1100 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1101 ),
1102 (AdapterError, Option<LocalExpressions>),
1103 > {
1104 let session_catalog = self.for_system_session();
1105
1106 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1107 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1108 Err(err) => return Err((err, cached_expr)),
1109 };
1110
1111 let mut uncached_expr = None;
1112
1113 let item = match plan {
1114 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1115 let collections = extra_versions
1116 .iter()
1117 .map(|(version, gid)| (*version, *gid))
1118 .chain([(RelationVersion::root(), global_id)].into_iter())
1119 .collect();
1120
1121 CatalogItem::Table(Table {
1122 create_sql: Some(table.create_sql),
1123 desc: table.desc,
1124 collections,
1125 conn_id: None,
1126 resolved_ids,
1127 custom_logical_compaction_window: custom_logical_compaction_window
1128 .or(table.compaction_window),
1129 is_retained_metrics_object,
1130 data_source: match table.data_source {
1131 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1132 TableDataSource::TableWrites { defaults }
1133 }
1134 mz_sql::plan::TableDataSource::DataSource {
1135 desc: data_source_desc,
1136 timeline,
1137 } => match data_source_desc {
1138 mz_sql::plan::DataSourceDesc::IngestionExport {
1139 ingestion_id,
1140 external_reference,
1141 details,
1142 data_config,
1143 } => TableDataSource::DataSource {
1144 desc: DataSourceDesc::IngestionExport {
1145 ingestion_id,
1146 external_reference,
1147 details,
1148 data_config,
1149 },
1150 timeline,
1151 },
1152 mz_sql::plan::DataSourceDesc::Webhook {
1153 validate_using,
1154 body_format,
1155 headers,
1156 cluster_id,
1157 } => TableDataSource::DataSource {
1158 desc: DataSourceDesc::Webhook {
1159 validate_using,
1160 body_format,
1161 headers,
1162 cluster_id: cluster_id
1163 .expect("Webhook Tables must have a cluster_id set"),
1164 },
1165 timeline,
1166 },
1167 _ => {
1168 return Err((
1169 AdapterError::Unstructured(anyhow::anyhow!(
1170 "unsupported data source for table"
1171 )),
1172 cached_expr,
1173 ));
1174 }
1175 },
1176 },
1177 })
1178 }
1179 Plan::CreateSource(CreateSourcePlan {
1180 source,
1181 timeline,
1182 in_cluster,
1183 ..
1184 }) => CatalogItem::Source(Source {
1185 create_sql: Some(source.create_sql),
1186 data_source: match source.data_source {
1187 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1188 desc,
1189 cluster_id: match in_cluster {
1190 Some(id) => id,
1191 None => {
1192 return Err((
1193 AdapterError::Unstructured(anyhow::anyhow!(
1194 "ingestion-based sources must have cluster specified"
1195 )),
1196 cached_expr,
1197 ));
1198 }
1199 },
1200 },
1201 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1202 desc,
1203 progress_subsource,
1204 data_config,
1205 details,
1206 } => DataSourceDesc::OldSyntaxIngestion {
1207 desc,
1208 progress_subsource,
1209 data_config,
1210 details,
1211 cluster_id: match in_cluster {
1212 Some(id) => id,
1213 None => {
1214 return Err((
1215 AdapterError::Unstructured(anyhow::anyhow!(
1216 "ingestion-based sources must have cluster specified"
1217 )),
1218 cached_expr,
1219 ));
1220 }
1221 },
1222 },
1223 mz_sql::plan::DataSourceDesc::IngestionExport {
1224 ingestion_id,
1225 external_reference,
1226 details,
1227 data_config,
1228 } => DataSourceDesc::IngestionExport {
1229 ingestion_id,
1230 external_reference,
1231 details,
1232 data_config,
1233 },
1234 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1235 mz_sql::plan::DataSourceDesc::Webhook {
1236 validate_using,
1237 body_format,
1238 headers,
1239 cluster_id,
1240 } => {
1241 mz_ore::soft_assert_or_log!(
1242 cluster_id.is_none(),
1243 "cluster_id set at Source level for Webhooks"
1244 );
1245 DataSourceDesc::Webhook {
1246 validate_using,
1247 body_format,
1248 headers,
1249 cluster_id: in_cluster
1250 .expect("webhook sources must use an existing cluster"),
1251 }
1252 }
1253 },
1254 desc: source.desc,
1255 global_id,
1256 timeline,
1257 resolved_ids,
1258 custom_logical_compaction_window: source
1259 .compaction_window
1260 .or(custom_logical_compaction_window),
1261 is_retained_metrics_object,
1262 }),
1263 Plan::CreateView(CreateViewPlan { view, .. }) => {
1264 let optimizer_config =
1266 optimize::OptimizerConfig::from(session_catalog.system_vars());
1267 let previous_exprs = previous_item.map(|item| match item {
1268 CatalogItem::View(view) => Some((view.raw_expr, view.optimized_expr)),
1269 _ => None,
1270 });
1271
1272 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1273 (Some(local_expr), _)
1274 if local_expr.optimizer_features == optimizer_config.features =>
1275 {
1276 debug!("local expression cache hit for {global_id:?}");
1277 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1278 }
1279 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1281 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1282 }
1283 (cached_expr, _) => {
1284 let optimizer_features = optimizer_config.features.clone();
1285 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1287
1288 let raw_expr = view.expr;
1290 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1291 Ok(optimzed_expr) => optimzed_expr,
1292 Err(err) => return Err((err.into(), cached_expr)),
1293 };
1294
1295 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1296
1297 (Arc::new(raw_expr), Arc::new(optimized_expr))
1298 }
1299 };
1300
1301 let dependencies: BTreeSet<_> = raw_expr
1303 .depends_on()
1304 .into_iter()
1305 .map(|gid| self.get_entry_by_global_id(&gid).id())
1306 .collect();
1307
1308 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1309 CatalogItem::View(View {
1310 create_sql: view.create_sql,
1311 global_id,
1312 raw_expr,
1313 desc: RelationDesc::new(typ, view.column_names),
1314 optimized_expr,
1315 conn_id: None,
1316 resolved_ids,
1317 dependencies: DependencyIds(dependencies),
1318 })
1319 }
1320 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1321 materialized_view, ..
1322 }) => {
1323 let collections = extra_versions
1324 .iter()
1325 .map(|(version, gid)| (*version, *gid))
1326 .chain([(RelationVersion::root(), global_id)].into_iter())
1327 .collect();
1328
1329 let system_vars = session_catalog.system_vars();
1331 let overrides = self
1332 .get_cluster(materialized_view.cluster_id)
1333 .config
1334 .features();
1335 let optimizer_config =
1336 optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1337 let previous_exprs = previous_item.map(|item| match item {
1338 CatalogItem::MaterializedView(materialized_view) => {
1339 (materialized_view.raw_expr, materialized_view.optimized_expr)
1340 }
1341 item => unreachable!("expected materialized view, found: {item:#?}"),
1342 });
1343
1344 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1345 (Some(local_expr), _)
1346 if local_expr.optimizer_features == optimizer_config.features =>
1347 {
1348 debug!("local expression cache hit for {global_id:?}");
1349 (
1350 Arc::new(materialized_view.expr),
1351 Arc::new(local_expr.local_mir),
1352 )
1353 }
1354 (_, Some((raw_expr, optimized_expr)))
1356 if *raw_expr == materialized_view.expr =>
1357 {
1358 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1359 }
1360 (cached_expr, _) => {
1361 let optimizer_features = optimizer_config.features.clone();
1362 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1364
1365 let raw_expr = materialized_view.expr;
1366 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1367 Ok(optimized_expr) => optimized_expr,
1368 Err(err) => return Err((err.into(), cached_expr)),
1369 };
1370
1371 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1372
1373 (Arc::new(raw_expr), Arc::new(optimized_expr))
1374 }
1375 };
1376 let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1377
1378 for &i in &materialized_view.non_null_assertions {
1379 typ.column_types[i].nullable = false;
1380 }
1381 let desc = RelationDesc::new(typ, materialized_view.column_names);
1382 let desc = VersionedRelationDesc::new(desc);
1383
1384 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1385
1386 let dependencies = raw_expr
1388 .depends_on()
1389 .into_iter()
1390 .map(|gid| self.get_entry_by_global_id(&gid).id())
1391 .collect();
1392
1393 CatalogItem::MaterializedView(MaterializedView {
1394 create_sql: materialized_view.create_sql,
1395 collections,
1396 raw_expr,
1397 optimized_expr,
1398 desc,
1399 resolved_ids,
1400 dependencies,
1401 replacement_target: materialized_view.replacement_target,
1402 cluster_id: materialized_view.cluster_id,
1403 target_replica: materialized_view.target_replica,
1404 non_null_assertions: materialized_view.non_null_assertions,
1405 custom_logical_compaction_window: materialized_view.compaction_window,
1406 refresh_schedule: materialized_view.refresh_schedule,
1407 initial_as_of,
1408 })
1409 }
1410 Plan::CreateContinualTask(plan) => {
1411 let ct =
1412 match crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids) {
1413 Ok(ct) => ct,
1414 Err(err) => return Err((err, cached_expr)),
1415 };
1416 CatalogItem::ContinualTask(ct)
1417 }
1418 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1419 create_sql: index.create_sql,
1420 global_id,
1421 on: index.on,
1422 keys: index.keys.into(),
1423 conn_id: None,
1424 resolved_ids,
1425 cluster_id: index.cluster_id,
1426 custom_logical_compaction_window: custom_logical_compaction_window
1427 .or(index.compaction_window),
1428 is_retained_metrics_object,
1429 }),
1430 Plan::CreateSink(CreateSinkPlan {
1431 sink,
1432 with_snapshot,
1433 in_cluster,
1434 ..
1435 }) => CatalogItem::Sink(Sink {
1436 create_sql: sink.create_sql,
1437 global_id,
1438 from: sink.from,
1439 connection: sink.connection,
1440 envelope: sink.envelope,
1441 version: sink.version,
1442 with_snapshot,
1443 resolved_ids,
1444 cluster_id: in_cluster,
1445 commit_interval: sink.commit_interval,
1446 }),
1447 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1448 if let Err(err) = typ.inner.desc(&session_catalog) {
1452 return Err((err.into(), cached_expr));
1453 }
1454 CatalogItem::Type(Type {
1455 create_sql: Some(typ.create_sql),
1456 global_id,
1457 details: CatalogTypeDetails {
1458 array_id: None,
1459 typ: typ.inner,
1460 pg_metadata: None,
1461 },
1462 resolved_ids,
1463 })
1464 }
1465 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1466 create_sql: secret.create_sql,
1467 global_id,
1468 }),
1469 Plan::CreateConnection(CreateConnectionPlan {
1470 connection:
1471 mz_sql::plan::Connection {
1472 create_sql,
1473 details,
1474 },
1475 ..
1476 }) => CatalogItem::Connection(Connection {
1477 create_sql,
1478 global_id,
1479 details,
1480 resolved_ids,
1481 }),
1482 _ => {
1483 return Err((
1484 Error::new(ErrorKind::Corruption {
1485 detail: "catalog entry generated inappropriate plan".to_string(),
1486 })
1487 .into(),
1488 cached_expr,
1489 ));
1490 }
1491 };
1492
1493 Ok((item, uncached_expr))
1494 }
1495
1496 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1502 let restore = Arc::clone(&self.system_configuration);
1513 Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1514 let res = f(self);
1515 self.system_configuration = restore;
1516 res
1517 }
1518
1519 pub fn get_indexes_on(
1521 &self,
1522 id: GlobalId,
1523 cluster: ClusterId,
1524 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1525 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1526
1527 self.try_get_entry_by_global_id(&id)
1528 .into_iter()
1529 .map(move |e| {
1530 e.used_by()
1531 .iter()
1532 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1533 CatalogItem::Index(index) if index_matches(index) => {
1534 Some((index.global_id(), index))
1535 }
1536 _ => None,
1537 })
1538 })
1539 .flatten()
1540 }
1541
1542 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1543 &self.database_by_id[database_id]
1544 }
1545
1546 pub(super) fn try_get_cluster_replica(
1551 &self,
1552 id: ClusterId,
1553 replica_id: ReplicaId,
1554 ) -> Option<&ClusterReplica> {
1555 self.try_get_cluster(id)
1556 .and_then(|cluster| cluster.replica(replica_id))
1557 }
1558
1559 pub(crate) fn get_cluster_replica(
1563 &self,
1564 cluster_id: ClusterId,
1565 replica_id: ReplicaId,
1566 ) -> &ClusterReplica {
1567 self.try_get_cluster_replica(cluster_id, replica_id)
1568 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1569 }
1570
1571 pub(super) fn resolve_replica_in_cluster(
1572 &self,
1573 cluster_id: &ClusterId,
1574 replica_name: &str,
1575 ) -> Result<&ClusterReplica, SqlCatalogError> {
1576 let cluster = self.get_cluster(*cluster_id);
1577 let replica_id = cluster
1578 .replica_id_by_name_
1579 .get(replica_name)
1580 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1581 Ok(&cluster.replicas_by_id_[replica_id])
1582 }
1583
1584 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1586 Ok(self.system_configuration.get(name)?)
1587 }
1588
1589 pub(super) fn parse_system_configuration(
1593 &self,
1594 name: &str,
1595 value: VarInput,
1596 ) -> Result<String, Error> {
1597 let value = self.system_configuration.parse(name, value)?;
1598 Ok(value.format())
1599 }
1600
1601 pub(super) fn resolve_schema_in_database(
1603 &self,
1604 database_spec: &ResolvedDatabaseSpecifier,
1605 schema_name: &str,
1606 conn_id: &ConnectionId,
1607 ) -> Result<&Schema, SqlCatalogError> {
1608 let schema = match database_spec {
1609 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1610 self.temporary_schemas.get(conn_id)
1611 }
1612 ResolvedDatabaseSpecifier::Ambient => self
1613 .ambient_schemas_by_name
1614 .get(schema_name)
1615 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1616 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1617 db.schemas_by_name
1618 .get(schema_name)
1619 .and_then(|id| db.schemas_by_id.get(id))
1620 }),
1621 };
1622 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1623 }
1624
1625 pub fn try_get_schema(
1630 &self,
1631 database_spec: &ResolvedDatabaseSpecifier,
1632 schema_spec: &SchemaSpecifier,
1633 conn_id: &ConnectionId,
1634 ) -> Option<&Schema> {
1635 match (database_spec, schema_spec) {
1637 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1638 self.temporary_schemas.get(conn_id)
1639 }
1640 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1641 self.ambient_schemas_by_id.get(id)
1642 }
1643 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1644 .database_by_id
1645 .get(database_id)
1646 .and_then(|db| db.schemas_by_id.get(schema_id)),
1647 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1648 unreachable!("temporary schemas are in the ambient database")
1649 }
1650 }
1651 }
1652
1653 pub fn get_schema(
1654 &self,
1655 database_spec: &ResolvedDatabaseSpecifier,
1656 schema_spec: &SchemaSpecifier,
1657 conn_id: &ConnectionId,
1658 ) -> &Schema {
1659 self.try_get_schema(database_spec, schema_spec, conn_id)
1661 .expect("schema must exist")
1662 }
1663
1664 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1665 self.database_by_id
1666 .values()
1667 .filter_map(|database| database.schemas_by_id.get(schema_id))
1668 .chain(self.ambient_schemas_by_id.values())
1669 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1670 .into_first()
1671 }
1672
1673 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1674 self.temporary_schemas
1675 .values()
1676 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1677 .into_first()
1678 }
1679
1680 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1681 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1682 }
1683
1684 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1685 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1686 }
1687
1688 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1689 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1690 }
1691
1692 pub fn get_information_schema_id(&self) -> SchemaId {
1693 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1694 }
1695
1696 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1697 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1698 }
1699
1700 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1701 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1702 }
1703
1704 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1705 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1706 }
1707
1708 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1709 SYSTEM_SCHEMAS
1710 .iter()
1711 .map(|name| self.ambient_schemas_by_name[*name])
1712 }
1713
1714 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1715 self.system_schema_ids().contains(&id)
1716 }
1717
1718 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1719 match spec {
1720 SchemaSpecifier::Temporary => false,
1721 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1722 }
1723 }
1724
1725 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1726 UNSTABLE_SCHEMAS
1727 .iter()
1728 .map(|name| self.ambient_schemas_by_name[*name])
1729 }
1730
1731 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1732 self.unstable_schema_ids().contains(&id)
1733 }
1734
1735 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1736 match spec {
1737 SchemaSpecifier::Temporary => false,
1738 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1739 }
1740 }
1741
1742 pub fn create_temporary_schema(
1745 &mut self,
1746 conn_id: &ConnectionId,
1747 owner_id: RoleId,
1748 ) -> Result<(), Error> {
1749 let oid = INVALID_OID;
1754 self.temporary_schemas.insert(
1755 conn_id.clone(),
1756 Schema {
1757 name: QualifiedSchemaName {
1758 database: ResolvedDatabaseSpecifier::Ambient,
1759 schema: MZ_TEMP_SCHEMA.into(),
1760 },
1761 id: SchemaSpecifier::Temporary,
1762 oid,
1763 items: BTreeMap::new(),
1764 functions: BTreeMap::new(),
1765 types: BTreeMap::new(),
1766 owner_id,
1767 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1768 mz_sql::catalog::ObjectType::Schema,
1769 owner_id,
1770 )]),
1771 },
1772 );
1773 Ok(())
1774 }
1775
1776 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1778 std::iter::empty()
1779 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1780 if schema.id.is_temporary() {
1781 Some(schema.oid)
1782 } else {
1783 None
1784 }
1785 }))
1786 .chain(self.entry_by_id.values().filter_map(|entry| {
1787 if entry.item().is_temporary() {
1788 Some(entry.oid)
1789 } else {
1790 None
1791 }
1792 }))
1793 }
1794
1795 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1799 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1800 }
1801
1802 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1806 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1807 let log = match self.get_entry(&item_id).item() {
1808 CatalogItem::Log(log) => log,
1809 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1810 };
1811 (item_id, log.global_id)
1812 }
1813
1814 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1818 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1819 }
1820
1821 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1825 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1826 let schema = &self.ambient_schemas_by_id[schema_id];
1827 match builtin.catalog_item_type() {
1828 CatalogItemType::Type => schema.types[builtin.name()],
1829 CatalogItemType::Func => schema.functions[builtin.name()],
1830 CatalogItemType::Table
1831 | CatalogItemType::Source
1832 | CatalogItemType::Sink
1833 | CatalogItemType::View
1834 | CatalogItemType::MaterializedView
1835 | CatalogItemType::Index
1836 | CatalogItemType::Secret
1837 | CatalogItemType::Connection
1838 | CatalogItemType::ContinualTask => schema.items[builtin.name()],
1839 }
1840 }
1841
1842 pub fn resolve_builtin_type_references(
1844 &self,
1845 builtin: &BuiltinType<NameReference>,
1846 ) -> BuiltinType<IdReference> {
1847 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1848 CatalogType::AclItem => CatalogType::AclItem,
1849 CatalogType::Array { element_reference } => CatalogType::Array {
1850 element_reference: self.get_system_type(element_reference).id,
1851 },
1852 CatalogType::List {
1853 element_reference,
1854 element_modifiers,
1855 } => CatalogType::List {
1856 element_reference: self.get_system_type(element_reference).id,
1857 element_modifiers: element_modifiers.clone(),
1858 },
1859 CatalogType::Map {
1860 key_reference,
1861 value_reference,
1862 key_modifiers,
1863 value_modifiers,
1864 } => CatalogType::Map {
1865 key_reference: self.get_system_type(key_reference).id,
1866 value_reference: self.get_system_type(value_reference).id,
1867 key_modifiers: key_modifiers.clone(),
1868 value_modifiers: value_modifiers.clone(),
1869 },
1870 CatalogType::Range { element_reference } => CatalogType::Range {
1871 element_reference: self.get_system_type(element_reference).id,
1872 },
1873 CatalogType::Record { fields } => CatalogType::Record {
1874 fields: fields
1875 .into_iter()
1876 .map(|f| CatalogRecordField {
1877 name: f.name.clone(),
1878 type_reference: self.get_system_type(f.type_reference).id,
1879 type_modifiers: f.type_modifiers.clone(),
1880 })
1881 .collect(),
1882 },
1883 CatalogType::Bool => CatalogType::Bool,
1884 CatalogType::Bytes => CatalogType::Bytes,
1885 CatalogType::Char => CatalogType::Char,
1886 CatalogType::Date => CatalogType::Date,
1887 CatalogType::Float32 => CatalogType::Float32,
1888 CatalogType::Float64 => CatalogType::Float64,
1889 CatalogType::Int16 => CatalogType::Int16,
1890 CatalogType::Int32 => CatalogType::Int32,
1891 CatalogType::Int64 => CatalogType::Int64,
1892 CatalogType::UInt16 => CatalogType::UInt16,
1893 CatalogType::UInt32 => CatalogType::UInt32,
1894 CatalogType::UInt64 => CatalogType::UInt64,
1895 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1896 CatalogType::Interval => CatalogType::Interval,
1897 CatalogType::Jsonb => CatalogType::Jsonb,
1898 CatalogType::Numeric => CatalogType::Numeric,
1899 CatalogType::Oid => CatalogType::Oid,
1900 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1901 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1902 CatalogType::Pseudo => CatalogType::Pseudo,
1903 CatalogType::RegClass => CatalogType::RegClass,
1904 CatalogType::RegProc => CatalogType::RegProc,
1905 CatalogType::RegType => CatalogType::RegType,
1906 CatalogType::String => CatalogType::String,
1907 CatalogType::Time => CatalogType::Time,
1908 CatalogType::Timestamp => CatalogType::Timestamp,
1909 CatalogType::TimestampTz => CatalogType::TimestampTz,
1910 CatalogType::Uuid => CatalogType::Uuid,
1911 CatalogType::VarChar => CatalogType::VarChar,
1912 CatalogType::Int2Vector => CatalogType::Int2Vector,
1913 CatalogType::MzAclItem => CatalogType::MzAclItem,
1914 };
1915
1916 BuiltinType {
1917 name: builtin.name,
1918 schema: builtin.schema,
1919 oid: builtin.oid,
1920 details: CatalogTypeDetails {
1921 array_id: builtin.details.array_id,
1922 typ,
1923 pg_metadata: builtin.details.pg_metadata.clone(),
1924 },
1925 }
1926 }
1927
1928 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1929 &self.config
1930 }
1931
1932 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1933 match self.database_by_name.get(database_name) {
1934 Some(id) => Ok(&self.database_by_id[id]),
1935 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1936 }
1937 }
1938
1939 pub fn resolve_schema(
1940 &self,
1941 current_database: Option<&DatabaseId>,
1942 database_name: Option<&str>,
1943 schema_name: &str,
1944 conn_id: &ConnectionId,
1945 ) -> Result<&Schema, SqlCatalogError> {
1946 let database_spec = match database_name {
1947 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
1952 self.resolve_database(database)?.id().clone(),
1953 )),
1954 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
1955 };
1956
1957 if let Some(database_spec) = database_spec {
1959 if let Ok(schema) =
1960 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
1961 {
1962 return Ok(schema);
1963 }
1964 }
1965
1966 if let Ok(schema) = self.resolve_schema_in_database(
1968 &ResolvedDatabaseSpecifier::Ambient,
1969 schema_name,
1970 conn_id,
1971 ) {
1972 return Ok(schema);
1973 }
1974
1975 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
1976 }
1977
1978 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
1982 self.ambient_schemas_by_name[name]
1983 }
1984
1985 pub fn resolve_search_path(
1986 &self,
1987 session: &dyn SessionMetadata,
1988 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
1989 let database = self
1990 .database_by_name
1991 .get(session.database())
1992 .map(|id| id.clone());
1993
1994 session
1995 .search_path()
1996 .iter()
1997 .map(|schema| {
1998 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
1999 })
2000 .filter_map(|schema| schema.ok())
2001 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2002 .collect()
2003 }
2004
2005 pub fn effective_search_path(
2006 &self,
2007 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2008 include_temp_schema: bool,
2009 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2010 let mut v = Vec::with_capacity(search_path.len() + 3);
2011 let temp_schema = (
2013 ResolvedDatabaseSpecifier::Ambient,
2014 SchemaSpecifier::Temporary,
2015 );
2016 if include_temp_schema && !search_path.contains(&temp_schema) {
2017 v.push(temp_schema);
2018 }
2019 let default_schemas = [
2020 (
2021 ResolvedDatabaseSpecifier::Ambient,
2022 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2023 ),
2024 (
2025 ResolvedDatabaseSpecifier::Ambient,
2026 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2027 ),
2028 ];
2029 for schema in default_schemas.into_iter() {
2030 if !search_path.contains(&schema) {
2031 v.push(schema);
2032 }
2033 }
2034 v.extend_from_slice(search_path);
2035 v
2036 }
2037
2038 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2039 let id = self
2040 .clusters_by_name
2041 .get(name)
2042 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2043 Ok(&self.clusters_by_id[id])
2044 }
2045
2046 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2047 let id = self
2048 .clusters_by_name
2049 .get(cluster.name)
2050 .expect("failed to lookup BuiltinCluster by name");
2051 self.clusters_by_id
2052 .get(id)
2053 .expect("failed to lookup BuiltinCluster by ID")
2054 }
2055
2056 pub fn resolve_cluster_replica(
2057 &self,
2058 cluster_replica_name: &QualifiedReplica,
2059 ) -> Result<&ClusterReplica, SqlCatalogError> {
2060 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2061 let replica_name = cluster_replica_name.replica.as_str();
2062 let replica_id = cluster
2063 .replica_id(replica_name)
2064 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2065 Ok(cluster.replica(replica_id).expect("Must exist"))
2066 }
2067
2068 #[allow(clippy::useless_let_if_seq)]
2074 pub fn resolve(
2075 &self,
2076 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2077 current_database: Option<&DatabaseId>,
2078 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2079 name: &PartialItemName,
2080 conn_id: &ConnectionId,
2081 err_gen: fn(String) -> SqlCatalogError,
2082 ) -> Result<&CatalogEntry, SqlCatalogError> {
2083 let schemas = match &name.schema {
2088 Some(schema_name) => {
2089 match self.resolve_schema(
2090 current_database,
2091 name.database.as_deref(),
2092 schema_name,
2093 conn_id,
2094 ) {
2095 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2096 Err(e) => return Err(e),
2097 }
2098 }
2099 None => match self
2100 .try_get_schema(
2101 &ResolvedDatabaseSpecifier::Ambient,
2102 &SchemaSpecifier::Temporary,
2103 conn_id,
2104 )
2105 .and_then(|schema| schema.items.get(&name.item))
2106 {
2107 Some(id) => return Ok(self.get_entry(id)),
2108 None => search_path.to_vec(),
2109 },
2110 };
2111
2112 for (database_spec, schema_spec) in &schemas {
2113 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2116 continue;
2117 };
2118
2119 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2120 return Ok(&self.entry_by_id[id]);
2121 }
2122 }
2123
2124 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2129 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2130 for schema_id in [
2131 self.get_mz_catalog_unstable_schema_id(),
2132 self.get_mz_introspection_schema_id(),
2133 ] {
2134 let schema = self.get_schema(
2135 &ResolvedDatabaseSpecifier::Ambient,
2136 &SchemaSpecifier::Id(schema_id),
2137 conn_id,
2138 );
2139
2140 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2141 debug!(
2142 github_27831 = true,
2143 "encountered use of outdated schema `mz_internal` for relation: {name}",
2144 );
2145 return Ok(&self.entry_by_id[id]);
2146 }
2147 }
2148 }
2149
2150 Err(err_gen(name.to_string()))
2151 }
2152
2153 pub fn resolve_entry(
2155 &self,
2156 current_database: Option<&DatabaseId>,
2157 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2158 name: &PartialItemName,
2159 conn_id: &ConnectionId,
2160 ) -> Result<&CatalogEntry, SqlCatalogError> {
2161 self.resolve(
2162 |schema| &schema.items,
2163 current_database,
2164 search_path,
2165 name,
2166 conn_id,
2167 SqlCatalogError::UnknownItem,
2168 )
2169 }
2170
2171 pub fn resolve_function(
2173 &self,
2174 current_database: Option<&DatabaseId>,
2175 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2176 name: &PartialItemName,
2177 conn_id: &ConnectionId,
2178 ) -> Result<&CatalogEntry, SqlCatalogError> {
2179 self.resolve(
2180 |schema| &schema.functions,
2181 current_database,
2182 search_path,
2183 name,
2184 conn_id,
2185 |name| SqlCatalogError::UnknownFunction {
2186 name,
2187 alternative: None,
2188 },
2189 )
2190 }
2191
2192 pub fn resolve_type(
2194 &self,
2195 current_database: Option<&DatabaseId>,
2196 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2197 name: &PartialItemName,
2198 conn_id: &ConnectionId,
2199 ) -> Result<&CatalogEntry, SqlCatalogError> {
2200 static NON_PG_CATALOG_TYPES: LazyLock<
2201 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2202 > = LazyLock::new(|| {
2203 BUILTINS::types()
2204 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2205 .map(|typ| (typ.name, typ))
2206 .collect()
2207 });
2208
2209 let entry = self.resolve(
2210 |schema| &schema.types,
2211 current_database,
2212 search_path,
2213 name,
2214 conn_id,
2215 |name| SqlCatalogError::UnknownType { name },
2216 )?;
2217
2218 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2219 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2220 warn!(
2221 "user specified an incorrect schema of {} for the type {}, which should be in \
2222 the {} schema. This works now due to a bug but will be fixed in a later release.",
2223 PG_CATALOG_SCHEMA.quoted(),
2224 typ.name.quoted(),
2225 typ.schema.quoted(),
2226 )
2227 }
2228 }
2229
2230 Ok(entry)
2231 }
2232
2233 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2235 match object_id {
2236 ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2237 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2238 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2239 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2240 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2241 ObjectId::ClusterReplica(cluster_replica_id) => {
2242 CommentObjectId::ClusterReplica(cluster_replica_id)
2243 }
2244 ObjectId::NetworkPolicy(network_policy_id) => {
2245 CommentObjectId::NetworkPolicy(network_policy_id)
2246 }
2247 }
2248 }
2249
2250 pub fn system_config(&self) -> &SystemVars {
2252 &self.system_configuration
2253 }
2254
2255 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2257 Arc::make_mut(&mut self.system_configuration)
2258 }
2259
2260 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2270 let mut dump = serde_json::to_value(&self).map_err(|e| {
2272 Error::new(ErrorKind::Unstructured(format!(
2273 "internal error: could not dump catalog: {}",
2276 e
2277 )))
2278 })?;
2279
2280 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2281 dump_obj.insert(
2283 "system_parameter_defaults".into(),
2284 serde_json::json!(self.system_config().defaults()),
2285 );
2286 if let Some(unfinalized_shards) = unfinalized_shards {
2288 dump_obj
2289 .get_mut("storage_metadata")
2290 .expect("known to exist")
2291 .as_object_mut()
2292 .expect("storage_metadata is an object")
2293 .insert(
2294 "unfinalized_shards".into(),
2295 serde_json::json!(unfinalized_shards),
2296 );
2297 }
2298 let temporary_gids: Vec<_> = self
2303 .entry_by_global_id
2304 .iter()
2305 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2306 .map(|(gid, _item_id)| *gid)
2307 .collect();
2308 if !temporary_gids.is_empty() {
2309 let gids = dump_obj
2310 .get_mut("entry_by_global_id")
2311 .expect("known_to_exist")
2312 .as_object_mut()
2313 .expect("entry_by_global_id is an object");
2314 for gid in temporary_gids {
2315 gids.remove(&gid.to_string());
2316 }
2317 }
2318 dump_obj.remove("role_auth_by_id");
2321
2322 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2324 }
2325
2326 pub fn availability_zones(&self) -> &[String] {
2327 &self.availability_zones
2328 }
2329
2330 pub fn concretize_replica_location(
2331 &self,
2332 location: mz_catalog::durable::ReplicaLocation,
2333 allowed_sizes: &Vec<String>,
2334 allowed_availability_zones: Option<&[String]>,
2335 ) -> Result<ReplicaLocation, Error> {
2336 let location = match location {
2337 mz_catalog::durable::ReplicaLocation::Unmanaged {
2338 storagectl_addrs,
2339 computectl_addrs,
2340 } => {
2341 if allowed_availability_zones.is_some() {
2342 return Err(Error {
2343 kind: ErrorKind::Internal(
2344 "tried concretize unmanaged replica with specific availability_zones"
2345 .to_string(),
2346 ),
2347 });
2348 }
2349 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2350 storagectl_addrs,
2351 computectl_addrs,
2352 })
2353 }
2354 mz_catalog::durable::ReplicaLocation::Managed {
2355 size,
2356 availability_zone,
2357 billed_as,
2358 internal,
2359 pending,
2360 } => {
2361 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2362 let message = "tried concretize managed replica with specific availability zones and availability zone";
2363 return Err(Error {
2364 kind: ErrorKind::Internal(message.to_string()),
2365 });
2366 }
2367 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2368 let cluster_replica_sizes = &self.cluster_replica_sizes;
2369
2370 ReplicaLocation::Managed(ManagedReplicaLocation {
2371 allocation: cluster_replica_sizes
2372 .0
2373 .get(&size)
2374 .expect("catalog out of sync")
2375 .clone(),
2376 availability_zones: match (availability_zone, allowed_availability_zones) {
2377 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2378 (None, Some(azs)) if azs.is_empty() => {
2379 ManagedReplicaAvailabilityZones::FromCluster(None)
2380 }
2381 (None, Some(azs)) => {
2382 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2383 }
2384 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2385 },
2386 size,
2387 billed_as,
2388 internal,
2389 pending,
2390 })
2391 }
2392 };
2393 Ok(location)
2394 }
2395
2396 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2407 let alloc = &self.cluster_replica_sizes.0[size];
2408 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2409 }
2410
2411 pub(crate) fn ensure_valid_replica_size(
2412 &self,
2413 allowed_sizes: &[String],
2414 size: &String,
2415 ) -> Result<(), Error> {
2416 let cluster_replica_sizes = &self.cluster_replica_sizes;
2417
2418 if !cluster_replica_sizes.0.contains_key(size)
2419 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2420 || cluster_replica_sizes.0[size].disabled
2421 {
2422 let mut entries = cluster_replica_sizes
2423 .enabled_allocations()
2424 .collect::<Vec<_>>();
2425
2426 if !allowed_sizes.is_empty() {
2427 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2428 entries.retain(|(name, _)| allowed_sizes.contains(name));
2429 }
2430
2431 entries.sort_by_key(
2432 |(
2433 _name,
2434 ReplicaAllocation {
2435 scale, cpu_limit, ..
2436 },
2437 )| (scale, cpu_limit),
2438 );
2439
2440 Err(Error {
2441 kind: ErrorKind::InvalidClusterReplicaSize {
2442 size: size.to_owned(),
2443 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2444 },
2445 })
2446 } else {
2447 Ok(())
2448 }
2449 }
2450
2451 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2452 if role_id.is_builtin() {
2453 let role = self.get_role(role_id);
2454 Err(Error::new(ErrorKind::ReservedRoleName(
2455 role.name().to_string(),
2456 )))
2457 } else {
2458 Ok(())
2459 }
2460 }
2461
2462 pub fn ensure_not_reserved_network_policy(
2463 &self,
2464 network_policy_id: &NetworkPolicyId,
2465 ) -> Result<(), Error> {
2466 if network_policy_id.is_builtin() {
2467 let policy = self.get_network_policy(network_policy_id);
2468 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2469 policy.name.clone(),
2470 )))
2471 } else {
2472 Ok(())
2473 }
2474 }
2475
2476 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2477 let is_grantable = !role_id.is_public() && !role_id.is_system();
2478 if is_grantable {
2479 Ok(())
2480 } else {
2481 let role = self.get_role(role_id);
2482 Err(Error::new(ErrorKind::UngrantableRoleName(
2483 role.name().to_string(),
2484 )))
2485 }
2486 }
2487
2488 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2489 if role_id.is_system() {
2490 let role = self.get_role(role_id);
2491 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2492 role.name().to_string(),
2493 )))
2494 } else {
2495 Ok(())
2496 }
2497 }
2498
2499 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2500 if role_id.is_predefined() {
2501 let role = self.get_role(role_id);
2502 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2503 role.name().to_string(),
2504 )))
2505 } else {
2506 Ok(())
2507 }
2508 }
2509
2510 pub(crate) fn add_to_audit_log(
2513 system_configuration: &SystemVars,
2514 oracle_write_ts: mz_repr::Timestamp,
2515 session: Option<&ConnMeta>,
2516 tx: &mut mz_catalog::durable::Transaction,
2517 audit_events: &mut Vec<VersionedEvent>,
2518 event_type: EventType,
2519 object_type: ObjectType,
2520 details: EventDetails,
2521 ) -> Result<(), Error> {
2522 let user = session.map(|session| session.user().name.to_string());
2523
2524 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2527 Some(ts) => ts.into(),
2528 _ => oracle_write_ts.into(),
2529 };
2530 let id = tx.allocate_audit_log_id()?;
2531 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2532 audit_events.push(event.clone());
2533 tx.insert_audit_log_event(event);
2534 Ok(())
2535 }
2536
2537 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2538 match id {
2539 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2540 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2541 self.get_cluster_replica(*cluster_id, *replica_id)
2542 .owner_id(),
2543 ),
2544 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2545 ObjectId::Schema((database_spec, schema_spec)) => Some(
2546 self.get_schema(database_spec, schema_spec, conn_id)
2547 .owner_id(),
2548 ),
2549 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2550 ObjectId::Role(_) => None,
2551 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2552 }
2553 }
2554
2555 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2556 match object_id {
2557 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2558 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2559 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2560 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2561 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2562 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2563 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2564 }
2565 }
2566
2567 pub(super) fn get_system_object_type(
2568 &self,
2569 id: &SystemObjectId,
2570 ) -> mz_sql::catalog::SystemObjectType {
2571 match id {
2572 SystemObjectId::Object(object_id) => {
2573 SystemObjectType::Object(self.get_object_type(object_id))
2574 }
2575 SystemObjectId::System => SystemObjectType::System,
2576 }
2577 }
2578
2579 pub fn storage_metadata(&self) -> &StorageMetadata {
2583 &self.storage_metadata
2584 }
2585
2586 pub fn source_compaction_windows(
2588 &self,
2589 ids: impl IntoIterator<Item = CatalogItemId>,
2590 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2591 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2592 let mut seen = BTreeSet::new();
2593 for item_id in ids {
2594 if !seen.insert(item_id) {
2595 continue;
2596 }
2597 let entry = self.get_entry(&item_id);
2598 match entry.item() {
2599 CatalogItem::Source(source) => {
2600 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2601 cws.entry(source_cw).or_default().insert(item_id);
2602 }
2603 CatalogItem::Table(table) => {
2604 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2605 match &table.data_source {
2606 TableDataSource::DataSource {
2607 desc:
2608 DataSourceDesc::IngestionExport { .. }
2609 | DataSourceDesc::Webhook { .. },
2611 timeline: _,
2612 } => {
2613 cws.entry(table_cw).or_default().insert(item_id);
2614 }
2615 TableDataSource::TableWrites { .. } => {}
2618 TableDataSource::DataSource {
2619 desc:
2620 DataSourceDesc::Ingestion { .. }
2621 | DataSourceDesc::OldSyntaxIngestion { .. }
2622 | DataSourceDesc::Introspection(_)
2623 | DataSourceDesc::Progress
2624 | DataSourceDesc::Catalog,
2625 ..
2626 } => {
2627 unreachable!(
2628 "unexpected DataSourceDesc for table {item_id}: {:?}",
2629 table.data_source
2630 )
2631 }
2632 }
2633 }
2634 _ => {
2635 continue;
2637 }
2638 }
2639 }
2640 cws
2641 }
2642
2643 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2644 match id {
2645 CommentObjectId::Table(id)
2646 | CommentObjectId::View(id)
2647 | CommentObjectId::MaterializedView(id)
2648 | CommentObjectId::Source(id)
2649 | CommentObjectId::Sink(id)
2650 | CommentObjectId::Index(id)
2651 | CommentObjectId::Func(id)
2652 | CommentObjectId::Connection(id)
2653 | CommentObjectId::Type(id)
2654 | CommentObjectId::Secret(id)
2655 | CommentObjectId::ContinualTask(id) => Some(*id),
2656 CommentObjectId::Role(_)
2657 | CommentObjectId::Database(_)
2658 | CommentObjectId::Schema(_)
2659 | CommentObjectId::Cluster(_)
2660 | CommentObjectId::ClusterReplica(_)
2661 | CommentObjectId::NetworkPolicy(_) => None,
2662 }
2663 }
2664
2665 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2666 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2667 }
2668
2669 pub fn comment_id_to_audit_log_name(
2670 &self,
2671 id: CommentObjectId,
2672 conn_id: &ConnectionId,
2673 ) -> String {
2674 match id {
2675 CommentObjectId::Table(id)
2676 | CommentObjectId::View(id)
2677 | CommentObjectId::MaterializedView(id)
2678 | CommentObjectId::Source(id)
2679 | CommentObjectId::Sink(id)
2680 | CommentObjectId::Index(id)
2681 | CommentObjectId::Func(id)
2682 | CommentObjectId::Connection(id)
2683 | CommentObjectId::Type(id)
2684 | CommentObjectId::Secret(id)
2685 | CommentObjectId::ContinualTask(id) => {
2686 let item = self.get_entry(&id);
2687 let name = self.resolve_full_name(item.name(), Some(conn_id));
2688 name.to_string()
2689 }
2690 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2691 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2692 CommentObjectId::Schema((spec, schema_id)) => {
2693 let schema = self.get_schema(&spec, &schema_id, conn_id);
2694 self.resolve_full_schema_name(&schema.name).to_string()
2695 }
2696 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2697 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2698 let cluster = self.get_cluster(cluster_id);
2699 let replica = self.get_cluster_replica(cluster_id, replica_id);
2700 QualifiedReplica {
2701 cluster: Ident::new_unchecked(cluster.name.clone()),
2702 replica: Ident::new_unchecked(replica.name.clone()),
2703 }
2704 .to_string()
2705 }
2706 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2707 }
2708 }
2709
2710 pub fn mock_authentication_nonce(&self) -> String {
2711 self.mock_authentication_nonce.clone().unwrap_or_default()
2712 }
2713}
2714
2715impl ConnectionResolver for CatalogState {
2716 fn resolve_connection(
2717 &self,
2718 id: CatalogItemId,
2719 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2720 use mz_storage_types::connections::Connection::*;
2721 match self
2722 .get_entry(&id)
2723 .connection()
2724 .expect("catalog out of sync")
2725 .details
2726 .to_connection()
2727 {
2728 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2729 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2730 Csr(conn) => Csr(conn.into_inline_connection(self)),
2731 Ssh(conn) => Ssh(conn),
2732 Aws(conn) => Aws(conn),
2733 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2734 MySql(conn) => MySql(conn.into_inline_connection(self)),
2735 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2736 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2737 }
2738 }
2739}
2740
2741impl OptimizerCatalog for CatalogState {
2742 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2743 CatalogState::get_entry_by_global_id(self, id)
2744 }
2745 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2746 CatalogState::get_entry(self, id)
2747 }
2748 fn resolve_full_name(
2749 &self,
2750 name: &QualifiedItemName,
2751 conn_id: Option<&ConnectionId>,
2752 ) -> FullItemName {
2753 CatalogState::resolve_full_name(self, name, conn_id)
2754 }
2755 fn get_indexes_on(
2756 &self,
2757 id: GlobalId,
2758 cluster: ClusterId,
2759 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2760 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2761 }
2762}
2763
2764impl OptimizerCatalog for Catalog {
2765 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2766 self.state.get_entry_by_global_id(id)
2767 }
2768
2769 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2770 self.state.get_entry(id)
2771 }
2772
2773 fn resolve_full_name(
2774 &self,
2775 name: &QualifiedItemName,
2776 conn_id: Option<&ConnectionId>,
2777 ) -> FullItemName {
2778 self.state.resolve_full_name(name, conn_id)
2779 }
2780
2781 fn get_indexes_on(
2782 &self,
2783 id: GlobalId,
2784 cluster: ClusterId,
2785 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2786 Box::new(self.state.get_indexes_on(id, cluster))
2787 }
2788}
2789
2790impl Catalog {
2791 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2792 self
2793 }
2794}