mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39    UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
44    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
45    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55    WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65    SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74use crate::durable::objects::item_type;
75
76/// Used to update `self` from the input value while consuming the input value.
77pub trait UpdateFrom<T>: From<T> {
78    fn update_from(&mut self, from: T);
79}
80
81#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
82pub struct Database {
83    pub name: String,
84    pub id: DatabaseId,
85    pub oid: u32,
86    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
87    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
88    pub schemas_by_name: BTreeMap<String, SchemaId>,
89    pub owner_id: RoleId,
90    pub privileges: PrivilegeMap,
91}
92
93impl From<Database> for durable::Database {
94    fn from(database: Database) -> durable::Database {
95        durable::Database {
96            id: database.id,
97            oid: database.oid,
98            name: database.name,
99            owner_id: database.owner_id,
100            privileges: database.privileges.into_all_values().collect(),
101        }
102    }
103}
104
105impl From<durable::Database> for Database {
106    fn from(
107        durable::Database {
108            id,
109            oid,
110            name,
111            owner_id,
112            privileges,
113        }: durable::Database,
114    ) -> Database {
115        Database {
116            id,
117            oid,
118            schemas_by_id: BTreeMap::new(),
119            schemas_by_name: BTreeMap::new(),
120            name,
121            owner_id,
122            privileges: PrivilegeMap::from_mz_acl_items(privileges),
123        }
124    }
125}
126
127impl UpdateFrom<durable::Database> for Database {
128    fn update_from(
129        &mut self,
130        durable::Database {
131            id,
132            oid,
133            name,
134            owner_id,
135            privileges,
136        }: durable::Database,
137    ) {
138        self.id = id;
139        self.oid = oid;
140        self.name = name;
141        self.owner_id = owner_id;
142        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
143    }
144}
145
146#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
147pub struct Schema {
148    pub name: QualifiedSchemaName,
149    pub id: SchemaSpecifier,
150    pub oid: u32,
151    pub items: BTreeMap<String, CatalogItemId>,
152    pub functions: BTreeMap<String, CatalogItemId>,
153    pub types: BTreeMap<String, CatalogItemId>,
154    pub owner_id: RoleId,
155    pub privileges: PrivilegeMap,
156}
157
158impl From<Schema> for durable::Schema {
159    fn from(schema: Schema) -> durable::Schema {
160        durable::Schema {
161            id: schema.id.into(),
162            oid: schema.oid,
163            name: schema.name.schema,
164            database_id: schema.name.database.id(),
165            owner_id: schema.owner_id,
166            privileges: schema.privileges.into_all_values().collect(),
167        }
168    }
169}
170
171impl From<durable::Schema> for Schema {
172    fn from(
173        durable::Schema {
174            id,
175            oid,
176            name,
177            database_id,
178            owner_id,
179            privileges,
180        }: durable::Schema,
181    ) -> Schema {
182        Schema {
183            name: QualifiedSchemaName {
184                database: database_id.into(),
185                schema: name,
186            },
187            id: id.into(),
188            oid,
189            items: BTreeMap::new(),
190            functions: BTreeMap::new(),
191            types: BTreeMap::new(),
192            owner_id,
193            privileges: PrivilegeMap::from_mz_acl_items(privileges),
194        }
195    }
196}
197
198impl UpdateFrom<durable::Schema> for Schema {
199    fn update_from(
200        &mut self,
201        durable::Schema {
202            id,
203            oid,
204            name,
205            database_id,
206            owner_id,
207            privileges,
208        }: durable::Schema,
209    ) {
210        self.name = QualifiedSchemaName {
211            database: database_id.into(),
212            schema: name,
213        };
214        self.id = id.into();
215        self.oid = oid;
216        self.owner_id = owner_id;
217        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
218    }
219}
220
221#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
222pub struct Role {
223    pub name: String,
224    pub id: RoleId,
225    pub oid: u32,
226    pub attributes: RoleAttributes,
227    pub membership: RoleMembership,
228    pub vars: RoleVars,
229}
230
231impl Role {
232    pub fn is_user(&self) -> bool {
233        self.id.is_user()
234    }
235
236    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
237        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
238    }
239}
240
241impl From<Role> for durable::Role {
242    fn from(role: Role) -> durable::Role {
243        durable::Role {
244            id: role.id,
245            oid: role.oid,
246            name: role.name,
247            attributes: role.attributes,
248            membership: role.membership,
249            vars: role.vars,
250        }
251    }
252}
253
254impl From<durable::Role> for Role {
255    fn from(
256        durable::Role {
257            id,
258            oid,
259            name,
260            attributes,
261            membership,
262            vars,
263        }: durable::Role,
264    ) -> Self {
265        Role {
266            name,
267            id,
268            oid,
269            attributes,
270            membership,
271            vars,
272        }
273    }
274}
275
276impl UpdateFrom<durable::Role> for Role {
277    fn update_from(
278        &mut self,
279        durable::Role {
280            id,
281            oid,
282            name,
283            attributes,
284            membership,
285            vars,
286        }: durable::Role,
287    ) {
288        self.id = id;
289        self.oid = oid;
290        self.name = name;
291        self.attributes = attributes;
292        self.membership = membership;
293        self.vars = vars;
294    }
295}
296
297#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
298pub struct RoleAuth {
299    pub role_id: RoleId,
300    pub password_hash: Option<String>,
301    pub updated_at: u64,
302}
303
304impl From<RoleAuth> for durable::RoleAuth {
305    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
306        durable::RoleAuth {
307            role_id: role_auth.role_id,
308            password_hash: role_auth.password_hash,
309            updated_at: role_auth.updated_at,
310        }
311    }
312}
313
314impl From<durable::RoleAuth> for RoleAuth {
315    fn from(
316        durable::RoleAuth {
317            role_id,
318            password_hash,
319            updated_at,
320        }: durable::RoleAuth,
321    ) -> RoleAuth {
322        RoleAuth {
323            role_id,
324            password_hash,
325            updated_at,
326        }
327    }
328}
329
330impl UpdateFrom<durable::RoleAuth> for RoleAuth {
331    fn update_from(&mut self, from: durable::RoleAuth) {
332        self.role_id = from.role_id;
333        self.password_hash = from.password_hash;
334    }
335}
336
337#[derive(Debug, Serialize, Clone, PartialEq)]
338pub struct Cluster {
339    pub name: String,
340    pub id: ClusterId,
341    pub config: ClusterConfig,
342    #[serde(skip)]
343    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
344    /// Objects bound to this cluster. Does not include introspection source
345    /// indexes.
346    pub bound_objects: BTreeSet<CatalogItemId>,
347    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
348    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
349    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
350    pub owner_id: RoleId,
351    pub privileges: PrivilegeMap,
352}
353
354impl Cluster {
355    /// The role of the cluster. Currently used to set alert severity.
356    pub fn role(&self) -> ClusterRole {
357        // NOTE - These roles power monitoring systems. Do not change
358        // them without talking to the cloud or observability groups.
359        if self.name == MZ_SYSTEM_CLUSTER.name {
360            ClusterRole::SystemCritical
361        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
362            ClusterRole::System
363        } else {
364            ClusterRole::User
365        }
366    }
367
368    /// Returns `true` if the cluster is a managed cluster.
369    pub fn is_managed(&self) -> bool {
370        matches!(self.config.variant, ClusterVariant::Managed { .. })
371    }
372
373    /// Lists the user replicas, which are those that do not have the internal flag set.
374    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
375        self.replicas().filter(|r| !r.config.location.internal())
376    }
377
378    /// Lists all replicas in the cluster
379    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
380        self.replicas_by_id_.values()
381    }
382
383    /// Lookup a replica by ID.
384    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
385        self.replicas_by_id_.get(&replica_id)
386    }
387
388    /// Lookup a replica ID by name.
389    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
390        self.replica_id_by_name_.get(name).copied()
391    }
392
393    /// Returns the availability zones of this cluster, if they exist.
394    pub fn availability_zones(&self) -> Option<&[String]> {
395        match &self.config.variant {
396            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
397            ClusterVariant::Unmanaged => None,
398        }
399    }
400
401    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
402        let name = self.name.clone();
403        let variant = match &self.config.variant {
404            ClusterVariant::Managed(ClusterVariantManaged {
405                size,
406                availability_zones,
407                logging,
408                replication_factor,
409                optimizer_feature_overrides,
410                schedule,
411            }) => {
412                let introspection = match logging {
413                    ReplicaLogging {
414                        log_logging,
415                        interval: Some(interval),
416                    } => Some(ComputeReplicaIntrospectionConfig {
417                        debugging: *log_logging,
418                        interval: interval.clone(),
419                    }),
420                    ReplicaLogging {
421                        log_logging: _,
422                        interval: None,
423                    } => None,
424                };
425                let compute = ComputeReplicaConfig { introspection };
426                CreateClusterVariant::Managed(CreateClusterManagedPlan {
427                    replication_factor: replication_factor.clone(),
428                    size: size.clone(),
429                    availability_zones: availability_zones.clone(),
430                    compute,
431                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
432                    schedule: schedule.clone(),
433                })
434            }
435            ClusterVariant::Unmanaged => {
436                // Unmanaged clusters are deprecated, so hopefully we can remove
437                // them before we have to implement this.
438                return Err(PlanError::Unsupported {
439                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
440                    discussion_no: None,
441                });
442            }
443        };
444        let workload_class = self.config.workload_class.clone();
445        Ok(CreateClusterPlan {
446            name,
447            variant,
448            workload_class,
449        })
450    }
451}
452
453impl From<Cluster> for durable::Cluster {
454    fn from(cluster: Cluster) -> durable::Cluster {
455        durable::Cluster {
456            id: cluster.id,
457            name: cluster.name,
458            owner_id: cluster.owner_id,
459            privileges: cluster.privileges.into_all_values().collect(),
460            config: cluster.config.into(),
461        }
462    }
463}
464
465impl From<durable::Cluster> for Cluster {
466    fn from(
467        durable::Cluster {
468            id,
469            name,
470            owner_id,
471            privileges,
472            config,
473        }: durable::Cluster,
474    ) -> Self {
475        Cluster {
476            name: name.clone(),
477            id,
478            bound_objects: BTreeSet::new(),
479            log_indexes: BTreeMap::new(),
480            replica_id_by_name_: BTreeMap::new(),
481            replicas_by_id_: BTreeMap::new(),
482            owner_id,
483            privileges: PrivilegeMap::from_mz_acl_items(privileges),
484            config: config.into(),
485        }
486    }
487}
488
489impl UpdateFrom<durable::Cluster> for Cluster {
490    fn update_from(
491        &mut self,
492        durable::Cluster {
493            id,
494            name,
495            owner_id,
496            privileges,
497            config,
498        }: durable::Cluster,
499    ) {
500        self.id = id;
501        self.name = name;
502        self.owner_id = owner_id;
503        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
504        self.config = config.into();
505    }
506}
507
508#[derive(Debug, Serialize, Clone, PartialEq)]
509pub struct ClusterReplica {
510    pub name: String,
511    pub cluster_id: ClusterId,
512    pub replica_id: ReplicaId,
513    pub config: ReplicaConfig,
514    pub owner_id: RoleId,
515}
516
517impl From<ClusterReplica> for durable::ClusterReplica {
518    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
519        durable::ClusterReplica {
520            cluster_id: replica.cluster_id,
521            replica_id: replica.replica_id,
522            name: replica.name,
523            config: replica.config.into(),
524            owner_id: replica.owner_id,
525        }
526    }
527}
528
529#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
530pub struct ClusterReplicaProcessStatus {
531    pub status: ClusterStatus,
532    pub time: DateTime<Utc>,
533}
534
535#[derive(Debug, Serialize, Clone, PartialEq)]
536pub struct SourceReferences {
537    pub updated_at: u64,
538    pub references: Vec<SourceReference>,
539}
540
541#[derive(Debug, Serialize, Clone, PartialEq)]
542pub struct SourceReference {
543    pub name: String,
544    pub namespace: Option<String>,
545    pub columns: Vec<String>,
546}
547
548impl From<SourceReference> for durable::SourceReference {
549    fn from(source_reference: SourceReference) -> durable::SourceReference {
550        durable::SourceReference {
551            name: source_reference.name,
552            namespace: source_reference.namespace,
553            columns: source_reference.columns,
554        }
555    }
556}
557
558impl SourceReferences {
559    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
560        durable::SourceReferences {
561            source_id,
562            updated_at: self.updated_at,
563            references: self.references.into_iter().map(Into::into).collect(),
564        }
565    }
566}
567
568impl From<durable::SourceReference> for SourceReference {
569    fn from(source_reference: durable::SourceReference) -> SourceReference {
570        SourceReference {
571            name: source_reference.name,
572            namespace: source_reference.namespace,
573            columns: source_reference.columns,
574        }
575    }
576}
577
578impl From<durable::SourceReferences> for SourceReferences {
579    fn from(source_references: durable::SourceReferences) -> SourceReferences {
580        SourceReferences {
581            updated_at: source_references.updated_at,
582            references: source_references
583                .references
584                .into_iter()
585                .map(|source_reference| source_reference.into())
586                .collect(),
587        }
588    }
589}
590
591impl From<mz_sql::plan::SourceReference> for SourceReference {
592    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
593        SourceReference {
594            name: source_reference.name,
595            namespace: source_reference.namespace,
596            columns: source_reference.columns,
597        }
598    }
599}
600
601impl From<mz_sql::plan::SourceReferences> for SourceReferences {
602    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
603        SourceReferences {
604            updated_at: source_references.updated_at,
605            references: source_references
606                .references
607                .into_iter()
608                .map(|source_reference| source_reference.into())
609                .collect(),
610        }
611    }
612}
613
614impl From<SourceReferences> for mz_sql::plan::SourceReferences {
615    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
616        mz_sql::plan::SourceReferences {
617            updated_at: source_references.updated_at,
618            references: source_references
619                .references
620                .into_iter()
621                .map(|source_reference| source_reference.into())
622                .collect(),
623        }
624    }
625}
626
627impl From<SourceReference> for mz_sql::plan::SourceReference {
628    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
629        mz_sql::plan::SourceReference {
630            name: source_reference.name,
631            namespace: source_reference.namespace,
632            columns: source_reference.columns,
633        }
634    }
635}
636
637#[derive(Clone, Debug, Serialize)]
638pub struct CatalogEntry {
639    pub item: CatalogItem,
640    #[serde(skip)]
641    pub referenced_by: Vec<CatalogItemId>,
642    // TODO(database-issues#7922)––this should have an invariant tied to it that all
643    // dependents (i.e. entries in this field) have IDs greater than this
644    // entry's ID.
645    #[serde(skip)]
646    pub used_by: Vec<CatalogItemId>,
647    pub id: CatalogItemId,
648    pub oid: u32,
649    pub name: QualifiedItemName,
650    pub owner_id: RoleId,
651    pub privileges: PrivilegeMap,
652}
653
654/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
655/// A single item in the catalog may be associated with multiple "collections".
656///
657/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
658/// currently running dataflow, etc.
659///
660/// Items in the Catalog have a stable name -> ID mapping, in other words for
661/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
662/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
663/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
664/// to a table. We can't just change the schema of the underlying Persist Shard
665/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
666/// allocate a new [`GlobalId`] to refer to the new version of the table, and
667/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
668///
669/// TODO(ct): Add a note here if we end up using this for associating continual
670/// tasks with a single catalog item.
671#[derive(Clone, Debug)]
672pub struct CatalogCollectionEntry {
673    pub entry: CatalogEntry,
674    pub version: RelationVersionSelector,
675}
676
677impl CatalogCollectionEntry {
678    pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
679        self.item().desc(name, self.version)
680    }
681
682    pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
683        self.item().desc_opt(self.version)
684    }
685}
686
687impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
688    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
689        CatalogCollectionEntry::desc(self, name)
690    }
691
692    fn global_id(&self) -> GlobalId {
693        self.entry
694            .item()
695            .global_id_for_version(self.version)
696            .expect("catalog corruption, missing version!")
697    }
698}
699
700impl Deref for CatalogCollectionEntry {
701    type Target = CatalogEntry;
702
703    fn deref(&self) -> &CatalogEntry {
704        &self.entry
705    }
706}
707
708impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
709    fn name(&self) -> &QualifiedItemName {
710        self.entry.name()
711    }
712
713    fn id(&self) -> CatalogItemId {
714        self.entry.id()
715    }
716
717    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
718        Box::new(self.entry.global_ids())
719    }
720
721    fn oid(&self) -> u32 {
722        self.entry.oid()
723    }
724
725    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
726        self.entry.func()
727    }
728
729    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
730        self.entry.source_desc()
731    }
732
733    fn connection(
734        &self,
735    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
736    {
737        mz_sql::catalog::CatalogItem::connection(&self.entry)
738    }
739
740    fn create_sql(&self) -> &str {
741        self.entry.create_sql()
742    }
743
744    fn item_type(&self) -> SqlCatalogItemType {
745        self.entry.item_type()
746    }
747
748    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
749        self.entry.index_details()
750    }
751
752    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
753        self.entry.writable_table_details()
754    }
755
756    fn replacement_target(&self) -> Option<CatalogItemId> {
757        self.entry.replacement_target()
758    }
759
760    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
761        self.entry.type_details()
762    }
763
764    fn references(&self) -> &ResolvedIds {
765        self.entry.references()
766    }
767
768    fn uses(&self) -> BTreeSet<CatalogItemId> {
769        self.entry.uses()
770    }
771
772    fn referenced_by(&self) -> &[CatalogItemId] {
773        self.entry.referenced_by()
774    }
775
776    fn used_by(&self) -> &[CatalogItemId] {
777        self.entry.used_by()
778    }
779
780    fn subsource_details(
781        &self,
782    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
783        self.entry.subsource_details()
784    }
785
786    fn source_export_details(
787        &self,
788    ) -> Option<(
789        CatalogItemId,
790        &UnresolvedItemName,
791        &SourceExportDetails,
792        &SourceExportDataConfig<ReferencedConnection>,
793    )> {
794        self.entry.source_export_details()
795    }
796
797    fn is_progress_source(&self) -> bool {
798        self.entry.is_progress_source()
799    }
800
801    fn progress_id(&self) -> Option<CatalogItemId> {
802        self.entry.progress_id()
803    }
804
805    fn owner_id(&self) -> RoleId {
806        *self.entry.owner_id()
807    }
808
809    fn privileges(&self) -> &PrivilegeMap {
810        self.entry.privileges()
811    }
812
813    fn cluster_id(&self) -> Option<ClusterId> {
814        self.entry.item().cluster_id()
815    }
816
817    fn at_version(
818        &self,
819        version: RelationVersionSelector,
820    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
821        Box::new(CatalogCollectionEntry {
822            entry: self.entry.clone(),
823            version,
824        })
825    }
826
827    fn latest_version(&self) -> Option<RelationVersion> {
828        self.entry.latest_version()
829    }
830}
831
832#[derive(Debug, Clone, Serialize)]
833pub enum CatalogItem {
834    Table(Table),
835    Source(Source),
836    Log(Log),
837    View(View),
838    MaterializedView(MaterializedView),
839    Sink(Sink),
840    Index(Index),
841    Type(Type),
842    Func(Func),
843    Secret(Secret),
844    Connection(Connection),
845    ContinualTask(ContinualTask),
846}
847
848impl From<CatalogEntry> for durable::Item {
849    fn from(entry: CatalogEntry) -> durable::Item {
850        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
851        durable::Item {
852            id: entry.id,
853            oid: entry.oid,
854            global_id,
855            schema_id: entry.name.qualifiers.schema_spec.into(),
856            name: entry.name.item,
857            create_sql,
858            owner_id: entry.owner_id,
859            privileges: entry.privileges.into_all_values().collect(),
860            extra_versions,
861        }
862    }
863}
864
865#[derive(Debug, Clone, Serialize)]
866pub struct Table {
867    /// Parse-able SQL that defines this table.
868    pub create_sql: Option<String>,
869    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
870    pub desc: VersionedRelationDesc,
871    /// Versions of this table, and the [`GlobalId`]s that refer to them.
872    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
873    pub collections: BTreeMap<RelationVersion, GlobalId>,
874    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
875    #[serde(skip)]
876    pub conn_id: Option<ConnectionId>,
877    /// Other catalog objects referenced by this table, e.g. custom types.
878    pub resolved_ids: ResolvedIds,
879    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
880    pub custom_logical_compaction_window: Option<CompactionWindow>,
881    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
882    /// session variable.
883    ///
884    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
885    pub is_retained_metrics_object: bool,
886    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
887    pub data_source: TableDataSource,
888}
889
890impl Table {
891    pub fn timeline(&self) -> Timeline {
892        match &self.data_source {
893            // The Coordinator controls insertions for writable tables
894            // (including system tables), so they are realtime.
895            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
896            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
897        }
898    }
899
900    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
901    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
902        self.collections.values().copied()
903    }
904
905    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
906    pub fn global_id_writes(&self) -> GlobalId {
907        *self
908            .collections
909            .last_key_value()
910            .expect("at least one version of a table")
911            .1
912    }
913
914    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
915    pub fn collection_descs(
916        &self,
917    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
918        self.collections.iter().map(|(version, gid)| {
919            let desc = self
920                .desc
921                .at_version(RelationVersionSelector::Specific(*version));
922            (*gid, *version, desc)
923        })
924    }
925
926    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
927    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
928        let (version, _gid) = self
929            .collections
930            .iter()
931            .find(|(_version, gid)| *gid == id)
932            .expect("GlobalId to exist");
933        self.desc
934            .at_version(RelationVersionSelector::Specific(*version))
935    }
936}
937
938#[derive(Clone, Debug, Serialize)]
939pub enum TableDataSource {
940    /// The table owns data created via INSERT/UPDATE/DELETE statements.
941    TableWrites {
942        #[serde(skip)]
943        defaults: Vec<Expr<Aug>>,
944    },
945
946    /// The table receives its data from the identified `DataSourceDesc`.
947    /// This table type does not support INSERT/UPDATE/DELETE statements.
948    DataSource {
949        desc: DataSourceDesc,
950        timeline: Timeline,
951    },
952}
953
954#[derive(Debug, Clone, Serialize)]
955pub enum DataSourceDesc {
956    /// Receives data from an external system
957    Ingestion {
958        desc: SourceDesc<ReferencedConnection>,
959        cluster_id: ClusterId,
960    },
961    /// Receives data from an external system
962    OldSyntaxIngestion {
963        desc: SourceDesc<ReferencedConnection>,
964        cluster_id: ClusterId,
965        // If we're dealing with an old syntax ingestion the progress id will be some other collection
966        // and the ingestion itself will have the data from an external reference
967        progress_subsource: CatalogItemId,
968        data_config: SourceExportDataConfig<ReferencedConnection>,
969        details: SourceExportDetails,
970    },
971    /// This source receives its data from the identified ingestion,
972    /// specifically the output identified by `external_reference`.
973    /// N.B. that `external_reference` should not be used to identify
974    /// anything downstream of purification, as the purification process
975    /// encodes source-specific identifiers into the `details` struct.
976    /// The `external_reference` field is only used here for displaying
977    /// human-readable names in system tables.
978    IngestionExport {
979        ingestion_id: CatalogItemId,
980        external_reference: UnresolvedItemName,
981        details: SourceExportDetails,
982        data_config: SourceExportDataConfig<ReferencedConnection>,
983    },
984    /// Receives introspection data from an internal system
985    Introspection(IntrospectionType),
986    /// Receives data from the source's reclocking/remapping operations.
987    Progress,
988    /// Receives data from HTTP requests.
989    Webhook {
990        /// Optional components used to validation a webhook request.
991        validate_using: Option<WebhookValidation>,
992        /// Describes how we deserialize the body of a webhook request.
993        body_format: WebhookBodyFormat,
994        /// Describes whether or not to include headers and how to map them.
995        headers: WebhookHeaders,
996        /// The cluster which this source is associated with.
997        cluster_id: ClusterId,
998    },
999}
1000
1001impl DataSourceDesc {
1002    /// The key and value formats of the data source.
1003    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1004        match &self {
1005            DataSourceDesc::Ingestion { .. } => (None, None),
1006            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1007                match &data_config.encoding.as_ref() {
1008                    Some(encoding) => match &encoding.key {
1009                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1010                        None => (None, Some(encoding.value.type_())),
1011                    },
1012                    None => (None, None),
1013                }
1014            }
1015            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1016                Some(encoding) => match &encoding.key {
1017                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1018                    None => (None, Some(encoding.value.type_())),
1019                },
1020                None => (None, None),
1021            },
1022            DataSourceDesc::Introspection(_)
1023            | DataSourceDesc::Webhook { .. }
1024            | DataSourceDesc::Progress => (None, None),
1025        }
1026    }
1027
1028    /// Envelope of the data source.
1029    pub fn envelope(&self) -> Option<&str> {
1030        // Note how "none"/"append-only" is different from `None`. Source
1031        // sources don't have an envelope (internal logs, for example), while
1032        // other sources have an envelope that we call the "NONE"-envelope.
1033
1034        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1035            match envelope {
1036                SourceEnvelope::None(_) => "none",
1037                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1038                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1039                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1040                        // NOTE(aljoscha): Should we somehow mark that this is
1041                        // using upsert internally? See note above about
1042                        // DEBEZIUM.
1043                        "debezium"
1044                    }
1045                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1046                        "upsert-value-err-inline"
1047                    }
1048                },
1049                SourceEnvelope::CdcV2 => {
1050                    // TODO(aljoscha): Should we even report this? It's
1051                    // currently not exposed.
1052                    "materialize"
1053                }
1054            }
1055        }
1056
1057        match self {
1058            // NOTE(aljoscha): We could move the block for ingestions into
1059            // `SourceEnvelope` itself, but that one feels more like an internal
1060            // thing and adapter should own how we represent envelopes as a
1061            // string? It would not be hard to convince me otherwise, though.
1062            DataSourceDesc::Ingestion { .. } => None,
1063            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1064                Some(envelope_string(&data_config.envelope))
1065            }
1066            DataSourceDesc::IngestionExport { data_config, .. } => {
1067                Some(envelope_string(&data_config.envelope))
1068            }
1069            DataSourceDesc::Introspection(_)
1070            | DataSourceDesc::Webhook { .. }
1071            | DataSourceDesc::Progress => None,
1072        }
1073    }
1074}
1075
1076#[derive(Debug, Clone, Serialize)]
1077pub struct Source {
1078    /// Parse-able SQL that defines this table.
1079    pub create_sql: Option<String>,
1080    /// [`GlobalId`] used to reference this source from outside the catalog.
1081    pub global_id: GlobalId,
1082    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1083    #[serde(skip)]
1084    pub data_source: DataSourceDesc,
1085    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1086    pub desc: RelationDesc,
1087    /// The timeline this source exists on.
1088    pub timeline: Timeline,
1089    /// Other catalog objects referenced by this table, e.g. custom types.
1090    pub resolved_ids: ResolvedIds,
1091    /// This value is ignored for subsources, i.e. for
1092    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1093    /// sources logical compaction window.
1094    pub custom_logical_compaction_window: Option<CompactionWindow>,
1095    /// Whether the source's logical compaction window is controlled by
1096    /// METRICS_RETENTION
1097    pub is_retained_metrics_object: bool,
1098}
1099
1100impl Source {
1101    /// Creates a new `Source`.
1102    ///
1103    /// # Panics
1104    /// - If an ingestion-based plan is not given a cluster_id.
1105    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1106    /// - If a non-ingestion-based source is given a cluster_id.
1107    pub fn new(
1108        plan: CreateSourcePlan,
1109        global_id: GlobalId,
1110        resolved_ids: ResolvedIds,
1111        custom_logical_compaction_window: Option<CompactionWindow>,
1112        is_retained_metrics_object: bool,
1113    ) -> Source {
1114        Source {
1115            create_sql: Some(plan.source.create_sql),
1116            data_source: match plan.source.data_source {
1117                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1118                    desc,
1119                    cluster_id: plan
1120                        .in_cluster
1121                        .expect("ingestion-based sources must be given a cluster ID"),
1122                },
1123                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1124                    desc,
1125                    progress_subsource,
1126                    data_config,
1127                    details,
1128                } => DataSourceDesc::OldSyntaxIngestion {
1129                    desc,
1130                    cluster_id: plan
1131                        .in_cluster
1132                        .expect("ingestion-based sources must be given a cluster ID"),
1133                    progress_subsource,
1134                    data_config,
1135                    details,
1136                },
1137                mz_sql::plan::DataSourceDesc::Progress => {
1138                    assert!(
1139                        plan.in_cluster.is_none(),
1140                        "subsources must not have a host config or cluster_id defined"
1141                    );
1142                    DataSourceDesc::Progress
1143                }
1144                mz_sql::plan::DataSourceDesc::IngestionExport {
1145                    ingestion_id,
1146                    external_reference,
1147                    details,
1148                    data_config,
1149                } => {
1150                    assert!(
1151                        plan.in_cluster.is_none(),
1152                        "subsources must not have a host config or cluster_id defined"
1153                    );
1154                    DataSourceDesc::IngestionExport {
1155                        ingestion_id,
1156                        external_reference,
1157                        details,
1158                        data_config,
1159                    }
1160                }
1161                mz_sql::plan::DataSourceDesc::Webhook {
1162                    validate_using,
1163                    body_format,
1164                    headers,
1165                    cluster_id,
1166                } => {
1167                    mz_ore::soft_assert_or_log!(
1168                        cluster_id.is_none(),
1169                        "cluster_id set at Source level for Webhooks"
1170                    );
1171                    DataSourceDesc::Webhook {
1172                        validate_using,
1173                        body_format,
1174                        headers,
1175                        cluster_id: plan
1176                            .in_cluster
1177                            .expect("webhook sources must be given a cluster ID"),
1178                    }
1179                }
1180            },
1181            desc: plan.source.desc,
1182            global_id,
1183            timeline: plan.timeline,
1184            resolved_ids,
1185            custom_logical_compaction_window: plan
1186                .source
1187                .compaction_window
1188                .or(custom_logical_compaction_window),
1189            is_retained_metrics_object,
1190        }
1191    }
1192
1193    /// Type of the source.
1194    pub fn source_type(&self) -> &str {
1195        match &self.data_source {
1196            DataSourceDesc::Ingestion { desc, .. }
1197            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1198            DataSourceDesc::Progress => "progress",
1199            DataSourceDesc::IngestionExport { .. } => "subsource",
1200            DataSourceDesc::Introspection(_) => "source",
1201            DataSourceDesc::Webhook { .. } => "webhook",
1202        }
1203    }
1204
1205    /// Connection ID of the source, if one exists.
1206    pub fn connection_id(&self) -> Option<CatalogItemId> {
1207        match &self.data_source {
1208            DataSourceDesc::Ingestion { desc, .. }
1209            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1210            DataSourceDesc::IngestionExport { .. }
1211            | DataSourceDesc::Introspection(_)
1212            | DataSourceDesc::Webhook { .. }
1213            | DataSourceDesc::Progress => None,
1214        }
1215    }
1216
1217    /// The single [`GlobalId`] that refers to this Source.
1218    pub fn global_id(&self) -> GlobalId {
1219        self.global_id
1220    }
1221
1222    /// The expensive resource that each source consumes is persist shards. To
1223    /// prevent abuse, we want to prevent users from creating sources that use an
1224    /// unbounded number of persist shards. But we also don't want to count
1225    /// persist shards that are mandated by teh system (e.g., the progress
1226    /// shard) so that future versions of Materialize can introduce additional
1227    /// per-source shards (e.g., a per-source status shard) without impacting
1228    /// the limit calculation.
1229    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1230        match &self.data_source {
1231            DataSourceDesc::Ingestion { .. } => 0,
1232            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1233                match &desc.connection {
1234                    // These multi-output sources do not use their primary
1235                    // source's data shard, so we don't include it in accounting
1236                    // for users.
1237                    GenericSourceConnection::Postgres(_)
1238                    | GenericSourceConnection::MySql(_)
1239                    | GenericSourceConnection::SqlServer(_) => 0,
1240                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1241                        // Load generators that output data in their primary shard
1242                        LoadGenerator::Clock
1243                        | LoadGenerator::Counter { .. }
1244                        | LoadGenerator::Datums
1245                        | LoadGenerator::KeyValue(_) => 1,
1246                        LoadGenerator::Auction
1247                        | LoadGenerator::Marketing
1248                        | LoadGenerator::Tpch { .. } => 0,
1249                    },
1250                    GenericSourceConnection::Kafka(_) => 1,
1251                }
1252            }
1253            //  DataSourceDesc::IngestionExport represents a subsource, which
1254            //  use a data shard.
1255            DataSourceDesc::IngestionExport { .. } => 1,
1256            DataSourceDesc::Webhook { .. } => 1,
1257            // Introspection and progress subsources are not under the user's control, so shouldn't
1258            // count toward their quota.
1259            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1260        }
1261    }
1262}
1263
1264#[derive(Debug, Clone, Serialize)]
1265pub struct Log {
1266    /// The category of data this log stores.
1267    pub variant: LogVariant,
1268    /// [`GlobalId`] used to reference this log from outside the catalog.
1269    pub global_id: GlobalId,
1270}
1271
1272impl Log {
1273    /// The single [`GlobalId`] that refers to this Log.
1274    pub fn global_id(&self) -> GlobalId {
1275        self.global_id
1276    }
1277}
1278
1279#[derive(Debug, Clone, Serialize)]
1280pub struct Sink {
1281    /// Parse-able SQL that defines this sink.
1282    pub create_sql: String,
1283    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1284    pub global_id: GlobalId,
1285    /// Collection we read into this sink.
1286    pub from: GlobalId,
1287    /// Connection to the external service we're sinking into, e.g. Kafka.
1288    pub connection: StorageSinkConnection<ReferencedConnection>,
1289    /// Envelope we use to sink into the external system.
1290    ///
1291    /// TODO(guswynn): this probably should just be in the `connection`.
1292    pub envelope: SinkEnvelope,
1293    /// Emit an initial snapshot into the sink.
1294    pub with_snapshot: bool,
1295    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1296    pub version: u64,
1297    /// Other catalog objects this sink references.
1298    pub resolved_ids: ResolvedIds,
1299    /// Cluster this sink runs on.
1300    pub cluster_id: ClusterId,
1301}
1302
1303impl Sink {
1304    pub fn sink_type(&self) -> &str {
1305        self.connection.name()
1306    }
1307
1308    /// Envelope of the sink.
1309    pub fn envelope(&self) -> Option<&str> {
1310        match &self.envelope {
1311            SinkEnvelope::Debezium => Some("debezium"),
1312            SinkEnvelope::Upsert => Some("upsert"),
1313        }
1314    }
1315
1316    /// Output a combined format string of the sink. For legacy reasons
1317    /// if the key-format is none or the key & value formats are
1318    /// both the same (either avro or json), we return the value format name,
1319    /// otherwise we return a composite name.
1320    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1321        match &self.connection {
1322            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1323            _ => None,
1324        }
1325    }
1326
1327    /// Output distinct key_format and value_format of the sink.
1328    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1329        match &self.connection {
1330            StorageSinkConnection::Kafka(connection) => {
1331                let key_format = connection
1332                    .format
1333                    .key_format
1334                    .as_ref()
1335                    .map(|f| f.get_format_name());
1336                let value_format = connection.format.value_format.get_format_name();
1337                Some((key_format, value_format))
1338            }
1339            _ => None,
1340        }
1341    }
1342
1343    pub fn connection_id(&self) -> Option<CatalogItemId> {
1344        self.connection.connection_id()
1345    }
1346
1347    /// The single [`GlobalId`] that this Sink can be referenced by.
1348    pub fn global_id(&self) -> GlobalId {
1349        self.global_id
1350    }
1351}
1352
1353#[derive(Debug, Clone, Serialize)]
1354pub struct View {
1355    /// Parse-able SQL that defines this view.
1356    pub create_sql: String,
1357    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1358    pub global_id: GlobalId,
1359    /// Unoptimized high-level expression from parsing the `create_sql`.
1360    pub raw_expr: Arc<HirRelationExpr>,
1361    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1362    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1363    /// Columns of this view.
1364    pub desc: RelationDesc,
1365    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1366    pub conn_id: Option<ConnectionId>,
1367    /// Other catalog objects that are referenced by this view, determined at name resolution.
1368    pub resolved_ids: ResolvedIds,
1369    /// All of the catalog objects that are referenced by this view.
1370    pub dependencies: DependencyIds,
1371}
1372
1373impl View {
1374    /// The single [`GlobalId`] this [`View`] can be referenced by.
1375    pub fn global_id(&self) -> GlobalId {
1376        self.global_id
1377    }
1378}
1379
1380#[derive(Debug, Clone, Serialize)]
1381pub struct MaterializedView {
1382    /// Parse-able SQL that defines this materialized view.
1383    pub create_sql: String,
1384    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1385    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1386    pub collections: BTreeMap<RelationVersion, GlobalId>,
1387    /// Raw high-level expression from planning, derived from the `create_sql`.
1388    pub raw_expr: Arc<HirRelationExpr>,
1389    /// Optimized mid-level expression, derived from the `raw_expr`.
1390    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1391    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1392    pub desc: VersionedRelationDesc,
1393    /// Other catalog items that this materialized view references, determined at name resolution.
1394    pub resolved_ids: ResolvedIds,
1395    /// All of the catalog objects that are referenced by this view.
1396    pub dependencies: DependencyIds,
1397    /// ID of the materialized view this materialized view is intended to replace.
1398    pub replacement_target: Option<CatalogItemId>,
1399    /// Cluster that this materialized view runs on.
1400    pub cluster_id: ClusterId,
1401    /// Column indexes that we assert are not `NULL`.
1402    ///
1403    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1404    pub non_null_assertions: Vec<usize>,
1405    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1406    pub custom_logical_compaction_window: Option<CompactionWindow>,
1407    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1408    pub refresh_schedule: Option<RefreshSchedule>,
1409    /// The initial `as_of` of the storage collection associated with the materialized view.
1410    ///
1411    /// Note: This doesn't change upon restarts.
1412    /// (The dataflow's initial `as_of` can be different.)
1413    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1414}
1415
1416impl MaterializedView {
1417    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1418    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1419        self.collections.values().copied()
1420    }
1421
1422    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1423    /// version.
1424    pub fn global_id_writes(&self) -> GlobalId {
1425        *self
1426            .collections
1427            .last_key_value()
1428            .expect("at least one version of a materialized view")
1429            .1
1430    }
1431
1432    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1433    pub fn collection_descs(
1434        &self,
1435    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1436        self.collections.iter().map(|(version, gid)| {
1437            let desc = self
1438                .desc
1439                .at_version(RelationVersionSelector::Specific(*version));
1440            (*gid, *version, desc)
1441        })
1442    }
1443
1444    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1445    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1446        let (version, _gid) = self
1447            .collections
1448            .iter()
1449            .find(|(_version, gid)| *gid == id)
1450            .expect("GlobalId to exist");
1451        self.desc
1452            .at_version(RelationVersionSelector::Specific(*version))
1453    }
1454
1455    /// Apply the given replacement materialized view to this [`MaterializedView`].
1456    pub fn apply_replacement(&mut self, replacement: Self) {
1457        let target_id = replacement
1458            .replacement_target
1459            .expect("replacement has target");
1460
1461        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1462            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1463                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1464            });
1465            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1466                cmvs
1467            } else {
1468                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1469            }
1470        }
1471
1472        let old_stmt = parse(&self.create_sql);
1473        let rpl_stmt = parse(&replacement.create_sql);
1474        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1475            if_exists: old_stmt.if_exists,
1476            name: old_stmt.name,
1477            columns: rpl_stmt.columns,
1478            replacing: None,
1479            in_cluster: rpl_stmt.in_cluster,
1480            query: rpl_stmt.query,
1481            as_of: rpl_stmt.as_of,
1482            with_options: rpl_stmt.with_options,
1483        };
1484        let create_sql = new_stmt.to_ast_string_stable();
1485
1486        let mut collections = std::mem::take(&mut self.collections);
1487        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1488        // necessary evolve the relation schema, so that version might be lower than the actual
1489        // latest version.
1490        let latest_version = collections.keys().max().expect("at least one version");
1491        let new_version = latest_version.bump();
1492        collections.insert(new_version, replacement.global_id_writes());
1493
1494        let mut resolved_ids = replacement.resolved_ids;
1495        resolved_ids.remove_item(&target_id);
1496        let mut dependencies = replacement.dependencies;
1497        dependencies.0.remove(&target_id);
1498
1499        *self = Self {
1500            create_sql,
1501            collections,
1502            raw_expr: replacement.raw_expr,
1503            optimized_expr: replacement.optimized_expr,
1504            desc: replacement.desc,
1505            resolved_ids,
1506            dependencies,
1507            replacement_target: None,
1508            cluster_id: replacement.cluster_id,
1509            non_null_assertions: replacement.non_null_assertions,
1510            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1511            refresh_schedule: replacement.refresh_schedule,
1512            initial_as_of: replacement.initial_as_of,
1513        };
1514    }
1515}
1516
1517#[derive(Debug, Clone, Serialize)]
1518pub struct Index {
1519    /// Parse-able SQL that defines this table.
1520    pub create_sql: String,
1521    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1522    pub global_id: GlobalId,
1523    /// The [`GlobalId`] this Index is on.
1524    pub on: GlobalId,
1525    /// Keys of the index.
1526    pub keys: Arc<[MirScalarExpr]>,
1527    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1528    pub conn_id: Option<ConnectionId>,
1529    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1530    pub resolved_ids: ResolvedIds,
1531    /// Cluster this index is installed on.
1532    pub cluster_id: ClusterId,
1533    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1534    pub custom_logical_compaction_window: Option<CompactionWindow>,
1535    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1536    /// session variable.
1537    ///
1538    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1539    pub is_retained_metrics_object: bool,
1540}
1541
1542impl Index {
1543    /// The [`GlobalId`] that refers to this Index.
1544    pub fn global_id(&self) -> GlobalId {
1545        self.global_id
1546    }
1547}
1548
1549#[derive(Debug, Clone, Serialize)]
1550pub struct Type {
1551    /// Parse-able SQL that defines this type.
1552    pub create_sql: Option<String>,
1553    /// [`GlobalId`] used to reference this type from outside the catalog.
1554    pub global_id: GlobalId,
1555    #[serde(skip)]
1556    pub details: CatalogTypeDetails<IdReference>,
1557    /// Other catalog objects referenced by this type.
1558    pub resolved_ids: ResolvedIds,
1559}
1560
1561#[derive(Debug, Clone, Serialize)]
1562pub struct Func {
1563    /// Static definition of the function.
1564    #[serde(skip)]
1565    pub inner: &'static mz_sql::func::Func,
1566    /// [`GlobalId`] used to reference this function from outside the catalog.
1567    pub global_id: GlobalId,
1568}
1569
1570#[derive(Debug, Clone, Serialize)]
1571pub struct Secret {
1572    /// Parse-able SQL that defines this secret.
1573    pub create_sql: String,
1574    /// [`GlobalId`] used to reference this secret from outside the catalog.
1575    pub global_id: GlobalId,
1576}
1577
1578#[derive(Debug, Clone, Serialize)]
1579pub struct Connection {
1580    /// Parse-able SQL that defines this connection.
1581    pub create_sql: String,
1582    /// [`GlobalId`] used to reference this connection from the storage layer.
1583    pub global_id: GlobalId,
1584    /// The kind of connection.
1585    pub details: ConnectionDetails,
1586    /// Other objects this connection depends on.
1587    pub resolved_ids: ResolvedIds,
1588}
1589
1590impl Connection {
1591    /// The single [`GlobalId`] used to reference this connection.
1592    pub fn global_id(&self) -> GlobalId {
1593        self.global_id
1594    }
1595}
1596
1597#[derive(Debug, Clone, Serialize)]
1598pub struct ContinualTask {
1599    /// Parse-able SQL that defines this continual task.
1600    pub create_sql: String,
1601    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1602    pub global_id: GlobalId,
1603    /// [`GlobalId`] of the collection that we read into this continual task.
1604    pub input_id: GlobalId,
1605    pub with_snapshot: bool,
1606    /// ContinualTasks are self-referential. We make this work by using a
1607    /// placeholder `LocalId` for the CT itself through name resolution and
1608    /// planning. Then we fill in the real `GlobalId` before constructing this
1609    /// catalog item.
1610    pub raw_expr: Arc<HirRelationExpr>,
1611    /// Columns for this continual task.
1612    pub desc: RelationDesc,
1613    /// Other catalog items that this continual task references, determined at name resolution.
1614    pub resolved_ids: ResolvedIds,
1615    /// All of the catalog objects that are referenced by this continual task.
1616    pub dependencies: DependencyIds,
1617    /// Cluster that this continual task runs on.
1618    pub cluster_id: ClusterId,
1619    /// See the comment on [MaterializedView::initial_as_of].
1620    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1621}
1622
1623impl ContinualTask {
1624    /// The single [`GlobalId`] used to reference this continual task.
1625    pub fn global_id(&self) -> GlobalId {
1626        self.global_id
1627    }
1628}
1629
1630#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1631pub struct NetworkPolicy {
1632    pub name: String,
1633    pub id: NetworkPolicyId,
1634    pub oid: u32,
1635    pub rules: Vec<NetworkPolicyRule>,
1636    pub owner_id: RoleId,
1637    pub privileges: PrivilegeMap,
1638}
1639
1640impl From<NetworkPolicy> for durable::NetworkPolicy {
1641    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1642        durable::NetworkPolicy {
1643            id: policy.id,
1644            oid: policy.oid,
1645            name: policy.name,
1646            rules: policy.rules,
1647            owner_id: policy.owner_id,
1648            privileges: policy.privileges.into_all_values().collect(),
1649        }
1650    }
1651}
1652
1653impl From<durable::NetworkPolicy> for NetworkPolicy {
1654    fn from(
1655        durable::NetworkPolicy {
1656            id,
1657            oid,
1658            name,
1659            rules,
1660            owner_id,
1661            privileges,
1662        }: durable::NetworkPolicy,
1663    ) -> Self {
1664        NetworkPolicy {
1665            id,
1666            oid,
1667            name,
1668            rules,
1669            owner_id,
1670            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1671        }
1672    }
1673}
1674
1675impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1676    fn update_from(
1677        &mut self,
1678        durable::NetworkPolicy {
1679            id,
1680            oid,
1681            name,
1682            rules,
1683            owner_id,
1684            privileges,
1685        }: durable::NetworkPolicy,
1686    ) {
1687        self.id = id;
1688        self.oid = oid;
1689        self.name = name;
1690        self.rules = rules;
1691        self.owner_id = owner_id;
1692        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1693    }
1694}
1695
1696impl CatalogItem {
1697    /// Returns a string indicating the type of this catalog entry.
1698    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1699        match self {
1700            CatalogItem::Table(_) => CatalogItemType::Table,
1701            CatalogItem::Source(_) => CatalogItemType::Source,
1702            CatalogItem::Log(_) => CatalogItemType::Source,
1703            CatalogItem::Sink(_) => CatalogItemType::Sink,
1704            CatalogItem::View(_) => CatalogItemType::View,
1705            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1706            CatalogItem::Index(_) => CatalogItemType::Index,
1707            CatalogItem::Type(_) => CatalogItemType::Type,
1708            CatalogItem::Func(_) => CatalogItemType::Func,
1709            CatalogItem::Secret(_) => CatalogItemType::Secret,
1710            CatalogItem::Connection(_) => CatalogItemType::Connection,
1711            CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1712        }
1713    }
1714
1715    /// Returns the [`GlobalId`]s that reference this item, if any.
1716    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1717        let gid = match self {
1718            CatalogItem::Source(source) => source.global_id,
1719            CatalogItem::Log(log) => log.global_id,
1720            CatalogItem::Sink(sink) => sink.global_id,
1721            CatalogItem::View(view) => view.global_id,
1722            CatalogItem::MaterializedView(mv) => {
1723                return itertools::Either::Left(mv.collections.values().copied());
1724            }
1725            CatalogItem::ContinualTask(ct) => ct.global_id,
1726            CatalogItem::Index(index) => index.global_id,
1727            CatalogItem::Func(func) => func.global_id,
1728            CatalogItem::Type(ty) => ty.global_id,
1729            CatalogItem::Secret(secret) => secret.global_id,
1730            CatalogItem::Connection(conn) => conn.global_id,
1731            CatalogItem::Table(table) => {
1732                return itertools::Either::Left(table.collections.values().copied());
1733            }
1734        };
1735        itertools::Either::Right(std::iter::once(gid))
1736    }
1737
1738    /// Returns the most up-to-date [`GlobalId`] for this item.
1739    ///
1740    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1741    pub fn latest_global_id(&self) -> GlobalId {
1742        match self {
1743            CatalogItem::Source(source) => source.global_id,
1744            CatalogItem::Log(log) => log.global_id,
1745            CatalogItem::Sink(sink) => sink.global_id,
1746            CatalogItem::View(view) => view.global_id,
1747            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1748            CatalogItem::ContinualTask(ct) => ct.global_id,
1749            CatalogItem::Index(index) => index.global_id,
1750            CatalogItem::Func(func) => func.global_id,
1751            CatalogItem::Type(ty) => ty.global_id,
1752            CatalogItem::Secret(secret) => secret.global_id,
1753            CatalogItem::Connection(conn) => conn.global_id,
1754            CatalogItem::Table(table) => table.global_id_writes(),
1755        }
1756    }
1757
1758    /// Whether this item represents a storage collection.
1759    pub fn is_storage_collection(&self) -> bool {
1760        match self {
1761            CatalogItem::Table(_)
1762            | CatalogItem::Source(_)
1763            | CatalogItem::MaterializedView(_)
1764            | CatalogItem::Sink(_)
1765            | CatalogItem::ContinualTask(_) => true,
1766            CatalogItem::Log(_)
1767            | CatalogItem::View(_)
1768            | CatalogItem::Index(_)
1769            | CatalogItem::Type(_)
1770            | CatalogItem::Func(_)
1771            | CatalogItem::Secret(_)
1772            | CatalogItem::Connection(_) => false,
1773        }
1774    }
1775
1776    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1777    /// version.
1778    ///
1779    /// Some item types honor `version` so callers can ask for the schema that
1780    /// matches a specific [`GlobalId`] or historical definition. Other relation
1781    /// types ignore `version` because they have a single shape. Non-relational
1782    /// items ( for example functions, indexes, sinks, secrets, and connections)
1783    /// return [`SqlCatalogError::InvalidDependency`].
1784    pub fn desc(
1785        &self,
1786        name: &FullItemName,
1787        version: RelationVersionSelector,
1788    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1789        self.desc_opt(version)
1790            .ok_or_else(|| SqlCatalogError::InvalidDependency {
1791                name: name.to_string(),
1792                typ: self.typ(),
1793            })
1794    }
1795
1796    pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1797        match &self {
1798            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1799            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1800            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1801            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1802            CatalogItem::MaterializedView(mview) => {
1803                Some(Cow::Owned(mview.desc.at_version(version)))
1804            }
1805            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1806            CatalogItem::Func(_)
1807            | CatalogItem::Index(_)
1808            | CatalogItem::Sink(_)
1809            | CatalogItem::Secret(_)
1810            | CatalogItem::Connection(_)
1811            | CatalogItem::Type(_) => None,
1812        }
1813    }
1814
1815    pub fn func(
1816        &self,
1817        entry: &CatalogEntry,
1818    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1819        match &self {
1820            CatalogItem::Func(func) => Ok(func.inner),
1821            _ => Err(SqlCatalogError::UnexpectedType {
1822                name: entry.name().item.to_string(),
1823                actual_type: entry.item_type(),
1824                expected_type: CatalogItemType::Func,
1825            }),
1826        }
1827    }
1828
1829    pub fn source_desc(
1830        &self,
1831        entry: &CatalogEntry,
1832    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1833        match &self {
1834            CatalogItem::Source(source) => match &source.data_source {
1835                DataSourceDesc::Ingestion { desc, .. }
1836                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1837                DataSourceDesc::IngestionExport { .. }
1838                | DataSourceDesc::Introspection(_)
1839                | DataSourceDesc::Webhook { .. }
1840                | DataSourceDesc::Progress => Ok(None),
1841            },
1842            _ => Err(SqlCatalogError::UnexpectedType {
1843                name: entry.name().item.to_string(),
1844                actual_type: entry.item_type(),
1845                expected_type: CatalogItemType::Source,
1846            }),
1847        }
1848    }
1849
1850    /// Reports whether this catalog entry is a progress source.
1851    pub fn is_progress_source(&self) -> bool {
1852        matches!(
1853            self,
1854            CatalogItem::Source(Source {
1855                data_source: DataSourceDesc::Progress,
1856                ..
1857            })
1858        )
1859    }
1860
1861    /// Collects the identifiers of the objects that were encountered when resolving names in the
1862    /// item's DDL statement.
1863    pub fn references(&self) -> &ResolvedIds {
1864        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1865        match self {
1866            CatalogItem::Func(_) => &*EMPTY,
1867            CatalogItem::Index(idx) => &idx.resolved_ids,
1868            CatalogItem::Sink(sink) => &sink.resolved_ids,
1869            CatalogItem::Source(source) => &source.resolved_ids,
1870            CatalogItem::Log(_) => &*EMPTY,
1871            CatalogItem::Table(table) => &table.resolved_ids,
1872            CatalogItem::Type(typ) => &typ.resolved_ids,
1873            CatalogItem::View(view) => &view.resolved_ids,
1874            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1875            CatalogItem::Secret(_) => &*EMPTY,
1876            CatalogItem::Connection(connection) => &connection.resolved_ids,
1877            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1878        }
1879    }
1880
1881    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1882    ///
1883    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1884    /// referenced. For example this will include any catalog objects used to implement functions
1885    /// and casts in the item.
1886    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1887        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1888        match self {
1889            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1890            // their implementation. However, currently there's no way to get that information.
1891            CatalogItem::Func(_) => {}
1892            CatalogItem::Index(_) => {}
1893            CatalogItem::Sink(_) => {}
1894            CatalogItem::Source(_) => {}
1895            CatalogItem::Log(_) => {}
1896            CatalogItem::Table(_) => {}
1897            CatalogItem::Type(_) => {}
1898            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1899            CatalogItem::MaterializedView(mview) => {
1900                uses.extend(mview.dependencies.0.iter().copied())
1901            }
1902            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1903            CatalogItem::Secret(_) => {}
1904            CatalogItem::Connection(_) => {}
1905        }
1906        uses
1907    }
1908
1909    /// Returns the connection ID that this item belongs to, if this item is
1910    /// temporary.
1911    pub fn conn_id(&self) -> Option<&ConnectionId> {
1912        match self {
1913            CatalogItem::View(view) => view.conn_id.as_ref(),
1914            CatalogItem::Index(index) => index.conn_id.as_ref(),
1915            CatalogItem::Table(table) => table.conn_id.as_ref(),
1916            CatalogItem::Log(_)
1917            | CatalogItem::Source(_)
1918            | CatalogItem::Sink(_)
1919            | CatalogItem::MaterializedView(_)
1920            | CatalogItem::Secret(_)
1921            | CatalogItem::Type(_)
1922            | CatalogItem::Func(_)
1923            | CatalogItem::Connection(_)
1924            | CatalogItem::ContinualTask(_) => None,
1925        }
1926    }
1927
1928    /// Sets the connection ID that this item belongs to, which makes it a
1929    /// temporary item.
1930    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1931        match self {
1932            CatalogItem::View(view) => view.conn_id = conn_id,
1933            CatalogItem::Index(index) => index.conn_id = conn_id,
1934            CatalogItem::Table(table) => table.conn_id = conn_id,
1935            CatalogItem::Log(_)
1936            | CatalogItem::Source(_)
1937            | CatalogItem::Sink(_)
1938            | CatalogItem::MaterializedView(_)
1939            | CatalogItem::Secret(_)
1940            | CatalogItem::Type(_)
1941            | CatalogItem::Func(_)
1942            | CatalogItem::Connection(_)
1943            | CatalogItem::ContinualTask(_) => (),
1944        }
1945    }
1946
1947    /// Indicates whether this item is temporary or not.
1948    pub fn is_temporary(&self) -> bool {
1949        self.conn_id().is_some()
1950    }
1951
1952    pub fn rename_schema_refs(
1953        &self,
1954        database_name: &str,
1955        cur_schema_name: &str,
1956        new_schema_name: &str,
1957    ) -> Result<CatalogItem, (String, String)> {
1958        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1959            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1960                .expect("invalid create sql persisted to catalog")
1961                .into_element()
1962                .ast;
1963
1964            // Rename all references to cur_schema_name.
1965            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1966                &mut create_stmt,
1967                database_name,
1968                cur_schema_name,
1969                new_schema_name,
1970            )?;
1971
1972            Ok(create_stmt.to_ast_string_stable())
1973        };
1974
1975        match self {
1976            CatalogItem::Table(i) => {
1977                let mut i = i.clone();
1978                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1979                Ok(CatalogItem::Table(i))
1980            }
1981            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1982            CatalogItem::Source(i) => {
1983                let mut i = i.clone();
1984                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1985                Ok(CatalogItem::Source(i))
1986            }
1987            CatalogItem::Sink(i) => {
1988                let mut i = i.clone();
1989                i.create_sql = do_rewrite(i.create_sql)?;
1990                Ok(CatalogItem::Sink(i))
1991            }
1992            CatalogItem::View(i) => {
1993                let mut i = i.clone();
1994                i.create_sql = do_rewrite(i.create_sql)?;
1995                Ok(CatalogItem::View(i))
1996            }
1997            CatalogItem::MaterializedView(i) => {
1998                let mut i = i.clone();
1999                i.create_sql = do_rewrite(i.create_sql)?;
2000                Ok(CatalogItem::MaterializedView(i))
2001            }
2002            CatalogItem::Index(i) => {
2003                let mut i = i.clone();
2004                i.create_sql = do_rewrite(i.create_sql)?;
2005                Ok(CatalogItem::Index(i))
2006            }
2007            CatalogItem::Secret(i) => {
2008                let mut i = i.clone();
2009                i.create_sql = do_rewrite(i.create_sql)?;
2010                Ok(CatalogItem::Secret(i))
2011            }
2012            CatalogItem::Connection(i) => {
2013                let mut i = i.clone();
2014                i.create_sql = do_rewrite(i.create_sql)?;
2015                Ok(CatalogItem::Connection(i))
2016            }
2017            CatalogItem::Type(i) => {
2018                let mut i = i.clone();
2019                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2020                Ok(CatalogItem::Type(i))
2021            }
2022            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2023            CatalogItem::ContinualTask(i) => {
2024                let mut i = i.clone();
2025                i.create_sql = do_rewrite(i.create_sql)?;
2026                Ok(CatalogItem::ContinualTask(i))
2027            }
2028        }
2029    }
2030
2031    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2032    /// (with the option of including the item's own name) or errors if request
2033    /// is ambiguous.
2034    pub fn rename_item_refs(
2035        &self,
2036        from: FullItemName,
2037        to_item_name: String,
2038        rename_self: bool,
2039    ) -> Result<CatalogItem, String> {
2040        let do_rewrite = |create_sql: String| -> Result<String, String> {
2041            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2042                .expect("invalid create sql persisted to catalog")
2043                .into_element()
2044                .ast;
2045            if rename_self {
2046                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2047            }
2048            // Determination of what constitutes an ambiguous request is done here.
2049            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2050            Ok(create_stmt.to_ast_string_stable())
2051        };
2052
2053        match self {
2054            CatalogItem::Table(i) => {
2055                let mut i = i.clone();
2056                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2057                Ok(CatalogItem::Table(i))
2058            }
2059            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2060            CatalogItem::Source(i) => {
2061                let mut i = i.clone();
2062                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2063                Ok(CatalogItem::Source(i))
2064            }
2065            CatalogItem::Sink(i) => {
2066                let mut i = i.clone();
2067                i.create_sql = do_rewrite(i.create_sql)?;
2068                Ok(CatalogItem::Sink(i))
2069            }
2070            CatalogItem::View(i) => {
2071                let mut i = i.clone();
2072                i.create_sql = do_rewrite(i.create_sql)?;
2073                Ok(CatalogItem::View(i))
2074            }
2075            CatalogItem::MaterializedView(i) => {
2076                let mut i = i.clone();
2077                i.create_sql = do_rewrite(i.create_sql)?;
2078                Ok(CatalogItem::MaterializedView(i))
2079            }
2080            CatalogItem::Index(i) => {
2081                let mut i = i.clone();
2082                i.create_sql = do_rewrite(i.create_sql)?;
2083                Ok(CatalogItem::Index(i))
2084            }
2085            CatalogItem::Secret(i) => {
2086                let mut i = i.clone();
2087                i.create_sql = do_rewrite(i.create_sql)?;
2088                Ok(CatalogItem::Secret(i))
2089            }
2090            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2091                unreachable!("{}s cannot be renamed", self.typ())
2092            }
2093            CatalogItem::Connection(i) => {
2094                let mut i = i.clone();
2095                i.create_sql = do_rewrite(i.create_sql)?;
2096                Ok(CatalogItem::Connection(i))
2097            }
2098            CatalogItem::ContinualTask(i) => {
2099                let mut i = i.clone();
2100                i.create_sql = do_rewrite(i.create_sql)?;
2101                Ok(CatalogItem::ContinualTask(i))
2102            }
2103        }
2104    }
2105
2106    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2107    /// an error if this item does not support retain history.
2108    pub fn update_retain_history(
2109        &mut self,
2110        value: Option<Value>,
2111        window: CompactionWindow,
2112    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2113        let update = |mut ast: &mut Statement<Raw>| {
2114            // Each statement type has unique option types. This macro handles them commonly.
2115            macro_rules! update_retain_history {
2116                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2117                    // Replace or add the option.
2118                    let pos = $stmt
2119                        .with_options
2120                        .iter()
2121                        // In case there are ever multiple, look for the last one.
2122                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2123                    if let Some(value) = value {
2124                        let next = mz_sql_parser::ast::$opt {
2125                            name: mz_sql_parser::ast::$name::RetainHistory,
2126                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2127                        };
2128                        if let Some(idx) = pos {
2129                            let previous = $stmt.with_options[idx].clone();
2130                            $stmt.with_options[idx] = next;
2131                            previous.value
2132                        } else {
2133                            $stmt.with_options.push(next);
2134                            None
2135                        }
2136                    } else {
2137                        if let Some(idx) = pos {
2138                            $stmt.with_options.swap_remove(idx).value
2139                        } else {
2140                            None
2141                        }
2142                    }
2143                }};
2144            }
2145            let previous = match &mut ast {
2146                Statement::CreateTable(stmt) => {
2147                    update_retain_history!(stmt, TableOption, TableOptionName)
2148                }
2149                Statement::CreateIndex(stmt) => {
2150                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2151                }
2152                Statement::CreateSource(stmt) => {
2153                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2154                }
2155                Statement::CreateMaterializedView(stmt) => {
2156                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2157                }
2158                _ => {
2159                    return Err(());
2160                }
2161            };
2162            Ok(previous)
2163        };
2164
2165        let res = self.update_sql(update)?;
2166        let cw = self
2167            .custom_logical_compaction_window_mut()
2168            .expect("item must have compaction window");
2169        *cw = Some(window);
2170        Ok(res)
2171    }
2172
2173    pub fn add_column(
2174        &mut self,
2175        name: ColumnName,
2176        typ: SqlColumnType,
2177        sql: RawDataType,
2178    ) -> Result<RelationVersion, PlanError> {
2179        let CatalogItem::Table(table) = self else {
2180            return Err(PlanError::Unsupported {
2181                feature: "adding columns to a non-Table".to_string(),
2182                discussion_no: None,
2183            });
2184        };
2185        let next_version = table.desc.add_column(name.clone(), typ);
2186
2187        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2188            Statement::CreateTable(stmt) => {
2189                let version = ColumnOptionDef {
2190                    name: None,
2191                    option: ColumnOption::Versioned {
2192                        action: ColumnVersioned::Added,
2193                        version: next_version.into(),
2194                    },
2195                };
2196                let column = ColumnDef {
2197                    name: name.into(),
2198                    data_type: sql,
2199                    collation: None,
2200                    options: vec![version],
2201                };
2202                stmt.columns.push(column);
2203                Ok(())
2204            }
2205            _ => Err(()),
2206        };
2207
2208        self.update_sql(update)
2209            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2210        Ok(next_version)
2211    }
2212
2213    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2214    /// otherwise returns f's result.
2215    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2216    where
2217        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2218    {
2219        let create_sql = match self {
2220            CatalogItem::Table(Table { create_sql, .. })
2221            | CatalogItem::Type(Type { create_sql, .. })
2222            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2223            CatalogItem::Sink(Sink { create_sql, .. })
2224            | CatalogItem::View(View { create_sql, .. })
2225            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2226            | CatalogItem::Index(Index { create_sql, .. })
2227            | CatalogItem::Secret(Secret { create_sql, .. })
2228            | CatalogItem::Connection(Connection { create_sql, .. })
2229            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2230            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2231        };
2232        let Some(create_sql) = create_sql else {
2233            return Err(());
2234        };
2235        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2236            .expect("non-system items must be parseable")
2237            .into_element()
2238            .ast;
2239        debug!("rewrite: {}", ast.to_ast_string_redacted());
2240        let t = f(&mut ast)?;
2241        *create_sql = ast.to_ast_string_stable();
2242        debug!("rewrote: {}", ast.to_ast_string_redacted());
2243        Ok(t)
2244    }
2245
2246    /// If the object is considered a "compute object"
2247    /// (i.e., it is managed by the compute controller),
2248    /// this function returns its cluster ID. Otherwise, it returns nothing.
2249    ///
2250    /// This function differs from `cluster_id` because while all
2251    /// compute objects run on a cluster, the converse is not true.
2252    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2253        match self {
2254            CatalogItem::Index(index) => Some(index.cluster_id),
2255            CatalogItem::Table(_)
2256            | CatalogItem::Source(_)
2257            | CatalogItem::Log(_)
2258            | CatalogItem::View(_)
2259            | CatalogItem::MaterializedView(_)
2260            | CatalogItem::Sink(_)
2261            | CatalogItem::Type(_)
2262            | CatalogItem::Func(_)
2263            | CatalogItem::Secret(_)
2264            | CatalogItem::Connection(_)
2265            | CatalogItem::ContinualTask(_) => None,
2266        }
2267    }
2268
2269    pub fn cluster_id(&self) -> Option<ClusterId> {
2270        match self {
2271            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2272            CatalogItem::Index(index) => Some(index.cluster_id),
2273            CatalogItem::Source(source) => match &source.data_source {
2274                DataSourceDesc::Ingestion { cluster_id, .. }
2275                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2276                // This is somewhat of a lie because the export runs on the same
2277                // cluster as its ingestion but we don't yet have a way of
2278                // cross-referencing the items
2279                DataSourceDesc::IngestionExport { .. } => None,
2280                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2281                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2282            },
2283            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2284            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2285            CatalogItem::Table(_)
2286            | CatalogItem::Log(_)
2287            | CatalogItem::View(_)
2288            | CatalogItem::Type(_)
2289            | CatalogItem::Func(_)
2290            | CatalogItem::Secret(_)
2291            | CatalogItem::Connection(_) => None,
2292        }
2293    }
2294
2295    /// The custom compaction window, if any has been set. This does not reflect any propagated
2296    /// compaction window (i.e., source -> subsource).
2297    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2298        match self {
2299            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2300            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2301            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2302            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2303            CatalogItem::Log(_)
2304            | CatalogItem::View(_)
2305            | CatalogItem::Sink(_)
2306            | CatalogItem::Type(_)
2307            | CatalogItem::Func(_)
2308            | CatalogItem::Secret(_)
2309            | CatalogItem::Connection(_)
2310            | CatalogItem::ContinualTask(_) => None,
2311        }
2312    }
2313
2314    /// Mutable access to the custom compaction window, or None if this type does not support custom
2315    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2316    /// subsource).
2317    pub fn custom_logical_compaction_window_mut(
2318        &mut self,
2319    ) -> Option<&mut Option<CompactionWindow>> {
2320        let cw = match self {
2321            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2322            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2323            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2324            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2325            CatalogItem::Log(_)
2326            | CatalogItem::View(_)
2327            | CatalogItem::Sink(_)
2328            | CatalogItem::Type(_)
2329            | CatalogItem::Func(_)
2330            | CatalogItem::Secret(_)
2331            | CatalogItem::Connection(_)
2332            | CatalogItem::ContinualTask(_) => return None,
2333        };
2334        Some(cw)
2335    }
2336
2337    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2338    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2339    ///
2340    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2341    /// sensible default (currently 1s).
2342    ///
2343    /// For objects that do not have the concept of compaction window, return None.
2344    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2345        let custom_logical_compaction_window = match self {
2346            CatalogItem::Table(_)
2347            | CatalogItem::Source(_)
2348            | CatalogItem::Index(_)
2349            | CatalogItem::MaterializedView(_)
2350            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2351            CatalogItem::Log(_)
2352            | CatalogItem::View(_)
2353            | CatalogItem::Sink(_)
2354            | CatalogItem::Type(_)
2355            | CatalogItem::Func(_)
2356            | CatalogItem::Secret(_)
2357            | CatalogItem::Connection(_) => return None,
2358        };
2359        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2360    }
2361
2362    /// Whether the item's logical compaction window
2363    /// is controlled by the METRICS_RETENTION
2364    /// system var.
2365    pub fn is_retained_metrics_object(&self) -> bool {
2366        match self {
2367            CatalogItem::Table(table) => table.is_retained_metrics_object,
2368            CatalogItem::Source(source) => source.is_retained_metrics_object,
2369            CatalogItem::Index(index) => index.is_retained_metrics_object,
2370            CatalogItem::Log(_)
2371            | CatalogItem::View(_)
2372            | CatalogItem::MaterializedView(_)
2373            | CatalogItem::Sink(_)
2374            | CatalogItem::Type(_)
2375            | CatalogItem::Func(_)
2376            | CatalogItem::Secret(_)
2377            | CatalogItem::Connection(_)
2378            | CatalogItem::ContinualTask(_) => false,
2379        }
2380    }
2381
2382    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2383        match self {
2384            CatalogItem::Table(table) => {
2385                let create_sql = table
2386                    .create_sql
2387                    .clone()
2388                    .expect("builtin tables cannot be serialized");
2389                let mut collections = table.collections.clone();
2390                let global_id = collections
2391                    .remove(&RelationVersion::root())
2392                    .expect("at least one version");
2393                (create_sql, global_id, collections)
2394            }
2395            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2396            CatalogItem::Source(source) => {
2397                assert!(
2398                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2399                    "cannot serialize introspection/builtin sources",
2400                );
2401                let create_sql = source
2402                    .create_sql
2403                    .clone()
2404                    .expect("builtin sources cannot be serialized");
2405                (create_sql, source.global_id, BTreeMap::new())
2406            }
2407            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2408            CatalogItem::MaterializedView(mview) => {
2409                let mut collections = mview.collections.clone();
2410                let global_id = collections
2411                    .remove(&RelationVersion::root())
2412                    .expect("at least one version");
2413                (mview.create_sql.clone(), global_id, collections)
2414            }
2415            CatalogItem::Index(index) => {
2416                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2417            }
2418            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2419            CatalogItem::Type(typ) => {
2420                let create_sql = typ
2421                    .create_sql
2422                    .clone()
2423                    .expect("builtin types cannot be serialized");
2424                (create_sql, typ.global_id, BTreeMap::new())
2425            }
2426            CatalogItem::Secret(secret) => {
2427                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2428            }
2429            CatalogItem::Connection(connection) => (
2430                connection.create_sql.clone(),
2431                connection.global_id,
2432                BTreeMap::new(),
2433            ),
2434            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2435            CatalogItem::ContinualTask(ct) => {
2436                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2437            }
2438        }
2439    }
2440
2441    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2442        match self {
2443            CatalogItem::Table(mut table) => {
2444                let create_sql = table
2445                    .create_sql
2446                    .expect("builtin tables cannot be serialized");
2447                let global_id = table
2448                    .collections
2449                    .remove(&RelationVersion::root())
2450                    .expect("at least one version");
2451                (create_sql, global_id, table.collections)
2452            }
2453            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2454            CatalogItem::Source(source) => {
2455                assert!(
2456                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2457                    "cannot serialize introspection/builtin sources",
2458                );
2459                let create_sql = source
2460                    .create_sql
2461                    .expect("builtin sources cannot be serialized");
2462                (create_sql, source.global_id, BTreeMap::new())
2463            }
2464            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2465            CatalogItem::MaterializedView(mut mview) => {
2466                let global_id = mview
2467                    .collections
2468                    .remove(&RelationVersion::root())
2469                    .expect("at least one version");
2470                (mview.create_sql, global_id, mview.collections)
2471            }
2472            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2473            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2474            CatalogItem::Type(typ) => {
2475                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2476                (create_sql, typ.global_id, BTreeMap::new())
2477            }
2478            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2479            CatalogItem::Connection(connection) => {
2480                (connection.create_sql, connection.global_id, BTreeMap::new())
2481            }
2482            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2483            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2484        }
2485    }
2486
2487    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2488    /// not have versions or if the version does not exist.
2489    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2490        let collections = match self {
2491            CatalogItem::MaterializedView(mv) => &mv.collections,
2492            CatalogItem::Table(table) => &table.collections,
2493            CatalogItem::Source(source) => return Some(source.global_id),
2494            CatalogItem::Log(log) => return Some(log.global_id),
2495            CatalogItem::View(view) => return Some(view.global_id),
2496            CatalogItem::Sink(sink) => return Some(sink.global_id),
2497            CatalogItem::Index(index) => return Some(index.global_id),
2498            CatalogItem::Type(ty) => return Some(ty.global_id),
2499            CatalogItem::Func(func) => return Some(func.global_id),
2500            CatalogItem::Secret(secret) => return Some(secret.global_id),
2501            CatalogItem::Connection(conn) => return Some(conn.global_id),
2502            CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2503        };
2504        match version {
2505            RelationVersionSelector::Latest => collections.values().last().copied(),
2506            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2507        }
2508    }
2509}
2510
2511impl CatalogEntry {
2512    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`],
2513    /// returning an error if this [`CatalogEntry`] does not produce rows.
2514    ///
2515    /// If you need to get the [`RelationDesc`] for a specific version, see [`CatalogItem::desc`].
2516    pub fn desc_latest(
2517        &self,
2518        name: &FullItemName,
2519    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2520        self.item.desc(name, RelationVersionSelector::Latest)
2521    }
2522
2523    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2524    /// produces rows.
2525    pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2526        self.item.desc_opt(RelationVersionSelector::Latest)
2527    }
2528
2529    /// Reports if the item has columns.
2530    pub fn has_columns(&self) -> bool {
2531        match self.item() {
2532            CatalogItem::Type(Type { details, .. }) => {
2533                matches!(details.typ, CatalogType::Record { .. })
2534            }
2535            _ => self.desc_opt_latest().is_some(),
2536        }
2537    }
2538
2539    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2540    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2541        self.item.func(self)
2542    }
2543
2544    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2545    pub fn index(&self) -> Option<&Index> {
2546        match self.item() {
2547            CatalogItem::Index(idx) => Some(idx),
2548            _ => None,
2549        }
2550    }
2551
2552    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2553    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2554        match self.item() {
2555            CatalogItem::MaterializedView(mv) => Some(mv),
2556            _ => None,
2557        }
2558    }
2559
2560    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2561    pub fn table(&self) -> Option<&Table> {
2562        match self.item() {
2563            CatalogItem::Table(tbl) => Some(tbl),
2564            _ => None,
2565        }
2566    }
2567
2568    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2569    pub fn source(&self) -> Option<&Source> {
2570        match self.item() {
2571            CatalogItem::Source(src) => Some(src),
2572            _ => None,
2573        }
2574    }
2575
2576    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2577    pub fn sink(&self) -> Option<&Sink> {
2578        match self.item() {
2579            CatalogItem::Sink(sink) => Some(sink),
2580            _ => None,
2581        }
2582    }
2583
2584    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2585    pub fn secret(&self) -> Option<&Secret> {
2586        match self.item() {
2587            CatalogItem::Secret(secret) => Some(secret),
2588            _ => None,
2589        }
2590    }
2591
2592    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2593        match self.item() {
2594            CatalogItem::Connection(connection) => Ok(connection),
2595            _ => {
2596                let db_name = match self.name().qualifiers.database_spec {
2597                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2598                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2599                };
2600                Err(SqlCatalogError::UnknownConnection(format!(
2601                    "{}{}.{}",
2602                    db_name,
2603                    self.name().qualifiers.schema_spec,
2604                    self.name().item
2605                )))
2606            }
2607        }
2608    }
2609
2610    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2611    /// this `CatalogEntry`, if any.
2612    pub fn source_desc(
2613        &self,
2614    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2615        self.item.source_desc(self)
2616    }
2617
2618    /// Reports whether this catalog entry is a connection.
2619    pub fn is_connection(&self) -> bool {
2620        matches!(self.item(), CatalogItem::Connection(_))
2621    }
2622
2623    /// Reports whether this catalog entry is a table.
2624    pub fn is_table(&self) -> bool {
2625        matches!(self.item(), CatalogItem::Table(_))
2626    }
2627
2628    /// Reports whether this catalog entry is a source. Note that this includes
2629    /// subsources.
2630    pub fn is_source(&self) -> bool {
2631        matches!(self.item(), CatalogItem::Source(_))
2632    }
2633
2634    /// Reports whether this catalog entry is a subsource and, if it is, the
2635    /// ingestion it is an export of, as well as the item it exports.
2636    pub fn subsource_details(
2637        &self,
2638    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2639        match &self.item() {
2640            CatalogItem::Source(source) => match &source.data_source {
2641                DataSourceDesc::IngestionExport {
2642                    ingestion_id,
2643                    external_reference,
2644                    details,
2645                    data_config: _,
2646                } => Some((*ingestion_id, external_reference, details)),
2647                _ => None,
2648            },
2649            _ => None,
2650        }
2651    }
2652
2653    /// Reports whether this catalog entry is a source export and, if it is, the
2654    /// ingestion it is an export of, as well as the item it exports.
2655    pub fn source_export_details(
2656        &self,
2657    ) -> Option<(
2658        CatalogItemId,
2659        &UnresolvedItemName,
2660        &SourceExportDetails,
2661        &SourceExportDataConfig<ReferencedConnection>,
2662    )> {
2663        match &self.item() {
2664            CatalogItem::Source(source) => match &source.data_source {
2665                DataSourceDesc::IngestionExport {
2666                    ingestion_id,
2667                    external_reference,
2668                    details,
2669                    data_config,
2670                } => Some((*ingestion_id, external_reference, details, data_config)),
2671                _ => None,
2672            },
2673            CatalogItem::Table(table) => match &table.data_source {
2674                TableDataSource::DataSource {
2675                    desc:
2676                        DataSourceDesc::IngestionExport {
2677                            ingestion_id,
2678                            external_reference,
2679                            details,
2680                            data_config,
2681                        },
2682                    timeline: _,
2683                } => Some((*ingestion_id, external_reference, details, data_config)),
2684                _ => None,
2685            },
2686            _ => None,
2687        }
2688    }
2689
2690    /// Reports whether this catalog entry is a progress source.
2691    pub fn is_progress_source(&self) -> bool {
2692        self.item().is_progress_source()
2693    }
2694
2695    /// Returns the `GlobalId` of all of this entry's progress ID.
2696    pub fn progress_id(&self) -> Option<CatalogItemId> {
2697        match &self.item() {
2698            CatalogItem::Source(source) => match &source.data_source {
2699                DataSourceDesc::Ingestion { .. } => Some(self.id),
2700                DataSourceDesc::OldSyntaxIngestion {
2701                    progress_subsource, ..
2702                } => Some(*progress_subsource),
2703                DataSourceDesc::IngestionExport { .. }
2704                | DataSourceDesc::Introspection(_)
2705                | DataSourceDesc::Progress
2706                | DataSourceDesc::Webhook { .. } => None,
2707            },
2708            CatalogItem::Table(_)
2709            | CatalogItem::Log(_)
2710            | CatalogItem::View(_)
2711            | CatalogItem::MaterializedView(_)
2712            | CatalogItem::Sink(_)
2713            | CatalogItem::Index(_)
2714            | CatalogItem::Type(_)
2715            | CatalogItem::Func(_)
2716            | CatalogItem::Secret(_)
2717            | CatalogItem::Connection(_)
2718            | CatalogItem::ContinualTask(_) => None,
2719        }
2720    }
2721
2722    /// Reports whether this catalog entry is a sink.
2723    pub fn is_sink(&self) -> bool {
2724        matches!(self.item(), CatalogItem::Sink(_))
2725    }
2726
2727    /// Reports whether this catalog entry is a materialized view.
2728    pub fn is_materialized_view(&self) -> bool {
2729        matches!(self.item(), CatalogItem::MaterializedView(_))
2730    }
2731
2732    /// Reports whether this catalog entry is a view.
2733    pub fn is_view(&self) -> bool {
2734        matches!(self.item(), CatalogItem::View(_))
2735    }
2736
2737    /// Reports whether this catalog entry is a secret.
2738    pub fn is_secret(&self) -> bool {
2739        matches!(self.item(), CatalogItem::Secret(_))
2740    }
2741
2742    /// Reports whether this catalog entry is an introspection source.
2743    pub fn is_introspection_source(&self) -> bool {
2744        matches!(self.item(), CatalogItem::Log(_))
2745    }
2746
2747    /// Reports whether this catalog entry is an index.
2748    pub fn is_index(&self) -> bool {
2749        matches!(self.item(), CatalogItem::Index(_))
2750    }
2751
2752    /// Reports whether this catalog entry is a continual task.
2753    pub fn is_continual_task(&self) -> bool {
2754        matches!(self.item(), CatalogItem::ContinualTask(_))
2755    }
2756
2757    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2758    pub fn is_relation(&self) -> bool {
2759        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2760    }
2761
2762    /// Collects the identifiers of the objects that were encountered when
2763    /// resolving names in the item's DDL statement.
2764    pub fn references(&self) -> &ResolvedIds {
2765        self.item.references()
2766    }
2767
2768    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2769    ///
2770    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2771    /// referenced. For example this will include any catalog objects used to implement functions
2772    /// and casts in the item.
2773    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2774        self.item.uses()
2775    }
2776
2777    /// Returns the `CatalogItem` associated with this catalog entry.
2778    pub fn item(&self) -> &CatalogItem {
2779        &self.item
2780    }
2781
2782    /// Returns the [`CatalogItemId`] of this catalog entry.
2783    pub fn id(&self) -> CatalogItemId {
2784        self.id
2785    }
2786
2787    /// Returns all of the [`GlobalId`]s associated with this item.
2788    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2789        self.item().global_ids()
2790    }
2791
2792    pub fn latest_global_id(&self) -> GlobalId {
2793        self.item().latest_global_id()
2794    }
2795
2796    /// Returns the OID of this catalog entry.
2797    pub fn oid(&self) -> u32 {
2798        self.oid
2799    }
2800
2801    /// Returns the fully qualified name of this catalog entry.
2802    pub fn name(&self) -> &QualifiedItemName {
2803        &self.name
2804    }
2805
2806    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2807    pub fn referenced_by(&self) -> &[CatalogItemId] {
2808        &self.referenced_by
2809    }
2810
2811    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2812    pub fn used_by(&self) -> &[CatalogItemId] {
2813        &self.used_by
2814    }
2815
2816    /// Returns the connection ID that this item belongs to, if this item is
2817    /// temporary.
2818    pub fn conn_id(&self) -> Option<&ConnectionId> {
2819        self.item.conn_id()
2820    }
2821
2822    /// Returns the role ID of the entry owner.
2823    pub fn owner_id(&self) -> &RoleId {
2824        &self.owner_id
2825    }
2826
2827    /// Returns the privileges of the entry.
2828    pub fn privileges(&self) -> &PrivilegeMap {
2829        &self.privileges
2830    }
2831}
2832
2833#[derive(Debug, Clone, Default)]
2834pub struct CommentsMap {
2835    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2836}
2837
2838impl CommentsMap {
2839    pub fn update_comment(
2840        &mut self,
2841        object_id: CommentObjectId,
2842        sub_component: Option<usize>,
2843        comment: Option<String>,
2844    ) -> Option<String> {
2845        let object_comments = self.map.entry(object_id).or_default();
2846
2847        // Either replace the existing comment, or remove it if comment is None/NULL.
2848        let (empty, prev) = if let Some(comment) = comment {
2849            let prev = object_comments.insert(sub_component, comment);
2850            (false, prev)
2851        } else {
2852            let prev = object_comments.remove(&sub_component);
2853            (object_comments.is_empty(), prev)
2854        };
2855
2856        // Cleanup entries that are now empty.
2857        if empty {
2858            self.map.remove(&object_id);
2859        }
2860
2861        // Return the previous comment, if there was one, for easy removal.
2862        prev
2863    }
2864
2865    /// Remove all comments for `object_id` from the map.
2866    ///
2867    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2868    /// relations you can also have comments on the individual columns. Dropping the comments for a
2869    /// relation will also drop all of the comments on any columns.
2870    pub fn drop_comments(
2871        &mut self,
2872        object_ids: &BTreeSet<CommentObjectId>,
2873    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2874        let mut removed_comments = Vec::new();
2875
2876        for object_id in object_ids {
2877            if let Some(comments) = self.map.remove(object_id) {
2878                let removed = comments
2879                    .into_iter()
2880                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2881                removed_comments.extend(removed);
2882            }
2883        }
2884
2885        removed_comments
2886    }
2887
2888    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2889        self.map
2890            .iter()
2891            .map(|(id, comments)| {
2892                comments
2893                    .iter()
2894                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2895            })
2896            .flatten()
2897    }
2898
2899    pub fn get_object_comments(
2900        &self,
2901        object_id: CommentObjectId,
2902    ) -> Option<&BTreeMap<Option<usize>, String>> {
2903        self.map.get(&object_id)
2904    }
2905}
2906
2907impl Serialize for CommentsMap {
2908    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2909    where
2910        S: serde::Serializer,
2911    {
2912        let comment_count = self
2913            .map
2914            .iter()
2915            .map(|(_object_id, comments)| comments.len())
2916            .sum();
2917
2918        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2919        for (object_id, sub) in &self.map {
2920            for (sub_component, comment) in sub {
2921                seq.serialize_element(&(
2922                    format!("{object_id:?}"),
2923                    format!("{sub_component:?}"),
2924                    comment,
2925                ))?;
2926            }
2927        }
2928        seq.end()
2929    }
2930}
2931
2932#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2933pub struct DefaultPrivileges {
2934    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2935    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2936}
2937
2938// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2939// map_key_to_string.
2940#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2941struct RoleDefaultPrivileges(
2942    /// Denormalized, the key is the grantee Role.
2943    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2944    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2945);
2946
2947impl Deref for RoleDefaultPrivileges {
2948    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2949
2950    fn deref(&self) -> &Self::Target {
2951        &self.0
2952    }
2953}
2954
2955impl DerefMut for RoleDefaultPrivileges {
2956    fn deref_mut(&mut self) -> &mut Self::Target {
2957        &mut self.0
2958    }
2959}
2960
2961impl DefaultPrivileges {
2962    /// Add a new default privilege into the set of all default privileges.
2963    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2964        if privilege.acl_mode.is_empty() {
2965            return;
2966        }
2967
2968        let privileges = self.privileges.entry(object).or_default();
2969        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2970            default_privilege.acl_mode |= privilege.acl_mode;
2971        } else {
2972            privileges.insert(privilege.grantee, privilege);
2973        }
2974    }
2975
2976    /// Revoke a default privilege from the set of all default privileges.
2977    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2978        if let Some(privileges) = self.privileges.get_mut(object) {
2979            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2980                default_privilege.acl_mode =
2981                    default_privilege.acl_mode.difference(privilege.acl_mode);
2982                if default_privilege.acl_mode.is_empty() {
2983                    privileges.remove(&privilege.grantee);
2984                }
2985            }
2986            if privileges.is_empty() {
2987                self.privileges.remove(object);
2988            }
2989        }
2990    }
2991
2992    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2993    /// any exist.
2994    pub fn get_privileges_for_grantee(
2995        &self,
2996        object: &DefaultPrivilegeObject,
2997        grantee: &RoleId,
2998    ) -> Option<&AclMode> {
2999        self.privileges
3000            .get(object)
3001            .and_then(|privileges| privileges.get(grantee))
3002            .map(|privilege| &privilege.acl_mode)
3003    }
3004
3005    /// Get all default privileges that apply to the provided object details.
3006    pub fn get_applicable_privileges(
3007        &self,
3008        role_id: RoleId,
3009        database_id: Option<DatabaseId>,
3010        schema_id: Option<SchemaId>,
3011        object_type: mz_sql::catalog::ObjectType,
3012    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3013        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
3014        // don't require the caller to worry about that and we will map their `object_type` to the
3015        // correct type for privileges.
3016        let privilege_object_type = if object_type.is_relation() {
3017            mz_sql::catalog::ObjectType::Table
3018        } else {
3019            object_type
3020        };
3021        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3022
3023        // Collect all entries that apply to the provided object details.
3024        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3025        // entries in the vec below. That's OK because we consolidate the results after.
3026        [
3027            DefaultPrivilegeObject {
3028                role_id,
3029                database_id,
3030                schema_id,
3031                object_type: privilege_object_type,
3032            },
3033            DefaultPrivilegeObject {
3034                role_id,
3035                database_id,
3036                schema_id: None,
3037                object_type: privilege_object_type,
3038            },
3039            DefaultPrivilegeObject {
3040                role_id,
3041                database_id: None,
3042                schema_id: None,
3043                object_type: privilege_object_type,
3044            },
3045            DefaultPrivilegeObject {
3046                role_id: RoleId::Public,
3047                database_id,
3048                schema_id,
3049                object_type: privilege_object_type,
3050            },
3051            DefaultPrivilegeObject {
3052                role_id: RoleId::Public,
3053                database_id,
3054                schema_id: None,
3055                object_type: privilege_object_type,
3056            },
3057            DefaultPrivilegeObject {
3058                role_id: RoleId::Public,
3059                database_id: None,
3060                schema_id: None,
3061                object_type: privilege_object_type,
3062            },
3063        ]
3064        .into_iter()
3065        .filter_map(|object| self.privileges.get(&object))
3066        .flat_map(|acl_map| acl_map.values())
3067        // Consolidate privileges with a common grantee.
3068        .fold(
3069            BTreeMap::new(),
3070            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3071                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3072                *accum_acl_mode |= *acl_mode;
3073                accum
3074            },
3075        )
3076        .into_iter()
3077        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3078        // default privilege has an object type of Table, then it may contain privileges valid for
3079        // tables but not other relations. If the passed in object type is another relation, then
3080        // we need to remove any privilege that is not valid for the specified relation.
3081        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3082        // Filter out empty privileges.
3083        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3084        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3085            grantee: *grantee,
3086            acl_mode,
3087        })
3088    }
3089
3090    pub fn iter(
3091        &self,
3092    ) -> impl Iterator<
3093        Item = (
3094            &DefaultPrivilegeObject,
3095            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3096        ),
3097    > {
3098        self.privileges
3099            .iter()
3100            .map(|(object, acl_map)| (object, acl_map.values()))
3101    }
3102}
3103
3104#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3105pub struct ClusterConfig {
3106    pub variant: ClusterVariant,
3107    pub workload_class: Option<String>,
3108}
3109
3110impl ClusterConfig {
3111    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3112        match &self.variant {
3113            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3114            ClusterVariant::Unmanaged => None,
3115        }
3116    }
3117}
3118
3119impl From<ClusterConfig> for durable::ClusterConfig {
3120    fn from(config: ClusterConfig) -> Self {
3121        Self {
3122            variant: config.variant.into(),
3123            workload_class: config.workload_class,
3124        }
3125    }
3126}
3127
3128impl From<durable::ClusterConfig> for ClusterConfig {
3129    fn from(config: durable::ClusterConfig) -> Self {
3130        Self {
3131            variant: config.variant.into(),
3132            workload_class: config.workload_class,
3133        }
3134    }
3135}
3136
3137#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3138pub struct ClusterVariantManaged {
3139    pub size: String,
3140    pub availability_zones: Vec<String>,
3141    pub logging: ReplicaLogging,
3142    pub replication_factor: u32,
3143    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3144    pub schedule: ClusterSchedule,
3145}
3146
3147impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3148    fn from(managed: ClusterVariantManaged) -> Self {
3149        Self {
3150            size: managed.size,
3151            availability_zones: managed.availability_zones,
3152            logging: managed.logging,
3153            replication_factor: managed.replication_factor,
3154            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3155            schedule: managed.schedule,
3156        }
3157    }
3158}
3159
3160impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3161    fn from(managed: durable::ClusterVariantManaged) -> Self {
3162        Self {
3163            size: managed.size,
3164            availability_zones: managed.availability_zones,
3165            logging: managed.logging,
3166            replication_factor: managed.replication_factor,
3167            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3168            schedule: managed.schedule,
3169        }
3170    }
3171}
3172
3173#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3174pub enum ClusterVariant {
3175    Managed(ClusterVariantManaged),
3176    Unmanaged,
3177}
3178
3179impl From<ClusterVariant> for durable::ClusterVariant {
3180    fn from(variant: ClusterVariant) -> Self {
3181        match variant {
3182            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3183            ClusterVariant::Unmanaged => Self::Unmanaged,
3184        }
3185    }
3186}
3187
3188impl From<durable::ClusterVariant> for ClusterVariant {
3189    fn from(variant: durable::ClusterVariant) -> Self {
3190        match variant {
3191            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3192            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3193        }
3194    }
3195}
3196
3197impl mz_sql::catalog::CatalogDatabase for Database {
3198    fn name(&self) -> &str {
3199        &self.name
3200    }
3201
3202    fn id(&self) -> DatabaseId {
3203        self.id
3204    }
3205
3206    fn has_schemas(&self) -> bool {
3207        !self.schemas_by_name.is_empty()
3208    }
3209
3210    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3211        &self.schemas_by_name
3212    }
3213
3214    // `as` is ok to use to cast to a trait object.
3215    #[allow(clippy::as_conversions)]
3216    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3217        self.schemas_by_id
3218            .values()
3219            .map(|schema| schema as &dyn CatalogSchema)
3220            .collect()
3221    }
3222
3223    fn owner_id(&self) -> RoleId {
3224        self.owner_id
3225    }
3226
3227    fn privileges(&self) -> &PrivilegeMap {
3228        &self.privileges
3229    }
3230}
3231
3232impl mz_sql::catalog::CatalogSchema for Schema {
3233    fn database(&self) -> &ResolvedDatabaseSpecifier {
3234        &self.name.database
3235    }
3236
3237    fn name(&self) -> &QualifiedSchemaName {
3238        &self.name
3239    }
3240
3241    fn id(&self) -> &SchemaSpecifier {
3242        &self.id
3243    }
3244
3245    fn has_items(&self) -> bool {
3246        !self.items.is_empty()
3247    }
3248
3249    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3250        Box::new(
3251            self.items
3252                .values()
3253                .chain(self.functions.values())
3254                .chain(self.types.values())
3255                .copied(),
3256        )
3257    }
3258
3259    fn owner_id(&self) -> RoleId {
3260        self.owner_id
3261    }
3262
3263    fn privileges(&self) -> &PrivilegeMap {
3264        &self.privileges
3265    }
3266}
3267
3268impl mz_sql::catalog::CatalogRole for Role {
3269    fn name(&self) -> &str {
3270        &self.name
3271    }
3272
3273    fn id(&self) -> RoleId {
3274        self.id
3275    }
3276
3277    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3278        &self.membership.map
3279    }
3280
3281    fn attributes(&self) -> &RoleAttributes {
3282        &self.attributes
3283    }
3284
3285    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3286        &self.vars.map
3287    }
3288}
3289
3290impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3291    fn name(&self) -> &str {
3292        &self.name
3293    }
3294
3295    fn id(&self) -> NetworkPolicyId {
3296        self.id
3297    }
3298
3299    fn owner_id(&self) -> RoleId {
3300        self.owner_id
3301    }
3302
3303    fn privileges(&self) -> &PrivilegeMap {
3304        &self.privileges
3305    }
3306}
3307
3308impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3309    fn name(&self) -> &str {
3310        &self.name
3311    }
3312
3313    fn id(&self) -> ClusterId {
3314        self.id
3315    }
3316
3317    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3318        &self.bound_objects
3319    }
3320
3321    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3322        &self.replica_id_by_name_
3323    }
3324
3325    // `as` is ok to use to cast to a trait object.
3326    #[allow(clippy::as_conversions)]
3327    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3328        self.replicas()
3329            .map(|replica| replica as &dyn CatalogClusterReplica)
3330            .collect()
3331    }
3332
3333    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3334        self.replica(id).expect("catalog out of sync")
3335    }
3336
3337    fn owner_id(&self) -> RoleId {
3338        self.owner_id
3339    }
3340
3341    fn privileges(&self) -> &PrivilegeMap {
3342        &self.privileges
3343    }
3344
3345    fn is_managed(&self) -> bool {
3346        self.is_managed()
3347    }
3348
3349    fn managed_size(&self) -> Option<&str> {
3350        match &self.config.variant {
3351            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3352            _ => None,
3353        }
3354    }
3355
3356    fn schedule(&self) -> Option<&ClusterSchedule> {
3357        match &self.config.variant {
3358            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3359            _ => None,
3360        }
3361    }
3362
3363    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3364        self.try_to_plan()
3365    }
3366}
3367
3368impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3369    fn name(&self) -> &str {
3370        &self.name
3371    }
3372
3373    fn cluster_id(&self) -> ClusterId {
3374        self.cluster_id
3375    }
3376
3377    fn replica_id(&self) -> ReplicaId {
3378        self.replica_id
3379    }
3380
3381    fn owner_id(&self) -> RoleId {
3382        self.owner_id
3383    }
3384
3385    fn internal(&self) -> bool {
3386        self.config.location.internal()
3387    }
3388}
3389
3390impl mz_sql::catalog::CatalogItem for CatalogEntry {
3391    fn name(&self) -> &QualifiedItemName {
3392        self.name()
3393    }
3394
3395    fn id(&self) -> CatalogItemId {
3396        self.id()
3397    }
3398
3399    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3400        Box::new(self.global_ids())
3401    }
3402
3403    fn oid(&self) -> u32 {
3404        self.oid()
3405    }
3406
3407    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3408        self.func()
3409    }
3410
3411    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3412        self.source_desc()
3413    }
3414
3415    fn connection(
3416        &self,
3417    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3418    {
3419        Ok(self.connection()?.details.to_connection())
3420    }
3421
3422    fn create_sql(&self) -> &str {
3423        match self.item() {
3424            CatalogItem::Table(Table { create_sql, .. }) => {
3425                create_sql.as_deref().unwrap_or("<builtin>")
3426            }
3427            CatalogItem::Source(Source { create_sql, .. }) => {
3428                create_sql.as_deref().unwrap_or("<builtin>")
3429            }
3430            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3431            CatalogItem::View(View { create_sql, .. }) => create_sql,
3432            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3433            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3434            CatalogItem::Type(Type { create_sql, .. }) => {
3435                create_sql.as_deref().unwrap_or("<builtin>")
3436            }
3437            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3438            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3439            CatalogItem::Func(_) => "<builtin>",
3440            CatalogItem::Log(_) => "<builtin>",
3441            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3442        }
3443    }
3444
3445    fn item_type(&self) -> SqlCatalogItemType {
3446        self.item().typ()
3447    }
3448
3449    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3450        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3451            Some((keys, *on))
3452        } else {
3453            None
3454        }
3455    }
3456
3457    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3458        if let CatalogItem::Table(Table {
3459            data_source: TableDataSource::TableWrites { defaults },
3460            ..
3461        }) = self.item()
3462        {
3463            Some(defaults.as_slice())
3464        } else {
3465            None
3466        }
3467    }
3468
3469    fn replacement_target(&self) -> Option<CatalogItemId> {
3470        if let CatalogItem::MaterializedView(mv) = self.item() {
3471            mv.replacement_target
3472        } else {
3473            None
3474        }
3475    }
3476
3477    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3478        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3479            Some(details)
3480        } else {
3481            None
3482        }
3483    }
3484
3485    fn references(&self) -> &ResolvedIds {
3486        self.references()
3487    }
3488
3489    fn uses(&self) -> BTreeSet<CatalogItemId> {
3490        self.uses()
3491    }
3492
3493    fn referenced_by(&self) -> &[CatalogItemId] {
3494        self.referenced_by()
3495    }
3496
3497    fn used_by(&self) -> &[CatalogItemId] {
3498        self.used_by()
3499    }
3500
3501    fn subsource_details(
3502        &self,
3503    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3504        self.subsource_details()
3505    }
3506
3507    fn source_export_details(
3508        &self,
3509    ) -> Option<(
3510        CatalogItemId,
3511        &UnresolvedItemName,
3512        &SourceExportDetails,
3513        &SourceExportDataConfig<ReferencedConnection>,
3514    )> {
3515        self.source_export_details()
3516    }
3517
3518    fn is_progress_source(&self) -> bool {
3519        self.is_progress_source()
3520    }
3521
3522    fn progress_id(&self) -> Option<CatalogItemId> {
3523        self.progress_id()
3524    }
3525
3526    fn owner_id(&self) -> RoleId {
3527        self.owner_id
3528    }
3529
3530    fn privileges(&self) -> &PrivilegeMap {
3531        &self.privileges
3532    }
3533
3534    fn cluster_id(&self) -> Option<ClusterId> {
3535        self.item().cluster_id()
3536    }
3537
3538    fn at_version(
3539        &self,
3540        version: RelationVersionSelector,
3541    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3542        Box::new(CatalogCollectionEntry {
3543            entry: self.clone(),
3544            version,
3545        })
3546    }
3547
3548    fn latest_version(&self) -> Option<RelationVersion> {
3549        self.table().map(|t| t.desc.latest_version())
3550    }
3551}
3552
3553/// A single update to the catalog state.
3554#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3555pub struct StateUpdate {
3556    pub kind: StateUpdateKind,
3557    pub ts: Timestamp,
3558    pub diff: StateDiff,
3559}
3560
3561/// The contents of a single state update.
3562///
3563/// Variants are listed in dependency order.
3564#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3565pub enum StateUpdateKind {
3566    Role(durable::objects::Role),
3567    RoleAuth(durable::objects::RoleAuth),
3568    Database(durable::objects::Database),
3569    Schema(durable::objects::Schema),
3570    DefaultPrivilege(durable::objects::DefaultPrivilege),
3571    SystemPrivilege(MzAclItem),
3572    SystemConfiguration(durable::objects::SystemConfiguration),
3573    Cluster(durable::objects::Cluster),
3574    NetworkPolicy(durable::objects::NetworkPolicy),
3575    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3576    ClusterReplica(durable::objects::ClusterReplica),
3577    SourceReferences(durable::objects::SourceReferences),
3578    SystemObjectMapping(durable::objects::SystemObjectMapping),
3579    // Temporary items are not actually updated via the durable catalog, but
3580    // this allows us to model them the same way as all other items in parts of
3581    // the pipeline.
3582    TemporaryItem(TemporaryItem),
3583    Item(durable::objects::Item),
3584    Comment(durable::objects::Comment),
3585    AuditLog(durable::objects::AuditLog),
3586    // Storage updates.
3587    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3588    UnfinalizedShard(durable::objects::UnfinalizedShard),
3589}
3590
3591/// Valid diffs for catalog state updates.
3592#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3593pub enum StateDiff {
3594    Retraction,
3595    Addition,
3596}
3597
3598impl From<StateDiff> for Diff {
3599    fn from(diff: StateDiff) -> Self {
3600        match diff {
3601            StateDiff::Retraction => Diff::MINUS_ONE,
3602            StateDiff::Addition => Diff::ONE,
3603        }
3604    }
3605}
3606impl TryFrom<Diff> for StateDiff {
3607    type Error = String;
3608
3609    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3610        match diff {
3611            Diff::MINUS_ONE => Ok(Self::Retraction),
3612            Diff::ONE => Ok(Self::Addition),
3613            diff => Err(format!("invalid diff {diff}")),
3614        }
3615    }
3616}
3617
3618/// Information needed to process an update to a temporary item.
3619#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3620pub struct TemporaryItem {
3621    pub id: CatalogItemId,
3622    pub oid: u32,
3623    pub global_id: GlobalId,
3624    pub schema_id: SchemaId,
3625    pub name: String,
3626    pub conn_id: Option<ConnectionId>,
3627    pub create_sql: String,
3628    pub owner_id: RoleId,
3629    pub privileges: Vec<MzAclItem>,
3630    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3631}
3632
3633impl From<CatalogEntry> for TemporaryItem {
3634    fn from(entry: CatalogEntry) -> Self {
3635        let conn_id = entry.conn_id().cloned();
3636        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3637
3638        TemporaryItem {
3639            id: entry.id,
3640            oid: entry.oid,
3641            global_id,
3642            schema_id: entry.name.qualifiers.schema_spec.into(),
3643            name: entry.name.item,
3644            conn_id,
3645            create_sql,
3646            owner_id: entry.owner_id,
3647            privileges: entry.privileges.into_all_values().collect(),
3648            extra_versions,
3649        }
3650    }
3651}
3652
3653impl TemporaryItem {
3654    pub fn item_type(&self) -> CatalogItemType {
3655        item_type(&self.create_sql)
3656    }
3657}
3658
3659/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3660#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3661pub enum BootstrapStateUpdateKind {
3662    Role(durable::objects::Role),
3663    RoleAuth(durable::objects::RoleAuth),
3664    Database(durable::objects::Database),
3665    Schema(durable::objects::Schema),
3666    DefaultPrivilege(durable::objects::DefaultPrivilege),
3667    SystemPrivilege(MzAclItem),
3668    SystemConfiguration(durable::objects::SystemConfiguration),
3669    Cluster(durable::objects::Cluster),
3670    NetworkPolicy(durable::objects::NetworkPolicy),
3671    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3672    ClusterReplica(durable::objects::ClusterReplica),
3673    SourceReferences(durable::objects::SourceReferences),
3674    SystemObjectMapping(durable::objects::SystemObjectMapping),
3675    Item(durable::objects::Item),
3676    Comment(durable::objects::Comment),
3677    AuditLog(durable::objects::AuditLog),
3678    // Storage updates.
3679    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3680    UnfinalizedShard(durable::objects::UnfinalizedShard),
3681}
3682
3683impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3684    fn from(value: BootstrapStateUpdateKind) -> Self {
3685        match value {
3686            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3687            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3688            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3689            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3690            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3691                StateUpdateKind::DefaultPrivilege(kind)
3692            }
3693            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3694                StateUpdateKind::SystemPrivilege(kind)
3695            }
3696            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3697                StateUpdateKind::SystemConfiguration(kind)
3698            }
3699            BootstrapStateUpdateKind::SourceReferences(kind) => {
3700                StateUpdateKind::SourceReferences(kind)
3701            }
3702            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3703            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3704            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3705                StateUpdateKind::IntrospectionSourceIndex(kind)
3706            }
3707            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3708            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3709                StateUpdateKind::SystemObjectMapping(kind)
3710            }
3711            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3712            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3713            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3714            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3715                StateUpdateKind::StorageCollectionMetadata(kind)
3716            }
3717            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3718                StateUpdateKind::UnfinalizedShard(kind)
3719            }
3720        }
3721    }
3722}
3723
3724impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3725    type Error = TemporaryItem;
3726
3727    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3728        match value {
3729            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3730            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3731            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3732            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3733            StateUpdateKind::DefaultPrivilege(kind) => {
3734                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3735            }
3736            StateUpdateKind::SystemPrivilege(kind) => {
3737                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3738            }
3739            StateUpdateKind::SystemConfiguration(kind) => {
3740                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3741            }
3742            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3743            StateUpdateKind::NetworkPolicy(kind) => {
3744                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3745            }
3746            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3747                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3748            }
3749            StateUpdateKind::ClusterReplica(kind) => {
3750                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3751            }
3752            StateUpdateKind::SourceReferences(kind) => {
3753                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3754            }
3755            StateUpdateKind::SystemObjectMapping(kind) => {
3756                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3757            }
3758            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3759            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3760            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3761            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3762            StateUpdateKind::StorageCollectionMetadata(kind) => {
3763                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3764            }
3765            StateUpdateKind::UnfinalizedShard(kind) => {
3766                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3767            }
3768        }
3769    }
3770}