mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33    CatalogItemId, ColumnName, ColumnType, Diff, GlobalId, RelationDesc, RelationVersion,
34    RelationVersionSelector, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39    UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44    DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45    RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54    HirRelationExpr, Ingestion as PlanIngestion, NetworkPolicyRule, PlanError, WebhookBodyFormat,
55    WebhookHeaders, WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
64    SourceExportDetails, Timeline,
65};
66use serde::ser::SerializeSeq;
67use serde::{Deserialize, Serialize};
68use timely::progress::Antichain;
69use tracing::debug;
70
71use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
72use crate::durable;
73
74/// Used to update `self` from the input value while consuming the input value.
75pub trait UpdateFrom<T>: From<T> {
76    fn update_from(&mut self, from: T);
77}
78
79#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
80pub struct Database {
81    pub name: String,
82    pub id: DatabaseId,
83    pub oid: u32,
84    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
85    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
86    pub schemas_by_name: BTreeMap<String, SchemaId>,
87    pub owner_id: RoleId,
88    pub privileges: PrivilegeMap,
89}
90
91impl From<Database> for durable::Database {
92    fn from(database: Database) -> durable::Database {
93        durable::Database {
94            id: database.id,
95            oid: database.oid,
96            name: database.name,
97            owner_id: database.owner_id,
98            privileges: database.privileges.into_all_values().collect(),
99        }
100    }
101}
102
103impl From<durable::Database> for Database {
104    fn from(
105        durable::Database {
106            id,
107            oid,
108            name,
109            owner_id,
110            privileges,
111        }: durable::Database,
112    ) -> Database {
113        Database {
114            id,
115            oid,
116            schemas_by_id: BTreeMap::new(),
117            schemas_by_name: BTreeMap::new(),
118            name,
119            owner_id,
120            privileges: PrivilegeMap::from_mz_acl_items(privileges),
121        }
122    }
123}
124
125impl UpdateFrom<durable::Database> for Database {
126    fn update_from(
127        &mut self,
128        durable::Database {
129            id,
130            oid,
131            name,
132            owner_id,
133            privileges,
134        }: durable::Database,
135    ) {
136        self.id = id;
137        self.oid = oid;
138        self.name = name;
139        self.owner_id = owner_id;
140        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
141    }
142}
143
144#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
145pub struct Schema {
146    pub name: QualifiedSchemaName,
147    pub id: SchemaSpecifier,
148    pub oid: u32,
149    pub items: BTreeMap<String, CatalogItemId>,
150    pub functions: BTreeMap<String, CatalogItemId>,
151    pub types: BTreeMap<String, CatalogItemId>,
152    pub owner_id: RoleId,
153    pub privileges: PrivilegeMap,
154}
155
156impl From<Schema> for durable::Schema {
157    fn from(schema: Schema) -> durable::Schema {
158        durable::Schema {
159            id: schema.id.into(),
160            oid: schema.oid,
161            name: schema.name.schema,
162            database_id: schema.name.database.id(),
163            owner_id: schema.owner_id,
164            privileges: schema.privileges.into_all_values().collect(),
165        }
166    }
167}
168
169impl From<durable::Schema> for Schema {
170    fn from(
171        durable::Schema {
172            id,
173            oid,
174            name,
175            database_id,
176            owner_id,
177            privileges,
178        }: durable::Schema,
179    ) -> Schema {
180        Schema {
181            name: QualifiedSchemaName {
182                database: database_id.into(),
183                schema: name,
184            },
185            id: id.into(),
186            oid,
187            items: BTreeMap::new(),
188            functions: BTreeMap::new(),
189            types: BTreeMap::new(),
190            owner_id,
191            privileges: PrivilegeMap::from_mz_acl_items(privileges),
192        }
193    }
194}
195
196impl UpdateFrom<durable::Schema> for Schema {
197    fn update_from(
198        &mut self,
199        durable::Schema {
200            id,
201            oid,
202            name,
203            database_id,
204            owner_id,
205            privileges,
206        }: durable::Schema,
207    ) {
208        self.name = QualifiedSchemaName {
209            database: database_id.into(),
210            schema: name,
211        };
212        self.id = id.into();
213        self.oid = oid;
214        self.owner_id = owner_id;
215        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
216    }
217}
218
219#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
220pub struct Role {
221    pub name: String,
222    pub id: RoleId,
223    pub oid: u32,
224    pub attributes: RoleAttributes,
225    pub membership: RoleMembership,
226    pub vars: RoleVars,
227}
228
229impl Role {
230    pub fn is_user(&self) -> bool {
231        self.id.is_user()
232    }
233
234    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
235        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
236    }
237}
238
239impl From<Role> for durable::Role {
240    fn from(role: Role) -> durable::Role {
241        durable::Role {
242            id: role.id,
243            oid: role.oid,
244            name: role.name,
245            attributes: role.attributes,
246            membership: role.membership,
247            vars: role.vars,
248        }
249    }
250}
251
252impl From<durable::Role> for Role {
253    fn from(
254        durable::Role {
255            id,
256            oid,
257            name,
258            attributes,
259            membership,
260            vars,
261        }: durable::Role,
262    ) -> Self {
263        Role {
264            name,
265            id,
266            oid,
267            attributes,
268            membership,
269            vars,
270        }
271    }
272}
273
274impl UpdateFrom<durable::Role> for Role {
275    fn update_from(
276        &mut self,
277        durable::Role {
278            id,
279            oid,
280            name,
281            attributes,
282            membership,
283            vars,
284        }: durable::Role,
285    ) {
286        self.id = id;
287        self.oid = oid;
288        self.name = name;
289        self.attributes = attributes;
290        self.membership = membership;
291        self.vars = vars;
292    }
293}
294
295#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
296pub struct RoleAuth {
297    pub role_id: RoleId,
298    pub password_hash: Option<String>,
299    pub updated_at: u64,
300}
301
302impl From<RoleAuth> for durable::RoleAuth {
303    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
304        durable::RoleAuth {
305            role_id: role_auth.role_id,
306            password_hash: role_auth.password_hash,
307            updated_at: role_auth.updated_at,
308        }
309    }
310}
311
312impl From<durable::RoleAuth> for RoleAuth {
313    fn from(
314        durable::RoleAuth {
315            role_id,
316            password_hash,
317            updated_at,
318        }: durable::RoleAuth,
319    ) -> RoleAuth {
320        RoleAuth {
321            role_id,
322            password_hash,
323            updated_at,
324        }
325    }
326}
327
328impl UpdateFrom<durable::RoleAuth> for RoleAuth {
329    fn update_from(&mut self, from: durable::RoleAuth) {
330        self.role_id = from.role_id;
331        self.password_hash = from.password_hash;
332    }
333}
334
335#[derive(Debug, Serialize, Clone, PartialEq)]
336pub struct Cluster {
337    pub name: String,
338    pub id: ClusterId,
339    pub config: ClusterConfig,
340    #[serde(skip)]
341    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
342    /// Objects bound to this cluster. Does not include introspection source
343    /// indexes.
344    pub bound_objects: BTreeSet<CatalogItemId>,
345    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
346    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
347    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
348    pub owner_id: RoleId,
349    pub privileges: PrivilegeMap,
350}
351
352impl Cluster {
353    /// The role of the cluster. Currently used to set alert severity.
354    pub fn role(&self) -> ClusterRole {
355        // NOTE - These roles power monitoring systems. Do not change
356        // them without talking to the cloud or observability groups.
357        if self.name == MZ_SYSTEM_CLUSTER.name {
358            ClusterRole::SystemCritical
359        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
360            ClusterRole::System
361        } else {
362            ClusterRole::User
363        }
364    }
365
366    /// Returns `true` if the cluster is a managed cluster.
367    pub fn is_managed(&self) -> bool {
368        matches!(self.config.variant, ClusterVariant::Managed { .. })
369    }
370
371    /// Lists the user replicas, which are those that do not have the internal flag set.
372    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
373        self.replicas().filter(|r| !r.config.location.internal())
374    }
375
376    /// Lists all replicas in the cluster
377    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
378        self.replicas_by_id_.values()
379    }
380
381    /// Lookup a replica by ID.
382    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
383        self.replicas_by_id_.get(&replica_id)
384    }
385
386    /// Lookup a replica ID by name.
387    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
388        self.replica_id_by_name_.get(name).copied()
389    }
390
391    /// Returns the availability zones of this cluster, if they exist.
392    pub fn availability_zones(&self) -> Option<&[String]> {
393        match &self.config.variant {
394            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
395            ClusterVariant::Unmanaged => None,
396        }
397    }
398
399    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
400        let name = self.name.clone();
401        let variant = match &self.config.variant {
402            ClusterVariant::Managed(ClusterVariantManaged {
403                size,
404                availability_zones,
405                logging,
406                replication_factor,
407                disk,
408                optimizer_feature_overrides,
409                schedule,
410            }) => {
411                let introspection = match logging {
412                    ReplicaLogging {
413                        log_logging,
414                        interval: Some(interval),
415                    } => Some(ComputeReplicaIntrospectionConfig {
416                        debugging: *log_logging,
417                        interval: interval.clone(),
418                    }),
419                    ReplicaLogging {
420                        log_logging: _,
421                        interval: None,
422                    } => None,
423                };
424                let compute = ComputeReplicaConfig { introspection };
425                CreateClusterVariant::Managed(CreateClusterManagedPlan {
426                    replication_factor: replication_factor.clone(),
427                    size: size.clone(),
428                    availability_zones: availability_zones.clone(),
429                    compute,
430                    disk: disk.clone(),
431                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
432                    schedule: schedule.clone(),
433                })
434            }
435            ClusterVariant::Unmanaged => {
436                // Unmanaged clusters are deprecated, so hopefully we can remove
437                // them before we have to implement this.
438                return Err(PlanError::Unsupported {
439                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
440                    discussion_no: None,
441                });
442            }
443        };
444        let workload_class = self.config.workload_class.clone();
445        Ok(CreateClusterPlan {
446            name,
447            variant,
448            workload_class,
449        })
450    }
451}
452
453impl From<Cluster> for durable::Cluster {
454    fn from(cluster: Cluster) -> durable::Cluster {
455        durable::Cluster {
456            id: cluster.id,
457            name: cluster.name,
458            owner_id: cluster.owner_id,
459            privileges: cluster.privileges.into_all_values().collect(),
460            config: cluster.config.into(),
461        }
462    }
463}
464
465impl From<durable::Cluster> for Cluster {
466    fn from(
467        durable::Cluster {
468            id,
469            name,
470            owner_id,
471            privileges,
472            config,
473        }: durable::Cluster,
474    ) -> Self {
475        Cluster {
476            name: name.clone(),
477            id,
478            bound_objects: BTreeSet::new(),
479            log_indexes: BTreeMap::new(),
480            replica_id_by_name_: BTreeMap::new(),
481            replicas_by_id_: BTreeMap::new(),
482            owner_id,
483            privileges: PrivilegeMap::from_mz_acl_items(privileges),
484            config: config.into(),
485        }
486    }
487}
488
489impl UpdateFrom<durable::Cluster> for Cluster {
490    fn update_from(
491        &mut self,
492        durable::Cluster {
493            id,
494            name,
495            owner_id,
496            privileges,
497            config,
498        }: durable::Cluster,
499    ) {
500        self.id = id;
501        self.name = name;
502        self.owner_id = owner_id;
503        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
504        self.config = config.into();
505    }
506}
507
508#[derive(Debug, Serialize, Clone, PartialEq)]
509pub struct ClusterReplica {
510    pub name: String,
511    pub cluster_id: ClusterId,
512    pub replica_id: ReplicaId,
513    pub config: ReplicaConfig,
514    pub owner_id: RoleId,
515}
516
517impl From<ClusterReplica> for durable::ClusterReplica {
518    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
519        durable::ClusterReplica {
520            cluster_id: replica.cluster_id,
521            replica_id: replica.replica_id,
522            name: replica.name,
523            config: replica.config.into(),
524            owner_id: replica.owner_id,
525        }
526    }
527}
528
529#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
530pub struct ClusterReplicaProcessStatus {
531    pub status: ClusterStatus,
532    pub time: DateTime<Utc>,
533}
534
535#[derive(Debug, Serialize, Clone, PartialEq)]
536pub struct SourceReferences {
537    pub updated_at: u64,
538    pub references: Vec<SourceReference>,
539}
540
541#[derive(Debug, Serialize, Clone, PartialEq)]
542pub struct SourceReference {
543    pub name: String,
544    pub namespace: Option<String>,
545    pub columns: Vec<String>,
546}
547
548impl From<SourceReference> for durable::SourceReference {
549    fn from(source_reference: SourceReference) -> durable::SourceReference {
550        durable::SourceReference {
551            name: source_reference.name,
552            namespace: source_reference.namespace,
553            columns: source_reference.columns,
554        }
555    }
556}
557
558impl SourceReferences {
559    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
560        durable::SourceReferences {
561            source_id,
562            updated_at: self.updated_at,
563            references: self.references.into_iter().map(Into::into).collect(),
564        }
565    }
566}
567
568impl From<durable::SourceReference> for SourceReference {
569    fn from(source_reference: durable::SourceReference) -> SourceReference {
570        SourceReference {
571            name: source_reference.name,
572            namespace: source_reference.namespace,
573            columns: source_reference.columns,
574        }
575    }
576}
577
578impl From<durable::SourceReferences> for SourceReferences {
579    fn from(source_references: durable::SourceReferences) -> SourceReferences {
580        SourceReferences {
581            updated_at: source_references.updated_at,
582            references: source_references
583                .references
584                .into_iter()
585                .map(|source_reference| source_reference.into())
586                .collect(),
587        }
588    }
589}
590
591impl From<mz_sql::plan::SourceReference> for SourceReference {
592    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
593        SourceReference {
594            name: source_reference.name,
595            namespace: source_reference.namespace,
596            columns: source_reference.columns,
597        }
598    }
599}
600
601impl From<mz_sql::plan::SourceReferences> for SourceReferences {
602    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
603        SourceReferences {
604            updated_at: source_references.updated_at,
605            references: source_references
606                .references
607                .into_iter()
608                .map(|source_reference| source_reference.into())
609                .collect(),
610        }
611    }
612}
613
614impl From<SourceReferences> for mz_sql::plan::SourceReferences {
615    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
616        mz_sql::plan::SourceReferences {
617            updated_at: source_references.updated_at,
618            references: source_references
619                .references
620                .into_iter()
621                .map(|source_reference| source_reference.into())
622                .collect(),
623        }
624    }
625}
626
627impl From<SourceReference> for mz_sql::plan::SourceReference {
628    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
629        mz_sql::plan::SourceReference {
630            name: source_reference.name,
631            namespace: source_reference.namespace,
632            columns: source_reference.columns,
633        }
634    }
635}
636
637#[derive(Clone, Debug, Serialize)]
638pub struct CatalogEntry {
639    pub item: CatalogItem,
640    #[serde(skip)]
641    pub referenced_by: Vec<CatalogItemId>,
642    // TODO(database-issues#7922)––this should have an invariant tied to it that all
643    // dependents (i.e. entries in this field) have IDs greater than this
644    // entry's ID.
645    #[serde(skip)]
646    pub used_by: Vec<CatalogItemId>,
647    pub id: CatalogItemId,
648    pub oid: u32,
649    pub name: QualifiedItemName,
650    pub owner_id: RoleId,
651    pub privileges: PrivilegeMap,
652}
653
654/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
655/// A single item in the catalog may be associated with multiple "collections".
656///
657/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
658/// currently running dataflow, etc.
659///
660/// Items in the Catalog have a stable name -> ID mapping, in other words for
661/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
662/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
663/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
664/// to a table. We can't just change the schema of the underlying Persist Shard
665/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
666/// allocate a new [`GlobalId`] to refer to the new version of the table, and
667/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
668///
669/// TODO(ct): Add a note here if we end up using this for associating continual
670/// tasks with a single catalog item.
671#[derive(Clone, Debug)]
672pub struct CatalogCollectionEntry {
673    pub entry: CatalogEntry,
674    pub version: RelationVersionSelector,
675}
676
677impl CatalogCollectionEntry {
678    pub fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
679        self.item().desc(name, self.version)
680    }
681
682    pub fn desc_opt(&self) -> Option<Cow<RelationDesc>> {
683        self.item().desc_opt(self.version)
684    }
685}
686
687impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
688    fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
689        CatalogCollectionEntry::desc(self, name)
690    }
691
692    fn global_id(&self) -> GlobalId {
693        match self.entry.item() {
694            CatalogItem::Source(source) => source.global_id,
695            CatalogItem::Log(log) => log.global_id,
696            CatalogItem::View(view) => view.global_id,
697            CatalogItem::MaterializedView(mv) => mv.global_id,
698            CatalogItem::Sink(sink) => sink.global_id,
699            CatalogItem::Index(index) => index.global_id,
700            CatalogItem::Type(ty) => ty.global_id,
701            CatalogItem::Func(func) => func.global_id,
702            CatalogItem::Secret(secret) => secret.global_id,
703            CatalogItem::Connection(conn) => conn.global_id,
704            CatalogItem::ContinualTask(ct) => ct.global_id,
705            CatalogItem::Table(table) => match self.version {
706                RelationVersionSelector::Latest => {
707                    let (_version, gid) = table
708                        .collections
709                        .last_key_value()
710                        .expect("at least one version");
711                    *gid
712                }
713                RelationVersionSelector::Specific(version) => table
714                    .collections
715                    .get(&version)
716                    .expect("catalog corruption, missing version!")
717                    .clone(),
718            },
719        }
720    }
721}
722
723impl Deref for CatalogCollectionEntry {
724    type Target = CatalogEntry;
725
726    fn deref(&self) -> &CatalogEntry {
727        &self.entry
728    }
729}
730
731impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
732    fn name(&self) -> &QualifiedItemName {
733        self.entry.name()
734    }
735
736    fn id(&self) -> CatalogItemId {
737        self.entry.id()
738    }
739
740    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
741        Box::new(self.entry.global_ids())
742    }
743
744    fn oid(&self) -> u32 {
745        self.entry.oid()
746    }
747
748    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
749        self.entry.func()
750    }
751
752    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
753        self.entry.source_desc()
754    }
755
756    fn connection(
757        &self,
758    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
759    {
760        mz_sql::catalog::CatalogItem::connection(&self.entry)
761    }
762
763    fn create_sql(&self) -> &str {
764        self.entry.create_sql()
765    }
766
767    fn item_type(&self) -> SqlCatalogItemType {
768        self.entry.item_type()
769    }
770
771    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
772        self.entry.index_details()
773    }
774
775    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
776        self.entry.writable_table_details()
777    }
778
779    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
780        self.entry.type_details()
781    }
782
783    fn references(&self) -> &ResolvedIds {
784        self.entry.references()
785    }
786
787    fn uses(&self) -> BTreeSet<CatalogItemId> {
788        self.entry.uses()
789    }
790
791    fn referenced_by(&self) -> &[CatalogItemId] {
792        self.entry.referenced_by()
793    }
794
795    fn used_by(&self) -> &[CatalogItemId] {
796        self.entry.used_by()
797    }
798
799    fn subsource_details(
800        &self,
801    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
802        self.entry.subsource_details()
803    }
804
805    fn source_export_details(
806        &self,
807    ) -> Option<(
808        CatalogItemId,
809        &UnresolvedItemName,
810        &SourceExportDetails,
811        &SourceExportDataConfig<ReferencedConnection>,
812    )> {
813        self.entry.source_export_details()
814    }
815
816    fn is_progress_source(&self) -> bool {
817        self.entry.is_progress_source()
818    }
819
820    fn progress_id(&self) -> Option<CatalogItemId> {
821        self.entry.progress_id()
822    }
823
824    fn owner_id(&self) -> RoleId {
825        *self.entry.owner_id()
826    }
827
828    fn privileges(&self) -> &PrivilegeMap {
829        self.entry.privileges()
830    }
831
832    fn cluster_id(&self) -> Option<ClusterId> {
833        self.entry.item().cluster_id()
834    }
835
836    fn at_version(
837        &self,
838        version: RelationVersionSelector,
839    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
840        Box::new(CatalogCollectionEntry {
841            entry: self.entry.clone(),
842            version,
843        })
844    }
845
846    fn latest_version(&self) -> Option<RelationVersion> {
847        self.entry.latest_version()
848    }
849}
850
851#[derive(Debug, Clone, Serialize)]
852pub enum CatalogItem {
853    Table(Table),
854    Source(Source),
855    Log(Log),
856    View(View),
857    MaterializedView(MaterializedView),
858    Sink(Sink),
859    Index(Index),
860    Type(Type),
861    Func(Func),
862    Secret(Secret),
863    Connection(Connection),
864    ContinualTask(ContinualTask),
865}
866
867impl From<CatalogEntry> for durable::Item {
868    fn from(entry: CatalogEntry) -> durable::Item {
869        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
870        durable::Item {
871            id: entry.id,
872            oid: entry.oid,
873            global_id,
874            schema_id: entry.name.qualifiers.schema_spec.into(),
875            name: entry.name.item,
876            create_sql,
877            owner_id: entry.owner_id,
878            privileges: entry.privileges.into_all_values().collect(),
879            extra_versions,
880        }
881    }
882}
883
884#[derive(Debug, Clone, Serialize)]
885pub struct Table {
886    /// Parse-able SQL that defines this table.
887    pub create_sql: Option<String>,
888    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
889    pub desc: VersionedRelationDesc,
890    /// Versions of this table, and the [`GlobalId`]s that refer to them.
891    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
892    pub collections: BTreeMap<RelationVersion, GlobalId>,
893    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
894    #[serde(skip)]
895    pub conn_id: Option<ConnectionId>,
896    /// Other catalog objects referenced by this table, e.g. custom types.
897    pub resolved_ids: ResolvedIds,
898    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
899    pub custom_logical_compaction_window: Option<CompactionWindow>,
900    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
901    /// session variable.
902    ///
903    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
904    pub is_retained_metrics_object: bool,
905    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
906    pub data_source: TableDataSource,
907}
908
909impl Table {
910    pub fn timeline(&self) -> Timeline {
911        match &self.data_source {
912            // The Coordinator controls insertions for writable tables
913            // (including system tables), so they are realtime.
914            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
915            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
916        }
917    }
918
919    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
920    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
921        self.collections.values().copied()
922    }
923
924    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
925    pub fn global_id_writes(&self) -> GlobalId {
926        self.collections
927            .last_key_value()
928            .expect("at least one version of a table")
929            .1
930            .clone()
931    }
932
933    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
934    pub fn collection_descs(
935        &self,
936    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
937        self.collections.iter().map(|(version, gid)| {
938            let desc = self
939                .desc
940                .at_version(RelationVersionSelector::Specific(*version));
941            (*gid, *version, desc)
942        })
943    }
944
945    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
946    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
947        let (version, _gid) = self
948            .collections
949            .iter()
950            .find(|(_version, gid)| *gid == id)
951            .expect("GlobalId to exist");
952        self.desc
953            .at_version(RelationVersionSelector::Specific(*version))
954    }
955}
956
957#[derive(Clone, Debug, Serialize)]
958pub enum TableDataSource {
959    /// The table owns data created via INSERT/UPDATE/DELETE statements.
960    TableWrites {
961        #[serde(skip)]
962        defaults: Vec<Expr<Aug>>,
963    },
964
965    /// The table receives its data from the identified `DataSourceDesc`.
966    /// This table type does not support INSERT/UPDATE/DELETE statements.
967    DataSource {
968        desc: DataSourceDesc,
969        timeline: Timeline,
970    },
971}
972
973#[derive(Debug, Clone, Serialize)]
974pub enum DataSourceDesc {
975    /// Receives data from an external system
976    Ingestion {
977        ingestion_desc: PlanIngestion,
978        cluster_id: ClusterId,
979    },
980    /// This source receives its data from the identified ingestion,
981    /// specifically the output identified by `external_reference`.
982    /// N.B. that `external_reference` should not be used to identify
983    /// anything downstream of purification, as the purification process
984    /// encodes source-specific identifiers into the `details` struct.
985    /// The `external_reference` field is only used here for displaying
986    /// human-readable names in system tables.
987    IngestionExport {
988        ingestion_id: CatalogItemId,
989        external_reference: UnresolvedItemName,
990        details: SourceExportDetails,
991        data_config: SourceExportDataConfig<ReferencedConnection>,
992    },
993    /// Receives introspection data from an internal system
994    Introspection(IntrospectionType),
995    /// Receives data from the source's reclocking/remapping operations.
996    Progress,
997    /// Receives data from HTTP requests.
998    Webhook {
999        /// Optional components used to validation a webhook request.
1000        validate_using: Option<WebhookValidation>,
1001        /// Describes how we deserialize the body of a webhook request.
1002        body_format: WebhookBodyFormat,
1003        /// Describes whether or not to include headers and how to map them.
1004        headers: WebhookHeaders,
1005        /// The cluster which this source is associated with.
1006        cluster_id: ClusterId,
1007    },
1008}
1009
1010impl DataSourceDesc {
1011    /// The key and value formats of the data source.
1012    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1013        match &self {
1014            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1015                match &ingestion_desc.desc.primary_export.encoding.as_ref() {
1016                    Some(encoding) => match &encoding.key {
1017                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1018                        None => (None, Some(encoding.value.type_())),
1019                    },
1020                    None => (None, None),
1021                }
1022            }
1023            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1024                Some(encoding) => match &encoding.key {
1025                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1026                    None => (None, Some(encoding.value.type_())),
1027                },
1028                None => (None, None),
1029            },
1030            DataSourceDesc::Introspection(_)
1031            | DataSourceDesc::Webhook { .. }
1032            | DataSourceDesc::Progress => (None, None),
1033        }
1034    }
1035
1036    /// Envelope of the data source.
1037    pub fn envelope(&self) -> Option<&str> {
1038        // Note how "none"/"append-only" is different from `None`. Source
1039        // sources don't have an envelope (internal logs, for example), while
1040        // other sources have an envelope that we call the "NONE"-envelope.
1041
1042        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1043            match envelope {
1044                SourceEnvelope::None(_) => "none",
1045                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1046                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1047                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1048                        // NOTE(aljoscha): Should we somehow mark that this is
1049                        // using upsert internally? See note above about
1050                        // DEBEZIUM.
1051                        "debezium"
1052                    }
1053                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1054                        "upsert-value-err-inline"
1055                    }
1056                },
1057                SourceEnvelope::CdcV2 => {
1058                    // TODO(aljoscha): Should we even report this? It's
1059                    // currently not exposed.
1060                    "materialize"
1061                }
1062            }
1063        }
1064
1065        match self {
1066            // NOTE(aljoscha): We could move the block for ingestions into
1067            // `SourceEnvelope` itself, but that one feels more like an internal
1068            // thing and adapter should own how we represent envelopes as a
1069            // string? It would not be hard to convince me otherwise, though.
1070            DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
1071                &ingestion_desc.desc.primary_export.envelope,
1072            )),
1073            DataSourceDesc::IngestionExport { data_config, .. } => {
1074                Some(envelope_string(&data_config.envelope))
1075            }
1076            DataSourceDesc::Introspection(_)
1077            | DataSourceDesc::Webhook { .. }
1078            | DataSourceDesc::Progress => None,
1079        }
1080    }
1081}
1082
1083#[derive(Debug, Clone, Serialize)]
1084pub struct Source {
1085    /// Parse-able SQL that defines this table.
1086    pub create_sql: Option<String>,
1087    /// [`GlobalId`] used to reference this source from outside the catalog.
1088    pub global_id: GlobalId,
1089    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1090    #[serde(skip)]
1091    pub data_source: DataSourceDesc,
1092    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1093    pub desc: RelationDesc,
1094    /// The timeline this source exists on.
1095    pub timeline: Timeline,
1096    /// Other catalog objects referenced by this table, e.g. custom types.
1097    pub resolved_ids: ResolvedIds,
1098    /// This value is ignored for subsources, i.e. for
1099    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1100    /// sources logical compaction window.
1101    pub custom_logical_compaction_window: Option<CompactionWindow>,
1102    /// Whether the source's logical compaction window is controlled by
1103    /// METRICS_RETENTION
1104    pub is_retained_metrics_object: bool,
1105}
1106
1107impl Source {
1108    /// Creates a new `Source`.
1109    ///
1110    /// # Panics
1111    /// - If an ingestion-based plan is not given a cluster_id.
1112    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1113    /// - If a non-ingestion-based source is given a cluster_id.
1114    pub fn new(
1115        plan: CreateSourcePlan,
1116        global_id: GlobalId,
1117        resolved_ids: ResolvedIds,
1118        custom_logical_compaction_window: Option<CompactionWindow>,
1119        is_retained_metrics_object: bool,
1120    ) -> Source {
1121        Source {
1122            create_sql: Some(plan.source.create_sql),
1123            data_source: match plan.source.data_source {
1124                mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1125                    DataSourceDesc::Ingestion {
1126                        ingestion_desc,
1127                        cluster_id: plan
1128                            .in_cluster
1129                            .expect("ingestion-based sources must be given a cluster ID"),
1130                    }
1131                }
1132                mz_sql::plan::DataSourceDesc::Progress => {
1133                    assert!(
1134                        plan.in_cluster.is_none(),
1135                        "subsources must not have a host config or cluster_id defined"
1136                    );
1137                    DataSourceDesc::Progress
1138                }
1139                mz_sql::plan::DataSourceDesc::IngestionExport {
1140                    ingestion_id,
1141                    external_reference,
1142                    details,
1143                    data_config,
1144                } => {
1145                    assert!(
1146                        plan.in_cluster.is_none(),
1147                        "subsources must not have a host config or cluster_id defined"
1148                    );
1149                    DataSourceDesc::IngestionExport {
1150                        ingestion_id,
1151                        external_reference,
1152                        details,
1153                        data_config,
1154                    }
1155                }
1156                mz_sql::plan::DataSourceDesc::Webhook {
1157                    validate_using,
1158                    body_format,
1159                    headers,
1160                    cluster_id,
1161                } => {
1162                    mz_ore::soft_assert_or_log!(
1163                        cluster_id.is_none(),
1164                        "cluster_id set at Source level for Webhooks"
1165                    );
1166                    DataSourceDesc::Webhook {
1167                        validate_using,
1168                        body_format,
1169                        headers,
1170                        cluster_id: plan
1171                            .in_cluster
1172                            .expect("webhook sources must be given a cluster ID"),
1173                    }
1174                }
1175            },
1176            desc: plan.source.desc,
1177            global_id,
1178            timeline: plan.timeline,
1179            resolved_ids,
1180            custom_logical_compaction_window: plan
1181                .source
1182                .compaction_window
1183                .or(custom_logical_compaction_window),
1184            is_retained_metrics_object,
1185        }
1186    }
1187
1188    /// Type of the source.
1189    pub fn source_type(&self) -> &str {
1190        match &self.data_source {
1191            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1192                ingestion_desc.desc.connection.name()
1193            }
1194            DataSourceDesc::Progress => "progress",
1195            DataSourceDesc::IngestionExport { .. } => "subsource",
1196            DataSourceDesc::Introspection(_) => "source",
1197            DataSourceDesc::Webhook { .. } => "webhook",
1198        }
1199    }
1200
1201    /// Connection ID of the source, if one exists.
1202    pub fn connection_id(&self) -> Option<CatalogItemId> {
1203        match &self.data_source {
1204            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1205                ingestion_desc.desc.connection.connection_id()
1206            }
1207            DataSourceDesc::IngestionExport { .. }
1208            | DataSourceDesc::Introspection(_)
1209            | DataSourceDesc::Webhook { .. }
1210            | DataSourceDesc::Progress => None,
1211        }
1212    }
1213
1214    /// The single [`GlobalId`] that refers to this Source.
1215    pub fn global_id(&self) -> GlobalId {
1216        self.global_id
1217    }
1218
1219    /// The expensive resource that each source consumes is persist shards. To
1220    /// prevent abuse, we want to prevent users from creating sources that use an
1221    /// unbounded number of persist shards. But we also don't want to count
1222    /// persist shards that are mandated by teh system (e.g., the progress
1223    /// shard) so that future versions of Materialize can introduce additional
1224    /// per-source shards (e.g., a per-source status shard) without impacting
1225    /// the limit calculation.
1226    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1227        match &self.data_source {
1228            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1229                match &ingestion_desc.desc.connection {
1230                    // These multi-output sources do not use their primary
1231                    // source's data shard, so we don't include it in accounting
1232                    // for users.
1233                    GenericSourceConnection::Postgres(_)
1234                    | GenericSourceConnection::MySql(_)
1235                    | GenericSourceConnection::SqlServer(_) => 0,
1236                    GenericSourceConnection::LoadGenerator(lg) => {
1237                        // TODO: make this a method on the load generator.
1238                        if lg.load_generator.views().is_empty() {
1239                            // Load generator writes directly to its persist shard
1240                            1
1241                        } else {
1242                            // Load generator has 1 persist shard per output,
1243                            // which will be accounted for by `SourceExport`.
1244                            0
1245                        }
1246                    }
1247                    GenericSourceConnection::Kafka(_) => 1,
1248                }
1249            }
1250            //  DataSourceDesc::IngestionExport represents a subsource, which
1251            //  use a data shard.
1252            DataSourceDesc::IngestionExport { .. } => 1,
1253            DataSourceDesc::Webhook { .. } => 1,
1254            // Introspection and progress subsources are not under the user's control, so shouldn't
1255            // count toward their quota.
1256            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1257        }
1258    }
1259}
1260
1261#[derive(Debug, Clone, Serialize)]
1262pub struct Log {
1263    /// The category of data this log stores.
1264    pub variant: LogVariant,
1265    /// [`GlobalId`] used to reference this log from outside the catalog.
1266    pub global_id: GlobalId,
1267}
1268
1269impl Log {
1270    /// The single [`GlobalId`] that refers to this Log.
1271    pub fn global_id(&self) -> GlobalId {
1272        self.global_id
1273    }
1274}
1275
1276#[derive(Debug, Clone, Serialize)]
1277pub struct Sink {
1278    /// Parse-able SQL that defines this sink.
1279    pub create_sql: String,
1280    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1281    pub global_id: GlobalId,
1282    /// Collection we read into this sink.
1283    pub from: GlobalId,
1284    /// Connection to the external service we're sinking into, e.g. Kafka.
1285    pub connection: StorageSinkConnection<ReferencedConnection>,
1286    /// Envelope we use to sink into the external system.
1287    ///
1288    /// TODO(guswynn): this probably should just be in the `connection`.
1289    pub envelope: SinkEnvelope,
1290    /// Emit an initial snapshot into the sink.
1291    pub with_snapshot: bool,
1292    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1293    pub version: u64,
1294    /// Other catalog objects this sink references.
1295    pub resolved_ids: ResolvedIds,
1296    /// Cluster this sink runs on.
1297    pub cluster_id: ClusterId,
1298}
1299
1300impl Sink {
1301    pub fn sink_type(&self) -> &str {
1302        self.connection.name()
1303    }
1304
1305    /// Envelope of the sink.
1306    pub fn envelope(&self) -> Option<&str> {
1307        match &self.envelope {
1308            SinkEnvelope::Debezium => Some("debezium"),
1309            SinkEnvelope::Upsert => Some("upsert"),
1310        }
1311    }
1312
1313    /// Output a combined format string of the sink. For legacy reasons
1314    /// if the key-format is none or the key & value formats are
1315    /// both the same (either avro or json), we return the value format name,
1316    /// otherwise we return a composite name.
1317    pub fn combined_format(&self) -> Cow<'_, str> {
1318        let StorageSinkConnection::Kafka(connection) = &self.connection;
1319        connection.format.get_format_name()
1320    }
1321
1322    /// Output distinct key_format and value_format of the sink.
1323    pub fn formats(&self) -> (Option<&str>, &str) {
1324        let StorageSinkConnection::Kafka(connection) = &self.connection;
1325        let key_format = connection
1326            .format
1327            .key_format
1328            .as_ref()
1329            .map(|format| format.get_format_name());
1330        let value_format = connection.format.value_format.get_format_name();
1331        (key_format, value_format)
1332    }
1333
1334    pub fn connection_id(&self) -> Option<CatalogItemId> {
1335        self.connection.connection_id()
1336    }
1337
1338    /// The single [`GlobalId`] that this Sink can be referenced by.
1339    pub fn global_id(&self) -> GlobalId {
1340        self.global_id
1341    }
1342}
1343
1344#[derive(Debug, Clone, Serialize)]
1345pub struct View {
1346    /// Parse-able SQL that defines this view.
1347    pub create_sql: String,
1348    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1349    pub global_id: GlobalId,
1350    /// Unoptimized high-level expression from parsing the `create_sql`.
1351    pub raw_expr: Arc<HirRelationExpr>,
1352    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1353    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1354    /// Columns of this view.
1355    pub desc: RelationDesc,
1356    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1357    pub conn_id: Option<ConnectionId>,
1358    /// Other catalog objects that are referenced by this view, determined at name resolution.
1359    pub resolved_ids: ResolvedIds,
1360    /// All of the catalog objects that are referenced by this view.
1361    pub dependencies: DependencyIds,
1362}
1363
1364impl View {
1365    /// The single [`GlobalId`] this [`View`] can be referenced by.
1366    pub fn global_id(&self) -> GlobalId {
1367        self.global_id
1368    }
1369}
1370
1371#[derive(Debug, Clone, Serialize)]
1372pub struct MaterializedView {
1373    /// Parse-able SQL that defines this materialized view.
1374    pub create_sql: String,
1375    /// [`GlobalId`] used to reference this materialized view from outside the catalog.
1376    pub global_id: GlobalId,
1377    /// Raw high-level expression from planning, derived from the `create_sql`.
1378    pub raw_expr: Arc<HirRelationExpr>,
1379    /// Optimized mid-level expression, derived from the `raw_expr`.
1380    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1381    /// Columns for this materialized view.
1382    pub desc: RelationDesc,
1383    /// Other catalog items that this materialized view references, determined at name resolution.
1384    pub resolved_ids: ResolvedIds,
1385    /// All of the catalog objects that are referenced by this view.
1386    pub dependencies: DependencyIds,
1387    /// Cluster that this materialized view runs on.
1388    pub cluster_id: ClusterId,
1389    /// Column indexes that we assert are not `NULL`.
1390    ///
1391    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1392    pub non_null_assertions: Vec<usize>,
1393    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1394    pub custom_logical_compaction_window: Option<CompactionWindow>,
1395    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1396    pub refresh_schedule: Option<RefreshSchedule>,
1397    /// The initial `as_of` of the storage collection associated with the materialized view.
1398    ///
1399    /// Note: This doesn't change upon restarts.
1400    /// (The dataflow's initial `as_of` can be different.)
1401    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1402}
1403
1404impl MaterializedView {
1405    /// The single [`GlobalId`] this [`MaterializedView`] can be referenced by.
1406    pub fn global_id(&self) -> GlobalId {
1407        self.global_id
1408    }
1409}
1410
1411#[derive(Debug, Clone, Serialize)]
1412pub struct Index {
1413    /// Parse-able SQL that defines this table.
1414    pub create_sql: String,
1415    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1416    pub global_id: GlobalId,
1417    /// The [`GlobalId`] this Index is on.
1418    pub on: GlobalId,
1419    /// Keys of the index.
1420    pub keys: Arc<[MirScalarExpr]>,
1421    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1422    pub conn_id: Option<ConnectionId>,
1423    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1424    pub resolved_ids: ResolvedIds,
1425    /// Cluster this index is installed on.
1426    pub cluster_id: ClusterId,
1427    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1428    pub custom_logical_compaction_window: Option<CompactionWindow>,
1429    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1430    /// session variable.
1431    ///
1432    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1433    pub is_retained_metrics_object: bool,
1434}
1435
1436impl Index {
1437    /// The [`GlobalId`] that refers to this Index.
1438    pub fn global_id(&self) -> GlobalId {
1439        self.global_id
1440    }
1441}
1442
1443#[derive(Debug, Clone, Serialize)]
1444pub struct Type {
1445    /// Parse-able SQL that defines this type.
1446    pub create_sql: Option<String>,
1447    /// [`GlobalId`] used to reference this type from outside the catalog.
1448    pub global_id: GlobalId,
1449    #[serde(skip)]
1450    pub details: CatalogTypeDetails<IdReference>,
1451    pub desc: Option<RelationDesc>,
1452    /// Other catalog objects referenced by this type.
1453    pub resolved_ids: ResolvedIds,
1454}
1455
1456#[derive(Debug, Clone, Serialize)]
1457pub struct Func {
1458    /// Static definition of the function.
1459    #[serde(skip)]
1460    pub inner: &'static mz_sql::func::Func,
1461    /// [`GlobalId`] used to reference this function from outside the catalog.
1462    pub global_id: GlobalId,
1463}
1464
1465#[derive(Debug, Clone, Serialize)]
1466pub struct Secret {
1467    /// Parse-able SQL that defines this secret.
1468    pub create_sql: String,
1469    /// [`GlobalId`] used to reference this secret from outside the catalog.
1470    pub global_id: GlobalId,
1471}
1472
1473#[derive(Debug, Clone, Serialize)]
1474pub struct Connection {
1475    /// Parse-able SQL that defines this connection.
1476    pub create_sql: String,
1477    /// [`GlobalId`] used to reference this connection from the storage layer.
1478    pub global_id: GlobalId,
1479    /// The kind of connection.
1480    pub details: ConnectionDetails,
1481    /// Other objects this connection depends on.
1482    pub resolved_ids: ResolvedIds,
1483}
1484
1485impl Connection {
1486    /// The single [`GlobalId`] used to reference this connection.
1487    pub fn global_id(&self) -> GlobalId {
1488        self.global_id
1489    }
1490}
1491
1492#[derive(Debug, Clone, Serialize)]
1493pub struct ContinualTask {
1494    /// Parse-able SQL that defines this continual task.
1495    pub create_sql: String,
1496    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1497    pub global_id: GlobalId,
1498    /// [`GlobalId`] of the collection that we read into this continual task.
1499    pub input_id: GlobalId,
1500    pub with_snapshot: bool,
1501    /// ContinualTasks are self-referential. We make this work by using a
1502    /// placeholder `LocalId` for the CT itself through name resolution and
1503    /// planning. Then we fill in the real `GlobalId` before constructing this
1504    /// catalog item.
1505    pub raw_expr: Arc<HirRelationExpr>,
1506    /// Columns for this continual task.
1507    pub desc: RelationDesc,
1508    /// Other catalog items that this continual task references, determined at name resolution.
1509    pub resolved_ids: ResolvedIds,
1510    /// All of the catalog objects that are referenced by this continual task.
1511    pub dependencies: DependencyIds,
1512    /// Cluster that this continual task runs on.
1513    pub cluster_id: ClusterId,
1514    /// See the comment on [MaterializedView::initial_as_of].
1515    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1516}
1517
1518impl ContinualTask {
1519    /// The single [`GlobalId`] used to reference this continual task.
1520    pub fn global_id(&self) -> GlobalId {
1521        self.global_id
1522    }
1523}
1524
1525#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1526pub struct NetworkPolicy {
1527    pub name: String,
1528    pub id: NetworkPolicyId,
1529    pub oid: u32,
1530    pub rules: Vec<NetworkPolicyRule>,
1531    pub owner_id: RoleId,
1532    pub privileges: PrivilegeMap,
1533}
1534
1535impl From<NetworkPolicy> for durable::NetworkPolicy {
1536    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1537        durable::NetworkPolicy {
1538            id: policy.id,
1539            oid: policy.oid,
1540            name: policy.name,
1541            rules: policy.rules,
1542            owner_id: policy.owner_id,
1543            privileges: policy.privileges.into_all_values().collect(),
1544        }
1545    }
1546}
1547
1548impl From<durable::NetworkPolicy> for NetworkPolicy {
1549    fn from(
1550        durable::NetworkPolicy {
1551            id,
1552            oid,
1553            name,
1554            rules,
1555            owner_id,
1556            privileges,
1557        }: durable::NetworkPolicy,
1558    ) -> Self {
1559        NetworkPolicy {
1560            id,
1561            oid,
1562            name,
1563            rules,
1564            owner_id,
1565            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1566        }
1567    }
1568}
1569
1570impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1571    fn update_from(
1572        &mut self,
1573        durable::NetworkPolicy {
1574            id,
1575            oid,
1576            name,
1577            rules,
1578            owner_id,
1579            privileges,
1580        }: durable::NetworkPolicy,
1581    ) {
1582        self.id = id;
1583        self.oid = oid;
1584        self.name = name;
1585        self.rules = rules;
1586        self.owner_id = owner_id;
1587        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1588    }
1589}
1590
1591impl CatalogItem {
1592    /// Returns a string indicating the type of this catalog entry.
1593    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1594        match self {
1595            CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1596            CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1597            CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1598            CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1599            CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1600            CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1601            CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1602            CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1603            CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1604            CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1605            CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1606            CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1607        }
1608    }
1609
1610    /// Returns the [`GlobalId`]s that reference this item, if any.
1611    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1612        let gid = match self {
1613            CatalogItem::Source(source) => source.global_id,
1614            CatalogItem::Log(log) => log.global_id,
1615            CatalogItem::Sink(sink) => sink.global_id,
1616            CatalogItem::View(view) => view.global_id,
1617            CatalogItem::MaterializedView(mv) => mv.global_id,
1618            CatalogItem::ContinualTask(ct) => ct.global_id,
1619            CatalogItem::Index(index) => index.global_id,
1620            CatalogItem::Func(func) => func.global_id,
1621            CatalogItem::Type(ty) => ty.global_id,
1622            CatalogItem::Secret(secret) => secret.global_id,
1623            CatalogItem::Connection(conn) => conn.global_id,
1624            CatalogItem::Table(table) => {
1625                return itertools::Either::Left(table.collections.values().copied());
1626            }
1627        };
1628        itertools::Either::Right(std::iter::once(gid))
1629    }
1630
1631    /// Returns the most up-to-date [`GlobalId`] for this item.
1632    ///
1633    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1634    pub fn latest_global_id(&self) -> GlobalId {
1635        match self {
1636            CatalogItem::Source(source) => source.global_id,
1637            CatalogItem::Log(log) => log.global_id,
1638            CatalogItem::Sink(sink) => sink.global_id,
1639            CatalogItem::View(view) => view.global_id,
1640            CatalogItem::MaterializedView(mv) => mv.global_id,
1641            CatalogItem::ContinualTask(ct) => ct.global_id,
1642            CatalogItem::Index(index) => index.global_id,
1643            CatalogItem::Func(func) => func.global_id,
1644            CatalogItem::Type(ty) => ty.global_id,
1645            CatalogItem::Secret(secret) => secret.global_id,
1646            CatalogItem::Connection(conn) => conn.global_id,
1647            CatalogItem::Table(table) => table.global_id_writes(),
1648        }
1649    }
1650
1651    /// Whether this item represents a storage collection.
1652    pub fn is_storage_collection(&self) -> bool {
1653        match self {
1654            CatalogItem::Table(_)
1655            | CatalogItem::Source(_)
1656            | CatalogItem::MaterializedView(_)
1657            | CatalogItem::Sink(_)
1658            | CatalogItem::ContinualTask(_) => true,
1659            CatalogItem::Log(_)
1660            | CatalogItem::View(_)
1661            | CatalogItem::Index(_)
1662            | CatalogItem::Type(_)
1663            | CatalogItem::Func(_)
1664            | CatalogItem::Secret(_)
1665            | CatalogItem::Connection(_) => false,
1666        }
1667    }
1668
1669    pub fn desc(
1670        &self,
1671        name: &FullItemName,
1672        version: RelationVersionSelector,
1673    ) -> Result<Cow<RelationDesc>, SqlCatalogError> {
1674        self.desc_opt(version)
1675            .ok_or_else(|| SqlCatalogError::InvalidDependency {
1676                name: name.to_string(),
1677                typ: self.typ(),
1678            })
1679    }
1680
1681    pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<RelationDesc>> {
1682        match &self {
1683            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1684            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1685            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1686            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1687            CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1688            CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1689            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1690            CatalogItem::Func(_)
1691            | CatalogItem::Index(_)
1692            | CatalogItem::Sink(_)
1693            | CatalogItem::Secret(_)
1694            | CatalogItem::Connection(_) => None,
1695        }
1696    }
1697
1698    pub fn func(
1699        &self,
1700        entry: &CatalogEntry,
1701    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1702        match &self {
1703            CatalogItem::Func(func) => Ok(func.inner),
1704            _ => Err(SqlCatalogError::UnexpectedType {
1705                name: entry.name().item.to_string(),
1706                actual_type: entry.item_type(),
1707                expected_type: CatalogItemType::Func,
1708            }),
1709        }
1710    }
1711
1712    pub fn source_desc(
1713        &self,
1714        entry: &CatalogEntry,
1715    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1716        match &self {
1717            CatalogItem::Source(source) => match &source.data_source {
1718                DataSourceDesc::Ingestion { ingestion_desc, .. } => Ok(Some(&ingestion_desc.desc)),
1719                DataSourceDesc::IngestionExport { .. }
1720                | DataSourceDesc::Introspection(_)
1721                | DataSourceDesc::Webhook { .. }
1722                | DataSourceDesc::Progress => Ok(None),
1723            },
1724            _ => Err(SqlCatalogError::UnexpectedType {
1725                name: entry.name().item.to_string(),
1726                actual_type: entry.item_type(),
1727                expected_type: CatalogItemType::Source,
1728            }),
1729        }
1730    }
1731
1732    /// Reports whether this catalog entry is a progress source.
1733    pub fn is_progress_source(&self) -> bool {
1734        matches!(
1735            self,
1736            CatalogItem::Source(Source {
1737                data_source: DataSourceDesc::Progress,
1738                ..
1739            })
1740        )
1741    }
1742
1743    /// Collects the identifiers of the objects that were encountered when resolving names in the
1744    /// item's DDL statement.
1745    pub fn references(&self) -> &ResolvedIds {
1746        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1747        match self {
1748            CatalogItem::Func(_) => &*EMPTY,
1749            CatalogItem::Index(idx) => &idx.resolved_ids,
1750            CatalogItem::Sink(sink) => &sink.resolved_ids,
1751            CatalogItem::Source(source) => &source.resolved_ids,
1752            CatalogItem::Log(_) => &*EMPTY,
1753            CatalogItem::Table(table) => &table.resolved_ids,
1754            CatalogItem::Type(typ) => &typ.resolved_ids,
1755            CatalogItem::View(view) => &view.resolved_ids,
1756            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1757            CatalogItem::Secret(_) => &*EMPTY,
1758            CatalogItem::Connection(connection) => &connection.resolved_ids,
1759            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1760        }
1761    }
1762
1763    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1764    ///
1765    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1766    /// referenced. For example this will include any catalog objects used to implement functions
1767    /// and casts in the item.
1768    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1769        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1770        match self {
1771            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1772            // their implementation. However, currently there's no way to get that information.
1773            CatalogItem::Func(_) => {}
1774            CatalogItem::Index(_) => {}
1775            CatalogItem::Sink(_) => {}
1776            CatalogItem::Source(_) => {}
1777            CatalogItem::Log(_) => {}
1778            CatalogItem::Table(_) => {}
1779            CatalogItem::Type(_) => {}
1780            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1781            CatalogItem::MaterializedView(mview) => {
1782                uses.extend(mview.dependencies.0.iter().copied())
1783            }
1784            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1785            CatalogItem::Secret(_) => {}
1786            CatalogItem::Connection(_) => {}
1787        }
1788        uses
1789    }
1790
1791    /// Returns the connection ID that this item belongs to, if this item is
1792    /// temporary.
1793    pub fn conn_id(&self) -> Option<&ConnectionId> {
1794        match self {
1795            CatalogItem::View(view) => view.conn_id.as_ref(),
1796            CatalogItem::Index(index) => index.conn_id.as_ref(),
1797            CatalogItem::Table(table) => table.conn_id.as_ref(),
1798            CatalogItem::Log(_)
1799            | CatalogItem::Source(_)
1800            | CatalogItem::Sink(_)
1801            | CatalogItem::MaterializedView(_)
1802            | CatalogItem::Secret(_)
1803            | CatalogItem::Type(_)
1804            | CatalogItem::Func(_)
1805            | CatalogItem::Connection(_)
1806            | CatalogItem::ContinualTask(_) => None,
1807        }
1808    }
1809
1810    /// Indicates whether this item is temporary or not.
1811    pub fn is_temporary(&self) -> bool {
1812        self.conn_id().is_some()
1813    }
1814
1815    pub fn rename_schema_refs(
1816        &self,
1817        database_name: &str,
1818        cur_schema_name: &str,
1819        new_schema_name: &str,
1820    ) -> Result<CatalogItem, (String, String)> {
1821        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1822            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1823                .expect("invalid create sql persisted to catalog")
1824                .into_element()
1825                .ast;
1826
1827            // Rename all references to cur_schema_name.
1828            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1829                &mut create_stmt,
1830                database_name,
1831                cur_schema_name,
1832                new_schema_name,
1833            )?;
1834
1835            Ok(create_stmt.to_ast_string_stable())
1836        };
1837
1838        match self {
1839            CatalogItem::Table(i) => {
1840                let mut i = i.clone();
1841                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1842                Ok(CatalogItem::Table(i))
1843            }
1844            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1845            CatalogItem::Source(i) => {
1846                let mut i = i.clone();
1847                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1848                Ok(CatalogItem::Source(i))
1849            }
1850            CatalogItem::Sink(i) => {
1851                let mut i = i.clone();
1852                i.create_sql = do_rewrite(i.create_sql)?;
1853                Ok(CatalogItem::Sink(i))
1854            }
1855            CatalogItem::View(i) => {
1856                let mut i = i.clone();
1857                i.create_sql = do_rewrite(i.create_sql)?;
1858                Ok(CatalogItem::View(i))
1859            }
1860            CatalogItem::MaterializedView(i) => {
1861                let mut i = i.clone();
1862                i.create_sql = do_rewrite(i.create_sql)?;
1863                Ok(CatalogItem::MaterializedView(i))
1864            }
1865            CatalogItem::Index(i) => {
1866                let mut i = i.clone();
1867                i.create_sql = do_rewrite(i.create_sql)?;
1868                Ok(CatalogItem::Index(i))
1869            }
1870            CatalogItem::Secret(i) => {
1871                let mut i = i.clone();
1872                i.create_sql = do_rewrite(i.create_sql)?;
1873                Ok(CatalogItem::Secret(i))
1874            }
1875            CatalogItem::Connection(i) => {
1876                let mut i = i.clone();
1877                i.create_sql = do_rewrite(i.create_sql)?;
1878                Ok(CatalogItem::Connection(i))
1879            }
1880            CatalogItem::Type(i) => {
1881                let mut i = i.clone();
1882                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1883                Ok(CatalogItem::Type(i))
1884            }
1885            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1886            CatalogItem::ContinualTask(i) => {
1887                let mut i = i.clone();
1888                i.create_sql = do_rewrite(i.create_sql)?;
1889                Ok(CatalogItem::ContinualTask(i))
1890            }
1891        }
1892    }
1893
1894    /// Returns a clone of `self` with all instances of `from` renamed to `to`
1895    /// (with the option of including the item's own name) or errors if request
1896    /// is ambiguous.
1897    pub fn rename_item_refs(
1898        &self,
1899        from: FullItemName,
1900        to_item_name: String,
1901        rename_self: bool,
1902    ) -> Result<CatalogItem, String> {
1903        let do_rewrite = |create_sql: String| -> Result<String, String> {
1904            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1905                .expect("invalid create sql persisted to catalog")
1906                .into_element()
1907                .ast;
1908            if rename_self {
1909                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1910            }
1911            // Determination of what constitutes an ambiguous request is done here.
1912            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1913            Ok(create_stmt.to_ast_string_stable())
1914        };
1915
1916        match self {
1917            CatalogItem::Table(i) => {
1918                let mut i = i.clone();
1919                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1920                Ok(CatalogItem::Table(i))
1921            }
1922            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1923            CatalogItem::Source(i) => {
1924                let mut i = i.clone();
1925                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1926                Ok(CatalogItem::Source(i))
1927            }
1928            CatalogItem::Sink(i) => {
1929                let mut i = i.clone();
1930                i.create_sql = do_rewrite(i.create_sql)?;
1931                Ok(CatalogItem::Sink(i))
1932            }
1933            CatalogItem::View(i) => {
1934                let mut i = i.clone();
1935                i.create_sql = do_rewrite(i.create_sql)?;
1936                Ok(CatalogItem::View(i))
1937            }
1938            CatalogItem::MaterializedView(i) => {
1939                let mut i = i.clone();
1940                i.create_sql = do_rewrite(i.create_sql)?;
1941                Ok(CatalogItem::MaterializedView(i))
1942            }
1943            CatalogItem::Index(i) => {
1944                let mut i = i.clone();
1945                i.create_sql = do_rewrite(i.create_sql)?;
1946                Ok(CatalogItem::Index(i))
1947            }
1948            CatalogItem::Secret(i) => {
1949                let mut i = i.clone();
1950                i.create_sql = do_rewrite(i.create_sql)?;
1951                Ok(CatalogItem::Secret(i))
1952            }
1953            CatalogItem::Func(_) | CatalogItem::Type(_) => {
1954                unreachable!("{}s cannot be renamed", self.typ())
1955            }
1956            CatalogItem::Connection(i) => {
1957                let mut i = i.clone();
1958                i.create_sql = do_rewrite(i.create_sql)?;
1959                Ok(CatalogItem::Connection(i))
1960            }
1961            CatalogItem::ContinualTask(i) => {
1962                let mut i = i.clone();
1963                i.create_sql = do_rewrite(i.create_sql)?;
1964                Ok(CatalogItem::ContinualTask(i))
1965            }
1966        }
1967    }
1968
1969    /// Updates the retain history for an item. Returns the previous retain history value. Returns
1970    /// an error if this item does not support retain history.
1971    pub fn update_retain_history(
1972        &mut self,
1973        value: Option<Value>,
1974        window: CompactionWindow,
1975    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
1976        let update = |mut ast: &mut Statement<Raw>| {
1977            // Each statement type has unique option types. This macro handles them commonly.
1978            macro_rules! update_retain_history {
1979                ( $stmt:ident, $opt:ident, $name:ident ) => {{
1980                    // Replace or add the option.
1981                    let pos = $stmt
1982                        .with_options
1983                        .iter()
1984                        // In case there are ever multiple, look for the last one.
1985                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
1986                    if let Some(value) = value {
1987                        let next = mz_sql_parser::ast::$opt {
1988                            name: mz_sql_parser::ast::$name::RetainHistory,
1989                            value: Some(WithOptionValue::RetainHistoryFor(value)),
1990                        };
1991                        if let Some(idx) = pos {
1992                            let previous = $stmt.with_options[idx].clone();
1993                            $stmt.with_options[idx] = next;
1994                            previous.value
1995                        } else {
1996                            $stmt.with_options.push(next);
1997                            None
1998                        }
1999                    } else {
2000                        if let Some(idx) = pos {
2001                            $stmt.with_options.swap_remove(idx).value
2002                        } else {
2003                            None
2004                        }
2005                    }
2006                }};
2007            }
2008            let previous = match &mut ast {
2009                Statement::CreateTable(stmt) => {
2010                    update_retain_history!(stmt, TableOption, TableOptionName)
2011                }
2012                Statement::CreateIndex(stmt) => {
2013                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2014                }
2015                Statement::CreateSource(stmt) => {
2016                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2017                }
2018                Statement::CreateMaterializedView(stmt) => {
2019                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2020                }
2021                _ => {
2022                    return Err(());
2023                }
2024            };
2025            Ok(previous)
2026        };
2027
2028        let res = self.update_sql(update)?;
2029        let cw = self
2030            .custom_logical_compaction_window_mut()
2031            .expect("item must have compaction window");
2032        *cw = Some(window);
2033        Ok(res)
2034    }
2035
2036    pub fn add_column(
2037        &mut self,
2038        name: ColumnName,
2039        typ: ColumnType,
2040        sql: RawDataType,
2041    ) -> Result<RelationVersion, PlanError> {
2042        let CatalogItem::Table(table) = self else {
2043            return Err(PlanError::Unsupported {
2044                feature: "adding columns to a non-Table".to_string(),
2045                discussion_no: None,
2046            });
2047        };
2048        let next_version = table.desc.add_column(name.clone(), typ);
2049
2050        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2051            Statement::CreateTable(stmt) => {
2052                let version = ColumnOptionDef {
2053                    name: None,
2054                    option: ColumnOption::Versioned {
2055                        action: ColumnVersioned::Added,
2056                        version: next_version.into(),
2057                    },
2058                };
2059                let column = ColumnDef {
2060                    name: name.into(),
2061                    data_type: sql,
2062                    collation: None,
2063                    options: vec![version],
2064                };
2065                stmt.columns.push(column);
2066                Ok(())
2067            }
2068            _ => Err(()),
2069        };
2070
2071        self.update_sql(update)
2072            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2073        Ok(next_version)
2074    }
2075
2076    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2077    /// otherwise returns f's result.
2078    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2079    where
2080        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2081    {
2082        let create_sql = match self {
2083            CatalogItem::Table(Table { create_sql, .. })
2084            | CatalogItem::Type(Type { create_sql, .. })
2085            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2086            CatalogItem::Sink(Sink { create_sql, .. })
2087            | CatalogItem::View(View { create_sql, .. })
2088            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2089            | CatalogItem::Index(Index { create_sql, .. })
2090            | CatalogItem::Secret(Secret { create_sql, .. })
2091            | CatalogItem::Connection(Connection { create_sql, .. })
2092            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2093            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2094        };
2095        let Some(create_sql) = create_sql else {
2096            return Err(());
2097        };
2098        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2099            .expect("non-system items must be parseable")
2100            .into_element()
2101            .ast;
2102        debug!("rewrite: {}", ast.to_ast_string_redacted());
2103        let t = f(&mut ast)?;
2104        *create_sql = ast.to_ast_string_stable();
2105        debug!("rewrote: {}", ast.to_ast_string_redacted());
2106        Ok(t)
2107    }
2108
2109    /// If the object is considered a "compute object"
2110    /// (i.e., it is managed by the compute controller),
2111    /// this function returns its cluster ID. Otherwise, it returns nothing.
2112    ///
2113    /// This function differs from `cluster_id` because while all
2114    /// compute objects run on a cluster, the converse is not true.
2115    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2116        match self {
2117            CatalogItem::Index(index) => Some(index.cluster_id),
2118            CatalogItem::Table(_)
2119            | CatalogItem::Source(_)
2120            | CatalogItem::Log(_)
2121            | CatalogItem::View(_)
2122            | CatalogItem::MaterializedView(_)
2123            | CatalogItem::Sink(_)
2124            | CatalogItem::Type(_)
2125            | CatalogItem::Func(_)
2126            | CatalogItem::Secret(_)
2127            | CatalogItem::Connection(_)
2128            | CatalogItem::ContinualTask(_) => None,
2129        }
2130    }
2131
2132    pub fn cluster_id(&self) -> Option<ClusterId> {
2133        match self {
2134            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2135            CatalogItem::Index(index) => Some(index.cluster_id),
2136            CatalogItem::Source(source) => match &source.data_source {
2137                DataSourceDesc::Ingestion { cluster_id, .. } => Some(*cluster_id),
2138                // This is somewhat of a lie because the export runs on the same
2139                // cluster as its ingestion but we don't yet have a way of
2140                // cross-referencing the items
2141                DataSourceDesc::IngestionExport { .. } => None,
2142                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2143                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2144            },
2145            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2146            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2147            CatalogItem::Table(_)
2148            | CatalogItem::Log(_)
2149            | CatalogItem::View(_)
2150            | CatalogItem::Type(_)
2151            | CatalogItem::Func(_)
2152            | CatalogItem::Secret(_)
2153            | CatalogItem::Connection(_) => None,
2154        }
2155    }
2156
2157    /// The custom compaction window, if any has been set. This does not reflect any propagated
2158    /// compaction window (i.e., source -> subsource).
2159    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2160        match self {
2161            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2162            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2163            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2164            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2165            CatalogItem::Log(_)
2166            | CatalogItem::View(_)
2167            | CatalogItem::Sink(_)
2168            | CatalogItem::Type(_)
2169            | CatalogItem::Func(_)
2170            | CatalogItem::Secret(_)
2171            | CatalogItem::Connection(_)
2172            | CatalogItem::ContinualTask(_) => None,
2173        }
2174    }
2175
2176    /// Mutable access to the custom compaction window, or None if this type does not support custom
2177    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2178    /// subsource).
2179    pub fn custom_logical_compaction_window_mut(
2180        &mut self,
2181    ) -> Option<&mut Option<CompactionWindow>> {
2182        let cw = match self {
2183            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2184            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2185            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2186            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2187            CatalogItem::Log(_)
2188            | CatalogItem::View(_)
2189            | CatalogItem::Sink(_)
2190            | CatalogItem::Type(_)
2191            | CatalogItem::Func(_)
2192            | CatalogItem::Secret(_)
2193            | CatalogItem::Connection(_)
2194            | CatalogItem::ContinualTask(_) => return None,
2195        };
2196        Some(cw)
2197    }
2198
2199    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2200    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2201    ///
2202    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2203    /// sensible default (currently 1s).
2204    ///
2205    /// For objects that do not have the concept of compaction window, return None.
2206    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2207        let custom_logical_compaction_window = match self {
2208            CatalogItem::Table(_)
2209            | CatalogItem::Source(_)
2210            | CatalogItem::Index(_)
2211            | CatalogItem::MaterializedView(_)
2212            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2213            CatalogItem::Log(_)
2214            | CatalogItem::View(_)
2215            | CatalogItem::Sink(_)
2216            | CatalogItem::Type(_)
2217            | CatalogItem::Func(_)
2218            | CatalogItem::Secret(_)
2219            | CatalogItem::Connection(_) => return None,
2220        };
2221        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2222    }
2223
2224    /// Whether the item's logical compaction window
2225    /// is controlled by the METRICS_RETENTION
2226    /// system var.
2227    pub fn is_retained_metrics_object(&self) -> bool {
2228        match self {
2229            CatalogItem::Table(table) => table.is_retained_metrics_object,
2230            CatalogItem::Source(source) => source.is_retained_metrics_object,
2231            CatalogItem::Index(index) => index.is_retained_metrics_object,
2232            CatalogItem::Log(_)
2233            | CatalogItem::View(_)
2234            | CatalogItem::MaterializedView(_)
2235            | CatalogItem::Sink(_)
2236            | CatalogItem::Type(_)
2237            | CatalogItem::Func(_)
2238            | CatalogItem::Secret(_)
2239            | CatalogItem::Connection(_)
2240            | CatalogItem::ContinualTask(_) => false,
2241        }
2242    }
2243
2244    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2245        match self {
2246            CatalogItem::Table(table) => {
2247                let create_sql = table
2248                    .create_sql
2249                    .clone()
2250                    .expect("builtin tables cannot be serialized");
2251                let mut collections = table.collections.clone();
2252                let global_id = collections
2253                    .remove(&RelationVersion::root())
2254                    .expect("at least one version");
2255                (create_sql, global_id, collections)
2256            }
2257            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2258            CatalogItem::Source(source) => {
2259                assert!(
2260                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2261                    "cannot serialize introspection/builtin sources",
2262                );
2263                let create_sql = source
2264                    .create_sql
2265                    .clone()
2266                    .expect("builtin sources cannot be serialized");
2267                (create_sql, source.global_id, BTreeMap::new())
2268            }
2269            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2270            CatalogItem::MaterializedView(mview) => {
2271                (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2272            }
2273            CatalogItem::Index(index) => {
2274                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2275            }
2276            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2277            CatalogItem::Type(typ) => {
2278                let create_sql = typ
2279                    .create_sql
2280                    .clone()
2281                    .expect("builtin types cannot be serialized");
2282                (create_sql, typ.global_id, BTreeMap::new())
2283            }
2284            CatalogItem::Secret(secret) => {
2285                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2286            }
2287            CatalogItem::Connection(connection) => (
2288                connection.create_sql.clone(),
2289                connection.global_id,
2290                BTreeMap::new(),
2291            ),
2292            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2293            CatalogItem::ContinualTask(ct) => {
2294                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2295            }
2296        }
2297    }
2298
2299    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2300        match self {
2301            CatalogItem::Table(mut table) => {
2302                let create_sql = table
2303                    .create_sql
2304                    .expect("builtin tables cannot be serialized");
2305                let global_id = table
2306                    .collections
2307                    .remove(&RelationVersion::root())
2308                    .expect("at least one version");
2309                (create_sql, global_id, table.collections)
2310            }
2311            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2312            CatalogItem::Source(source) => {
2313                assert!(
2314                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2315                    "cannot serialize introspection/builtin sources",
2316                );
2317                let create_sql = source
2318                    .create_sql
2319                    .expect("builtin sources cannot be serialized");
2320                (create_sql, source.global_id, BTreeMap::new())
2321            }
2322            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2323            CatalogItem::MaterializedView(mview) => {
2324                (mview.create_sql, mview.global_id, BTreeMap::new())
2325            }
2326            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2327            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2328            CatalogItem::Type(typ) => {
2329                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2330                (create_sql, typ.global_id, BTreeMap::new())
2331            }
2332            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2333            CatalogItem::Connection(connection) => {
2334                (connection.create_sql, connection.global_id, BTreeMap::new())
2335            }
2336            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2337            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2338        }
2339    }
2340}
2341
2342impl CatalogEntry {
2343    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`],
2344    /// returning an error if this [`CatalogEntry`] does not produce rows.
2345    ///
2346    /// If you need to get the [`RelationDesc`] for a specific version, see [`CatalogItem::desc`].
2347    pub fn desc_latest(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
2348        self.item.desc(name, RelationVersionSelector::Latest)
2349    }
2350
2351    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2352    /// produces rows.
2353    pub fn desc_opt_latest(&self) -> Option<Cow<RelationDesc>> {
2354        self.item.desc_opt(RelationVersionSelector::Latest)
2355    }
2356
2357    /// Reports if the item has columns.
2358    pub fn has_columns(&self) -> bool {
2359        self.desc_opt_latest().is_some()
2360    }
2361
2362    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2363    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2364        self.item.func(self)
2365    }
2366
2367    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2368    pub fn index(&self) -> Option<&Index> {
2369        match self.item() {
2370            CatalogItem::Index(idx) => Some(idx),
2371            _ => None,
2372        }
2373    }
2374
2375    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2376    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2377        match self.item() {
2378            CatalogItem::MaterializedView(mv) => Some(mv),
2379            _ => None,
2380        }
2381    }
2382
2383    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2384    pub fn table(&self) -> Option<&Table> {
2385        match self.item() {
2386            CatalogItem::Table(tbl) => Some(tbl),
2387            _ => None,
2388        }
2389    }
2390
2391    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2392    pub fn source(&self) -> Option<&Source> {
2393        match self.item() {
2394            CatalogItem::Source(src) => Some(src),
2395            _ => None,
2396        }
2397    }
2398
2399    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2400    pub fn sink(&self) -> Option<&Sink> {
2401        match self.item() {
2402            CatalogItem::Sink(sink) => Some(sink),
2403            _ => None,
2404        }
2405    }
2406
2407    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2408    pub fn secret(&self) -> Option<&Secret> {
2409        match self.item() {
2410            CatalogItem::Secret(secret) => Some(secret),
2411            _ => None,
2412        }
2413    }
2414
2415    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2416        match self.item() {
2417            CatalogItem::Connection(connection) => Ok(connection),
2418            _ => {
2419                let db_name = match self.name().qualifiers.database_spec {
2420                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2421                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2422                };
2423                Err(SqlCatalogError::UnknownConnection(format!(
2424                    "{}{}.{}",
2425                    db_name,
2426                    self.name().qualifiers.schema_spec,
2427                    self.name().item
2428                )))
2429            }
2430        }
2431    }
2432
2433    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2434    /// this `CatalogEntry`, if any.
2435    pub fn source_desc(
2436        &self,
2437    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2438        self.item.source_desc(self)
2439    }
2440
2441    /// Reports whether this catalog entry is a connection.
2442    pub fn is_connection(&self) -> bool {
2443        matches!(self.item(), CatalogItem::Connection(_))
2444    }
2445
2446    /// Reports whether this catalog entry is a table.
2447    pub fn is_table(&self) -> bool {
2448        matches!(self.item(), CatalogItem::Table(_))
2449    }
2450
2451    /// Reports whether this catalog entry is a source. Note that this includes
2452    /// subsources.
2453    pub fn is_source(&self) -> bool {
2454        matches!(self.item(), CatalogItem::Source(_))
2455    }
2456
2457    /// Reports whether this catalog entry is a subsource and, if it is, the
2458    /// ingestion it is an export of, as well as the item it exports.
2459    pub fn subsource_details(
2460        &self,
2461    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2462        match &self.item() {
2463            CatalogItem::Source(source) => match &source.data_source {
2464                DataSourceDesc::IngestionExport {
2465                    ingestion_id,
2466                    external_reference,
2467                    details,
2468                    data_config: _,
2469                } => Some((*ingestion_id, external_reference, details)),
2470                _ => None,
2471            },
2472            _ => None,
2473        }
2474    }
2475
2476    /// Reports whether this catalog entry is a source export and, if it is, the
2477    /// ingestion it is an export of, as well as the item it exports.
2478    pub fn source_export_details(
2479        &self,
2480    ) -> Option<(
2481        CatalogItemId,
2482        &UnresolvedItemName,
2483        &SourceExportDetails,
2484        &SourceExportDataConfig<ReferencedConnection>,
2485    )> {
2486        match &self.item() {
2487            CatalogItem::Source(source) => match &source.data_source {
2488                DataSourceDesc::IngestionExport {
2489                    ingestion_id,
2490                    external_reference,
2491                    details,
2492                    data_config,
2493                } => Some((*ingestion_id, external_reference, details, data_config)),
2494                _ => None,
2495            },
2496            CatalogItem::Table(table) => match &table.data_source {
2497                TableDataSource::DataSource {
2498                    desc:
2499                        DataSourceDesc::IngestionExport {
2500                            ingestion_id,
2501                            external_reference,
2502                            details,
2503                            data_config,
2504                        },
2505                    timeline: _,
2506                } => Some((*ingestion_id, external_reference, details, data_config)),
2507                _ => None,
2508            },
2509            _ => None,
2510        }
2511    }
2512
2513    /// Reports whether this catalog entry is a progress source.
2514    pub fn is_progress_source(&self) -> bool {
2515        self.item().is_progress_source()
2516    }
2517
2518    /// Returns the `GlobalId` of all of this entry's progress ID.
2519    pub fn progress_id(&self) -> Option<CatalogItemId> {
2520        match &self.item() {
2521            CatalogItem::Source(source) => match &source.data_source {
2522                DataSourceDesc::Ingestion { ingestion_desc, .. } => {
2523                    Some(ingestion_desc.progress_subsource)
2524                }
2525                DataSourceDesc::IngestionExport { .. }
2526                | DataSourceDesc::Introspection(_)
2527                | DataSourceDesc::Progress
2528                | DataSourceDesc::Webhook { .. } => None,
2529            },
2530            CatalogItem::Table(_)
2531            | CatalogItem::Log(_)
2532            | CatalogItem::View(_)
2533            | CatalogItem::MaterializedView(_)
2534            | CatalogItem::Sink(_)
2535            | CatalogItem::Index(_)
2536            | CatalogItem::Type(_)
2537            | CatalogItem::Func(_)
2538            | CatalogItem::Secret(_)
2539            | CatalogItem::Connection(_)
2540            | CatalogItem::ContinualTask(_) => None,
2541        }
2542    }
2543
2544    /// Reports whether this catalog entry is a sink.
2545    pub fn is_sink(&self) -> bool {
2546        matches!(self.item(), CatalogItem::Sink(_))
2547    }
2548
2549    /// Reports whether this catalog entry is a materialized view.
2550    pub fn is_materialized_view(&self) -> bool {
2551        matches!(self.item(), CatalogItem::MaterializedView(_))
2552    }
2553
2554    /// Reports whether this catalog entry is a view.
2555    pub fn is_view(&self) -> bool {
2556        matches!(self.item(), CatalogItem::View(_))
2557    }
2558
2559    /// Reports whether this catalog entry is a secret.
2560    pub fn is_secret(&self) -> bool {
2561        matches!(self.item(), CatalogItem::Secret(_))
2562    }
2563
2564    /// Reports whether this catalog entry is an introspection source.
2565    pub fn is_introspection_source(&self) -> bool {
2566        matches!(self.item(), CatalogItem::Log(_))
2567    }
2568
2569    /// Reports whether this catalog entry is an index.
2570    pub fn is_index(&self) -> bool {
2571        matches!(self.item(), CatalogItem::Index(_))
2572    }
2573
2574    /// Reports whether this catalog entry is a continual task.
2575    pub fn is_continual_task(&self) -> bool {
2576        matches!(self.item(), CatalogItem::ContinualTask(_))
2577    }
2578
2579    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2580    pub fn is_relation(&self) -> bool {
2581        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2582    }
2583
2584    /// Collects the identifiers of the objects that were encountered when
2585    /// resolving names in the item's DDL statement.
2586    pub fn references(&self) -> &ResolvedIds {
2587        self.item.references()
2588    }
2589
2590    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2591    ///
2592    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2593    /// referenced. For example this will include any catalog objects used to implement functions
2594    /// and casts in the item.
2595    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2596        self.item.uses()
2597    }
2598
2599    /// Returns the `CatalogItem` associated with this catalog entry.
2600    pub fn item(&self) -> &CatalogItem {
2601        &self.item
2602    }
2603
2604    /// Returns the [`CatalogItemId`] of this catalog entry.
2605    pub fn id(&self) -> CatalogItemId {
2606        self.id
2607    }
2608
2609    /// Returns all of the [`GlobalId`]s associated with this item.
2610    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2611        self.item().global_ids()
2612    }
2613
2614    pub fn latest_global_id(&self) -> GlobalId {
2615        self.item().latest_global_id()
2616    }
2617
2618    /// Returns the OID of this catalog entry.
2619    pub fn oid(&self) -> u32 {
2620        self.oid
2621    }
2622
2623    /// Returns the fully qualified name of this catalog entry.
2624    pub fn name(&self) -> &QualifiedItemName {
2625        &self.name
2626    }
2627
2628    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2629    pub fn referenced_by(&self) -> &[CatalogItemId] {
2630        &self.referenced_by
2631    }
2632
2633    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2634    pub fn used_by(&self) -> &[CatalogItemId] {
2635        &self.used_by
2636    }
2637
2638    /// Returns the connection ID that this item belongs to, if this item is
2639    /// temporary.
2640    pub fn conn_id(&self) -> Option<&ConnectionId> {
2641        self.item.conn_id()
2642    }
2643
2644    /// Returns the role ID of the entry owner.
2645    pub fn owner_id(&self) -> &RoleId {
2646        &self.owner_id
2647    }
2648
2649    /// Returns the privileges of the entry.
2650    pub fn privileges(&self) -> &PrivilegeMap {
2651        &self.privileges
2652    }
2653}
2654
2655#[derive(Debug, Clone, Default)]
2656pub struct CommentsMap {
2657    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2658}
2659
2660impl CommentsMap {
2661    pub fn update_comment(
2662        &mut self,
2663        object_id: CommentObjectId,
2664        sub_component: Option<usize>,
2665        comment: Option<String>,
2666    ) -> Option<String> {
2667        let object_comments = self.map.entry(object_id).or_default();
2668
2669        // Either replace the existing comment, or remove it if comment is None/NULL.
2670        let (empty, prev) = if let Some(comment) = comment {
2671            let prev = object_comments.insert(sub_component, comment);
2672            (false, prev)
2673        } else {
2674            let prev = object_comments.remove(&sub_component);
2675            (object_comments.is_empty(), prev)
2676        };
2677
2678        // Cleanup entries that are now empty.
2679        if empty {
2680            self.map.remove(&object_id);
2681        }
2682
2683        // Return the previous comment, if there was one, for easy removal.
2684        prev
2685    }
2686
2687    /// Remove all comments for `object_id` from the map.
2688    ///
2689    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2690    /// relations you can also have comments on the individual columns. Dropping the comments for a
2691    /// relation will also drop all of the comments on any columns.
2692    pub fn drop_comments(
2693        &mut self,
2694        object_ids: &BTreeSet<CommentObjectId>,
2695    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2696        let mut removed_comments = Vec::new();
2697
2698        for object_id in object_ids {
2699            if let Some(comments) = self.map.remove(object_id) {
2700                let removed = comments
2701                    .into_iter()
2702                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2703                removed_comments.extend(removed);
2704            }
2705        }
2706
2707        removed_comments
2708    }
2709
2710    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2711        self.map
2712            .iter()
2713            .map(|(id, comments)| {
2714                comments
2715                    .iter()
2716                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2717            })
2718            .flatten()
2719    }
2720
2721    pub fn get_object_comments(
2722        &self,
2723        object_id: CommentObjectId,
2724    ) -> Option<&BTreeMap<Option<usize>, String>> {
2725        self.map.get(&object_id)
2726    }
2727}
2728
2729impl Serialize for CommentsMap {
2730    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2731    where
2732        S: serde::Serializer,
2733    {
2734        let comment_count = self
2735            .map
2736            .iter()
2737            .map(|(_object_id, comments)| comments.len())
2738            .sum();
2739
2740        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2741        for (object_id, sub) in &self.map {
2742            for (sub_component, comment) in sub {
2743                seq.serialize_element(&(
2744                    format!("{object_id:?}"),
2745                    format!("{sub_component:?}"),
2746                    comment,
2747                ))?;
2748            }
2749        }
2750        seq.end()
2751    }
2752}
2753
2754#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2755pub struct DefaultPrivileges {
2756    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2757    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2758}
2759
2760// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2761// map_key_to_string.
2762#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2763struct RoleDefaultPrivileges(
2764    /// Denormalized, the key is the grantee Role.
2765    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2766    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2767);
2768
2769impl Deref for RoleDefaultPrivileges {
2770    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2771
2772    fn deref(&self) -> &Self::Target {
2773        &self.0
2774    }
2775}
2776
2777impl DerefMut for RoleDefaultPrivileges {
2778    fn deref_mut(&mut self) -> &mut Self::Target {
2779        &mut self.0
2780    }
2781}
2782
2783impl DefaultPrivileges {
2784    /// Add a new default privilege into the set of all default privileges.
2785    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2786        if privilege.acl_mode.is_empty() {
2787            return;
2788        }
2789
2790        let privileges = self.privileges.entry(object).or_default();
2791        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2792            default_privilege.acl_mode |= privilege.acl_mode;
2793        } else {
2794            privileges.insert(privilege.grantee, privilege);
2795        }
2796    }
2797
2798    /// Revoke a default privilege from the set of all default privileges.
2799    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2800        if let Some(privileges) = self.privileges.get_mut(object) {
2801            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2802                default_privilege.acl_mode =
2803                    default_privilege.acl_mode.difference(privilege.acl_mode);
2804                if default_privilege.acl_mode.is_empty() {
2805                    privileges.remove(&privilege.grantee);
2806                }
2807            }
2808            if privileges.is_empty() {
2809                self.privileges.remove(object);
2810            }
2811        }
2812    }
2813
2814    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2815    /// any exist.
2816    pub fn get_privileges_for_grantee(
2817        &self,
2818        object: &DefaultPrivilegeObject,
2819        grantee: &RoleId,
2820    ) -> Option<&AclMode> {
2821        self.privileges
2822            .get(object)
2823            .and_then(|privileges| privileges.get(grantee))
2824            .map(|privilege| &privilege.acl_mode)
2825    }
2826
2827    /// Get all default privileges that apply to the provided object details.
2828    pub fn get_applicable_privileges(
2829        &self,
2830        role_id: RoleId,
2831        database_id: Option<DatabaseId>,
2832        schema_id: Option<SchemaId>,
2833        object_type: mz_sql::catalog::ObjectType,
2834    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2835        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
2836        // don't require the caller to worry about that and we will map their `object_type` to the
2837        // correct type for privileges.
2838        let privilege_object_type = if object_type.is_relation() {
2839            mz_sql::catalog::ObjectType::Table
2840        } else {
2841            object_type
2842        };
2843        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2844
2845        // Collect all entries that apply to the provided object details.
2846        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
2847        // entries in the vec below. That's OK because we consolidate the results after.
2848        [
2849            DefaultPrivilegeObject {
2850                role_id,
2851                database_id,
2852                schema_id,
2853                object_type: privilege_object_type,
2854            },
2855            DefaultPrivilegeObject {
2856                role_id,
2857                database_id,
2858                schema_id: None,
2859                object_type: privilege_object_type,
2860            },
2861            DefaultPrivilegeObject {
2862                role_id,
2863                database_id: None,
2864                schema_id: None,
2865                object_type: privilege_object_type,
2866            },
2867            DefaultPrivilegeObject {
2868                role_id: RoleId::Public,
2869                database_id,
2870                schema_id,
2871                object_type: privilege_object_type,
2872            },
2873            DefaultPrivilegeObject {
2874                role_id: RoleId::Public,
2875                database_id,
2876                schema_id: None,
2877                object_type: privilege_object_type,
2878            },
2879            DefaultPrivilegeObject {
2880                role_id: RoleId::Public,
2881                database_id: None,
2882                schema_id: None,
2883                object_type: privilege_object_type,
2884            },
2885        ]
2886        .into_iter()
2887        .filter_map(|object| self.privileges.get(&object))
2888        .flat_map(|acl_map| acl_map.values())
2889        // Consolidate privileges with a common grantee.
2890        .fold(
2891            BTreeMap::new(),
2892            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2893                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2894                *accum_acl_mode |= *acl_mode;
2895                accum
2896            },
2897        )
2898        .into_iter()
2899        // Restrict the acl_mode to only privileges valid for the provided object type. If the
2900        // default privilege has an object type of Table, then it may contain privileges valid for
2901        // tables but not other relations. If the passed in object type is another relation, then
2902        // we need to remove any privilege that is not valid for the specified relation.
2903        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2904        // Filter out empty privileges.
2905        .filter(|(_, acl_mode)| !acl_mode.is_empty())
2906        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2907            grantee: *grantee,
2908            acl_mode,
2909        })
2910    }
2911
2912    pub fn iter(
2913        &self,
2914    ) -> impl Iterator<
2915        Item = (
2916            &DefaultPrivilegeObject,
2917            impl Iterator<Item = &DefaultPrivilegeAclItem>,
2918        ),
2919    > {
2920        self.privileges
2921            .iter()
2922            .map(|(object, acl_map)| (object, acl_map.values()))
2923    }
2924}
2925
2926#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2927pub struct ClusterConfig {
2928    pub variant: ClusterVariant,
2929    pub workload_class: Option<String>,
2930}
2931
2932impl ClusterConfig {
2933    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2934        match &self.variant {
2935            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2936            ClusterVariant::Unmanaged => None,
2937        }
2938    }
2939}
2940
2941impl From<ClusterConfig> for durable::ClusterConfig {
2942    fn from(config: ClusterConfig) -> Self {
2943        Self {
2944            variant: config.variant.into(),
2945            workload_class: config.workload_class,
2946        }
2947    }
2948}
2949
2950impl From<durable::ClusterConfig> for ClusterConfig {
2951    fn from(config: durable::ClusterConfig) -> Self {
2952        Self {
2953            variant: config.variant.into(),
2954            workload_class: config.workload_class,
2955        }
2956    }
2957}
2958
2959#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2960pub struct ClusterVariantManaged {
2961    pub size: String,
2962    pub availability_zones: Vec<String>,
2963    pub logging: ReplicaLogging,
2964    pub replication_factor: u32,
2965    pub disk: bool,
2966    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2967    pub schedule: ClusterSchedule,
2968}
2969
2970impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
2971    fn from(managed: ClusterVariantManaged) -> Self {
2972        Self {
2973            size: managed.size,
2974            availability_zones: managed.availability_zones,
2975            logging: managed.logging,
2976            replication_factor: managed.replication_factor,
2977            disk: managed.disk,
2978            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2979            schedule: managed.schedule,
2980        }
2981    }
2982}
2983
2984impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
2985    fn from(managed: durable::ClusterVariantManaged) -> Self {
2986        Self {
2987            size: managed.size,
2988            availability_zones: managed.availability_zones,
2989            logging: managed.logging,
2990            replication_factor: managed.replication_factor,
2991            disk: managed.disk,
2992            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2993            schedule: managed.schedule,
2994        }
2995    }
2996}
2997
2998#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2999pub enum ClusterVariant {
3000    Managed(ClusterVariantManaged),
3001    Unmanaged,
3002}
3003
3004impl From<ClusterVariant> for durable::ClusterVariant {
3005    fn from(variant: ClusterVariant) -> Self {
3006        match variant {
3007            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3008            ClusterVariant::Unmanaged => Self::Unmanaged,
3009        }
3010    }
3011}
3012
3013impl From<durable::ClusterVariant> for ClusterVariant {
3014    fn from(variant: durable::ClusterVariant) -> Self {
3015        match variant {
3016            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3017            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3018        }
3019    }
3020}
3021
3022impl mz_sql::catalog::CatalogDatabase for Database {
3023    fn name(&self) -> &str {
3024        &self.name
3025    }
3026
3027    fn id(&self) -> DatabaseId {
3028        self.id
3029    }
3030
3031    fn has_schemas(&self) -> bool {
3032        !self.schemas_by_name.is_empty()
3033    }
3034
3035    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3036        &self.schemas_by_name
3037    }
3038
3039    // `as` is ok to use to cast to a trait object.
3040    #[allow(clippy::as_conversions)]
3041    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3042        self.schemas_by_id
3043            .values()
3044            .map(|schema| schema as &dyn CatalogSchema)
3045            .collect()
3046    }
3047
3048    fn owner_id(&self) -> RoleId {
3049        self.owner_id
3050    }
3051
3052    fn privileges(&self) -> &PrivilegeMap {
3053        &self.privileges
3054    }
3055}
3056
3057impl mz_sql::catalog::CatalogSchema for Schema {
3058    fn database(&self) -> &ResolvedDatabaseSpecifier {
3059        &self.name.database
3060    }
3061
3062    fn name(&self) -> &QualifiedSchemaName {
3063        &self.name
3064    }
3065
3066    fn id(&self) -> &SchemaSpecifier {
3067        &self.id
3068    }
3069
3070    fn has_items(&self) -> bool {
3071        !self.items.is_empty()
3072    }
3073
3074    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3075        Box::new(
3076            self.items
3077                .values()
3078                .chain(self.functions.values())
3079                .chain(self.types.values())
3080                .copied(),
3081        )
3082    }
3083
3084    fn owner_id(&self) -> RoleId {
3085        self.owner_id
3086    }
3087
3088    fn privileges(&self) -> &PrivilegeMap {
3089        &self.privileges
3090    }
3091}
3092
3093impl mz_sql::catalog::CatalogRole for Role {
3094    fn name(&self) -> &str {
3095        &self.name
3096    }
3097
3098    fn id(&self) -> RoleId {
3099        self.id
3100    }
3101
3102    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3103        &self.membership.map
3104    }
3105
3106    fn attributes(&self) -> &RoleAttributes {
3107        &self.attributes
3108    }
3109
3110    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3111        &self.vars.map
3112    }
3113}
3114
3115impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3116    fn name(&self) -> &str {
3117        &self.name
3118    }
3119
3120    fn id(&self) -> NetworkPolicyId {
3121        self.id
3122    }
3123
3124    fn owner_id(&self) -> RoleId {
3125        self.owner_id
3126    }
3127
3128    fn privileges(&self) -> &PrivilegeMap {
3129        &self.privileges
3130    }
3131}
3132
3133impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3134    fn name(&self) -> &str {
3135        &self.name
3136    }
3137
3138    fn id(&self) -> ClusterId {
3139        self.id
3140    }
3141
3142    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3143        &self.bound_objects
3144    }
3145
3146    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3147        &self.replica_id_by_name_
3148    }
3149
3150    // `as` is ok to use to cast to a trait object.
3151    #[allow(clippy::as_conversions)]
3152    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica> {
3153        self.replicas()
3154            .map(|replica| replica as &dyn CatalogClusterReplica)
3155            .collect()
3156    }
3157
3158    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica {
3159        self.replica(id).expect("catalog out of sync")
3160    }
3161
3162    fn owner_id(&self) -> RoleId {
3163        self.owner_id
3164    }
3165
3166    fn privileges(&self) -> &PrivilegeMap {
3167        &self.privileges
3168    }
3169
3170    fn is_managed(&self) -> bool {
3171        self.is_managed()
3172    }
3173
3174    fn managed_size(&self) -> Option<&str> {
3175        match &self.config.variant {
3176            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3177            _ => None,
3178        }
3179    }
3180
3181    fn schedule(&self) -> Option<&ClusterSchedule> {
3182        match &self.config.variant {
3183            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3184            _ => None,
3185        }
3186    }
3187
3188    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3189        self.try_to_plan()
3190    }
3191}
3192
3193impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3194    fn name(&self) -> &str {
3195        &self.name
3196    }
3197
3198    fn cluster_id(&self) -> ClusterId {
3199        self.cluster_id
3200    }
3201
3202    fn replica_id(&self) -> ReplicaId {
3203        self.replica_id
3204    }
3205
3206    fn owner_id(&self) -> RoleId {
3207        self.owner_id
3208    }
3209
3210    fn internal(&self) -> bool {
3211        self.config.location.internal()
3212    }
3213}
3214
3215impl mz_sql::catalog::CatalogItem for CatalogEntry {
3216    fn name(&self) -> &QualifiedItemName {
3217        self.name()
3218    }
3219
3220    fn id(&self) -> CatalogItemId {
3221        self.id()
3222    }
3223
3224    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3225        Box::new(self.global_ids())
3226    }
3227
3228    fn oid(&self) -> u32 {
3229        self.oid()
3230    }
3231
3232    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3233        self.func()
3234    }
3235
3236    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3237        self.source_desc()
3238    }
3239
3240    fn connection(
3241        &self,
3242    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3243    {
3244        Ok(self.connection()?.details.to_connection())
3245    }
3246
3247    fn create_sql(&self) -> &str {
3248        match self.item() {
3249            CatalogItem::Table(Table { create_sql, .. }) => {
3250                create_sql.as_deref().unwrap_or("<builtin>")
3251            }
3252            CatalogItem::Source(Source { create_sql, .. }) => {
3253                create_sql.as_deref().unwrap_or("<builtin>")
3254            }
3255            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3256            CatalogItem::View(View { create_sql, .. }) => create_sql,
3257            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3258            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3259            CatalogItem::Type(Type { create_sql, .. }) => {
3260                create_sql.as_deref().unwrap_or("<builtin>")
3261            }
3262            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3263            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3264            CatalogItem::Func(_) => "<builtin>",
3265            CatalogItem::Log(_) => "<builtin>",
3266            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3267        }
3268    }
3269
3270    fn item_type(&self) -> SqlCatalogItemType {
3271        self.item().typ()
3272    }
3273
3274    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3275        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3276            Some((keys, *on))
3277        } else {
3278            None
3279        }
3280    }
3281
3282    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3283        if let CatalogItem::Table(Table {
3284            data_source: TableDataSource::TableWrites { defaults },
3285            ..
3286        }) = self.item()
3287        {
3288            Some(defaults.as_slice())
3289        } else {
3290            None
3291        }
3292    }
3293
3294    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3295        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3296            Some(details)
3297        } else {
3298            None
3299        }
3300    }
3301
3302    fn references(&self) -> &ResolvedIds {
3303        self.references()
3304    }
3305
3306    fn uses(&self) -> BTreeSet<CatalogItemId> {
3307        self.uses()
3308    }
3309
3310    fn referenced_by(&self) -> &[CatalogItemId] {
3311        self.referenced_by()
3312    }
3313
3314    fn used_by(&self) -> &[CatalogItemId] {
3315        self.used_by()
3316    }
3317
3318    fn subsource_details(
3319        &self,
3320    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3321        self.subsource_details()
3322    }
3323
3324    fn source_export_details(
3325        &self,
3326    ) -> Option<(
3327        CatalogItemId,
3328        &UnresolvedItemName,
3329        &SourceExportDetails,
3330        &SourceExportDataConfig<ReferencedConnection>,
3331    )> {
3332        self.source_export_details()
3333    }
3334
3335    fn is_progress_source(&self) -> bool {
3336        self.is_progress_source()
3337    }
3338
3339    fn progress_id(&self) -> Option<CatalogItemId> {
3340        self.progress_id()
3341    }
3342
3343    fn owner_id(&self) -> RoleId {
3344        self.owner_id
3345    }
3346
3347    fn privileges(&self) -> &PrivilegeMap {
3348        &self.privileges
3349    }
3350
3351    fn cluster_id(&self) -> Option<ClusterId> {
3352        self.item().cluster_id()
3353    }
3354
3355    fn at_version(
3356        &self,
3357        version: RelationVersionSelector,
3358    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3359        Box::new(CatalogCollectionEntry {
3360            entry: self.clone(),
3361            version,
3362        })
3363    }
3364
3365    fn latest_version(&self) -> Option<RelationVersion> {
3366        self.table().map(|t| t.desc.latest_version())
3367    }
3368}
3369
3370/// A single update to the catalog state.
3371#[derive(Debug)]
3372pub struct StateUpdate {
3373    pub kind: StateUpdateKind,
3374    pub ts: Timestamp,
3375    pub diff: StateDiff,
3376}
3377
3378/// The contents of a single state update.
3379///
3380/// Variants are listed in dependency order.
3381#[derive(Debug, Clone)]
3382pub enum StateUpdateKind {
3383    Role(durable::objects::Role),
3384    RoleAuth(durable::objects::RoleAuth),
3385    Database(durable::objects::Database),
3386    Schema(durable::objects::Schema),
3387    DefaultPrivilege(durable::objects::DefaultPrivilege),
3388    SystemPrivilege(MzAclItem),
3389    SystemConfiguration(durable::objects::SystemConfiguration),
3390    Cluster(durable::objects::Cluster),
3391    NetworkPolicy(durable::objects::NetworkPolicy),
3392    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3393    ClusterReplica(durable::objects::ClusterReplica),
3394    SourceReferences(durable::objects::SourceReferences),
3395    SystemObjectMapping(durable::objects::SystemObjectMapping),
3396    // Temporary items are not actually updated via the durable catalog, but this allows us to
3397    // model them the same way as all other items.
3398    TemporaryItem(TemporaryItem),
3399    Item(durable::objects::Item),
3400    Comment(durable::objects::Comment),
3401    AuditLog(durable::objects::AuditLog),
3402    // Storage updates.
3403    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3404    UnfinalizedShard(durable::objects::UnfinalizedShard),
3405}
3406
3407/// Valid diffs for catalog state updates.
3408#[derive(Debug, Clone, Copy, Eq, PartialEq)]
3409pub enum StateDiff {
3410    Retraction,
3411    Addition,
3412}
3413
3414impl From<StateDiff> for Diff {
3415    fn from(diff: StateDiff) -> Self {
3416        match diff {
3417            StateDiff::Retraction => Diff::MINUS_ONE,
3418            StateDiff::Addition => Diff::ONE,
3419        }
3420    }
3421}
3422impl TryFrom<Diff> for StateDiff {
3423    type Error = String;
3424
3425    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3426        match diff {
3427            Diff::MINUS_ONE => Ok(Self::Retraction),
3428            Diff::ONE => Ok(Self::Addition),
3429            diff => Err(format!("invalid diff {diff}")),
3430        }
3431    }
3432}
3433
3434/// Information needed to process an update to a temporary item.
3435#[derive(Debug, Clone)]
3436pub struct TemporaryItem {
3437    pub id: CatalogItemId,
3438    pub oid: u32,
3439    pub name: QualifiedItemName,
3440    pub item: CatalogItem,
3441    pub owner_id: RoleId,
3442    pub privileges: PrivilegeMap,
3443}
3444
3445impl From<CatalogEntry> for TemporaryItem {
3446    fn from(entry: CatalogEntry) -> Self {
3447        TemporaryItem {
3448            id: entry.id,
3449            oid: entry.oid,
3450            name: entry.name,
3451            item: entry.item,
3452            owner_id: entry.owner_id,
3453            privileges: entry.privileges,
3454        }
3455    }
3456}
3457
3458/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3459#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3460pub enum BootstrapStateUpdateKind {
3461    Role(durable::objects::Role),
3462    RoleAuth(durable::objects::RoleAuth),
3463    Database(durable::objects::Database),
3464    Schema(durable::objects::Schema),
3465    DefaultPrivilege(durable::objects::DefaultPrivilege),
3466    SystemPrivilege(MzAclItem),
3467    SystemConfiguration(durable::objects::SystemConfiguration),
3468    Cluster(durable::objects::Cluster),
3469    NetworkPolicy(durable::objects::NetworkPolicy),
3470    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3471    ClusterReplica(durable::objects::ClusterReplica),
3472    SourceReferences(durable::objects::SourceReferences),
3473    SystemObjectMapping(durable::objects::SystemObjectMapping),
3474    Item(durable::objects::Item),
3475    Comment(durable::objects::Comment),
3476    AuditLog(durable::objects::AuditLog),
3477    // Storage updates.
3478    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3479    UnfinalizedShard(durable::objects::UnfinalizedShard),
3480}
3481
3482impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3483    fn from(value: BootstrapStateUpdateKind) -> Self {
3484        match value {
3485            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3486            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3487            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3488            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3489            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3490                StateUpdateKind::DefaultPrivilege(kind)
3491            }
3492            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3493                StateUpdateKind::SystemPrivilege(kind)
3494            }
3495            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3496                StateUpdateKind::SystemConfiguration(kind)
3497            }
3498            BootstrapStateUpdateKind::SourceReferences(kind) => {
3499                StateUpdateKind::SourceReferences(kind)
3500            }
3501            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3502            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3503            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3504                StateUpdateKind::IntrospectionSourceIndex(kind)
3505            }
3506            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3507            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3508                StateUpdateKind::SystemObjectMapping(kind)
3509            }
3510            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3511            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3512            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3513            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3514                StateUpdateKind::StorageCollectionMetadata(kind)
3515            }
3516            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3517                StateUpdateKind::UnfinalizedShard(kind)
3518            }
3519        }
3520    }
3521}
3522
3523impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3524    type Error = TemporaryItem;
3525
3526    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3527        match value {
3528            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3529            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3530            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3531            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3532            StateUpdateKind::DefaultPrivilege(kind) => {
3533                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3534            }
3535            StateUpdateKind::SystemPrivilege(kind) => {
3536                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3537            }
3538            StateUpdateKind::SystemConfiguration(kind) => {
3539                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3540            }
3541            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3542            StateUpdateKind::NetworkPolicy(kind) => {
3543                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3544            }
3545            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3546                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3547            }
3548            StateUpdateKind::ClusterReplica(kind) => {
3549                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3550            }
3551            StateUpdateKind::SourceReferences(kind) => {
3552                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3553            }
3554            StateUpdateKind::SystemObjectMapping(kind) => {
3555                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3556            }
3557            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3558            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3559            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3560            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3561            StateUpdateKind::StorageCollectionMetadata(kind) => {
3562                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3563            }
3564            StateUpdateKind::UnfinalizedShard(kind) => {
3565                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3566            }
3567        }
3568    }
3569}