1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet, VecDeque};
14use std::fmt::Debug;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Instant;
18
19use ipnet::IpNet;
20use itertools::Itertools;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent};
24use mz_build_info::DUMMY_BUILD_INFO;
25use mz_catalog::SYSTEM_CONN_ID;
26use mz_catalog::builtin::{
27 BUILTINS, Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType,
28};
29use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
30use mz_catalog::expr_cache::LocalExpressions;
31use mz_catalog::memory::error::{Error, ErrorKind};
32use mz_catalog::memory::objects::{
33 CatalogCollectionEntry, CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap,
34 Connection, DataSourceDesc, Database, DefaultPrivileges, Index, MaterializedView,
35 NetworkPolicy, Role, RoleAuth, Schema, Secret, Sink, Source, SourceReferences, Table,
36 TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaAllocation, ReplicaLocation,
40 UnmanagedReplicaLocation,
41};
42use mz_controller_types::{ClusterId, ReplicaId};
43use mz_expr::{CollectionPlan, OptimizedMirRelationExpr};
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::DiskLimit;
46use mz_ore::collections::CollectionExt;
47use mz_ore::now::NOW_ZERO;
48use mz_ore::soft_assert_no_log;
49use mz_ore::str::StrExt;
50use mz_pgrepr::oid::INVALID_OID;
51use mz_repr::adt::mz_acl_item::PrivilegeMap;
52use mz_repr::namespaces::{
53 INFORMATION_SCHEMA, MZ_CATALOG_SCHEMA, MZ_CATALOG_UNSTABLE_SCHEMA, MZ_INTERNAL_SCHEMA,
54 MZ_INTROSPECTION_SCHEMA, MZ_TEMP_SCHEMA, MZ_UNSAFE_SCHEMA, PG_CATALOG_SCHEMA, SYSTEM_SCHEMAS,
55 UNSTABLE_SCHEMAS,
56};
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
59use mz_repr::role_id::RoleId;
60use mz_repr::{
61 CatalogItemId, GlobalId, RelationDesc, RelationVersion, RelationVersionSelector,
62 VersionedRelationDesc,
63};
64use mz_secrets::InMemorySecretsController;
65use mz_sql::ast::Ident;
66use mz_sql::catalog::{
67 CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
68 CatalogItem as SqlCatalogItem, CatalogItemType, CatalogRecordField, CatalogRole, CatalogSchema,
69 CatalogType, CatalogTypeDetails, IdReference, NameReference, SessionCatalog, SystemObjectType,
70 TypeReference,
71};
72use mz_sql::catalog::{CatalogConfig, EnvironmentId};
73use mz_sql::names::{
74 CommentObjectId, DatabaseId, DependencyIds, FullItemName, FullSchemaName, ObjectId,
75 PartialItemName, QualifiedItemName, QualifiedSchemaName, RawDatabaseSpecifier,
76 ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier, SystemObjectId,
77};
78use mz_sql::plan::{
79 CreateConnectionPlan, CreateIndexPlan, CreateMaterializedViewPlan, CreateSecretPlan,
80 CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, Params,
81 Plan, PlanContext,
82};
83use mz_sql::rbac;
84use mz_sql::session::metadata::SessionMetadata;
85use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
86use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, SystemVars, Var, VarInput};
87use mz_sql_parser::ast::QualifiedReplica;
88use mz_storage_client::controller::StorageMetadata;
89use mz_storage_types::connections::ConnectionContext;
90use mz_storage_types::connections::inline::{
91 ConnectionResolver, InlinedConnection, IntoInlineConnection,
92};
93use mz_transform::notice::OptimizerNotice;
94use serde::Serialize;
95use timely::progress::Antichain;
96use tokio::sync::mpsc;
97use tracing::{debug, warn};
98
99use crate::AdapterError;
101use crate::catalog::{Catalog, ConnCatalog};
102use crate::coord::{ConnMeta, infer_sql_type_for_catalog};
103use crate::optimize::{self, Optimize, OptimizerCatalog};
104use crate::session::Session;
105
106#[derive(Debug, Clone, Serialize)]
113pub struct CatalogState {
114 pub(super) database_by_name: imbl::OrdMap<String, DatabaseId>,
120 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
121 pub(super) database_by_id: imbl::OrdMap<DatabaseId, Database>,
122 #[serde(serialize_with = "skip_temp_items")]
123 pub(super) entry_by_id: imbl::OrdMap<CatalogItemId, CatalogEntry>,
124 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
125 pub(super) entry_by_global_id: imbl::OrdMap<GlobalId, CatalogItemId>,
126 pub(super) ambient_schemas_by_name: imbl::OrdMap<String, SchemaId>,
127 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
128 pub(super) ambient_schemas_by_id: imbl::OrdMap<SchemaId, Schema>,
129 pub(super) clusters_by_name: imbl::OrdMap<String, ClusterId>,
130 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
131 pub(super) clusters_by_id: imbl::OrdMap<ClusterId, Cluster>,
132 pub(super) roles_by_name: imbl::OrdMap<String, RoleId>,
133 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
134 pub(super) roles_by_id: imbl::OrdMap<RoleId, Role>,
135 pub(super) network_policies_by_name: imbl::OrdMap<String, NetworkPolicyId>,
136 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
137 pub(super) network_policies_by_id: imbl::OrdMap<NetworkPolicyId, NetworkPolicy>,
138 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
139 pub(super) role_auth_by_id: imbl::OrdMap<RoleId, RoleAuth>,
140
141 #[serde(skip)]
142 pub(super) system_configuration: Arc<SystemVars>,
143 pub(super) default_privileges: Arc<DefaultPrivileges>,
144 pub(super) system_privileges: Arc<PrivilegeMap>,
145 pub(super) comments: Arc<CommentsMap>,
146 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
147 pub(super) source_references: imbl::OrdMap<CatalogItemId, SourceReferences>,
148 pub(super) storage_metadata: Arc<StorageMetadata>,
149 pub(super) mock_authentication_nonce: Option<String>,
150
151 #[serde(skip)]
156 pub(super) notices_by_dep_id: imbl::OrdMap<GlobalId, Vec<Arc<OptimizerNotice>>>,
157
158 #[serde(skip)]
162 pub(super) temporary_schemas: imbl::OrdMap<ConnectionId, Schema>,
163
164 #[serde(skip)]
166 pub(super) config: mz_sql::catalog::CatalogConfig,
167 pub(super) cluster_replica_sizes: ClusterReplicaSizeMap,
168 #[serde(skip)]
169 pub(crate) availability_zones: Vec<String>,
170
171 #[serde(skip)]
173 pub(super) egress_addresses: Vec<IpNet>,
174 pub(super) aws_principal_context: Option<AwsPrincipalContext>,
175 pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
176 pub(super) http_host_name: Option<String>,
177
178 #[serde(skip)]
180 pub(super) license_key: ValidatedLicenseKey,
181}
182
183#[derive(Debug, Clone, Serialize)]
188pub(crate) enum LocalExpressionCache {
189 Open {
191 cached_exprs: BTreeMap<GlobalId, LocalExpressions>,
193 uncached_exprs: BTreeMap<GlobalId, LocalExpressions>,
195 },
196 Closed,
198}
199
200impl LocalExpressionCache {
201 pub(super) fn new(cached_exprs: BTreeMap<GlobalId, LocalExpressions>) -> Self {
202 Self::Open {
203 cached_exprs,
204 uncached_exprs: BTreeMap::new(),
205 }
206 }
207
208 pub(super) fn remove_cached_expression(&mut self, id: &GlobalId) -> Option<LocalExpressions> {
209 match self {
210 LocalExpressionCache::Open { cached_exprs, .. } => cached_exprs.remove(id),
211 LocalExpressionCache::Closed => None,
212 }
213 }
214
215 pub(super) fn insert_cached_expression(
218 &mut self,
219 id: GlobalId,
220 local_expressions: LocalExpressions,
221 ) {
222 match self {
223 LocalExpressionCache::Open { cached_exprs, .. } => {
224 cached_exprs.insert(id, local_expressions);
225 }
226 LocalExpressionCache::Closed => {}
227 }
228 }
229
230 pub(super) fn insert_uncached_expression(
233 &mut self,
234 id: GlobalId,
235 local_mir: OptimizedMirRelationExpr,
236 optimizer_features: OptimizerFeatures,
237 ) {
238 match self {
239 LocalExpressionCache::Open { uncached_exprs, .. } => {
240 let local_expr = LocalExpressions {
241 local_mir,
242 optimizer_features,
243 };
244 let prev = uncached_exprs.remove(&id);
251 match prev {
252 Some(prev) if prev == local_expr => {
253 uncached_exprs.insert(id, local_expr);
254 }
255 None => {
256 uncached_exprs.insert(id, local_expr);
257 }
258 Some(_) => {}
259 }
260 }
261 LocalExpressionCache::Closed => {}
262 }
263 }
264
265 pub(super) fn into_uncached_exprs(self) -> BTreeMap<GlobalId, LocalExpressions> {
266 match self {
267 LocalExpressionCache::Open { uncached_exprs, .. } => uncached_exprs,
268 LocalExpressionCache::Closed => BTreeMap::new(),
269 }
270 }
271}
272
273fn skip_temp_items<S>(
274 entries: &imbl::OrdMap<CatalogItemId, CatalogEntry>,
275 serializer: S,
276) -> Result<S::Ok, S::Error>
277where
278 S: serde::Serializer,
279{
280 mz_ore::serde::map_key_to_string(
281 entries.iter().filter(|(_k, v)| v.conn_id().is_none()),
282 serializer,
283 )
284}
285
286impl CatalogState {
287 pub fn empty_test() -> Self {
291 CatalogState {
292 database_by_name: Default::default(),
293 database_by_id: Default::default(),
294 entry_by_id: Default::default(),
295 entry_by_global_id: Default::default(),
296 notices_by_dep_id: Default::default(),
297 ambient_schemas_by_name: Default::default(),
298 ambient_schemas_by_id: Default::default(),
299 temporary_schemas: Default::default(),
300 clusters_by_id: Default::default(),
301 clusters_by_name: Default::default(),
302 network_policies_by_name: Default::default(),
303 roles_by_name: Default::default(),
304 roles_by_id: Default::default(),
305 network_policies_by_id: Default::default(),
306 role_auth_by_id: Default::default(),
307 config: CatalogConfig {
308 start_time: Default::default(),
309 start_instant: Instant::now(),
310 nonce: Default::default(),
311 environment_id: EnvironmentId::for_tests(),
312 session_id: Default::default(),
313 build_info: &DUMMY_BUILD_INFO,
314 now: NOW_ZERO.clone(),
315 connection_context: ConnectionContext::for_tests(Arc::new(
316 InMemorySecretsController::new(),
317 )),
318 helm_chart_version: None,
319 },
320 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
321 availability_zones: Default::default(),
322 system_configuration: Arc::new(SystemVars::default()),
323 egress_addresses: Default::default(),
324 aws_principal_context: Default::default(),
325 aws_privatelink_availability_zones: Default::default(),
326 http_host_name: Default::default(),
327 default_privileges: Arc::new(DefaultPrivileges::default()),
328 system_privileges: Arc::new(PrivilegeMap::default()),
329 comments: Arc::new(CommentsMap::default()),
330 source_references: Default::default(),
331 storage_metadata: Arc::new(StorageMetadata::default()),
332 license_key: ValidatedLicenseKey::for_tests(),
333 mock_authentication_nonce: Default::default(),
334 }
335 }
336
337 pub fn for_session<'a>(&'a self, session: &'a Session) -> ConnCatalog<'a> {
338 let search_path = self.resolve_search_path(session);
339 let database = self
340 .database_by_name
341 .get(session.vars().database())
342 .map(|id| id.clone());
343 let state = match session.transaction().catalog_state() {
344 Some(txn_catalog_state) => Cow::Borrowed(txn_catalog_state),
345 None => Cow::Borrowed(self),
346 };
347 ConnCatalog {
348 state,
349 unresolvable_ids: BTreeSet::new(),
350 conn_id: session.conn_id().clone(),
351 cluster: session.vars().cluster().into(),
352 database,
353 search_path,
354 role_id: session.current_role_id().clone(),
355 prepared_statements: Some(session.prepared_statements()),
356 portals: Some(session.portals()),
357 notices_tx: session.retain_notice_transmitter(),
358 restrict_to_user_objects: session.vars().restrict_to_user_objects(),
359 }
360 }
361
362 pub fn for_sessionless_user(&self, role_id: RoleId) -> ConnCatalog<'_> {
363 let (notices_tx, _notices_rx) = mpsc::unbounded_channel();
364 let cluster = self.system_configuration.default_cluster();
365
366 ConnCatalog {
367 state: Cow::Borrowed(self),
368 unresolvable_ids: BTreeSet::new(),
369 conn_id: SYSTEM_CONN_ID.clone(),
370 cluster,
371 database: self
372 .resolve_database(DEFAULT_DATABASE_NAME)
373 .ok()
374 .map(|db| db.id()),
375 search_path: Vec::new(),
378 role_id,
379 prepared_statements: None,
380 portals: None,
381 notices_tx,
382 restrict_to_user_objects: false,
383 }
384 }
385
386 pub fn for_system_session(&self) -> ConnCatalog<'_> {
387 self.for_sessionless_user(MZ_SYSTEM_ROLE_ID)
388 }
389
390 pub fn transitive_uses(&self, id: CatalogItemId) -> impl Iterator<Item = CatalogItemId> + '_ {
395 struct I<'a> {
396 queue: VecDeque<CatalogItemId>,
397 seen: BTreeSet<CatalogItemId>,
398 this: &'a CatalogState,
399 }
400 impl<'a> Iterator for I<'a> {
401 type Item = CatalogItemId;
402 fn next(&mut self) -> Option<Self::Item> {
403 if let Some(next) = self.queue.pop_front() {
404 for child in self.this.get_entry(&next).item().uses() {
405 if !self.seen.contains(&child) {
406 self.queue.push_back(child);
407 self.seen.insert(child);
408 }
409 }
410 Some(next)
411 } else {
412 None
413 }
414 }
415 }
416
417 I {
418 queue: [id].into_iter().collect(),
419 seen: [id].into_iter().collect(),
420 this: self,
421 }
422 }
423
424 pub fn introspection_dependencies(&self, id: CatalogItemId) -> Vec<CatalogItemId> {
427 let mut out = Vec::new();
428 self.introspection_dependencies_inner(id, &mut out);
429 out
430 }
431
432 fn introspection_dependencies_inner(&self, id: CatalogItemId, out: &mut Vec<CatalogItemId>) {
433 match self.get_entry(&id).item() {
434 CatalogItem::Log(_) => out.push(id),
435 item @ (CatalogItem::View(_)
436 | CatalogItem::MaterializedView(_)
437 | CatalogItem::Connection(_)) => {
438 for item_id in item.references().items() {
440 self.introspection_dependencies_inner(*item_id, out);
441 }
442 }
443 CatalogItem::Sink(sink) => {
444 let from_item_id = self.get_entry_by_global_id(&sink.from).id();
445 self.introspection_dependencies_inner(from_item_id, out)
446 }
447 CatalogItem::Index(idx) => {
448 let on_item_id = self.get_entry_by_global_id(&idx.on).id();
449 self.introspection_dependencies_inner(on_item_id, out)
450 }
451 CatalogItem::Table(_)
452 | CatalogItem::Source(_)
453 | CatalogItem::Type(_)
454 | CatalogItem::Func(_)
455 | CatalogItem::Secret(_) => (),
456 }
457 }
458
459 pub(super) fn object_dependents(
465 &self,
466 object_ids: &Vec<ObjectId>,
467 conn_id: &ConnectionId,
468 seen: &mut BTreeSet<ObjectId>,
469 ) -> Vec<ObjectId> {
470 let mut dependents = Vec::new();
471 for object_id in object_ids {
472 match object_id {
473 ObjectId::Cluster(id) => {
474 dependents.extend_from_slice(&self.cluster_dependents(*id, seen));
475 }
476 ObjectId::ClusterReplica((cluster_id, replica_id)) => dependents.extend_from_slice(
477 &self.cluster_replica_dependents(*cluster_id, *replica_id, seen),
478 ),
479 ObjectId::Database(id) => {
480 dependents.extend_from_slice(&self.database_dependents(*id, conn_id, seen))
481 }
482 ObjectId::Schema((database_spec, schema_spec)) => {
483 dependents.extend_from_slice(&self.schema_dependents(
484 database_spec.clone(),
485 schema_spec.clone(),
486 conn_id,
487 seen,
488 ));
489 }
490 ObjectId::NetworkPolicy(id) => {
491 dependents.extend_from_slice(&self.network_policy_dependents(*id, seen));
492 }
493 id @ ObjectId::Role(_) => {
494 let unseen = seen.insert(id.clone());
495 if unseen {
496 dependents.push(id.clone());
497 }
498 }
499 ObjectId::Item(id) => {
500 dependents.extend_from_slice(&self.item_dependents(*id, seen))
501 }
502 }
503 }
504 dependents
505 }
506
507 fn cluster_dependents(
514 &self,
515 cluster_id: ClusterId,
516 seen: &mut BTreeSet<ObjectId>,
517 ) -> Vec<ObjectId> {
518 let mut dependents = Vec::new();
519 let object_id = ObjectId::Cluster(cluster_id);
520 if !seen.contains(&object_id) {
521 seen.insert(object_id.clone());
522 let cluster = self.get_cluster(cluster_id);
523 for item_id in cluster.bound_objects() {
524 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
525 }
526 for replica_id in cluster.replica_ids().values() {
527 dependents.extend_from_slice(&self.cluster_replica_dependents(
528 cluster_id,
529 *replica_id,
530 seen,
531 ));
532 }
533 dependents.push(object_id);
534 }
535 dependents
536 }
537
538 pub(super) fn cluster_replica_dependents(
545 &self,
546 cluster_id: ClusterId,
547 replica_id: ReplicaId,
548 seen: &mut BTreeSet<ObjectId>,
549 ) -> Vec<ObjectId> {
550 let mut dependents = Vec::new();
551 let object_id = ObjectId::ClusterReplica((cluster_id, replica_id));
552 if !seen.contains(&object_id) {
553 seen.insert(object_id.clone());
554 let cluster = self.get_cluster(cluster_id);
558 for item_id in cluster.bound_objects() {
559 if let CatalogItem::MaterializedView(mv) = self.get_entry(item_id).item()
560 && mv.target_replica == Some(replica_id)
561 {
562 dependents.extend_from_slice(&self.item_dependents(*item_id, seen));
563 }
564 }
565 dependents.push(object_id);
566 }
567 dependents
568 }
569
570 fn database_dependents(
577 &self,
578 database_id: DatabaseId,
579 conn_id: &ConnectionId,
580 seen: &mut BTreeSet<ObjectId>,
581 ) -> Vec<ObjectId> {
582 let mut dependents = Vec::new();
583 let object_id = ObjectId::Database(database_id);
584 if !seen.contains(&object_id) {
585 seen.insert(object_id.clone());
586 let database = self.get_database(&database_id);
587 for schema_id in database.schema_ids().values() {
588 dependents.extend_from_slice(&self.schema_dependents(
589 ResolvedDatabaseSpecifier::Id(database_id),
590 SchemaSpecifier::Id(*schema_id),
591 conn_id,
592 seen,
593 ));
594 }
595 dependents.push(object_id);
596 }
597 dependents
598 }
599
600 fn schema_dependents(
607 &self,
608 database_spec: ResolvedDatabaseSpecifier,
609 schema_spec: SchemaSpecifier,
610 conn_id: &ConnectionId,
611 seen: &mut BTreeSet<ObjectId>,
612 ) -> Vec<ObjectId> {
613 let mut dependents = Vec::new();
614 let object_id = ObjectId::Schema((database_spec, schema_spec.clone()));
615 if !seen.contains(&object_id) {
616 seen.insert(object_id.clone());
617 let schema = self.get_schema(&database_spec, &schema_spec, conn_id);
618 for item_id in schema.item_ids() {
619 dependents.extend_from_slice(&self.item_dependents(item_id, seen));
620 }
621 dependents.push(object_id)
622 }
623 dependents
624 }
625
626 pub(super) fn item_dependents(
633 &self,
634 item_id: CatalogItemId,
635 seen: &mut BTreeSet<ObjectId>,
636 ) -> Vec<ObjectId> {
637 let mut dependents = Vec::new();
638 let object_id = ObjectId::Item(item_id);
639 if !seen.contains(&object_id) {
640 seen.insert(object_id.clone());
641 let entry = self.get_entry(&item_id);
642 for dependent_id in entry.used_by() {
643 dependents.extend_from_slice(&self.item_dependents(*dependent_id, seen));
644 }
645 dependents.push(object_id);
646 if let Some(progress_id) = entry.progress_id() {
650 dependents.extend_from_slice(&self.item_dependents(progress_id, seen));
651 }
652 }
653 dependents
654 }
655
656 pub(super) fn network_policy_dependents(
663 &self,
664 network_policy_id: NetworkPolicyId,
665 _seen: &mut BTreeSet<ObjectId>,
666 ) -> Vec<ObjectId> {
667 let object_id = ObjectId::NetworkPolicy(network_policy_id);
668 vec![object_id]
672 }
673
674 fn is_stable(&self, id: CatalogItemId) -> bool {
678 let spec = self.get_entry(&id).name().qualifiers.schema_spec;
679 !self.is_unstable_schema_specifier(spec)
680 }
681
682 pub(super) fn check_unstable_dependencies(&self, item: &CatalogItem) -> Result<(), Error> {
683 if self.system_config().unsafe_enable_unstable_dependencies() {
684 return Ok(());
685 }
686
687 let unstable_dependencies: Vec<_> = item
688 .references()
689 .items()
690 .filter(|id| !self.is_stable(**id))
691 .map(|id| self.get_entry(id).name().item.clone())
692 .collect();
693
694 if unstable_dependencies.is_empty() || item.is_temporary() {
698 Ok(())
699 } else {
700 let object_type = item.typ().to_string();
701 Err(Error {
702 kind: ErrorKind::UnstableDependency {
703 object_type,
704 unstable_dependencies,
705 },
706 })
707 }
708 }
709
710 pub fn resolve_full_name(
711 &self,
712 name: &QualifiedItemName,
713 conn_id: Option<&ConnectionId>,
714 ) -> FullItemName {
715 let conn_id = conn_id.unwrap_or(&SYSTEM_CONN_ID);
716
717 let database = match &name.qualifiers.database_spec {
718 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
719 ResolvedDatabaseSpecifier::Id(id) => {
720 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
721 }
722 };
723 let schema = match &name.qualifiers.schema_spec {
726 SchemaSpecifier::Temporary => MZ_TEMP_SCHEMA.to_string(),
727 SchemaSpecifier::Id(_) => self
728 .get_schema(
729 &name.qualifiers.database_spec,
730 &name.qualifiers.schema_spec,
731 conn_id,
732 )
733 .name()
734 .schema
735 .clone(),
736 };
737 FullItemName {
738 database,
739 schema,
740 item: name.item.clone(),
741 }
742 }
743
744 pub(super) fn resolve_full_schema_name(&self, name: &QualifiedSchemaName) -> FullSchemaName {
745 let database = match &name.database {
746 ResolvedDatabaseSpecifier::Ambient => RawDatabaseSpecifier::Ambient,
747 ResolvedDatabaseSpecifier::Id(id) => {
748 RawDatabaseSpecifier::Name(self.get_database(id).name().to_string())
749 }
750 };
751 FullSchemaName {
752 database,
753 schema: name.schema.clone(),
754 }
755 }
756
757 pub fn get_entry(&self, id: &CatalogItemId) -> &CatalogEntry {
758 self.entry_by_id
759 .get(id)
760 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"))
761 }
762
763 pub fn get_entry_by_global_id(&self, id: &GlobalId) -> CatalogCollectionEntry {
764 let item_id = self
765 .entry_by_global_id
766 .get(id)
767 .unwrap_or_else(|| panic!("catalog out of sync, missing id {id:?}"));
768
769 let entry = self.get_entry(item_id).clone();
770 let version = match entry.item() {
771 CatalogItem::Table(table) => {
772 let (version, _) = table
773 .collections
774 .iter()
775 .find(|(_verison, gid)| *gid == id)
776 .expect("version to exist");
777 RelationVersionSelector::Specific(*version)
778 }
779 _ => RelationVersionSelector::Latest,
780 };
781 CatalogCollectionEntry { entry, version }
782 }
783
784 pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
785 self.entry_by_id.iter()
786 }
787
788 pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
789 self.temporary_schemas
791 .get(conn)
792 .into_iter()
793 .flat_map(|schema| schema.items.values().copied().map(ObjectId::from))
794 }
795
796 pub fn has_temporary_schema(&self, conn: &ConnectionId) -> bool {
802 self.temporary_schemas.contains_key(conn)
803 }
804
805 pub(super) fn get_system_type(&self, name: &str) -> &CatalogEntry {
811 let mut res = None;
812 for schema_id in self.system_schema_ids() {
813 let schema = &self.ambient_schemas_by_id[&schema_id];
814 if let Some(global_id) = schema.types.get(name) {
815 match res {
816 None => res = Some(self.get_entry(global_id)),
817 Some(_) => panic!(
818 "only call get_system_type on objects uniquely identifiable in one system schema"
819 ),
820 }
821 }
822 }
823
824 res.unwrap_or_else(|| panic!("cannot find type {} in system schema", name))
825 }
826
827 pub fn get_item_by_name(
828 &self,
829 name: &QualifiedItemName,
830 conn_id: &ConnectionId,
831 ) -> Option<&CatalogEntry> {
832 self.get_schema(
833 &name.qualifiers.database_spec,
834 &name.qualifiers.schema_spec,
835 conn_id,
836 )
837 .items
838 .get(&name.item)
839 .and_then(|id| self.try_get_entry(id))
840 }
841
842 pub fn get_type_by_name(
843 &self,
844 name: &QualifiedItemName,
845 conn_id: &ConnectionId,
846 ) -> Option<&CatalogEntry> {
847 self.get_schema(
848 &name.qualifiers.database_spec,
849 &name.qualifiers.schema_spec,
850 conn_id,
851 )
852 .types
853 .get(&name.item)
854 .and_then(|id| self.try_get_entry(id))
855 }
856
857 pub(super) fn find_available_name(
858 &self,
859 mut name: QualifiedItemName,
860 conn_id: &ConnectionId,
861 ) -> QualifiedItemName {
862 let mut i = 0;
863 let orig_item_name = name.item.clone();
864 while self.get_item_by_name(&name, conn_id).is_some() {
865 i += 1;
866 name.item = format!("{}{}", orig_item_name, i);
867 }
868 name
869 }
870
871 pub fn try_get_entry(&self, id: &CatalogItemId) -> Option<&CatalogEntry> {
872 self.entry_by_id.get(id)
873 }
874
875 pub fn try_get_entry_by_global_id(&self, id: &GlobalId) -> Option<&CatalogEntry> {
876 let item_id = self.entry_by_global_id.get(id)?;
877 self.try_get_entry(item_id)
878 }
879
880 pub fn try_get_desc_by_global_id(&self, id: &GlobalId) -> Option<Cow<'_, RelationDesc>> {
883 let entry = self.try_get_entry_by_global_id(id)?;
884 let desc = match entry.item() {
885 CatalogItem::Table(table) => Cow::Owned(table.desc_for(id)),
886 other => other.relation_desc(RelationVersionSelector::Latest)?,
888 };
889 Some(desc)
890 }
891
892 pub(crate) fn get_cluster(&self, cluster_id: ClusterId) -> &Cluster {
893 self.try_get_cluster(cluster_id)
894 .unwrap_or_else(|| panic!("unknown cluster {cluster_id}"))
895 }
896
897 pub(super) fn try_get_cluster(&self, cluster_id: ClusterId) -> Option<&Cluster> {
898 self.clusters_by_id.get(&cluster_id)
899 }
900
901 pub(super) fn try_get_role(&self, id: &RoleId) -> Option<&Role> {
902 self.roles_by_id.get(id)
903 }
904
905 pub fn get_role(&self, id: &RoleId) -> &Role {
906 self.roles_by_id.get(id).expect("catalog out of sync")
907 }
908
909 pub fn get_roles(&self) -> impl Iterator<Item = &RoleId> {
910 self.roles_by_id.keys()
911 }
912
913 pub(super) fn try_get_role_by_name(&self, role_name: &str) -> Option<&Role> {
914 self.roles_by_name
915 .get(role_name)
916 .map(|id| &self.roles_by_id[id])
917 }
918
919 pub(super) fn get_role_auth(&self, id: &RoleId) -> &RoleAuth {
920 self.role_auth_by_id
921 .get(id)
922 .unwrap_or_else(|| panic!("catalog out of sync, missing role auth for {id}"))
923 }
924
925 pub(super) fn try_get_role_auth_by_id(&self, id: &RoleId) -> Option<&RoleAuth> {
926 self.role_auth_by_id.get(id)
927 }
928
929 pub(super) fn try_get_network_policy_by_name(
930 &self,
931 policy_name: &str,
932 ) -> Option<&NetworkPolicy> {
933 self.network_policies_by_name
934 .get(policy_name)
935 .map(|id| &self.network_policies_by_id[id])
936 }
937
938 pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet<RoleId> {
939 let mut membership = BTreeSet::new();
940 let mut queue = VecDeque::from(vec![id]);
941 while let Some(cur_id) = queue.pop_front() {
942 if !membership.contains(cur_id) {
943 membership.insert(cur_id.clone());
944 let role = self.get_role(cur_id);
945 soft_assert_no_log!(
946 !role.membership().keys().contains(id),
947 "circular membership exists in the catalog"
948 );
949 queue.extend(role.membership().keys());
950 }
951 }
952 membership.insert(RoleId::Public);
953 membership
954 }
955
956 pub fn get_network_policy(&self, id: &NetworkPolicyId) -> &NetworkPolicy {
957 self.network_policies_by_id
958 .get(id)
959 .expect("catalog out of sync")
960 }
961
962 pub fn get_network_policies(&self) -> impl Iterator<Item = &NetworkPolicyId> {
963 self.network_policies_by_id.keys()
964 }
965
966 pub fn try_get_webhook_url(&self, id: &CatalogItemId) -> Option<url::Url> {
971 let entry = self.try_get_entry(id)?;
972 let name = self.resolve_full_name(entry.name(), None);
974 let host_name = self
975 .http_host_name
976 .as_ref()
977 .map(|x| x.as_str())
978 .unwrap_or_else(|| "HOST");
979
980 let RawDatabaseSpecifier::Name(database) = name.database else {
981 return None;
982 };
983
984 let mut url = url::Url::parse(&format!("https://{host_name}/api/webhook")).ok()?;
985 url.path_segments_mut()
986 .ok()?
987 .push(&database)
988 .push(&name.schema)
989 .push(&name.item);
990
991 Some(url)
992 }
993
994 pub(crate) fn deserialize_plan_with_enable_for_item_parsing(
1002 &mut self,
1005 create_sql: &str,
1006 force_if_exists_skip: bool,
1007 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1008 self.with_enable_for_item_parsing(|state| {
1009 let pcx = PlanContext::zero().with_ignore_if_exists_errors(force_if_exists_skip);
1010 let pcx = Some(&pcx);
1011 let session_catalog = state.for_system_session();
1012
1013 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1014 let (stmt, resolved_ids) = mz_sql::names::resolve(&session_catalog, stmt)?;
1015 let (plan, _sql_impl_ids) =
1016 mz_sql::plan::plan(pcx, &session_catalog, stmt, &Params::empty(), &resolved_ids)?;
1017
1018 Ok((plan, resolved_ids))
1019 })
1020 }
1021
1022 #[mz_ore::instrument]
1024 pub(crate) fn parse_plan(
1025 create_sql: &str,
1026 pcx: Option<&PlanContext>,
1027 catalog: &ConnCatalog,
1028 ) -> Result<(Plan, ResolvedIds), AdapterError> {
1029 let stmt = mz_sql::parse::parse(create_sql)?.into_element().ast;
1030 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, stmt)?;
1031 let (plan, _sql_impl_ids) =
1032 mz_sql::plan::plan(pcx, catalog, stmt, &Params::empty(), &resolved_ids)?;
1033
1034 Ok((plan, resolved_ids))
1035 }
1036
1037 pub(crate) fn deserialize_item(
1039 &self,
1040 global_id: GlobalId,
1041 create_sql: &str,
1042 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1043 local_expression_cache: &mut LocalExpressionCache,
1044 previous_item: Option<CatalogItem>,
1045 ) -> Result<CatalogItem, AdapterError> {
1046 self.parse_item(
1047 global_id,
1048 create_sql,
1049 extra_versions,
1050 None,
1051 false,
1052 None,
1053 local_expression_cache,
1054 previous_item,
1055 )
1056 }
1057
1058 #[mz_ore::instrument]
1060 pub(crate) fn parse_item(
1061 &self,
1062 global_id: GlobalId,
1063 create_sql: &str,
1064 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1065 pcx: Option<&PlanContext>,
1066 is_retained_metrics_object: bool,
1067 custom_logical_compaction_window: Option<CompactionWindow>,
1068 local_expression_cache: &mut LocalExpressionCache,
1069 previous_item: Option<CatalogItem>,
1070 ) -> Result<CatalogItem, AdapterError> {
1071 let cached_expr = local_expression_cache.remove_cached_expression(&global_id);
1072 match self.parse_item_inner(
1073 global_id,
1074 create_sql,
1075 extra_versions,
1076 pcx,
1077 is_retained_metrics_object,
1078 custom_logical_compaction_window,
1079 cached_expr,
1080 previous_item,
1081 ) {
1082 Ok((item, uncached_expr)) => {
1083 if let Some((uncached_expr, optimizer_features)) = uncached_expr {
1084 local_expression_cache.insert_uncached_expression(
1085 global_id,
1086 uncached_expr,
1087 optimizer_features,
1088 );
1089 }
1090 Ok(item)
1091 }
1092 Err((err, cached_expr)) => {
1093 if let Some(local_expr) = cached_expr {
1094 local_expression_cache.insert_cached_expression(global_id, local_expr);
1095 }
1096 Err(err)
1097 }
1098 }
1099 }
1100
1101 #[mz_ore::instrument]
1108 pub(crate) fn parse_item_inner(
1109 &self,
1110 global_id: GlobalId,
1111 create_sql: &str,
1112 extra_versions: &BTreeMap<RelationVersion, GlobalId>,
1113 pcx: Option<&PlanContext>,
1114 is_retained_metrics_object: bool,
1115 custom_logical_compaction_window: Option<CompactionWindow>,
1116 cached_expr: Option<LocalExpressions>,
1117 previous_item: Option<CatalogItem>,
1118 ) -> Result<
1119 (
1120 CatalogItem,
1121 Option<(OptimizedMirRelationExpr, OptimizerFeatures)>,
1122 ),
1123 (AdapterError, Option<LocalExpressions>),
1124 > {
1125 let session_catalog = self.for_system_session();
1126
1127 let (plan, resolved_ids) = match Self::parse_plan(create_sql, pcx, &session_catalog) {
1128 Ok((plan, resolved_ids)) => (plan, resolved_ids),
1129 Err(err) => return Err((err, cached_expr)),
1130 };
1131
1132 let mut uncached_expr = None;
1133
1134 let previous_plans = previous_item.as_ref().map(|item| {
1144 (
1145 item.optimized_plan().cloned(),
1146 item.physical_plan().cloned(),
1147 item.dataflow_metainfo().cloned(),
1148 )
1149 });
1150
1151 let mut item = match plan {
1152 Plan::CreateTable(CreateTablePlan { table, .. }) => {
1153 let collections = extra_versions
1154 .iter()
1155 .map(|(version, gid)| (*version, *gid))
1156 .chain([(RelationVersion::root(), global_id)].into_iter())
1157 .collect();
1158
1159 CatalogItem::Table(Table {
1160 create_sql: Some(table.create_sql),
1161 desc: table.desc,
1162 collections,
1163 conn_id: None,
1164 resolved_ids,
1165 custom_logical_compaction_window: custom_logical_compaction_window
1166 .or(table.compaction_window),
1167 is_retained_metrics_object,
1168 data_source: match table.data_source {
1169 mz_sql::plan::TableDataSource::TableWrites { defaults } => {
1170 TableDataSource::TableWrites { defaults }
1171 }
1172 mz_sql::plan::TableDataSource::DataSource {
1173 desc: data_source_desc,
1174 timeline,
1175 } => match data_source_desc {
1176 mz_sql::plan::DataSourceDesc::IngestionExport {
1177 ingestion_id,
1178 external_reference,
1179 details,
1180 data_config,
1181 } => TableDataSource::DataSource {
1182 desc: DataSourceDesc::IngestionExport {
1183 ingestion_id,
1184 external_reference,
1185 details,
1186 data_config,
1187 },
1188 timeline,
1189 },
1190 mz_sql::plan::DataSourceDesc::Webhook {
1191 validate_using,
1192 body_format,
1193 headers,
1194 cluster_id,
1195 } => TableDataSource::DataSource {
1196 desc: DataSourceDesc::Webhook {
1197 validate_using,
1198 body_format,
1199 headers,
1200 cluster_id: cluster_id
1201 .expect("Webhook Tables must have a cluster_id set"),
1202 },
1203 timeline,
1204 },
1205 _ => {
1206 return Err((
1207 AdapterError::Unstructured(anyhow::anyhow!(
1208 "unsupported data source for table"
1209 )),
1210 cached_expr,
1211 ));
1212 }
1213 },
1214 },
1215 })
1216 }
1217 Plan::CreateSource(CreateSourcePlan {
1218 source,
1219 timeline,
1220 in_cluster,
1221 ..
1222 }) => CatalogItem::Source(Source {
1223 create_sql: Some(source.create_sql),
1224 data_source: match source.data_source {
1225 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1226 desc,
1227 cluster_id: match in_cluster {
1228 Some(id) => id,
1229 None => {
1230 return Err((
1231 AdapterError::Unstructured(anyhow::anyhow!(
1232 "ingestion-based sources must have cluster specified"
1233 )),
1234 cached_expr,
1235 ));
1236 }
1237 },
1238 },
1239 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1240 desc,
1241 progress_subsource,
1242 data_config,
1243 details,
1244 } => DataSourceDesc::OldSyntaxIngestion {
1245 desc,
1246 progress_subsource,
1247 data_config,
1248 details,
1249 cluster_id: match in_cluster {
1250 Some(id) => id,
1251 None => {
1252 return Err((
1253 AdapterError::Unstructured(anyhow::anyhow!(
1254 "ingestion-based sources must have cluster specified"
1255 )),
1256 cached_expr,
1257 ));
1258 }
1259 },
1260 },
1261 mz_sql::plan::DataSourceDesc::IngestionExport {
1262 ingestion_id,
1263 external_reference,
1264 details,
1265 data_config,
1266 } => DataSourceDesc::IngestionExport {
1267 ingestion_id,
1268 external_reference,
1269 details,
1270 data_config,
1271 },
1272 mz_sql::plan::DataSourceDesc::Progress => DataSourceDesc::Progress,
1273 mz_sql::plan::DataSourceDesc::Webhook {
1274 validate_using,
1275 body_format,
1276 headers,
1277 cluster_id,
1278 } => {
1279 mz_ore::soft_assert_or_log!(
1280 cluster_id.is_none(),
1281 "cluster_id set at Source level for Webhooks"
1282 );
1283 DataSourceDesc::Webhook {
1284 validate_using,
1285 body_format,
1286 headers,
1287 cluster_id: in_cluster
1288 .expect("webhook sources must use an existing cluster"),
1289 }
1290 }
1291 },
1292 desc: source.desc,
1293 global_id,
1294 timeline,
1295 resolved_ids,
1296 custom_logical_compaction_window: source
1297 .compaction_window
1298 .or(custom_logical_compaction_window),
1299 is_retained_metrics_object,
1300 }),
1301 Plan::CreateView(CreateViewPlan { view, .. }) => {
1302 let optimizer_config =
1304 optimize::OptimizerConfig::from(session_catalog.system_vars());
1305 let previous_exprs = previous_item.map(|item| match item {
1306 CatalogItem::View(view) => Some((view.raw_expr, view.locally_optimized_expr)),
1307 _ => None,
1308 });
1309
1310 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1311 (Some(local_expr), _)
1312 if local_expr.optimizer_features == optimizer_config.features =>
1313 {
1314 debug!("local expression cache hit for {global_id:?}");
1315 (Arc::new(view.expr), Arc::new(local_expr.local_mir))
1316 }
1317 (_, Some(Some((raw_expr, optimized_expr)))) if *raw_expr == view.expr => {
1319 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1320 }
1321 (cached_expr, _) => {
1322 let optimizer_features = optimizer_config.features.clone();
1323 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1325
1326 let raw_expr = view.expr;
1328 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1329 Ok(optimzed_expr) => optimzed_expr,
1330 Err(err) => return Err((err.into(), cached_expr)),
1331 };
1332
1333 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1334
1335 (Arc::new(raw_expr), Arc::new(optimized_expr))
1336 }
1337 };
1338
1339 let dependencies: BTreeSet<_> = raw_expr
1341 .depends_on()
1342 .into_iter()
1343 .map(|gid| self.get_entry_by_global_id(&gid).id())
1344 .collect();
1345
1346 let typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1347 CatalogItem::View(View {
1348 create_sql: view.create_sql,
1349 global_id,
1350 raw_expr,
1351 desc: RelationDesc::new(typ, view.column_names),
1352 locally_optimized_expr: optimized_expr,
1353 conn_id: None,
1354 resolved_ids,
1355 dependencies: DependencyIds(dependencies),
1356 })
1357 }
1358 Plan::CreateMaterializedView(CreateMaterializedViewPlan {
1359 materialized_view, ..
1360 }) => {
1361 let collections = extra_versions
1362 .iter()
1363 .map(|(version, gid)| (*version, *gid))
1364 .chain([(RelationVersion::root(), global_id)].into_iter())
1365 .collect();
1366
1367 let system_vars = session_catalog.system_vars();
1369 let overrides = self
1370 .get_cluster(materialized_view.cluster_id)
1371 .config
1372 .features();
1373 let optimizer_config =
1374 optimize::OptimizerConfig::from(system_vars).override_from(&overrides);
1375 let previous_exprs = previous_item.map(|item| match item {
1376 CatalogItem::MaterializedView(materialized_view) => (
1377 materialized_view.raw_expr,
1378 materialized_view.locally_optimized_expr,
1379 ),
1380 item => unreachable!("expected materialized view, found: {item:#?}"),
1381 });
1382
1383 let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) {
1384 (Some(local_expr), _)
1385 if local_expr.optimizer_features == optimizer_config.features =>
1386 {
1387 debug!("local expression cache hit for {global_id:?}");
1388 (
1389 Arc::new(materialized_view.expr),
1390 Arc::new(local_expr.local_mir),
1391 )
1392 }
1393 (_, Some((raw_expr, optimized_expr)))
1395 if *raw_expr == materialized_view.expr =>
1396 {
1397 (Arc::clone(&raw_expr), Arc::clone(&optimized_expr))
1398 }
1399 (cached_expr, _) => {
1400 let optimizer_features = optimizer_config.features.clone();
1401 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1403
1404 let raw_expr = materialized_view.expr;
1405 let optimized_expr = match optimizer.optimize(raw_expr.clone()) {
1406 Ok(optimized_expr) => optimized_expr,
1407 Err(err) => return Err((err.into(), cached_expr)),
1408 };
1409
1410 uncached_expr = Some((optimized_expr.clone(), optimizer_features));
1411
1412 (Arc::new(raw_expr), Arc::new(optimized_expr))
1413 }
1414 };
1415 let mut typ = infer_sql_type_for_catalog(&raw_expr, &optimized_expr);
1416
1417 for &i in &materialized_view.non_null_assertions {
1418 typ.column_types[i].nullable = false;
1419 }
1420 let desc = RelationDesc::new(typ, materialized_view.column_names);
1421 let desc = VersionedRelationDesc::new(desc);
1422
1423 let initial_as_of = materialized_view.as_of.map(Antichain::from_elem);
1424
1425 let dependencies = raw_expr
1427 .depends_on()
1428 .into_iter()
1429 .map(|gid| self.get_entry_by_global_id(&gid).id())
1430 .collect();
1431
1432 CatalogItem::MaterializedView(MaterializedView {
1433 create_sql: materialized_view.create_sql,
1434 collections,
1435 raw_expr,
1436 locally_optimized_expr: optimized_expr,
1437 desc,
1438 resolved_ids,
1439 dependencies,
1440 replacement_target: materialized_view.replacement_target,
1441 cluster_id: materialized_view.cluster_id,
1442 target_replica: materialized_view.target_replica,
1443 non_null_assertions: materialized_view.non_null_assertions,
1444 custom_logical_compaction_window: materialized_view.compaction_window,
1445 refresh_schedule: materialized_view.refresh_schedule,
1446 initial_as_of,
1447 optimized_plan: None,
1448 physical_plan: None,
1449 dataflow_metainfo: None,
1450 })
1451 }
1452 Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {
1453 create_sql: index.create_sql,
1454 global_id,
1455 on: index.on,
1456 keys: index.keys.into(),
1457 conn_id: None,
1458 resolved_ids,
1459 cluster_id: index.cluster_id,
1460 custom_logical_compaction_window: custom_logical_compaction_window
1461 .or(index.compaction_window),
1462 is_retained_metrics_object,
1463 optimized_plan: None,
1464 physical_plan: None,
1465 dataflow_metainfo: None,
1466 }),
1467 Plan::CreateSink(CreateSinkPlan {
1468 sink,
1469 with_snapshot,
1470 in_cluster,
1471 ..
1472 }) => CatalogItem::Sink(Sink {
1473 create_sql: sink.create_sql,
1474 global_id,
1475 from: sink.from,
1476 connection: sink.connection,
1477 envelope: sink.envelope,
1478 version: sink.version,
1479 with_snapshot,
1480 resolved_ids,
1481 cluster_id: in_cluster,
1482 commit_interval: sink.commit_interval,
1483 }),
1484 Plan::CreateType(CreateTypePlan { typ, .. }) => {
1485 if let Err(err) = typ.inner.desc(&session_catalog) {
1489 return Err((err.into(), cached_expr));
1490 }
1491 CatalogItem::Type(Type {
1492 create_sql: Some(typ.create_sql),
1493 global_id,
1494 details: CatalogTypeDetails {
1495 array_id: None,
1496 typ: typ.inner,
1497 pg_metadata: None,
1498 },
1499 resolved_ids,
1500 })
1501 }
1502 Plan::CreateSecret(CreateSecretPlan { secret, .. }) => CatalogItem::Secret(Secret {
1503 create_sql: secret.create_sql,
1504 global_id,
1505 }),
1506 Plan::CreateConnection(CreateConnectionPlan {
1507 connection:
1508 mz_sql::plan::Connection {
1509 create_sql,
1510 details,
1511 },
1512 ..
1513 }) => CatalogItem::Connection(Connection {
1514 create_sql,
1515 global_id,
1516 details,
1517 resolved_ids,
1518 }),
1519 _ => {
1520 return Err((
1521 Error::new(ErrorKind::Corruption {
1522 detail: "catalog entry generated inappropriate plan".to_string(),
1523 })
1524 .into(),
1525 cached_expr,
1526 ));
1527 }
1528 };
1529
1530 if let Some((prev_optimized, prev_physical, prev_metainfo)) = previous_plans {
1534 if let Some((optimized_plan, physical_plan, dataflow_metainfo)) = item.plan_fields_mut()
1535 {
1536 *optimized_plan = prev_optimized;
1537 *physical_plan = prev_physical;
1538 *dataflow_metainfo = prev_metainfo;
1539 }
1540 }
1541
1542 Ok((item, uncached_expr))
1543 }
1544
1545 pub fn with_enable_for_item_parsing<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
1551 let restore = Arc::clone(&self.system_configuration);
1562 Arc::make_mut(&mut self.system_configuration).enable_for_item_parsing();
1563 let res = f(self);
1564 self.system_configuration = restore;
1565 res
1566 }
1567
1568 pub fn get_indexes_on(
1570 &self,
1571 id: GlobalId,
1572 cluster: ClusterId,
1573 ) -> impl Iterator<Item = (GlobalId, &Index)> {
1574 let index_matches = move |idx: &Index| idx.on == id && idx.cluster_id == cluster;
1575
1576 self.try_get_entry_by_global_id(&id)
1577 .into_iter()
1578 .map(move |e| {
1579 e.used_by()
1580 .iter()
1581 .filter_map(move |uses_id| match self.get_entry(uses_id).item() {
1582 CatalogItem::Index(index) if index_matches(index) => {
1583 Some((index.global_id(), index))
1584 }
1585 _ => None,
1586 })
1587 })
1588 .flatten()
1589 }
1590
1591 pub(super) fn get_database(&self, database_id: &DatabaseId) -> &Database {
1592 &self.database_by_id[database_id]
1593 }
1594
1595 pub(super) fn try_get_cluster_replica(
1600 &self,
1601 id: ClusterId,
1602 replica_id: ReplicaId,
1603 ) -> Option<&ClusterReplica> {
1604 self.try_get_cluster(id)
1605 .and_then(|cluster| cluster.replica(replica_id))
1606 }
1607
1608 pub(crate) fn get_cluster_replica(
1612 &self,
1613 cluster_id: ClusterId,
1614 replica_id: ReplicaId,
1615 ) -> &ClusterReplica {
1616 self.try_get_cluster_replica(cluster_id, replica_id)
1617 .unwrap_or_else(|| panic!("unknown cluster replica: {cluster_id}.{replica_id}"))
1618 }
1619
1620 pub(super) fn resolve_replica_in_cluster(
1621 &self,
1622 cluster_id: &ClusterId,
1623 replica_name: &str,
1624 ) -> Result<&ClusterReplica, SqlCatalogError> {
1625 let cluster = self.get_cluster(*cluster_id);
1626 let replica_id = cluster
1627 .replica_id_by_name_
1628 .get(replica_name)
1629 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
1630 Ok(&cluster.replicas_by_id_[replica_id])
1631 }
1632
1633 pub fn get_system_configuration(&self, name: &str) -> Result<&dyn Var, Error> {
1635 Ok(self.system_configuration.get(name)?)
1636 }
1637
1638 pub(super) fn parse_system_configuration(
1642 &self,
1643 name: &str,
1644 value: VarInput,
1645 ) -> Result<String, Error> {
1646 let value = self.system_configuration.parse(name, value)?;
1647 Ok(value.format())
1648 }
1649
1650 pub(super) fn resolve_schema_in_database(
1652 &self,
1653 database_spec: &ResolvedDatabaseSpecifier,
1654 schema_name: &str,
1655 conn_id: &ConnectionId,
1656 ) -> Result<&Schema, SqlCatalogError> {
1657 let schema = match database_spec {
1658 ResolvedDatabaseSpecifier::Ambient if schema_name == MZ_TEMP_SCHEMA => {
1659 self.temporary_schemas.get(conn_id)
1660 }
1661 ResolvedDatabaseSpecifier::Ambient => self
1662 .ambient_schemas_by_name
1663 .get(schema_name)
1664 .and_then(|id| self.ambient_schemas_by_id.get(id)),
1665 ResolvedDatabaseSpecifier::Id(id) => self.database_by_id.get(id).and_then(|db| {
1666 db.schemas_by_name
1667 .get(schema_name)
1668 .and_then(|id| db.schemas_by_id.get(id))
1669 }),
1670 };
1671 schema.ok_or_else(|| SqlCatalogError::UnknownSchema(schema_name.into()))
1672 }
1673
1674 pub fn try_get_schema(
1679 &self,
1680 database_spec: &ResolvedDatabaseSpecifier,
1681 schema_spec: &SchemaSpecifier,
1682 conn_id: &ConnectionId,
1683 ) -> Option<&Schema> {
1684 match (database_spec, schema_spec) {
1686 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => {
1687 self.temporary_schemas.get(conn_id)
1688 }
1689 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(id)) => {
1690 self.ambient_schemas_by_id.get(id)
1691 }
1692 (ResolvedDatabaseSpecifier::Id(database_id), SchemaSpecifier::Id(schema_id)) => self
1693 .database_by_id
1694 .get(database_id)
1695 .and_then(|db| db.schemas_by_id.get(schema_id)),
1696 (ResolvedDatabaseSpecifier::Id(_), SchemaSpecifier::Temporary) => {
1697 unreachable!("temporary schemas are in the ambient database")
1698 }
1699 }
1700 }
1701
1702 pub fn get_schema(
1703 &self,
1704 database_spec: &ResolvedDatabaseSpecifier,
1705 schema_spec: &SchemaSpecifier,
1706 conn_id: &ConnectionId,
1707 ) -> &Schema {
1708 self.try_get_schema(database_spec, schema_spec, conn_id)
1710 .expect("schema must exist")
1711 }
1712
1713 pub(super) fn find_non_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1714 self.database_by_id
1715 .values()
1716 .filter_map(|database| database.schemas_by_id.get(schema_id))
1717 .chain(self.ambient_schemas_by_id.values())
1718 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1719 .into_first()
1720 }
1721
1722 pub(super) fn find_temp_schema(&self, schema_id: &SchemaId) -> &Schema {
1723 self.temporary_schemas
1724 .values()
1725 .filter(|schema| schema.id() == &SchemaSpecifier::from(*schema_id))
1726 .into_first()
1727 }
1728
1729 pub fn get_mz_catalog_schema_id(&self) -> SchemaId {
1730 self.ambient_schemas_by_name[MZ_CATALOG_SCHEMA]
1731 }
1732
1733 pub fn get_mz_catalog_unstable_schema_id(&self) -> SchemaId {
1734 self.ambient_schemas_by_name[MZ_CATALOG_UNSTABLE_SCHEMA]
1735 }
1736
1737 pub fn get_pg_catalog_schema_id(&self) -> SchemaId {
1738 self.ambient_schemas_by_name[PG_CATALOG_SCHEMA]
1739 }
1740
1741 pub fn get_information_schema_id(&self) -> SchemaId {
1742 self.ambient_schemas_by_name[INFORMATION_SCHEMA]
1743 }
1744
1745 pub fn get_mz_internal_schema_id(&self) -> SchemaId {
1746 self.ambient_schemas_by_name[MZ_INTERNAL_SCHEMA]
1747 }
1748
1749 pub fn get_mz_introspection_schema_id(&self) -> SchemaId {
1750 self.ambient_schemas_by_name[MZ_INTROSPECTION_SCHEMA]
1751 }
1752
1753 pub fn get_mz_unsafe_schema_id(&self) -> SchemaId {
1754 self.ambient_schemas_by_name[MZ_UNSAFE_SCHEMA]
1755 }
1756
1757 pub fn system_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1758 SYSTEM_SCHEMAS
1759 .iter()
1760 .map(|name| self.ambient_schemas_by_name[*name])
1761 }
1762
1763 pub fn is_system_schema_id(&self, id: SchemaId) -> bool {
1764 self.system_schema_ids().contains(&id)
1765 }
1766
1767 pub fn is_system_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1768 match spec {
1769 SchemaSpecifier::Temporary => false,
1770 SchemaSpecifier::Id(id) => self.is_system_schema_id(id),
1771 }
1772 }
1773
1774 pub fn unstable_schema_ids(&self) -> impl Iterator<Item = SchemaId> + '_ {
1775 UNSTABLE_SCHEMAS
1776 .iter()
1777 .map(|name| self.ambient_schemas_by_name[*name])
1778 }
1779
1780 pub fn is_unstable_schema_id(&self, id: SchemaId) -> bool {
1781 self.unstable_schema_ids().contains(&id)
1782 }
1783
1784 pub fn is_unstable_schema_specifier(&self, spec: SchemaSpecifier) -> bool {
1785 match spec {
1786 SchemaSpecifier::Temporary => false,
1787 SchemaSpecifier::Id(id) => self.is_unstable_schema_id(id),
1788 }
1789 }
1790
1791 pub fn create_temporary_schema(
1794 &mut self,
1795 conn_id: &ConnectionId,
1796 owner_id: RoleId,
1797 ) -> Result<(), Error> {
1798 let oid = INVALID_OID;
1803 self.temporary_schemas.insert(
1804 conn_id.clone(),
1805 Schema {
1806 name: QualifiedSchemaName {
1807 database: ResolvedDatabaseSpecifier::Ambient,
1808 schema: MZ_TEMP_SCHEMA.into(),
1809 },
1810 id: SchemaSpecifier::Temporary,
1811 oid,
1812 items: BTreeMap::new(),
1813 functions: BTreeMap::new(),
1814 types: BTreeMap::new(),
1815 owner_id,
1816 privileges: PrivilegeMap::from_mz_acl_items(vec![rbac::owner_privilege(
1817 mz_sql::catalog::ObjectType::Schema,
1818 owner_id,
1819 )]),
1820 },
1821 );
1822 Ok(())
1823 }
1824
1825 pub(crate) fn get_temporary_oids(&self) -> impl Iterator<Item = u32> + '_ {
1827 std::iter::empty()
1828 .chain(self.ambient_schemas_by_id.values().filter_map(|schema| {
1829 if schema.id.is_temporary() {
1830 Some(schema.oid)
1831 } else {
1832 None
1833 }
1834 }))
1835 .chain(self.entry_by_id.values().filter_map(|entry| {
1836 if entry.item().is_temporary() {
1837 Some(entry.oid)
1838 } else {
1839 None
1840 }
1841 }))
1842 }
1843
1844 pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> CatalogItemId {
1848 self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
1849 }
1850
1851 pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> (CatalogItemId, GlobalId) {
1855 let item_id = self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin));
1856 let log = match self.get_entry(&item_id).item() {
1857 CatalogItem::Log(log) => log,
1858 other => unreachable!("programming error, expected BuiltinLog, found {other:?}"),
1859 };
1860 (item_id, log.global_id)
1861 }
1862
1863 pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> CatalogItemId {
1867 self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
1868 }
1869
1870 pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> CatalogItemId {
1874 let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
1875 let schema = &self.ambient_schemas_by_id[schema_id];
1876 match builtin.catalog_item_type() {
1877 CatalogItemType::Type => schema.types[builtin.name()],
1878 CatalogItemType::Func => schema.functions[builtin.name()],
1879 CatalogItemType::Table
1880 | CatalogItemType::Source
1881 | CatalogItemType::Sink
1882 | CatalogItemType::View
1883 | CatalogItemType::MaterializedView
1884 | CatalogItemType::Index
1885 | CatalogItemType::Secret
1886 | CatalogItemType::Connection => schema.items[builtin.name()],
1887 }
1888 }
1889
1890 pub fn resolve_builtin_type_references(
1892 &self,
1893 builtin: &BuiltinType<NameReference>,
1894 ) -> BuiltinType<IdReference> {
1895 let typ: CatalogType<IdReference> = match &builtin.details.typ {
1896 CatalogType::AclItem => CatalogType::AclItem,
1897 CatalogType::Array { element_reference } => CatalogType::Array {
1898 element_reference: self.get_system_type(element_reference).id,
1899 },
1900 CatalogType::List {
1901 element_reference,
1902 element_modifiers,
1903 } => CatalogType::List {
1904 element_reference: self.get_system_type(element_reference).id,
1905 element_modifiers: element_modifiers.clone(),
1906 },
1907 CatalogType::Map {
1908 key_reference,
1909 value_reference,
1910 key_modifiers,
1911 value_modifiers,
1912 } => CatalogType::Map {
1913 key_reference: self.get_system_type(key_reference).id,
1914 value_reference: self.get_system_type(value_reference).id,
1915 key_modifiers: key_modifiers.clone(),
1916 value_modifiers: value_modifiers.clone(),
1917 },
1918 CatalogType::Range { element_reference } => CatalogType::Range {
1919 element_reference: self.get_system_type(element_reference).id,
1920 },
1921 CatalogType::Record { fields } => CatalogType::Record {
1922 fields: fields
1923 .into_iter()
1924 .map(|f| CatalogRecordField {
1925 name: f.name.clone(),
1926 type_reference: self.get_system_type(f.type_reference).id,
1927 type_modifiers: f.type_modifiers.clone(),
1928 })
1929 .collect(),
1930 },
1931 CatalogType::Bool => CatalogType::Bool,
1932 CatalogType::Bytes => CatalogType::Bytes,
1933 CatalogType::Char => CatalogType::Char,
1934 CatalogType::Date => CatalogType::Date,
1935 CatalogType::Float32 => CatalogType::Float32,
1936 CatalogType::Float64 => CatalogType::Float64,
1937 CatalogType::Int16 => CatalogType::Int16,
1938 CatalogType::Int32 => CatalogType::Int32,
1939 CatalogType::Int64 => CatalogType::Int64,
1940 CatalogType::UInt16 => CatalogType::UInt16,
1941 CatalogType::UInt32 => CatalogType::UInt32,
1942 CatalogType::UInt64 => CatalogType::UInt64,
1943 CatalogType::MzTimestamp => CatalogType::MzTimestamp,
1944 CatalogType::Interval => CatalogType::Interval,
1945 CatalogType::Jsonb => CatalogType::Jsonb,
1946 CatalogType::Numeric => CatalogType::Numeric,
1947 CatalogType::Oid => CatalogType::Oid,
1948 CatalogType::PgLegacyChar => CatalogType::PgLegacyChar,
1949 CatalogType::PgLegacyName => CatalogType::PgLegacyName,
1950 CatalogType::Pseudo => CatalogType::Pseudo,
1951 CatalogType::RegClass => CatalogType::RegClass,
1952 CatalogType::RegProc => CatalogType::RegProc,
1953 CatalogType::RegType => CatalogType::RegType,
1954 CatalogType::String => CatalogType::String,
1955 CatalogType::Time => CatalogType::Time,
1956 CatalogType::Timestamp => CatalogType::Timestamp,
1957 CatalogType::TimestampTz => CatalogType::TimestampTz,
1958 CatalogType::Uuid => CatalogType::Uuid,
1959 CatalogType::VarChar => CatalogType::VarChar,
1960 CatalogType::Int2Vector => CatalogType::Int2Vector,
1961 CatalogType::MzAclItem => CatalogType::MzAclItem,
1962 };
1963
1964 BuiltinType {
1965 name: builtin.name,
1966 schema: builtin.schema,
1967 oid: builtin.oid,
1968 details: CatalogTypeDetails {
1969 array_id: builtin.details.array_id,
1970 typ,
1971 pg_metadata: builtin.details.pg_metadata.clone(),
1972 },
1973 }
1974 }
1975
1976 pub fn config(&self) -> &mz_sql::catalog::CatalogConfig {
1977 &self.config
1978 }
1979
1980 pub fn resolve_database(&self, database_name: &str) -> Result<&Database, SqlCatalogError> {
1981 match self.database_by_name.get(database_name) {
1982 Some(id) => Ok(&self.database_by_id[id]),
1983 None => Err(SqlCatalogError::UnknownDatabase(database_name.into())),
1984 }
1985 }
1986
1987 pub fn resolve_schema(
1988 &self,
1989 current_database: Option<&DatabaseId>,
1990 database_name: Option<&str>,
1991 schema_name: &str,
1992 conn_id: &ConnectionId,
1993 ) -> Result<&Schema, SqlCatalogError> {
1994 let database_spec = match database_name {
1995 Some(database) => Some(ResolvedDatabaseSpecifier::Id(
2000 self.resolve_database(database)?.id().clone(),
2001 )),
2002 None => current_database.map(|id| ResolvedDatabaseSpecifier::Id(id.clone())),
2003 };
2004
2005 if let Some(database_spec) = database_spec {
2007 if let Ok(schema) =
2008 self.resolve_schema_in_database(&database_spec, schema_name, conn_id)
2009 {
2010 return Ok(schema);
2011 }
2012 }
2013
2014 if let Ok(schema) = self.resolve_schema_in_database(
2016 &ResolvedDatabaseSpecifier::Ambient,
2017 schema_name,
2018 conn_id,
2019 ) {
2020 return Ok(schema);
2021 }
2022
2023 Err(SqlCatalogError::UnknownSchema(schema_name.into()))
2024 }
2025
2026 pub fn resolve_system_schema(&self, name: &'static str) -> SchemaId {
2030 self.ambient_schemas_by_name[name]
2031 }
2032
2033 pub fn resolve_search_path(
2034 &self,
2035 session: &dyn SessionMetadata,
2036 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2037 let database = self
2038 .database_by_name
2039 .get(session.database())
2040 .map(|id| id.clone());
2041
2042 session
2043 .search_path()
2044 .iter()
2045 .map(|schema| {
2046 self.resolve_schema(database.as_ref(), None, schema.as_str(), session.conn_id())
2047 })
2048 .filter_map(|schema| schema.ok())
2049 .map(|schema| (schema.name().database.clone(), schema.id().clone()))
2050 .collect()
2051 }
2052
2053 pub fn effective_search_path(
2054 &self,
2055 search_path: &[(ResolvedDatabaseSpecifier, SchemaSpecifier)],
2056 include_temp_schema: bool,
2057 ) -> Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)> {
2058 let mut v = Vec::with_capacity(search_path.len() + 3);
2059 let temp_schema = (
2061 ResolvedDatabaseSpecifier::Ambient,
2062 SchemaSpecifier::Temporary,
2063 );
2064 if include_temp_schema && !search_path.contains(&temp_schema) {
2065 v.push(temp_schema);
2066 }
2067 let default_schemas = [
2068 (
2069 ResolvedDatabaseSpecifier::Ambient,
2070 SchemaSpecifier::Id(self.get_mz_catalog_schema_id()),
2071 ),
2072 (
2073 ResolvedDatabaseSpecifier::Ambient,
2074 SchemaSpecifier::Id(self.get_pg_catalog_schema_id()),
2075 ),
2076 ];
2077 for schema in default_schemas.into_iter() {
2078 if !search_path.contains(&schema) {
2079 v.push(schema);
2080 }
2081 }
2082 v.extend_from_slice(search_path);
2083 v
2084 }
2085
2086 pub fn resolve_cluster(&self, name: &str) -> Result<&Cluster, SqlCatalogError> {
2087 let id = self
2088 .clusters_by_name
2089 .get(name)
2090 .ok_or_else(|| SqlCatalogError::UnknownCluster(name.to_string()))?;
2091 Ok(&self.clusters_by_id[id])
2092 }
2093
2094 pub fn resolve_builtin_cluster(&self, cluster: &BuiltinCluster) -> &Cluster {
2095 let id = self
2096 .clusters_by_name
2097 .get(cluster.name)
2098 .expect("failed to lookup BuiltinCluster by name");
2099 self.clusters_by_id
2100 .get(id)
2101 .expect("failed to lookup BuiltinCluster by ID")
2102 }
2103
2104 pub fn resolve_cluster_replica(
2105 &self,
2106 cluster_replica_name: &QualifiedReplica,
2107 ) -> Result<&ClusterReplica, SqlCatalogError> {
2108 let cluster = self.resolve_cluster(cluster_replica_name.cluster.as_str())?;
2109 let replica_name = cluster_replica_name.replica.as_str();
2110 let replica_id = cluster
2111 .replica_id(replica_name)
2112 .ok_or_else(|| SqlCatalogError::UnknownClusterReplica(replica_name.to_string()))?;
2113 Ok(cluster.replica(replica_id).expect("Must exist"))
2114 }
2115
2116 #[allow(clippy::useless_let_if_seq)]
2122 pub fn resolve(
2123 &self,
2124 get_schema_entries: fn(&Schema) -> &BTreeMap<String, CatalogItemId>,
2125 current_database: Option<&DatabaseId>,
2126 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2127 name: &PartialItemName,
2128 conn_id: &ConnectionId,
2129 err_gen: fn(String) -> SqlCatalogError,
2130 ) -> Result<&CatalogEntry, SqlCatalogError> {
2131 let schemas = match &name.schema {
2136 Some(schema_name) => {
2137 match self.resolve_schema(
2138 current_database,
2139 name.database.as_deref(),
2140 schema_name,
2141 conn_id,
2142 ) {
2143 Ok(schema) => vec![(schema.name.database.clone(), schema.id.clone())],
2144 Err(e) => return Err(e),
2145 }
2146 }
2147 None => match self
2148 .try_get_schema(
2149 &ResolvedDatabaseSpecifier::Ambient,
2150 &SchemaSpecifier::Temporary,
2151 conn_id,
2152 )
2153 .and_then(|schema| schema.items.get(&name.item))
2154 {
2155 Some(id) => return Ok(self.get_entry(id)),
2156 None => search_path.to_vec(),
2157 },
2158 };
2159
2160 for (database_spec, schema_spec) in &schemas {
2161 let Some(schema) = self.try_get_schema(database_spec, schema_spec, conn_id) else {
2164 continue;
2165 };
2166
2167 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2168 return Ok(&self.entry_by_id[id]);
2169 }
2170 }
2171
2172 let mz_internal_schema = SchemaSpecifier::Id(self.get_mz_internal_schema_id());
2177 if schemas.iter().any(|(_, spec)| *spec == mz_internal_schema) {
2178 for schema_id in [
2179 self.get_mz_catalog_unstable_schema_id(),
2180 self.get_mz_introspection_schema_id(),
2181 ] {
2182 let schema = self.get_schema(
2183 &ResolvedDatabaseSpecifier::Ambient,
2184 &SchemaSpecifier::Id(schema_id),
2185 conn_id,
2186 );
2187
2188 if let Some(id) = get_schema_entries(schema).get(&name.item) {
2189 debug!(
2190 github_27831 = true,
2191 "encountered use of outdated schema `mz_internal` for relation: {name}",
2192 );
2193 return Ok(&self.entry_by_id[id]);
2194 }
2195 }
2196 }
2197
2198 Err(err_gen(name.to_string()))
2199 }
2200
2201 pub fn resolve_entry(
2203 &self,
2204 current_database: Option<&DatabaseId>,
2205 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2206 name: &PartialItemName,
2207 conn_id: &ConnectionId,
2208 ) -> Result<&CatalogEntry, SqlCatalogError> {
2209 self.resolve(
2210 |schema| &schema.items,
2211 current_database,
2212 search_path,
2213 name,
2214 conn_id,
2215 SqlCatalogError::UnknownItem,
2216 )
2217 }
2218
2219 pub fn resolve_function(
2221 &self,
2222 current_database: Option<&DatabaseId>,
2223 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2224 name: &PartialItemName,
2225 conn_id: &ConnectionId,
2226 ) -> Result<&CatalogEntry, SqlCatalogError> {
2227 self.resolve(
2228 |schema| &schema.functions,
2229 current_database,
2230 search_path,
2231 name,
2232 conn_id,
2233 |name| SqlCatalogError::UnknownFunction {
2234 name,
2235 alternative: None,
2236 },
2237 )
2238 }
2239
2240 pub fn resolve_type(
2242 &self,
2243 current_database: Option<&DatabaseId>,
2244 search_path: &Vec<(ResolvedDatabaseSpecifier, SchemaSpecifier)>,
2245 name: &PartialItemName,
2246 conn_id: &ConnectionId,
2247 ) -> Result<&CatalogEntry, SqlCatalogError> {
2248 static NON_PG_CATALOG_TYPES: LazyLock<
2249 BTreeMap<&'static str, &'static BuiltinType<NameReference>>,
2250 > = LazyLock::new(|| {
2251 BUILTINS::types()
2252 .filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
2253 .map(|typ| (typ.name, typ))
2254 .collect()
2255 });
2256
2257 let entry = self.resolve(
2258 |schema| &schema.types,
2259 current_database,
2260 search_path,
2261 name,
2262 conn_id,
2263 |name| SqlCatalogError::UnknownType { name },
2264 )?;
2265
2266 if conn_id != &SYSTEM_CONN_ID && name.schema.as_deref() == Some(PG_CATALOG_SCHEMA) {
2267 if let Some(typ) = NON_PG_CATALOG_TYPES.get(entry.name().item.as_str()) {
2268 warn!(
2269 "user specified an incorrect schema of {} for the type {}, which should be in \
2270 the {} schema. This works now due to a bug but will be fixed in a later release.",
2271 PG_CATALOG_SCHEMA.quoted(),
2272 typ.name.quoted(),
2273 typ.schema.quoted(),
2274 )
2275 }
2276 }
2277
2278 Ok(entry)
2279 }
2280
2281 pub(super) fn get_comment_id(&self, object_id: ObjectId) -> CommentObjectId {
2283 match object_id {
2284 ObjectId::Item(item_id) => self.get_entry(&item_id).comment_object_id(),
2285 ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
2286 ObjectId::Database(database_id) => CommentObjectId::Database(database_id),
2287 ObjectId::Schema((database, schema)) => CommentObjectId::Schema((database, schema)),
2288 ObjectId::Cluster(cluster_id) => CommentObjectId::Cluster(cluster_id),
2289 ObjectId::ClusterReplica(cluster_replica_id) => {
2290 CommentObjectId::ClusterReplica(cluster_replica_id)
2291 }
2292 ObjectId::NetworkPolicy(network_policy_id) => {
2293 CommentObjectId::NetworkPolicy(network_policy_id)
2294 }
2295 }
2296 }
2297
2298 pub fn system_config(&self) -> &SystemVars {
2300 &self.system_configuration
2301 }
2302
2303 pub fn system_config_mut(&mut self) -> &mut SystemVars {
2305 Arc::make_mut(&mut self.system_configuration)
2306 }
2307
2308 pub fn dump(&self, unfinalized_shards: Option<BTreeSet<String>>) -> Result<String, Error> {
2318 let mut dump = serde_json::to_value(&self).map_err(|e| {
2320 Error::new(ErrorKind::Unstructured(format!(
2321 "internal error: could not dump catalog: {}",
2324 e
2325 )))
2326 })?;
2327
2328 let dump_obj = dump.as_object_mut().expect("state must have been dumped");
2329 dump_obj.insert(
2331 "system_parameter_defaults".into(),
2332 serde_json::json!(self.system_config().defaults()),
2333 );
2334 if let Some(unfinalized_shards) = unfinalized_shards {
2336 dump_obj
2337 .get_mut("storage_metadata")
2338 .expect("known to exist")
2339 .as_object_mut()
2340 .expect("storage_metadata is an object")
2341 .insert(
2342 "unfinalized_shards".into(),
2343 serde_json::json!(unfinalized_shards),
2344 );
2345 }
2346 let temporary_gids: Vec<_> = self
2351 .entry_by_global_id
2352 .iter()
2353 .filter(|(_gid, item_id)| self.get_entry(item_id).conn_id().is_some())
2354 .map(|(gid, _item_id)| *gid)
2355 .collect();
2356 if !temporary_gids.is_empty() {
2357 let gids = dump_obj
2358 .get_mut("entry_by_global_id")
2359 .expect("known_to_exist")
2360 .as_object_mut()
2361 .expect("entry_by_global_id is an object");
2362 for gid in temporary_gids {
2363 gids.remove(&gid.to_string());
2364 }
2365 }
2366 dump_obj.remove("role_auth_by_id");
2369
2370 Ok(serde_json::to_string_pretty(&dump).expect("cannot fail on serde_json::Value"))
2372 }
2373
2374 pub fn availability_zones(&self) -> &[String] {
2375 &self.availability_zones
2376 }
2377
2378 pub fn concretize_replica_location(
2379 &self,
2380 location: mz_catalog::durable::ReplicaLocation,
2381 allowed_sizes: &Vec<String>,
2382 allowed_availability_zones: Option<&[String]>,
2383 ) -> Result<ReplicaLocation, Error> {
2384 let location = match location {
2385 mz_catalog::durable::ReplicaLocation::Unmanaged {
2386 storagectl_addrs,
2387 computectl_addrs,
2388 } => {
2389 if allowed_availability_zones.is_some() {
2390 return Err(Error {
2391 kind: ErrorKind::Internal(
2392 "tried concretize unmanaged replica with specific availability_zones"
2393 .to_string(),
2394 ),
2395 });
2396 }
2397 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
2398 storagectl_addrs,
2399 computectl_addrs,
2400 })
2401 }
2402 mz_catalog::durable::ReplicaLocation::Managed {
2403 size,
2404 availability_zone,
2405 billed_as,
2406 internal,
2407 pending,
2408 } => {
2409 if allowed_availability_zones.is_some() && availability_zone.is_some() {
2410 let message = "tried concretize managed replica with specific availability zones and availability zone";
2411 return Err(Error {
2412 kind: ErrorKind::Internal(message.to_string()),
2413 });
2414 }
2415 self.ensure_valid_replica_size(allowed_sizes, &size)?;
2416 let cluster_replica_sizes = &self.cluster_replica_sizes;
2417
2418 ReplicaLocation::Managed(ManagedReplicaLocation {
2419 allocation: cluster_replica_sizes
2420 .0
2421 .get(&size)
2422 .expect("catalog out of sync")
2423 .clone(),
2424 availability_zones: match (availability_zone, allowed_availability_zones) {
2425 (Some(az), _) => ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
2426 (None, Some([])) => ManagedReplicaAvailabilityZones::FromCluster(None),
2427 (None, Some(azs)) => {
2428 ManagedReplicaAvailabilityZones::FromCluster(Some(azs.to_vec()))
2429 }
2430 (None, None) => ManagedReplicaAvailabilityZones::FromReplica(None),
2431 },
2432 size,
2433 billed_as,
2434 internal,
2435 pending,
2436 })
2437 }
2438 };
2439 Ok(location)
2440 }
2441
2442 pub(crate) fn cluster_replica_size_has_disk(&self, size: &str) -> bool {
2453 let alloc = &self.cluster_replica_sizes.0[size];
2454 !alloc.swap_enabled && alloc.disk_limit != Some(DiskLimit::ZERO)
2455 }
2456
2457 pub(crate) fn ensure_valid_replica_size(
2458 &self,
2459 allowed_sizes: &[String],
2460 size: &String,
2461 ) -> Result<(), Error> {
2462 let cluster_replica_sizes = &self.cluster_replica_sizes;
2463
2464 if !cluster_replica_sizes.0.contains_key(size)
2465 || (!allowed_sizes.is_empty() && !allowed_sizes.contains(size))
2466 || cluster_replica_sizes.0[size].disabled
2467 {
2468 let mut entries = cluster_replica_sizes
2469 .enabled_allocations()
2470 .collect::<Vec<_>>();
2471
2472 if !allowed_sizes.is_empty() {
2473 let allowed_sizes = BTreeSet::<&String>::from_iter(allowed_sizes.iter());
2474 entries.retain(|(name, _)| allowed_sizes.contains(name));
2475 }
2476
2477 entries.sort_by_key(
2478 |(
2479 _name,
2480 ReplicaAllocation {
2481 scale, cpu_limit, ..
2482 },
2483 )| (scale, cpu_limit),
2484 );
2485
2486 Err(Error {
2487 kind: ErrorKind::InvalidClusterReplicaSize {
2488 size: size.to_owned(),
2489 expected: entries.into_iter().map(|(name, _)| name.clone()).collect(),
2490 },
2491 })
2492 } else {
2493 Ok(())
2494 }
2495 }
2496
2497 pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
2498 if role_id.is_builtin() {
2499 let role = self.get_role(role_id);
2500 Err(Error::new(ErrorKind::ReservedRoleName(
2501 role.name().to_string(),
2502 )))
2503 } else {
2504 Ok(())
2505 }
2506 }
2507
2508 pub fn ensure_not_reserved_network_policy(
2509 &self,
2510 network_policy_id: &NetworkPolicyId,
2511 ) -> Result<(), Error> {
2512 if network_policy_id.is_builtin() {
2513 let policy = self.get_network_policy(network_policy_id);
2514 Err(Error::new(ErrorKind::ReservedNetworkPolicyName(
2515 policy.name.clone(),
2516 )))
2517 } else {
2518 Ok(())
2519 }
2520 }
2521
2522 pub fn ensure_grantable_role(&self, role_id: &RoleId) -> Result<(), Error> {
2523 let is_grantable = !role_id.is_public() && !role_id.is_system();
2524 if is_grantable {
2525 Ok(())
2526 } else {
2527 let role = self.get_role(role_id);
2528 Err(Error::new(ErrorKind::UngrantableRoleName(
2529 role.name().to_string(),
2530 )))
2531 }
2532 }
2533
2534 pub fn ensure_not_system_role(&self, role_id: &RoleId) -> Result<(), Error> {
2535 if role_id.is_system() {
2536 let role = self.get_role(role_id);
2537 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2538 role.name().to_string(),
2539 )))
2540 } else {
2541 Ok(())
2542 }
2543 }
2544
2545 pub fn ensure_not_predefined_role(&self, role_id: &RoleId) -> Result<(), Error> {
2546 if role_id.is_predefined() {
2547 let role = self.get_role(role_id);
2548 Err(Error::new(ErrorKind::ReservedSystemRoleName(
2549 role.name().to_string(),
2550 )))
2551 } else {
2552 Ok(())
2553 }
2554 }
2555
2556 pub(crate) fn add_to_audit_log(
2559 system_configuration: &SystemVars,
2560 oracle_write_ts: mz_repr::Timestamp,
2561 session: Option<&ConnMeta>,
2562 tx: &mut mz_catalog::durable::Transaction,
2563 audit_events: &mut Vec<VersionedEvent>,
2564 event_type: EventType,
2565 object_type: ObjectType,
2566 details: EventDetails,
2567 ) -> Result<(), Error> {
2568 let user = session.map(|session| session.user().name.to_string());
2569
2570 let occurred_at = match system_configuration.unsafe_mock_audit_event_timestamp() {
2573 Some(ts) => ts.into(),
2574 _ => oracle_write_ts.into(),
2575 };
2576 let id = tx.allocate_audit_log_id()?;
2577 let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
2578 audit_events.push(event.clone());
2579 tx.insert_audit_log_event(event);
2580 Ok(())
2581 }
2582
2583 pub(super) fn get_owner_id(&self, id: &ObjectId, conn_id: &ConnectionId) -> Option<RoleId> {
2584 match id {
2585 ObjectId::Cluster(id) => Some(self.get_cluster(*id).owner_id()),
2586 ObjectId::ClusterReplica((cluster_id, replica_id)) => Some(
2587 self.get_cluster_replica(*cluster_id, *replica_id)
2588 .owner_id(),
2589 ),
2590 ObjectId::Database(id) => Some(self.get_database(id).owner_id()),
2591 ObjectId::Schema((database_spec, schema_spec)) => Some(
2592 self.get_schema(database_spec, schema_spec, conn_id)
2593 .owner_id(),
2594 ),
2595 ObjectId::Item(id) => Some(*self.get_entry(id).owner_id()),
2596 ObjectId::Role(_) => None,
2597 ObjectId::NetworkPolicy(id) => Some(self.get_network_policy(id).owner_id.clone()),
2598 }
2599 }
2600
2601 pub(super) fn get_object_type(&self, object_id: &ObjectId) -> mz_sql::catalog::ObjectType {
2602 match object_id {
2603 ObjectId::Cluster(_) => mz_sql::catalog::ObjectType::Cluster,
2604 ObjectId::ClusterReplica(_) => mz_sql::catalog::ObjectType::ClusterReplica,
2605 ObjectId::Database(_) => mz_sql::catalog::ObjectType::Database,
2606 ObjectId::Schema(_) => mz_sql::catalog::ObjectType::Schema,
2607 ObjectId::Role(_) => mz_sql::catalog::ObjectType::Role,
2608 ObjectId::Item(id) => self.get_entry(id).item_type().into(),
2609 ObjectId::NetworkPolicy(_) => mz_sql::catalog::ObjectType::NetworkPolicy,
2610 }
2611 }
2612
2613 pub(super) fn get_system_object_type(
2614 &self,
2615 id: &SystemObjectId,
2616 ) -> mz_sql::catalog::SystemObjectType {
2617 match id {
2618 SystemObjectId::Object(object_id) => {
2619 SystemObjectType::Object(self.get_object_type(object_id))
2620 }
2621 SystemObjectId::System => SystemObjectType::System,
2622 }
2623 }
2624
2625 pub fn storage_metadata(&self) -> &StorageMetadata {
2629 &self.storage_metadata
2630 }
2631
2632 pub fn source_compaction_windows(
2634 &self,
2635 ids: impl IntoIterator<Item = CatalogItemId>,
2636 ) -> BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> {
2637 let mut cws: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>> = BTreeMap::new();
2638 let mut seen = BTreeSet::new();
2639 for item_id in ids {
2640 if !seen.insert(item_id) {
2641 continue;
2642 }
2643 let entry = self.get_entry(&item_id);
2644 match entry.item() {
2645 CatalogItem::Source(source) => {
2646 let source_cw = source.custom_logical_compaction_window.unwrap_or_default();
2647 cws.entry(source_cw).or_default().insert(item_id);
2648 }
2649 CatalogItem::Table(table) => {
2650 let table_cw = table.custom_logical_compaction_window.unwrap_or_default();
2651 match &table.data_source {
2652 TableDataSource::DataSource {
2653 desc:
2654 DataSourceDesc::IngestionExport { .. }
2655 | DataSourceDesc::Webhook { .. },
2657 timeline: _,
2658 } => {
2659 cws.entry(table_cw).or_default().insert(item_id);
2660 }
2661 TableDataSource::TableWrites { .. } => {}
2664 TableDataSource::DataSource {
2665 desc:
2666 DataSourceDesc::Ingestion { .. }
2667 | DataSourceDesc::OldSyntaxIngestion { .. }
2668 | DataSourceDesc::Introspection(_)
2669 | DataSourceDesc::Progress
2670 | DataSourceDesc::Catalog,
2671 ..
2672 } => {
2673 unreachable!(
2674 "unexpected DataSourceDesc for table {item_id}: {:?}",
2675 table.data_source
2676 )
2677 }
2678 }
2679 }
2680 _ => {
2681 continue;
2683 }
2684 }
2685 }
2686 cws
2687 }
2688
2689 pub fn comment_id_to_item_id(id: &CommentObjectId) -> Option<CatalogItemId> {
2690 match id {
2691 CommentObjectId::Table(id)
2692 | CommentObjectId::View(id)
2693 | CommentObjectId::MaterializedView(id)
2694 | CommentObjectId::Source(id)
2695 | CommentObjectId::Sink(id)
2696 | CommentObjectId::Index(id)
2697 | CommentObjectId::Func(id)
2698 | CommentObjectId::Connection(id)
2699 | CommentObjectId::Type(id)
2700 | CommentObjectId::Secret(id) => Some(*id),
2701 CommentObjectId::Role(_)
2702 | CommentObjectId::Database(_)
2703 | CommentObjectId::Schema(_)
2704 | CommentObjectId::Cluster(_)
2705 | CommentObjectId::ClusterReplica(_)
2706 | CommentObjectId::NetworkPolicy(_) => None,
2707 }
2708 }
2709
2710 pub fn get_comment_id_entry(&self, id: &CommentObjectId) -> Option<&CatalogEntry> {
2711 Self::comment_id_to_item_id(id).map(|id| self.get_entry(&id))
2712 }
2713
2714 pub fn comment_id_to_audit_log_name(
2715 &self,
2716 id: CommentObjectId,
2717 conn_id: &ConnectionId,
2718 ) -> String {
2719 match id {
2720 CommentObjectId::Table(id)
2721 | CommentObjectId::View(id)
2722 | CommentObjectId::MaterializedView(id)
2723 | CommentObjectId::Source(id)
2724 | CommentObjectId::Sink(id)
2725 | CommentObjectId::Index(id)
2726 | CommentObjectId::Func(id)
2727 | CommentObjectId::Connection(id)
2728 | CommentObjectId::Type(id)
2729 | CommentObjectId::Secret(id) => {
2730 let item = self.get_entry(&id);
2731 let name = self.resolve_full_name(item.name(), Some(conn_id));
2732 name.to_string()
2733 }
2734 CommentObjectId::Role(id) => self.get_role(&id).name.clone(),
2735 CommentObjectId::Database(id) => self.get_database(&id).name.clone(),
2736 CommentObjectId::Schema((spec, schema_id)) => {
2737 let schema = self.get_schema(&spec, &schema_id, conn_id);
2738 self.resolve_full_schema_name(&schema.name).to_string()
2739 }
2740 CommentObjectId::Cluster(id) => self.get_cluster(id).name.clone(),
2741 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
2742 let cluster = self.get_cluster(cluster_id);
2743 let replica = self.get_cluster_replica(cluster_id, replica_id);
2744 QualifiedReplica {
2745 cluster: Ident::new_unchecked(cluster.name.clone()),
2746 replica: Ident::new_unchecked(replica.name.clone()),
2747 }
2748 .to_string()
2749 }
2750 CommentObjectId::NetworkPolicy(id) => self.get_network_policy(&id).name.clone(),
2751 }
2752 }
2753
2754 pub fn mock_authentication_nonce(&self) -> String {
2755 self.mock_authentication_nonce.clone().unwrap_or_default()
2756 }
2757}
2758
2759impl ConnectionResolver for CatalogState {
2760 fn resolve_connection(
2761 &self,
2762 id: CatalogItemId,
2763 ) -> mz_storage_types::connections::Connection<InlinedConnection> {
2764 use mz_storage_types::connections::Connection::*;
2765 match self
2766 .get_entry(&id)
2767 .connection()
2768 .expect("catalog out of sync")
2769 .details
2770 .to_connection()
2771 {
2772 Kafka(conn) => Kafka(conn.into_inline_connection(self)),
2773 Postgres(conn) => Postgres(conn.into_inline_connection(self)),
2774 Csr(conn) => Csr(conn.into_inline_connection(self)),
2775 Ssh(conn) => Ssh(conn),
2776 Aws(conn) => Aws(conn),
2777 AwsPrivatelink(conn) => AwsPrivatelink(conn),
2778 MySql(conn) => MySql(conn.into_inline_connection(self)),
2779 SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
2780 IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
2781 }
2782 }
2783}
2784
2785impl OptimizerCatalog for CatalogState {
2786 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2787 CatalogState::get_entry_by_global_id(self, id)
2788 }
2789 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2790 CatalogState::get_entry(self, id)
2791 }
2792 fn resolve_full_name(
2793 &self,
2794 name: &QualifiedItemName,
2795 conn_id: Option<&ConnectionId>,
2796 ) -> FullItemName {
2797 CatalogState::resolve_full_name(self, name, conn_id)
2798 }
2799 fn get_indexes_on(
2800 &self,
2801 id: GlobalId,
2802 cluster: ClusterId,
2803 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2804 Box::new(CatalogState::get_indexes_on(self, id, cluster))
2805 }
2806}
2807
2808impl OptimizerCatalog for Catalog {
2809 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
2810 self.state.get_entry_by_global_id(id)
2811 }
2812
2813 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry {
2814 self.state.get_entry(id)
2815 }
2816
2817 fn resolve_full_name(
2818 &self,
2819 name: &QualifiedItemName,
2820 conn_id: Option<&ConnectionId>,
2821 ) -> FullItemName {
2822 self.state.resolve_full_name(name, conn_id)
2823 }
2824
2825 fn get_indexes_on(
2826 &self,
2827 id: GlobalId,
2828 cluster: ClusterId,
2829 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_> {
2830 Box::new(self.state.get_indexes_on(id, cluster))
2831 }
2832}
2833
2834impl Catalog {
2835 pub fn as_optimizer_catalog(self: Arc<Self>) -> Arc<dyn OptimizerCatalog> {
2836 self
2837 }
2838}