1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{
67 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
68 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
69 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
70 TypeReference,
71};
72use mz_sql::catalog::{CatalogConfig, EnvironmentId};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use mz_transform::notice::OptimizerNotice;
94use serde::Serialize;
95use timely::progress::Antichain;
96use tokio::sync::mpsc;
97use tracing::{debug, warn};
98
99use crate::AdapterError;
101use crate::catalog::{Catalog, ConnCatalog};
102use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
103use crate::optimize::{self, Optimize, OptimizerCatalog};
104use crate::session::Session;
105
106#[derive(Debug, Clone, Serialize)]
113pub struct CatalogState {
114 pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
120 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121 pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
122 #[serde(serialize_with = "skip_temp_items")]
123 pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
124 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125 pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
126 pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
127 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128 pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
129 pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
130 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131 pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
132 pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
133 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
134 pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
135 pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
136 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
137 pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
138 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
139 pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
140
141 #[serde(skip)]
142 pub(super) system_configuration: Arc<SystemVars>,
143 pub(super) default_privileges: Arc<DefaultPrivileges>,
144 pub(super) system_privileges: Arc<PrivilegeMap>,
145 pub(super) comments: Arc<CommentsMap>,
146 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
147 pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
148 pub(super) storage_metadata: Arc<StorageMetadata>,
149 pub(super) mock_authentication_nonce: Option<String>,
150
151 #[serde(skip)]
156 pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
157
158 #[serde(skip)]
162 pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
163
164 #[serde(skip)]
166 pub(super) config: mz_sql::catalog::CatalogConfig,
167 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
168 #[serde(skip)]
169 pub(crate) availability_zones: Vec<String>,
170
171 #[serde(skip)]
173 pub(super) egress_addresses: Vec<IpNet>,
174 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
175 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
176 pub(super) http_host_name: Option<String>,
177
178 #[serde(skip)]
180 pub(super) license_key: ValidatedLicenseKey,
181}
182
183#[derive(Debug, Clone, Serialize)]
188pub(crate) enum LocalExpressionCache {
189 Open {
191 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
193 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
195 },
196 Closed,
198}
199
200impl LocalExpressionCache {
201 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
202 Self::Open {
203 cached_exprs,
204 uncached_exprs: BTreeMap::new(),
205 }
206 }
207
208 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
209 match self {
210 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
211 LocalExpressionCache::Closed => None,
212 }
213 }
214
215 pub(super) fn insert_cached_expression(
218 &mut self,
219 id: GlobalId,
220 local_expressions: LocalExpressions,
221 ) {
222 match self {
223 LocalExpressionCache::Open { cached_exprs, .. } => {
224 cached_exprs.insert(id, local_expressions);
225 }
226 LocalExpressionCache::Closed => {}
227 }
228 }
229
230 pub(super) fn insert_uncached_expression(
233 &mut self,
234 id: GlobalId,
235 local_mir: OptimizedMirRelationExpr,
236 optimizer_features: OptimizerFeatures,
237 ) {
238 match self {
239 LocalExpressionCache::Open { uncached_exprs, .. } => {
240 let local_expr = LocalExpressions {
241 local_mir,
242 optimizer_features,
243 };
244 let prev = uncached_exprs.remove(&id);
251 match prev {
252 Some(prev) if prev == local_expr => {
253 uncached_exprs.insert(id, local_expr);
254 }
255 None => {
256 uncached_exprs.insert(id, local_expr);
257 }
258 Some(_) => {}
259 }
260 }
261 LocalExpressionCache::Closed => {}
262 }
263 }
264
265 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
266 match self {
267 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
268 LocalExpressionCache::Closed => BTreeMap::new(),
269 }
270 }
271}
272
273fn skip_temp_items<S>(
274 entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
275 serializer: S,
276) -> Result<S::Ok, S::Error>
277where
278 S: serde::Serializer,
279{
280 mz_ore::serde::map_key_to_string(
281 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
282 serializer,
283 )
284}
285
286impl CatalogState {
287 pub fn empty_test() -> Self {
291 CatalogState {
292 database_by_name: Default::default(),
293 database_by_id: Default::default(),
294 entry_by_id: Default::default(),
295 entry_by_global_id: Default::default(),
296 notices_by_dep_id: Default::default(),
297 ambient_schemas_by_name: Default::default(),
298 ambient_schemas_by_id: Default::default(),
299 temporary_schemas: Default::default(),
300 clusters_by_id: Default::default(),
301 clusters_by_name: Default::default(),
302 network_policies_by_name: Default::default(),
303 roles_by_name: Default::default(),
304 roles_by_id: Default::default(),
305 network_policies_by_id: Default::default(),
306 role_auth_by_id: Default::default(),
307 config: CatalogConfig {
308 start_time: Default::default(),
309 start_instant: Instant::now(),
310 nonce: Default::default(),
311 environment_id: EnvironmentId::for_tests(),
312 session_id: Default::default(),
313 build_info: &DUMMY_BUILD_INFO,
314 now: NOW_ZERO.clone(),
315 connection_context: ConnectionContext::for_tests(Arc::new(
316 InMemorySecretsController::new(),
317 )),
318 helm_chart_version: None,
319 },
320 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
321 availability_zones: Default::default(),
322 system_configuration: Arc::new(SystemVars::default()),
323 egress_addresses: Default::default(),
324 aws_principal_context: Default::default(),
325 aws_privatelink_availability_zones: Default::default(),
326 http_host_name: Default::default(),
327 default_privileges: Arc::new(DefaultPrivileges::default()),
328 system_privileges: Arc::new(PrivilegeMap::default()),
329 comments: Arc::new(CommentsMap::default()),
330 source_references: Default::default(),
331 storage_metadata: Arc::new(StorageMetadata::default()),
332 license_key: ValidatedLicenseKey::for_tests(),
333 mock_authentication_nonce: Default::default(),
334 }
335 }
336
337 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
338 let search_path = self.resolve_search_path(session);
339 let database = self
340 .database_by_name
341 .get(session.vars().database())
342 .map(|id| id.clone());
343 let state = match session.transaction().catalog_state() {
344 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
345 None => Cow::Borrowed(self),
346 };
347 ConnCatalog {
348 state,
349 unresolvable_ids: BTreeSet::new(),
350 conn_id: session.conn_id().clone(),
351 cluster: session.vars().cluster().into(),
352 database,
353 search_path,
354 role_id: session.current_role_id().clone(),
355 prepared_statements: Some(session.prepared_statements()),
356 portals: Some(session.portals()),
357 notices_tx: session.retain_notice_transmitter(),
358 }
359 }
360
361 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
362 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
363 let cluster = self.system_configuration.default_cluster();
364
365 ConnCatalog {
366 state: Cow::Borrowed(self),
367 unresolvable_ids: BTreeSet::new(),
368 conn_id: SYSTEM_CONN_ID.clone(),
369 cluster,
370 database: self
371 .resolve_database(DEFAULT_DATABASE_NAME)
372 .ok()
373 .map(|db| db.id()),
374 search_path: Vec::new(),
377 role_id,
378 prepared_statements: None,
379 portals: None,
380 notices_tx,
381 }
382 }
383
384 pub fn for_system_session(&self) -> ConnCatalog<'_> {
385 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
386 }
387
388 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
393 struct I<'a> {
394 queue: VecDeque<CatalogItemId>,
395 seen: BTreeSet<CatalogItemId>,
396 this: &'a CatalogState,
397 }
398 impl<'a> Iterator for I<'a> {
399 type Item = CatalogItemId;
400 fn next(&mut self) -> Option<Self::Item> {
401 if let Some(next) = self.queue.pop_front() {
402 for child in self.this.get_entry(&next).item().uses() {
403 if !self.seen.contains(&child) {
404 self.queue.push_back(child);
405 self.seen.insert(child);
406 }
407 }
408 Some(next)
409 } else {
410 None
411 }
412 }
413 }
414
415 I {
416 queue: [id].into_iter().collect(),
417 seen: [id].into_iter().collect(),
418 this: self,
419 }
420 }
421
422 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
425 let mut out = Vec::new();
426 self.introspection_dependencies_inner(id, &mut out);
427 out
428 }
429
430 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
431 match self.get_entry(&id).item() {
432 CatalogItem::Log(_) => out.push(id),
433 item @ (CatalogItem::View(_)
434 | CatalogItem::MaterializedView(_)
435 | CatalogItem::Connection(_)) => {
436 for item_id in item.references().items() {
438 self.introspection_dependencies_inner(*item_id, out);
439 }
440 }
441 CatalogItem::Sink(sink) => {
442 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
443 self.introspection_dependencies_inner(from_item_id, out)
444 }
445 CatalogItem::Index(idx) => {
446 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
447 self.introspection_dependencies_inner(on_item_id, out)
448 }
449 CatalogItem::Table(_)
450 | CatalogItem::Source(_)
451 | CatalogItem::Type(_)
452 | CatalogItem::Func(_)
453 | CatalogItem::Secret(_) => (),
454 }
455 }
456
457 pub(super) fn object_dependents(
463 &self,
464 object_ids: &Vec<ObjectId>,
465 conn_id: &ConnectionId,
466 seen: &mut BTreeSet<ObjectId>,
467 ) -> Vec<ObjectId> {
468 let mut dependents = Vec::new();
469 for object_id in object_ids {
470 match object_id {
471 ObjectId::Cluster(id) => {
472 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
473 }
474 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
475 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
476 ),
477 ObjectId::Database(id) => {
478 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
479 }
480 ObjectId::Schema((database_spec, schema_spec)) => {
481 dependents.extend_from_slice(&self.schema_dependents(
482 database_spec.clone(),
483 schema_spec.clone(),
484 conn_id,
485 seen,
486 ));
487 }
488 ObjectId::NetworkPolicy(id) => {
489 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
490 }
491 id @ ObjectId::Role(_) => {
492 let unseen = seen.insert(id.clone());
493 if unseen {
494 dependents.push(id.clone());
495 }
496 }
497 ObjectId::Item(id) => {
498 dependents.extend_from_slice(&self.item_dependents(*id, seen))
499 }
500 }
501 }
502 dependents
503 }
504
505 fn cluster_dependents(
512 &self,
513 cluster_id: ClusterId,
514 seen: &mut BTreeSet<ObjectId>,
515 ) -> Vec<ObjectId> {
516 let mut dependents = Vec::new();
517 let object_id = ObjectId::Cluster(cluster_id);
518 if !seen.contains(&object_id) {
519 seen.insert(object_id.clone());
520 let cluster = self.get_cluster(cluster_id);
521 for item_id in cluster.bound_objects() {
522 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
523 }
524 for replica_id in cluster.replica_ids().values() {
525 dependents.extend_from_slice(&self.cluster_replica_dependents(
526 cluster_id,
527 *replica_id,
528 seen,
529 ));
530 }
531 dependents.push(object_id);
532 }
533 dependents
534 }
535
536 pub(super) fn cluster_replica_dependents(
543 &self,
544 cluster_id: ClusterId,
545 replica_id: ReplicaId,
546 seen: &mut BTreeSet<ObjectId>,
547 ) -> Vec<ObjectId> {
548 let mut dependents = Vec::new();
549 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
550 if !seen.contains(&object_id) {
551 seen.insert(object_id.clone());
552 let cluster = self.get_cluster(cluster_id);
556 for item_id in cluster.bound_objects() {
557 if let CatalogItem::MaterializedView(mv) = self.get_entry(item_id).item()
558 && mv.target_replica == Some(replica_id)
559 {
560 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
561 }
562 }
563 dependents.push(object_id);
564 }
565 dependents
566 }
567
568 fn database_dependents(
575 &self,
576 database_id: DatabaseId,
577 conn_id: &ConnectionId,
578 seen: &mut BTreeSet<ObjectId>,
579 ) -> Vec<ObjectId> {
580 let mut dependents = Vec::new();
581 let object_id = ObjectId::Database(database_id);
582 if !seen.contains(&object_id) {
583 seen.insert(object_id.clone());
584 let database = self.get_database(&database_id);
585 for schema_id in database.schema_ids().values() {
586 dependents.extend_from_slice(&self.schema_dependents(
587 ResolvedDatabaseSpecifier::Id(database_id),
588 SchemaSpecifier::Id(*schema_id),
589 conn_id,
590 seen,
591 ));
592 }
593 dependents.push(object_id);
594 }
595 dependents
596 }
597
598 fn schema_dependents(
605 &self,
606 database_spec: ResolvedDatabaseSpecifier,
607 schema_spec: SchemaSpecifier,
608 conn_id: &ConnectionId,
609 seen: &mut BTreeSet<ObjectId>,
610 ) -> Vec<ObjectId> {
611 let mut dependents = Vec::new();
612 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
613 if !seen.contains(&object_id) {
614 seen.insert(object_id.clone());
615 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
616 for item_id in schema.item_ids() {
617 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
618 }
619 dependents.push(object_id)
620 }
621 dependents
622 }
623
624 pub(super) fn item_dependents(
631 &self,
632 item_id: CatalogItemId,
633 seen: &mut BTreeSet<ObjectId>,
634 ) -> Vec<ObjectId> {
635 let mut dependents = Vec::new();
636 let object_id = ObjectId::Item(item_id);
637 if !seen.contains(&object_id) {
638 seen.insert(object_id.clone());
639 let entry = self.get_entry(&item_id);
640 for dependent_id in entry.used_by() {
641 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
642 }
643 dependents.push(object_id);
644 if let Some(progress_id) = entry.progress_id() {
648 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
649 }
650 }
651 dependents
652 }
653
654 pub(super) fn network_policy_dependents(
661 &self,
662 network_policy_id: NetworkPolicyId,
663 _seen: &mut BTreeSet<ObjectId>,
664 ) -> Vec<ObjectId> {
665 let object_id = ObjectId::NetworkPolicy(network_policy_id);
666 vec![object_id]
670 }
671
672 fn is_stable(&self, id: CatalogItemId) -> bool {
676 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
677 !self.is_unstable_schema_specifier(spec)
678 }
679
680 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
681 if self.system_config().unsafe_enable_unstable_dependencies() {
682 return Ok(());
683 }
684
685 let unstable_dependencies: Vec<_> = item
686 .references()
687 .items()
688 .filter(|id| !self.is_stable(**id))
689 .map(|id| self.get_entry(id).name().item.clone())
690 .collect();
691
692 if unstable_dependencies.is_empty() || item.is_temporary() {
696 Ok(())
697 } else {
698 let object_type = item.typ().to_string();
699 Err(Error {
700 kind: ErrorKind::UnstableDependency {
701 object_type,
702 unstable_dependencies,
703 },
704 })
705 }
706 }
707
708 pub fn resolve_full_name(
709 &self,
710 name: &QualifiedItemName,
711 conn_id: Option<&ConnectionId>,
712 ) -> FullItemName {
713 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
714
715 let database = match &name.qualifiers.database_spec {
716 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
717 ResolvedDatabaseSpecifier::Id(id) => {
718 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
719 }
720 };
721 let schema = match &name.qualifiers.schema_spec {
724 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
725 SchemaSpecifier::Id(_) => self
726 .get_schema(
727 &name.qualifiers.database_spec,
728 &name.qualifiers.schema_spec,
729 conn_id,
730 )
731 .name()
732 .schema
733 .clone(),
734 };
735 FullItemName {
736 database,
737 schema,
738 item: name.item.clone(),
739 }
740 }
741
742 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
743 let database = match &name.database {
744 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
745 ResolvedDatabaseSpecifier::Id(id) => {
746 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
747 }
748 };
749 FullSchemaName {
750 database,
751 schema: name.schema.clone(),
752 }
753 }
754
755 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
756 self.entry_by_id
757 .get(id)
758 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
759 }
760
761 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
762 let item_id = self
763 .entry_by_global_id
764 .get(id)
765 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
766
767 let entry = self.get_entry(item_id).clone();
768 let version = match entry.item() {
769 CatalogItem::Table(table) => {
770 let (version, _) = table
771 .collections
772 .iter()
773 .find(|(_verison, gid)| *gid == id)
774 .expect("version to exist");
775 RelationVersionSelector::Specific(*version)
776 }
777 _ => RelationVersionSelector::Latest,
778 };
779 CatalogCollectionEntry { entry, version }
780 }
781
782 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
783 self.entry_by_id.iter()
784 }
785
786 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
787 self.temporary_schemas
789 .get(conn)
790 .into_iter()
791 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
792 }
793
794 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
800 self.temporary_schemas.contains_key(conn)
801 }
802
803 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
809 let mut res = None;
810 for schema_id in self.system_schema_ids() {
811 let schema = &self.ambient_schemas_by_id[&schema_id];
812 if let Some(global_id) = schema.types.get(name) {
813 match res {
814 None => res = Some(self.get_entry(global_id)),
815 Some(_) => panic!(
816 "only call get_system_type on objects uniquely identifiable in one system schema"
817 ),
818 }
819 }
820 }
821
822 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
823 }
824
825 pub fn get_item_by_name(
826 &self,
827 name: &QualifiedItemName,
828 conn_id: &ConnectionId,
829 ) -> Option<&CatalogEntry> {
830 self.get_schema(
831 &name.qualifiers.database_spec,
832 &name.qualifiers.schema_spec,
833 conn_id,
834 )
835 .items
836 .get(&name.item)
837 .and_then(|id| self.try_get_entry(id))
838 }
839
840 pub fn get_type_by_name(
841 &self,
842 name: &QualifiedItemName,
843 conn_id: &ConnectionId,
844 ) -> Option<&CatalogEntry> {
845 self.get_schema(
846 &name.qualifiers.database_spec,
847 &name.qualifiers.schema_spec,
848 conn_id,
849 )
850 .types
851 .get(&name.item)
852 .and_then(|id| self.try_get_entry(id))
853 }
854
855 pub(super) fn find_available_name(
856 &self,
857 mut name: QualifiedItemName,
858 conn_id: &ConnectionId,
859 ) -> QualifiedItemName {
860 let mut i = 0;
861 let orig_item_name = name.item.clone();
862 while self.get_item_by_name(&name, conn_id).is_some() {
863 i += 1;
864 name.item = format!("{}{}", orig_item_name, i);
865 }
866 name
867 }
868
869 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
870 self.entry_by_id.get(id)
871 }
872
873 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
874 let item_id = self.entry_by_global_id.get(id)?;
875 self.try_get_entry(item_id)
876 }
877
878 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
881 let entry = self.try_get_entry_by_global_id(id)?;
882 let desc = match entry.item() {
883 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
884 other => other.relation_desc(RelationVersionSelector::Latest)?,
886 };
887 Some(desc)
888 }
889
890 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
891 self.try_get_cluster(cluster_id)
892 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
893 }
894
895 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
896 self.clusters_by_id.get(&cluster_id)
897 }
898
899 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
900 self.roles_by_id.get(id)
901 }
902
903 pub fn get_role(&self, id: &RoleId) -> &Role {
904 self.roles_by_id.get(id).expect("catalog out of sync")
905 }
906
907 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
908 self.roles_by_id.keys()
909 }
910
911 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
912 self.roles_by_name
913 .get(role_name)
914 .map(|id| &self.roles_by_id[id])
915 }
916
917 pub fn try_get_role_by_name_case_insensitive(&self, role_name: &str) -> Option<&Role> {
924 let lower = role_name.to_lowercase();
925 self.roles_by_name
926 .iter()
927 .find(|(name, _)| name.to_lowercase() == lower)
928 .map(|(_, id)| &self.roles_by_id[id])
929 }
930
931 pub fn roles_by_lowercase_name(&self) -> BTreeMap<String, &Role> {
936 self.roles_by_name
937 .iter()
938 .map(|(name, id)| (name.to_lowercase(), &self.roles_by_id[id]))
939 .collect()
940 }
941
942 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
943 self.role_auth_by_id
944 .get(id)
945 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
946 }
947
948 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
949 self.role_auth_by_id.get(id)
950 }
951
952 pub(super) fn try_get_network_policy_by_name(
953 &self,
954 policy_name: &str,
955 ) -> Option<&NetworkPolicy> {
956 self.network_policies_by_name
957 .get(policy_name)
958 .map(|id| &self.network_policies_by_id[id])
959 }
960
961 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
962 let mut membership = BTreeSet::new();
963 let mut queue = VecDeque::from(vec![id]);
964 while let Some(cur_id) = queue.pop_front() {
965 if !membership.contains(cur_id) {
966 membership.insert(cur_id.clone());
967 let role = self.get_role(cur_id);
968 soft_assert_no_log!(
969 !role.membership().keys().contains(id),
970 "circular membership exists in the catalog"
971 );
972 queue.extend(role.membership().keys());
973 }
974 }
975 membership.insert(RoleId::Public);
976 membership
977 }
978
979 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
980 self.network_policies_by_id
981 .get(id)
982 .expect("catalog out of sync")
983 }
984
985 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
986 self.network_policies_by_id.keys()
987 }
988
989 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
994 let entry = self.try_get_entry(id)?;
995 let name = self.resolve_full_name(entry.name(), None);
997 let host_name = self
998 .http_host_name
999 .as_ref()
1000 .map(|x| x.as_str())
1001 .unwrap_or_else(|| "HOST");
1002
1003 let RawDatabaseSpecifier::Name(database) = name.database else {
1004 return None;
1005 };
1006
1007 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
1008 url.path_segments_mut()
1009 .ok()?
1010 .push(&database)
1011 .push(&name.schema)
1012 .push(&name.item);
1013
1014 Some(url)
1015 }
1016
1017 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1025 &mut self,
1028 create_sql: &str,
1029 force_if_exists_skip: bool,
1030 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1031 self.with_enable_for_item_parsing(|state| {
1032 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1033 let pcx = Some(&pcx);
1034 let session_catalog = state.for_system_session();
1035
1036 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1037 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1038 let plan =
1039 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1040
1041 Ok((plan, resolved_ids))
1042 })
1043 }
1044
1045 #[mz_ore::instrument]
1047 pub(crate) fn parse_plan(
1048 create_sql: &str,
1049 pcx: Option<&PlanContext>,
1050 catalog: &ConnCatalog,
1051 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1052 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1053 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1054 let plan = mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1055
1056 Ok((plan, resolved_ids))
1057 }
1058
1059 pub(crate) fn deserialize_item(
1061 &self,
1062 global_id: GlobalId,
1063 create_sql: &str,
1064 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1065 local_expression_cache: &mut LocalExpressionCache,
1066 previous_item: Option<CatalogItem>,
1067 ) -> Result<CatalogItem, AdapterError> {
1068 self.parse_item(
1069 global_id,
1070 create_sql,
1071 extra_versions,
1072 None,
1073 false,
1074 None,
1075 local_expression_cache,
1076 previous_item,
1077 )
1078 }
1079
1080 #[mz_ore::instrument]
1082 pub(crate) fn parse_item(
1083 &self,
1084 global_id: GlobalId,
1085 create_sql: &str,
1086 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1087 pcx: Option<&PlanContext>,
1088 is_retained_metrics_object: bool,
1089 custom_logical_compaction_window: Option<CompactionWindow>,
1090 local_expression_cache: &mut LocalExpressionCache,
1091 previous_item: Option<CatalogItem>,
1092 ) -> Result<CatalogItem, AdapterError> {
1093 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1094 match self.parse_item_inner(
1095 global_id,
1096 create_sql,
1097 extra_versions,
1098 pcx,
1099 is_retained_metrics_object,
1100 custom_logical_compaction_window,
1101 cached_expr,
1102 previous_item,
1103 ) {
1104 Ok((item, uncached_expr)) => {
1105 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1106 local_expression_cache.insert_uncached_expression(
1107 global_id,
1108 uncached_expr,
1109 optimizer_features,
1110 );
1111 }
1112 Ok(item)
1113 }
1114 Err((err, cached_expr)) => {
1115 if let Some(local_expr) = cached_expr {
1116 local_expression_cache.insert_cached_expression(global_id, local_expr);
1117 }
1118 Err(err)
1119 }
1120 }
1121 }
1122
1123 #[mz_ore::instrument]
1130 pub(crate) fn parse_item_inner(
1131 &self,
1132 global_id: GlobalId,
1133 create_sql: &str,
1134 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1135 pcx: Option<&PlanContext>,
1136 is_retained_metrics_object: bool,
1137 custom_logical_compaction_window: Option<CompactionWindow>,
1138 cached_expr: Option<LocalExpressions>,
1139 previous_item: Option<CatalogItem>,
1140 ) -> Result<
1141 (
1142 CatalogItem,
1143 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1144 ),
1145 (AdapterError, Option<LocalExpressions>),
1146 > {
1147 let session_catalog = self.for_system_session();
1148
1149 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1150 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1151 Err(err) => return Err((err, cached_expr)),
1152 };
1153
1154 let mut uncached_expr = None;
1155
1156 let previous_plans = previous_item.as_ref().map(|item| {
1166 (
1167 item.optimized_plan().cloned(),
1168 item.physical_plan().cloned(),
1169 item.dataflow_metainfo().cloned(),
1170 )
1171 });
1172
1173 let mut item = match plan {
1174 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1175 let collections = extra_versions
1176 .iter()
1177 .map(|(version, gid)| (*version, *gid))
1178 .chain([(RelationVersion::root(), global_id)].into_iter())
1179 .collect();
1180
1181 CatalogItem::Table(Table {
1182 create_sql: Some(table.create_sql),
1183 desc: table.desc,
1184 collections,
1185 conn_id: None,
1186 resolved_ids,
1187 custom_logical_compaction_window: custom_logical_compaction_window
1188 .or(table.compaction_window),
1189 is_retained_metrics_object,
1190 data_source: match table.data_source {
1191 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1192 TableDataSource::TableWrites { defaults }
1193 }
1194 mz_sql::plan::TableDataSource::DataSource {
1195 desc: data_source_desc,
1196 timeline,
1197 } => match data_source_desc {
1198 mz_sql::plan::DataSourceDesc::IngestionExport {
1199 ingestion_id,
1200 external_reference,
1201 details,
1202 data_config,
1203 } => TableDataSource::DataSource {
1204 desc: DataSourceDesc::IngestionExport {
1205 ingestion_id,
1206 external_reference,
1207 details,
1208 data_config,
1209 },
1210 timeline,
1211 },
1212 mz_sql::plan::DataSourceDesc::Webhook {
1213 validate_using,
1214 body_format,
1215 headers,
1216 cluster_id,
1217 } => TableDataSource::DataSource {
1218 desc: DataSourceDesc::Webhook {
1219 validate_using,
1220 body_format,
1221 headers,
1222 cluster_id: cluster_id
1223 .expect("Webhook Tables must have a cluster_id set"),
1224 },
1225 timeline,
1226 },
1227 _ => {
1228 return Err((
1229 AdapterError::Unstructured(anyhow::anyhow!(
1230 "unsupported data source for table"
1231 )),
1232 cached_expr,
1233 ));
1234 }
1235 },
1236 },
1237 })
1238 }
1239 Plan::CreateSource(CreateSourcePlan {
1240 source,
1241 timeline,
1242 in_cluster,
1243 ..
1244 }) => CatalogItem::Source(Source {
1245 create_sql: Some(source.create_sql),
1246 data_source: match source.data_source {
1247 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1248 desc,
1249 cluster_id: match in_cluster {
1250 Some(id) => id,
1251 None => {
1252 return Err((
1253 AdapterError::Unstructured(anyhow::anyhow!(
1254 "ingestion-based sources must have cluster specified"
1255 )),
1256 cached_expr,
1257 ));
1258 }
1259 },
1260 },
1261 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1262 desc,
1263 progress_subsource,
1264 data_config,
1265 details,
1266 } => DataSourceDesc::OldSyntaxIngestion {
1267 desc,
1268 progress_subsource,
1269 data_config,
1270 details,
1271 cluster_id: match in_cluster {
1272 Some(id) => id,
1273 None => {
1274 return Err((
1275 AdapterError::Unstructured(anyhow::anyhow!(
1276 "ingestion-based sources must have cluster specified"
1277 )),
1278 cached_expr,
1279 ));
1280 }
1281 },
1282 },
1283 mz_sql::plan::DataSourceDesc::IngestionExport {
1284 ingestion_id,
1285 external_reference,
1286 details,
1287 data_config,
1288 } => DataSourceDesc::IngestionExport {
1289 ingestion_id,
1290 external_reference,
1291 details,
1292 data_config,
1293 },
1294 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1295 mz_sql::plan::DataSourceDesc::Webhook {
1296 validate_using,
1297 body_format,
1298 headers,
1299 cluster_id,
1300 } => {
1301 mz_ore::soft_assert_or_log!(
1302 cluster_id.is_none(),
1303 "cluster_id set at Source level for Webhooks"
1304 );
1305 DataSourceDesc::Webhook {
1306 validate_using,
1307 body_format,
1308 headers,
1309 cluster_id: in_cluster
1310 .expect("webhook sources must use an existing cluster"),
1311 }
1312 }
1313 },
1314 desc: source.desc,
1315 global_id,
1316 timeline,
1317 resolved_ids,
1318 custom_logical_compaction_window: source
1319 .compaction_window
1320 .or(custom_logical_compaction_window),
1321 is_retained_metrics_object,
1322 }),
1323 Plan::CreateView(CreateViewPlan { view, .. }) => {
1324 let optimizer_config =
1326 optimize::OptimizerConfig::from(session_catalog.system_vars());
1327 let previous_exprs = previous_item.map(|item| match item {
1328 CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1329 _ => None,
1330 });
1331
1332 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1333 (Some(local_expr), _)
1334 if local_expr.optimizer_features == optimizer_config.features =>
1335 {
1336 debug!("local expression cache hit for {global_id:?}");
1337 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1338 }
1339 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1341 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1342 }
1343 (cached_expr, _) => {
1344 let optimizer_features = optimizer_config.features.clone();
1345 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1347
1348 let raw_expr = view.expr;
1350 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1351 Ok(optimzed_expr) => optimzed_expr,
1352 Err(err) => return Err((err.into(), cached_expr)),
1353 };
1354
1355 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1356
1357 (Arc::new(raw_expr), Arc::new(optimized_expr))
1358 }
1359 };
1360
1361 let dependencies: BTreeSet<_> = raw_expr
1363 .depends_on()
1364 .into_iter()
1365 .map(|gid| self.get_entry_by_global_id(&gid).id())
1366 .collect();
1367
1368 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1369 CatalogItem::View(View {
1370 create_sql: view.create_sql,
1371 global_id,
1372 raw_expr,
1373 desc: RelationDesc::new(typ, view.column_names),
1374 locally_optimized_expr: optimized_expr,
1375 conn_id: None,
1376 resolved_ids,
1377 dependencies: DependencyIds(dependencies),
1378 })
1379 }
1380 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1381 materialized_view, ..
1382 }) => {
1383 let collections = extra_versions
1384 .iter()
1385 .map(|(version, gid)| (*version, *gid))
1386 .chain([(RelationVersion::root(), global_id)].into_iter())
1387 .collect();
1388
1389 let system_vars = session_catalog.system_vars();
1391 let overrides = self
1392 .get_cluster(materialized_view.cluster_id)
1393 .config
1394 .features();
1395 let optimizer_config =
1396 optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1397 let previous_exprs = previous_item.map(|item| match item {
1398 CatalogItem::MaterializedView(materialized_view) => (
1399 materialized_view.raw_expr,
1400 materialized_view.locally_optimized_expr,
1401 ),
1402 item => unreachable!("expected materialized view, found: {item:#?}"),
1403 });
1404
1405 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1406 (Some(local_expr), _)
1407 if local_expr.optimizer_features == optimizer_config.features =>
1408 {
1409 debug!("local expression cache hit for {global_id:?}");
1410 (
1411 Arc::new(materialized_view.expr),
1412 Arc::new(local_expr.local_mir),
1413 )
1414 }
1415 (_, Some((raw_expr, optimized_expr)))
1417 if *raw_expr == materialized_view.expr =>
1418 {
1419 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1420 }
1421 (cached_expr, _) => {
1422 let optimizer_features = optimizer_config.features.clone();
1423 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1425
1426 let raw_expr = materialized_view.expr;
1427 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1428 Ok(optimized_expr) => optimized_expr,
1429 Err(err) => return Err((err.into(), cached_expr)),
1430 };
1431
1432 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1433
1434 (Arc::new(raw_expr), Arc::new(optimized_expr))
1435 }
1436 };
1437 let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1438
1439 for &i in &materialized_view.non_null_assertions {
1440 typ.column_types[i].nullable = false;
1441 }
1442 let desc = RelationDesc::new(typ, materialized_view.column_names);
1443 let desc = VersionedRelationDesc::new(desc);
1444
1445 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1446
1447 let dependencies = raw_expr
1449 .depends_on()
1450 .into_iter()
1451 .map(|gid| self.get_entry_by_global_id(&gid).id())
1452 .collect();
1453
1454 CatalogItem::MaterializedView(MaterializedView {
1455 create_sql: materialized_view.create_sql,
1456 collections,
1457 raw_expr,
1458 locally_optimized_expr: optimized_expr,
1459 desc,
1460 resolved_ids,
1461 dependencies,
1462 replacement_target: materialized_view.replacement_target,
1463 cluster_id: materialized_view.cluster_id,
1464 target_replica: materialized_view.target_replica,
1465 non_null_assertions: materialized_view.non_null_assertions,
1466 custom_logical_compaction_window: materialized_view.compaction_window,
1467 refresh_schedule: materialized_view.refresh_schedule,
1468 initial_as_of,
1469 optimized_plan: None,
1470 physical_plan: None,
1471 dataflow_metainfo: None,
1472 })
1473 }
1474 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1475 create_sql: index.create_sql,
1476 global_id,
1477 on: index.on,
1478 keys: index.keys.into(),
1479 conn_id: None,
1480 resolved_ids,
1481 cluster_id: index.cluster_id,
1482 custom_logical_compaction_window: custom_logical_compaction_window
1483 .or(index.compaction_window),
1484 is_retained_metrics_object,
1485 optimized_plan: None,
1486 physical_plan: None,
1487 dataflow_metainfo: None,
1488 }),
1489 Plan::CreateSink(CreateSinkPlan {
1490 sink,
1491 with_snapshot,
1492 in_cluster,
1493 ..
1494 }) => CatalogItem::Sink(Sink {
1495 create_sql: sink.create_sql,
1496 global_id,
1497 from: sink.from,
1498 connection: sink.connection,
1499 envelope: sink.envelope,
1500 version: sink.version,
1501 with_snapshot,
1502 resolved_ids,
1503 cluster_id: in_cluster,
1504 commit_interval: sink.commit_interval,
1505 }),
1506 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1507 if let Err(err) = typ.inner.desc(&session_catalog) {
1511 return Err((err.into(), cached_expr));
1512 }
1513 CatalogItem::Type(Type {
1514 create_sql: Some(typ.create_sql),
1515 global_id,
1516 details: CatalogTypeDetails {
1517 array_id: None,
1518 typ: typ.inner,
1519 pg_metadata: None,
1520 },
1521 resolved_ids,
1522 })
1523 }
1524 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1525 create_sql: secret.create_sql,
1526 global_id,
1527 }),
1528 Plan::CreateConnection(CreateConnectionPlan {
1529 connection:
1530 mz_sql::plan::Connection {
1531 create_sql,
1532 details,
1533 },
1534 ..
1535 }) => CatalogItem::Connection(Connection {
1536 create_sql,
1537 global_id,
1538 details,
1539 resolved_ids,
1540 }),
1541 _ => {
1542 return Err((
1543 Error::new(ErrorKind::Corruption {
1544 detail: "catalog entry generated inappropriate plan".to_string(),
1545 })
1546 .into(),
1547 cached_expr,
1548 ));
1549 }
1550 };
1551
1552 if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1556 if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1557 {
1558 *optimized_plan = prev_optimized;
1559 *physical_plan = prev_physical;
1560 *dataflow_metainfo = prev_metainfo;
1561 }
1562 }
1563
1564 Ok((item, uncached_expr))
1565 }
1566
1567 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1573 let restore = Arc::clone(&self.system_configuration);
1584 Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1585 let res = f(self);
1586 self.system_configuration = restore;
1587 res
1588 }
1589
1590 pub fn get_indexes_on(
1592 &self,
1593 id: GlobalId,
1594 cluster: ClusterId,
1595 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1596 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1597
1598 self.try_get_entry_by_global_id(&id)
1599 .into_iter()
1600 .map(move |e| {
1601 e.used_by()
1602 .iter()
1603 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1604 CatalogItem::Index(index) if index_matches(index) => {
1605 Some((index.global_id(), index))
1606 }
1607 _ => None,
1608 })
1609 })
1610 .flatten()
1611 }
1612
1613 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1614 &self.database_by_id[database_id]
1615 }
1616
1617 pub(super) fn try_get_cluster_replica(
1622 &self,
1623 id: ClusterId,
1624 replica_id: ReplicaId,
1625 ) -> Option<&ClusterReplica> {
1626 self.try_get_cluster(id)
1627 .and_then(|cluster| cluster.replica(replica_id))
1628 }
1629
1630 pub(crate) fn get_cluster_replica(
1634 &self,
1635 cluster_id: ClusterId,
1636 replica_id: ReplicaId,
1637 ) -> &ClusterReplica {
1638 self.try_get_cluster_replica(cluster_id, replica_id)
1639 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1640 }
1641
1642 pub(super) fn resolve_replica_in_cluster(
1643 &self,
1644 cluster_id: &ClusterId,
1645 replica_name: &str,
1646 ) -> Result<&ClusterReplica, SqlCatalogError> {
1647 let cluster = self.get_cluster(*cluster_id);
1648 let replica_id = cluster
1649 .replica_id_by_name_
1650 .get(replica_name)
1651 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1652 Ok(&cluster.replicas_by_id_[replica_id])
1653 }
1654
1655 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1657 Ok(self.system_configuration.get(name)?)
1658 }
1659
1660 pub(super) fn parse_system_configuration(
1664 &self,
1665 name: &str,
1666 value: VarInput,
1667 ) -> Result<String, Error> {
1668 let value = self.system_configuration.parse(name, value)?;
1669 Ok(value.format())
1670 }
1671
1672 pub(super) fn resolve_schema_in_database(
1674 &self,
1675 database_spec: &ResolvedDatabaseSpecifier,
1676 schema_name: &str,
1677 conn_id: &ConnectionId,
1678 ) -> Result<&Schema, SqlCatalogError> {
1679 let schema = match database_spec {
1680 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1681 self.temporary_schemas.get(conn_id)
1682 }
1683 ResolvedDatabaseSpecifier::Ambient => self
1684 .ambient_schemas_by_name
1685 .get(schema_name)
1686 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1687 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1688 db.schemas_by_name
1689 .get(schema_name)
1690 .and_then(|id| db.schemas_by_id.get(id))
1691 }),
1692 };
1693 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1694 }
1695
1696 pub fn try_get_schema(
1701 &self,
1702 database_spec: &ResolvedDatabaseSpecifier,
1703 schema_spec: &SchemaSpecifier,
1704 conn_id: &ConnectionId,
1705 ) -> Option<&Schema> {
1706 match (database_spec, schema_spec) {
1708 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1709 self.temporary_schemas.get(conn_id)
1710 }
1711 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1712 self.ambient_schemas_by_id.get(id)
1713 }
1714 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1715 .database_by_id
1716 .get(database_id)
1717 .and_then(|db| db.schemas_by_id.get(schema_id)),
1718 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1719 unreachable!("temporary schemas are in the ambient database")
1720 }
1721 }
1722 }
1723
1724 pub fn get_schema(
1725 &self,
1726 database_spec: &ResolvedDatabaseSpecifier,
1727 schema_spec: &SchemaSpecifier,
1728 conn_id: &ConnectionId,
1729 ) -> &Schema {
1730 self.try_get_schema(database_spec, schema_spec, conn_id)
1732 .expect("schema must exist")
1733 }
1734
1735 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1736 self.database_by_id
1737 .values()
1738 .filter_map(|database| database.schemas_by_id.get(schema_id))
1739 .chain(self.ambient_schemas_by_id.values())
1740 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1741 .into_first()
1742 }
1743
1744 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1745 self.temporary_schemas
1746 .values()
1747 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1748 .into_first()
1749 }
1750
1751 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1752 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1753 }
1754
1755 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1756 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1757 }
1758
1759 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1760 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1761 }
1762
1763 pub fn get_information_schema_id(&self) -> SchemaId {
1764 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1765 }
1766
1767 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1768 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1769 }
1770
1771 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1772 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1773 }
1774
1775 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1776 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1777 }
1778
1779 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1780 SYSTEM_SCHEMAS
1781 .iter()
1782 .map(|name| self.ambient_schemas_by_name[*name])
1783 }
1784
1785 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1786 self.system_schema_ids().contains(&id)
1787 }
1788
1789 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1790 match spec {
1791 SchemaSpecifier::Temporary => false,
1792 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1793 }
1794 }
1795
1796 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1797 UNSTABLE_SCHEMAS
1798 .iter()
1799 .map(|name| self.ambient_schemas_by_name[*name])
1800 }
1801
1802 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1803 self.unstable_schema_ids().contains(&id)
1804 }
1805
1806 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1807 match spec {
1808 SchemaSpecifier::Temporary => false,
1809 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1810 }
1811 }
1812
1813 pub fn create_temporary_schema(
1816 &mut self,
1817 conn_id: &ConnectionId,
1818 owner_id: RoleId,
1819 ) -> Result<(), Error> {
1820 let oid = INVALID_OID;
1825 self.temporary_schemas.insert(
1826 conn_id.clone(),
1827 Schema {
1828 name: QualifiedSchemaName {
1829 database: ResolvedDatabaseSpecifier::Ambient,
1830 schema: MZ_TEMP_SCHEMA.into(),
1831 },
1832 id: SchemaSpecifier::Temporary,
1833 oid,
1834 items: BTreeMap::new(),
1835 functions: BTreeMap::new(),
1836 types: BTreeMap::new(),
1837 owner_id,
1838 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1839 mz_sql::catalog::ObjectType::Schema,
1840 owner_id,
1841 )]),
1842 },
1843 );
1844 Ok(())
1845 }
1846
1847 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1849 std::iter::empty()
1850 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1851 if schema.id.is_temporary() {
1852 Some(schema.oid)
1853 } else {
1854 None
1855 }
1856 }))
1857 .chain(self.entry_by_id.values().filter_map(|entry| {
1858 if entry.item().is_temporary() {
1859 Some(entry.oid)
1860 } else {
1861 None
1862 }
1863 }))
1864 }
1865
1866 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1870 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1871 }
1872
1873 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1877 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1878 let log = match self.get_entry(&item_id).item() {
1879 CatalogItem::Log(log) => log,
1880 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1881 };
1882 (item_id, log.global_id)
1883 }
1884
1885 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1889 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1890 }
1891
1892 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1896 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1897 let schema = &self.ambient_schemas_by_id[schema_id];
1898 match builtin.catalog_item_type() {
1899 CatalogItemType::Type => schema.types[builtin.name()],
1900 CatalogItemType::Func => schema.functions[builtin.name()],
1901 CatalogItemType::Table
1902 | CatalogItemType::Source
1903 | CatalogItemType::Sink
1904 | CatalogItemType::View
1905 | CatalogItemType::MaterializedView
1906 | CatalogItemType::Index
1907 | CatalogItemType::Secret
1908 | CatalogItemType::Connection => schema.items[builtin.name()],
1909 }
1910 }
1911
1912 pub fn resolve_builtin_type_references(
1914 &self,
1915 builtin: &BuiltinType<NameReference>,
1916 ) -> BuiltinType<IdReference> {
1917 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1918 CatalogType::AclItem => CatalogType::AclItem,
1919 CatalogType::Array { element_reference } => CatalogType::Array {
1920 element_reference: self.get_system_type(element_reference).id,
1921 },
1922 CatalogType::List {
1923 element_reference,
1924 element_modifiers,
1925 } => CatalogType::List {
1926 element_reference: self.get_system_type(element_reference).id,
1927 element_modifiers: element_modifiers.clone(),
1928 },
1929 CatalogType::Map {
1930 key_reference,
1931 value_reference,
1932 key_modifiers,
1933 value_modifiers,
1934 } => CatalogType::Map {
1935 key_reference: self.get_system_type(key_reference).id,
1936 value_reference: self.get_system_type(value_reference).id,
1937 key_modifiers: key_modifiers.clone(),
1938 value_modifiers: value_modifiers.clone(),
1939 },
1940 CatalogType::Range { element_reference } => CatalogType::Range {
1941 element_reference: self.get_system_type(element_reference).id,
1942 },
1943 CatalogType::Record { fields } => CatalogType::Record {
1944 fields: fields
1945 .into_iter()
1946 .map(|f| CatalogRecordField {
1947 name: f.name.clone(),
1948 type_reference: self.get_system_type(f.type_reference).id,
1949 type_modifiers: f.type_modifiers.clone(),
1950 })
1951 .collect(),
1952 },
1953 CatalogType::Bool => CatalogType::Bool,
1954 CatalogType::Bytes => CatalogType::Bytes,
1955 CatalogType::Char => CatalogType::Char,
1956 CatalogType::Date => CatalogType::Date,
1957 CatalogType::Float32 => CatalogType::Float32,
1958 CatalogType::Float64 => CatalogType::Float64,
1959 CatalogType::Int16 => CatalogType::Int16,
1960 CatalogType::Int32 => CatalogType::Int32,
1961 CatalogType::Int64 => CatalogType::Int64,
1962 CatalogType::UInt16 => CatalogType::UInt16,
1963 CatalogType::UInt32 => CatalogType::UInt32,
1964 CatalogType::UInt64 => CatalogType::UInt64,
1965 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1966 CatalogType::Interval => CatalogType::Interval,
1967 CatalogType::Jsonb => CatalogType::Jsonb,
1968 CatalogType::Numeric => CatalogType::Numeric,
1969 CatalogType::Oid => CatalogType::Oid,
1970 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1971 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1972 CatalogType::Pseudo => CatalogType::Pseudo,
1973 CatalogType::RegClass => CatalogType::RegClass,
1974 CatalogType::RegProc => CatalogType::RegProc,
1975 CatalogType::RegType => CatalogType::RegType,
1976 CatalogType::String => CatalogType::String,
1977 CatalogType::Time => CatalogType::Time,
1978 CatalogType::Timestamp => CatalogType::Timestamp,
1979 CatalogType::TimestampTz => CatalogType::TimestampTz,
1980 CatalogType::Uuid => CatalogType::Uuid,
1981 CatalogType::VarChar => CatalogType::VarChar,
1982 CatalogType::Int2Vector => CatalogType::Int2Vector,
1983 CatalogType::MzAclItem => CatalogType::MzAclItem,
1984 };
1985
1986 BuiltinType {
1987 name: builtin.name,
1988 schema: builtin.schema,
1989 oid: builtin.oid,
1990 details: CatalogTypeDetails {
1991 array_id: builtin.details.array_id,
1992 typ,
1993 pg_metadata: builtin.details.pg_metadata.clone(),
1994 },
1995 }
1996 }
1997
1998 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1999 &self.config
2000 }
2001
2002 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
2003 match self.database_by_name.get(database_name) {
2004 Some(id) => Ok(&self.database_by_id[id]),
2005 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
2006 }
2007 }
2008
2009 pub fn resolve_schema(
2010 &self,
2011 current_database: Option<&DatabaseId>,
2012 database_name: Option<&str>,
2013 schema_name: &str,
2014 conn_id: &ConnectionId,
2015 ) -> Result<&Schema, SqlCatalogError> {
2016 let database_spec = match database_name {
2017 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
2022 self.resolve_database(database)?.id().clone(),
2023 )),
2024 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2025 };
2026
2027 if let Some(database_spec) = database_spec {
2029 if let Ok(schema) =
2030 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2031 {
2032 return Ok(schema);
2033 }
2034 }
2035
2036 if let Ok(schema) = self.resolve_schema_in_database(
2038 &ResolvedDatabaseSpecifier::Ambient,
2039 schema_name,
2040 conn_id,
2041 ) {
2042 return Ok(schema);
2043 }
2044
2045 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2046 }
2047
2048 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2052 self.ambient_schemas_by_name[name]
2053 }
2054
2055 pub fn resolve_search_path(
2056 &self,
2057 session: &dyn SessionMetadata,
2058 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2059 let database = self
2060 .database_by_name
2061 .get(session.database())
2062 .map(|id| id.clone());
2063
2064 session
2065 .search_path()
2066 .iter()
2067 .map(|schema| {
2068 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2069 })
2070 .filter_map(|schema| schema.ok())
2071 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2072 .collect()
2073 }
2074
2075 pub fn effective_search_path(
2076 &self,
2077 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2078 include_temp_schema: bool,
2079 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2080 let mut v = Vec::with_capacity(search_path.len() + 3);
2081 let temp_schema = (
2083 ResolvedDatabaseSpecifier::Ambient,
2084 SchemaSpecifier::Temporary,
2085 );
2086 if include_temp_schema && !search_path.contains(&temp_schema) {
2087 v.push(temp_schema);
2088 }
2089 let default_schemas = [
2090 (
2091 ResolvedDatabaseSpecifier::Ambient,
2092 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2093 ),
2094 (
2095 ResolvedDatabaseSpecifier::Ambient,
2096 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2097 ),
2098 ];
2099 for schema in default_schemas.into_iter() {
2100 if !search_path.contains(&schema) {
2101 v.push(schema);
2102 }
2103 }
2104 v.extend_from_slice(search_path);
2105 v
2106 }
2107
2108 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2109 let id = self
2110 .clusters_by_name
2111 .get(name)
2112 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2113 Ok(&self.clusters_by_id[id])
2114 }
2115
2116 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2117 let id = self
2118 .clusters_by_name
2119 .get(cluster.name)
2120 .expect("failed to lookup BuiltinCluster by name");
2121 self.clusters_by_id
2122 .get(id)
2123 .expect("failed to lookup BuiltinCluster by ID")
2124 }
2125
2126 pub fn resolve_cluster_replica(
2127 &self,
2128 cluster_replica_name: &QualifiedReplica,
2129 ) -> Result<&ClusterReplica, SqlCatalogError> {
2130 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2131 let replica_name = cluster_replica_name.replica.as_str();
2132 let replica_id = cluster
2133 .replica_id(replica_name)
2134 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2135 Ok(cluster.replica(replica_id).expect("Must exist"))
2136 }
2137
2138 #[allow(clippy::useless_let_if_seq)]
2144 pub fn resolve(
2145 &self,
2146 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2147 current_database: Option<&DatabaseId>,
2148 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2149 name: &PartialItemName,
2150 conn_id: &ConnectionId,
2151 err_gen: fn(String) -> SqlCatalogError,
2152 ) -> Result<&CatalogEntry, SqlCatalogError> {
2153 let schemas = match &name.schema {
2158 Some(schema_name) => {
2159 match self.resolve_schema(
2160 current_database,
2161 name.database.as_deref(),
2162 schema_name,
2163 conn_id,
2164 ) {
2165 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2166 Err(e) => return Err(e),
2167 }
2168 }
2169 None => match self
2170 .try_get_schema(
2171 &ResolvedDatabaseSpecifier::Ambient,
2172 &SchemaSpecifier::Temporary,
2173 conn_id,
2174 )
2175 .and_then(|schema| schema.items.get(&name.item))
2176 {
2177 Some(id) => return Ok(self.get_entry(id)),
2178 None => search_path.to_vec(),
2179 },
2180 };
2181
2182 for (database_spec, schema_spec) in &schemas {
2183 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2186 continue;
2187 };
2188
2189 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2190 return Ok(&self.entry_by_id[id]);
2191 }
2192 }
2193
2194 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2199 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2200 for schema_id in [
2201 self.get_mz_catalog_unstable_schema_id(),
2202 self.get_mz_introspection_schema_id(),
2203 ] {
2204 let schema = self.get_schema(
2205 &ResolvedDatabaseSpecifier::Ambient,
2206 &SchemaSpecifier::Id(schema_id),
2207 conn_id,
2208 );
2209
2210 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2211 debug!(
2212 github_27831 = true,
2213 "encountered use of outdated schema `mz_internal` for relation: {name}",
2214 );
2215 return Ok(&self.entry_by_id[id]);
2216 }
2217 }
2218 }
2219
2220 Err(err_gen(name.to_string()))
2221 }
2222
2223 pub fn resolve_entry(
2225 &self,
2226 current_database: Option<&DatabaseId>,
2227 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2228 name: &PartialItemName,
2229 conn_id: &ConnectionId,
2230 ) -> Result<&CatalogEntry, SqlCatalogError> {
2231 self.resolve(
2232 |schema| &schema.items,
2233 current_database,
2234 search_path,
2235 name,
2236 conn_id,
2237 SqlCatalogError::UnknownItem,
2238 )
2239 }
2240
2241 pub fn resolve_function(
2243 &self,
2244 current_database: Option<&DatabaseId>,
2245 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2246 name: &PartialItemName,
2247 conn_id: &ConnectionId,
2248 ) -> Result<&CatalogEntry, SqlCatalogError> {
2249 self.resolve(
2250 |schema| &schema.functions,
2251 current_database,
2252 search_path,
2253 name,
2254 conn_id,
2255 |name| SqlCatalogError::UnknownFunction {
2256 name,
2257 alternative: None,
2258 },
2259 )
2260 }
2261
2262 pub fn resolve_type(
2264 &self,
2265 current_database: Option<&DatabaseId>,
2266 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2267 name: &PartialItemName,
2268 conn_id: &ConnectionId,
2269 ) -> Result<&CatalogEntry, SqlCatalogError> {
2270 static NON_PG_CATALOG_TYPES: LazyLock<
2271 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2272 > = LazyLock::new(|| {
2273 BUILTINS::types()
2274 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2275 .map(|typ| (typ.name, typ))
2276 .collect()
2277 });
2278
2279 let entry = self.resolve(
2280 |schema| &schema.types,
2281 current_database,
2282 search_path,
2283 name,
2284 conn_id,
2285 |name| SqlCatalogError::UnknownType { name },
2286 )?;
2287
2288 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2289 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2290 warn!(
2291 "user specified an incorrect schema of {} for the type {}, which should be in \
2292 the {} schema. This works now due to a bug but will be fixed in a later release.",
2293 PG_CATALOG_SCHEMA.quoted(),
2294 typ.name.quoted(),
2295 typ.schema.quoted(),
2296 )
2297 }
2298 }
2299
2300 Ok(entry)
2301 }
2302
2303 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2305 match object_id {
2306 ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2307 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2308 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2309 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2310 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2311 ObjectId::ClusterReplica(cluster_replica_id) => {
2312 CommentObjectId::ClusterReplica(cluster_replica_id)
2313 }
2314 ObjectId::NetworkPolicy(network_policy_id) => {
2315 CommentObjectId::NetworkPolicy(network_policy_id)
2316 }
2317 }
2318 }
2319
2320 pub fn system_config(&self) -> &SystemVars {
2322 &self.system_configuration
2323 }
2324
2325 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2327 Arc::make_mut(&mut self.system_configuration)
2328 }
2329
2330 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2340 let mut dump = serde_json::to_value(&self).map_err(|e| {
2342 Error::new(ErrorKind::Unstructured(format!(
2343 "internal error: could not dump catalog: {}",
2346 e
2347 )))
2348 })?;
2349
2350 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2351 dump_obj.insert(
2353 "system_parameter_defaults".into(),
2354 serde_json::json!(self.system_config().defaults()),
2355 );
2356 if let Some(unfinalized_shards) = unfinalized_shards {
2358 dump_obj
2359 .get_mut("storage_metadata")
2360 .expect("known to exist")
2361 .as_object_mut()
2362 .expect("storage_metadata is an object")
2363 .insert(
2364 "unfinalized_shards".into(),
2365 serde_json::json!(unfinalized_shards),
2366 );
2367 }
2368 let temporary_gids: Vec<_> = self
2373 .entry_by_global_id
2374 .iter()
2375 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2376 .map(|(gid, _item_id)| *gid)
2377 .collect();
2378 if !temporary_gids.is_empty() {
2379 let gids = dump_obj
2380 .get_mut("entry_by_global_id")
2381 .expect("known_to_exist")
2382 .as_object_mut()
2383 .expect("entry_by_global_id is an object");
2384 for gid in temporary_gids {
2385 gids.remove(&gid.to_string());
2386 }
2387 }
2388 dump_obj.remove("role_auth_by_id");
2391
2392 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2394 }
2395
2396 pub fn availability_zones(&self) -> &[String] {
2397 &self.availability_zones
2398 }
2399
2400 pub fn concretize_replica_location(
2401 &self,
2402 location: mz_catalog::durable::ReplicaLocation,
2403 allowed_sizes: &Vec<String>,
2404 allowed_availability_zones: Option<&[String]>,
2405 ) -> Result<ReplicaLocation, Error> {
2406 let location = match location {
2407 mz_catalog::durable::ReplicaLocation::Unmanaged {
2408 storagectl_addrs,
2409 computectl_addrs,
2410 } => {
2411 if allowed_availability_zones.is_some() {
2412 return Err(Error {
2413 kind: ErrorKind::Internal(
2414 "tried concretize unmanaged replica with specific availability_zones"
2415 .to_string(),
2416 ),
2417 });
2418 }
2419 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2420 storagectl_addrs,
2421 computectl_addrs,
2422 })
2423 }
2424 mz_catalog::durable::ReplicaLocation::Managed {
2425 size,
2426 availability_zone,
2427 billed_as,
2428 internal,
2429 pending,
2430 } => {
2431 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2432 let message = "tried concretize managed replica with specific availability zones and availability zone";
2433 return Err(Error {
2434 kind: ErrorKind::Internal(message.to_string()),
2435 });
2436 }
2437 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2438 let cluster_replica_sizes = &self.cluster_replica_sizes;
2439
2440 ReplicaLocation::Managed(ManagedReplicaLocation {
2441 allocation: cluster_replica_sizes
2442 .0
2443 .get(&size)
2444 .expect("catalog out of sync")
2445 .clone(),
2446 availability_zones: match (availability_zone, allowed_availability_zones) {
2447 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2448 (None, Some([])) => ManagedReplicaAvailabilityZones::FromCluster(None),
2449 (None, Some(azs)) => {
2450 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2451 }
2452 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2453 },
2454 size,
2455 billed_as,
2456 internal,
2457 pending,
2458 })
2459 }
2460 };
2461 Ok(location)
2462 }
2463
2464 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2475 let alloc = &self.cluster_replica_sizes.0[size];
2476 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2477 }
2478
2479 pub(crate) fn ensure_valid_replica_size(
2480 &self,
2481 allowed_sizes: &[String],
2482 size: &String,
2483 ) -> Result<(), Error> {
2484 let cluster_replica_sizes = &self.cluster_replica_sizes;
2485
2486 if !cluster_replica_sizes.0.contains_key(size)
2487 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2488 || cluster_replica_sizes.0[size].disabled
2489 {
2490 let mut entries = cluster_replica_sizes
2491 .enabled_allocations()
2492 .collect::<Vec<_>>();
2493
2494 if !allowed_sizes.is_empty() {
2495 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2496 entries.retain(|(name, _)| allowed_sizes.contains(name));
2497 }
2498
2499 entries.sort_by_key(
2500 |(
2501 _name,
2502 ReplicaAllocation {
2503 scale, cpu_limit, ..
2504 },
2505 )| (scale, cpu_limit),
2506 );
2507
2508 Err(Error {
2509 kind: ErrorKind::InvalidClusterReplicaSize {
2510 size: size.to_owned(),
2511 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2512 },
2513 })
2514 } else {
2515 Ok(())
2516 }
2517 }
2518
2519 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2520 if role_id.is_builtin() {
2521 let role = self.get_role(role_id);
2522 Err(Error::new(ErrorKind::ReservedRoleName(
2523 role.name().to_string(),
2524 )))
2525 } else {
2526 Ok(())
2527 }
2528 }
2529
2530 pub fn ensure_not_reserved_network_policy(
2531 &self,
2532 network_policy_id: &NetworkPolicyId,
2533 ) -> Result<(), Error> {
2534 if network_policy_id.is_builtin() {
2535 let policy = self.get_network_policy(network_policy_id);
2536 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2537 policy.name.clone(),
2538 )))
2539 } else {
2540 Ok(())
2541 }
2542 }
2543
2544 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2545 let is_grantable = !role_id.is_public() && !role_id.is_system();
2546 if is_grantable {
2547 Ok(())
2548 } else {
2549 let role = self.get_role(role_id);
2550 Err(Error::new(ErrorKind::UngrantableRoleName(
2551 role.name().to_string(),
2552 )))
2553 }
2554 }
2555
2556 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2557 if role_id.is_system() {
2558 let role = self.get_role(role_id);
2559 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2560 role.name().to_string(),
2561 )))
2562 } else {
2563 Ok(())
2564 }
2565 }
2566
2567 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2568 if role_id.is_predefined() {
2569 let role = self.get_role(role_id);
2570 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2571 role.name().to_string(),
2572 )))
2573 } else {
2574 Ok(())
2575 }
2576 }
2577
2578 pub(crate) fn add_to_audit_log(
2581 system_configuration: &SystemVars,
2582 oracle_write_ts: mz_repr::Timestamp,
2583 session: Option<&ConnMeta>,
2584 tx: &mut mz_catalog::durable::Transaction,
2585 audit_events: &mut Vec<VersionedEvent>,
2586 event_type: EventType,
2587 object_type: ObjectType,
2588 details: EventDetails,
2589 ) -> Result<(), Error> {
2590 let user = session.map(|session| session.user().name.to_string());
2591
2592 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2595 Some(ts) => ts.into(),
2596 _ => oracle_write_ts.into(),
2597 };
2598 let id = tx.allocate_audit_log_id()?;
2599 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2600 audit_events.push(event.clone());
2601 tx.insert_audit_log_event(event);
2602 Ok(())
2603 }
2604
2605 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2606 match id {
2607 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2608 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2609 self.get_cluster_replica(*cluster_id, *replica_id)
2610 .owner_id(),
2611 ),
2612 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2613 ObjectId::Schema((database_spec, schema_spec)) => Some(
2614 self.get_schema(database_spec, schema_spec, conn_id)
2615 .owner_id(),
2616 ),
2617 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2618 ObjectId::Role(_) => None,
2619 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2620 }
2621 }
2622
2623 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2624 match object_id {
2625 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2626 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2627 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2628 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2629 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2630 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2631 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2632 }
2633 }
2634
2635 pub(super) fn get_system_object_type(
2636 &self,
2637 id: &SystemObjectId,
2638 ) -> mz_sql::catalog::SystemObjectType {
2639 match id {
2640 SystemObjectId::Object(object_id) => {
2641 SystemObjectType::Object(self.get_object_type(object_id))
2642 }
2643 SystemObjectId::System => SystemObjectType::System,
2644 }
2645 }
2646
2647 pub fn storage_metadata(&self) -> &StorageMetadata {
2651 &self.storage_metadata
2652 }
2653
2654 pub fn source_compaction_windows(
2656 &self,
2657 ids: impl IntoIterator<Item = CatalogItemId>,
2658 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2659 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2660 let mut seen = BTreeSet::new();
2661 for item_id in ids {
2662 if !seen.insert(item_id) {
2663 continue;
2664 }
2665 let entry = self.get_entry(&item_id);
2666 match entry.item() {
2667 CatalogItem::Source(source) => {
2668 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2669 cws.entry(source_cw).or_default().insert(item_id);
2670 }
2671 CatalogItem::Table(table) => {
2672 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2673 match &table.data_source {
2674 TableDataSource::DataSource {
2675 desc:
2676 DataSourceDesc::IngestionExport { .. }
2677 | DataSourceDesc::Webhook { .. },
2679 timeline: _,
2680 } => {
2681 cws.entry(table_cw).or_default().insert(item_id);
2682 }
2683 TableDataSource::TableWrites { .. } => {}
2686 TableDataSource::DataSource {
2687 desc:
2688 DataSourceDesc::Ingestion { .. }
2689 | DataSourceDesc::OldSyntaxIngestion { .. }
2690 | DataSourceDesc::Introspection(_)
2691 | DataSourceDesc::Progress
2692 | DataSourceDesc::Catalog,
2693 ..
2694 } => {
2695 unreachable!(
2696 "unexpected DataSourceDesc for table {item_id}: {:?}",
2697 table.data_source
2698 )
2699 }
2700 }
2701 }
2702 _ => {
2703 continue;
2705 }
2706 }
2707 }
2708 cws
2709 }
2710
2711 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2712 match id {
2713 CommentObjectId::Table(id)
2714 | CommentObjectId::View(id)
2715 | CommentObjectId::MaterializedView(id)
2716 | CommentObjectId::Source(id)
2717 | CommentObjectId::Sink(id)
2718 | CommentObjectId::Index(id)
2719 | CommentObjectId::Func(id)
2720 | CommentObjectId::Connection(id)
2721 | CommentObjectId::Type(id)
2722 | CommentObjectId::Secret(id) => Some(*id),
2723 CommentObjectId::Role(_)
2724 | CommentObjectId::Database(_)
2725 | CommentObjectId::Schema(_)
2726 | CommentObjectId::Cluster(_)
2727 | CommentObjectId::ClusterReplica(_)
2728 | CommentObjectId::NetworkPolicy(_) => None,
2729 }
2730 }
2731
2732 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2733 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2734 }
2735
2736 pub fn comment_id_to_audit_log_name(
2737 &self,
2738 id: CommentObjectId,
2739 conn_id: &ConnectionId,
2740 ) -> String {
2741 match id {
2742 CommentObjectId::Table(id)
2743 | CommentObjectId::View(id)
2744 | CommentObjectId::MaterializedView(id)
2745 | CommentObjectId::Source(id)
2746 | CommentObjectId::Sink(id)
2747 | CommentObjectId::Index(id)
2748 | CommentObjectId::Func(id)
2749 | CommentObjectId::Connection(id)
2750 | CommentObjectId::Type(id)
2751 | CommentObjectId::Secret(id) => {
2752 let item = self.get_entry(&id);
2753 let name = self.resolve_full_name(item.name(), Some(conn_id));
2754 name.to_string()
2755 }
2756 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2757 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2758 CommentObjectId::Schema((spec, schema_id)) => {
2759 let schema = self.get_schema(&spec, &schema_id, conn_id);
2760 self.resolve_full_schema_name(&schema.name).to_string()
2761 }
2762 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2763 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2764 let cluster = self.get_cluster(cluster_id);
2765 let replica = self.get_cluster_replica(cluster_id, replica_id);
2766 QualifiedReplica {
2767 cluster: Ident::new_unchecked(cluster.name.clone()),
2768 replica: Ident::new_unchecked(replica.name.clone()),
2769 }
2770 .to_string()
2771 }
2772 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2773 }
2774 }
2775
2776 pub fn mock_authentication_nonce(&self) -> String {
2777 self.mock_authentication_nonce.clone().unwrap_or_default()
2778 }
2779}
2780
2781impl ConnectionResolver for CatalogState {
2782 fn resolve_connection(
2783 &self,
2784 id: CatalogItemId,
2785 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2786 use mz_storage_types::connections::Connection::*;
2787 match self
2788 .get_entry(&id)
2789 .connection()
2790 .expect("catalog out of sync")
2791 .details
2792 .to_connection()
2793 {
2794 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2795 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2796 Csr(conn) => Csr(conn.into_inline_connection(self)),
2797 Ssh(conn) => Ssh(conn),
2798 Aws(conn) => Aws(conn),
2799 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2800 MySql(conn) => MySql(conn.into_inline_connection(self)),
2801 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2802 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2803 }
2804 }
2805}
2806
2807impl OptimizerCatalog for CatalogState {
2808 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2809 CatalogState::get_entry_by_global_id(self, id)
2810 }
2811 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2812 CatalogState::get_entry(self, id)
2813 }
2814 fn resolve_full_name(
2815 &self,
2816 name: &QualifiedItemName,
2817 conn_id: Option<&ConnectionId>,
2818 ) -> FullItemName {
2819 CatalogState::resolve_full_name(self, name, conn_id)
2820 }
2821 fn get_indexes_on(
2822 &self,
2823 id: GlobalId,
2824 cluster: ClusterId,
2825 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2826 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2827 }
2828}
2829
2830impl OptimizerCatalog for Catalog {
2831 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2832 self.state.get_entry_by_global_id(id)
2833 }
2834
2835 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2836 self.state.get_entry(id)
2837 }
2838
2839 fn resolve_full_name(
2840 &self,
2841 name: &QualifiedItemName,
2842 conn_id: Option<&ConnectionId>,
2843 ) -> FullItemName {
2844 self.state.resolve_full_name(name, conn_id)
2845 }
2846
2847 fn get_indexes_on(
2848 &self,
2849 id: GlobalId,
2850 cluster: ClusterId,
2851 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2852 Box::new(self.state.get_indexes_on(id, cluster))
2853 }
2854}
2855
2856impl Catalog {
2857 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2858 self
2859 }
2860}