mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39    UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44    DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45    RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54    HirRelationExpr, Ingestion as PlanIngestion, NetworkPolicyRule, PlanError, WebhookBodyFormat,
55    WebhookHeaders, WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
64    SourceExportDetails, Timeline,
65};
66use serde::ser::SerializeSeq;
67use serde::{Deserialize, Serialize};
68use timely::progress::Antichain;
69use tracing::debug;
70
71use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
72use crate::durable;
73
74/// Used to update `self` from the input value while consuming the input value.
75pub trait UpdateFrom<T>: From<T> {
76    fn update_from(&mut self, from: T);
77}
78
79#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
80pub struct Database {
81    pub name: String,
82    pub id: DatabaseId,
83    pub oid: u32,
84    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
85    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
86    pub schemas_by_name: BTreeMap<String, SchemaId>,
87    pub owner_id: RoleId,
88    pub privileges: PrivilegeMap,
89}
90
91impl From<Database> for durable::Database {
92    fn from(database: Database) -> durable::Database {
93        durable::Database {
94            id: database.id,
95            oid: database.oid,
96            name: database.name,
97            owner_id: database.owner_id,
98            privileges: database.privileges.into_all_values().collect(),
99        }
100    }
101}
102
103impl From<durable::Database> for Database {
104    fn from(
105        durable::Database {
106            id,
107            oid,
108            name,
109            owner_id,
110            privileges,
111        }: durable::Database,
112    ) -> Database {
113        Database {
114            id,
115            oid,
116            schemas_by_id: BTreeMap::new(),
117            schemas_by_name: BTreeMap::new(),
118            name,
119            owner_id,
120            privileges: PrivilegeMap::from_mz_acl_items(privileges),
121        }
122    }
123}
124
125impl UpdateFrom<durable::Database> for Database {
126    fn update_from(
127        &mut self,
128        durable::Database {
129            id,
130            oid,
131            name,
132            owner_id,
133            privileges,
134        }: durable::Database,
135    ) {
136        self.id = id;
137        self.oid = oid;
138        self.name = name;
139        self.owner_id = owner_id;
140        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
141    }
142}
143
144#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
145pub struct Schema {
146    pub name: QualifiedSchemaName,
147    pub id: SchemaSpecifier,
148    pub oid: u32,
149    pub items: BTreeMap<String, CatalogItemId>,
150    pub functions: BTreeMap<String, CatalogItemId>,
151    pub types: BTreeMap<String, CatalogItemId>,
152    pub owner_id: RoleId,
153    pub privileges: PrivilegeMap,
154}
155
156impl From<Schema> for durable::Schema {
157    fn from(schema: Schema) -> durable::Schema {
158        durable::Schema {
159            id: schema.id.into(),
160            oid: schema.oid,
161            name: schema.name.schema,
162            database_id: schema.name.database.id(),
163            owner_id: schema.owner_id,
164            privileges: schema.privileges.into_all_values().collect(),
165        }
166    }
167}
168
169impl From<durable::Schema> for Schema {
170    fn from(
171        durable::Schema {
172            id,
173            oid,
174            name,
175            database_id,
176            owner_id,
177            privileges,
178        }: durable::Schema,
179    ) -> Schema {
180        Schema {
181            name: QualifiedSchemaName {
182                database: database_id.into(),
183                schema: name,
184            },
185            id: id.into(),
186            oid,
187            items: BTreeMap::new(),
188            functions: BTreeMap::new(),
189            types: BTreeMap::new(),
190            owner_id,
191            privileges: PrivilegeMap::from_mz_acl_items(privileges),
192        }
193    }
194}
195
196impl UpdateFrom<durable::Schema> for Schema {
197    fn update_from(
198        &mut self,
199        durable::Schema {
200            id,
201            oid,
202            name,
203            database_id,
204            owner_id,
205            privileges,
206        }: durable::Schema,
207    ) {
208        self.name = QualifiedSchemaName {
209            database: database_id.into(),
210            schema: name,
211        };
212        self.id = id.into();
213        self.oid = oid;
214        self.owner_id = owner_id;
215        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
216    }
217}
218
219#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
220pub struct Role {
221    pub name: String,
222    pub id: RoleId,
223    pub oid: u32,
224    pub attributes: RoleAttributes,
225    pub membership: RoleMembership,
226    pub vars: RoleVars,
227}
228
229impl Role {
230    pub fn is_user(&self) -> bool {
231        self.id.is_user()
232    }
233
234    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
235        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
236    }
237}
238
239impl From<Role> for durable::Role {
240    fn from(role: Role) -> durable::Role {
241        durable::Role {
242            id: role.id,
243            oid: role.oid,
244            name: role.name,
245            attributes: role.attributes,
246            membership: role.membership,
247            vars: role.vars,
248        }
249    }
250}
251
252impl From<durable::Role> for Role {
253    fn from(
254        durable::Role {
255            id,
256            oid,
257            name,
258            attributes,
259            membership,
260            vars,
261        }: durable::Role,
262    ) -> Self {
263        Role {
264            name,
265            id,
266            oid,
267            attributes,
268            membership,
269            vars,
270        }
271    }
272}
273
274impl UpdateFrom<durable::Role> for Role {
275    fn update_from(
276        &mut self,
277        durable::Role {
278            id,
279            oid,
280            name,
281            attributes,
282            membership,
283            vars,
284        }: durable::Role,
285    ) {
286        self.id = id;
287        self.oid = oid;
288        self.name = name;
289        self.attributes = attributes;
290        self.membership = membership;
291        self.vars = vars;
292    }
293}
294
295#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
296pub struct RoleAuth {
297    pub role_id: RoleId,
298    pub password_hash: Option<String>,
299    pub updated_at: u64,
300}
301
302impl From<RoleAuth> for durable::RoleAuth {
303    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
304        durable::RoleAuth {
305            role_id: role_auth.role_id,
306            password_hash: role_auth.password_hash,
307            updated_at: role_auth.updated_at,
308        }
309    }
310}
311
312impl From<durable::RoleAuth> for RoleAuth {
313    fn from(
314        durable::RoleAuth {
315            role_id,
316            password_hash,
317            updated_at,
318        }: durable::RoleAuth,
319    ) -> RoleAuth {
320        RoleAuth {
321            role_id,
322            password_hash,
323            updated_at,
324        }
325    }
326}
327
328impl UpdateFrom<durable::RoleAuth> for RoleAuth {
329    fn update_from(&mut self, from: durable::RoleAuth) {
330        self.role_id = from.role_id;
331        self.password_hash = from.password_hash;
332    }
333}
334
335#[derive(Debug, Serialize, Clone, PartialEq)]
336pub struct Cluster {
337    pub name: String,
338    pub id: ClusterId,
339    pub config: ClusterConfig,
340    #[serde(skip)]
341    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
342    /// Objects bound to this cluster. Does not include introspection source
343    /// indexes.
344    pub bound_objects: BTreeSet<CatalogItemId>,
345    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
346    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
347    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
348    pub owner_id: RoleId,
349    pub privileges: PrivilegeMap,
350}
351
352impl Cluster {
353    /// The role of the cluster. Currently used to set alert severity.
354    pub fn role(&self) -> ClusterRole {
355        // NOTE - These roles power monitoring systems. Do not change
356        // them without talking to the cloud or observability groups.
357        if self.name == MZ_SYSTEM_CLUSTER.name {
358            ClusterRole::SystemCritical
359        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
360            ClusterRole::System
361        } else {
362            ClusterRole::User
363        }
364    }
365
366    /// Returns `true` if the cluster is a managed cluster.
367    pub fn is_managed(&self) -> bool {
368        matches!(self.config.variant, ClusterVariant::Managed { .. })
369    }
370
371    /// Lists the user replicas, which are those that do not have the internal flag set.
372    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
373        self.replicas().filter(|r| !r.config.location.internal())
374    }
375
376    /// Lists all replicas in the cluster
377    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
378        self.replicas_by_id_.values()
379    }
380
381    /// Lookup a replica by ID.
382    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
383        self.replicas_by_id_.get(&replica_id)
384    }
385
386    /// Lookup a replica ID by name.
387    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
388        self.replica_id_by_name_.get(name).copied()
389    }
390
391    /// Returns the availability zones of this cluster, if they exist.
392    pub fn availability_zones(&self) -> Option<&[String]> {
393        match &self.config.variant {
394            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
395            ClusterVariant::Unmanaged => None,
396        }
397    }
398
399    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
400        let name = self.name.clone();
401        let variant = match &self.config.variant {
402            ClusterVariant::Managed(ClusterVariantManaged {
403                size,
404                availability_zones,
405                logging,
406                replication_factor,
407                optimizer_feature_overrides,
408                schedule,
409            }) => {
410                let introspection = match logging {
411                    ReplicaLogging {
412                        log_logging,
413                        interval: Some(interval),
414                    } => Some(ComputeReplicaIntrospectionConfig {
415                        debugging: *log_logging,
416                        interval: interval.clone(),
417                    }),
418                    ReplicaLogging {
419                        log_logging: _,
420                        interval: None,
421                    } => None,
422                };
423                let compute = ComputeReplicaConfig { introspection };
424                CreateClusterVariant::Managed(CreateClusterManagedPlan {
425                    replication_factor: replication_factor.clone(),
426                    size: size.clone(),
427                    availability_zones: availability_zones.clone(),
428                    compute,
429                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
430                    schedule: schedule.clone(),
431                })
432            }
433            ClusterVariant::Unmanaged => {
434                // Unmanaged clusters are deprecated, so hopefully we can remove
435                // them before we have to implement this.
436                return Err(PlanError::Unsupported {
437                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
438                    discussion_no: None,
439                });
440            }
441        };
442        let workload_class = self.config.workload_class.clone();
443        Ok(CreateClusterPlan {
444            name,
445            variant,
446            workload_class,
447        })
448    }
449}
450
451impl From<Cluster> for durable::Cluster {
452    fn from(cluster: Cluster) -> durable::Cluster {
453        durable::Cluster {
454            id: cluster.id,
455            name: cluster.name,
456            owner_id: cluster.owner_id,
457            privileges: cluster.privileges.into_all_values().collect(),
458            config: cluster.config.into(),
459        }
460    }
461}
462
463impl From<durable::Cluster> for Cluster {
464    fn from(
465        durable::Cluster {
466            id,
467            name,
468            owner_id,
469            privileges,
470            config,
471        }: durable::Cluster,
472    ) -> Self {
473        Cluster {
474            name: name.clone(),
475            id,
476            bound_objects: BTreeSet::new(),
477            log_indexes: BTreeMap::new(),
478            replica_id_by_name_: BTreeMap::new(),
479            replicas_by_id_: BTreeMap::new(),
480            owner_id,
481            privileges: PrivilegeMap::from_mz_acl_items(privileges),
482            config: config.into(),
483        }
484    }
485}
486
487impl UpdateFrom<durable::Cluster> for Cluster {
488    fn update_from(
489        &mut self,
490        durable::Cluster {
491            id,
492            name,
493            owner_id,
494            privileges,
495            config,
496        }: durable::Cluster,
497    ) {
498        self.id = id;
499        self.name = name;
500        self.owner_id = owner_id;
501        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
502        self.config = config.into();
503    }
504}
505
506#[derive(Debug, Serialize, Clone, PartialEq)]
507pub struct ClusterReplica {
508    pub name: String,
509    pub cluster_id: ClusterId,
510    pub replica_id: ReplicaId,
511    pub config: ReplicaConfig,
512    pub owner_id: RoleId,
513}
514
515impl From<ClusterReplica> for durable::ClusterReplica {
516    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
517        durable::ClusterReplica {
518            cluster_id: replica.cluster_id,
519            replica_id: replica.replica_id,
520            name: replica.name,
521            config: replica.config.into(),
522            owner_id: replica.owner_id,
523        }
524    }
525}
526
527#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
528pub struct ClusterReplicaProcessStatus {
529    pub status: ClusterStatus,
530    pub time: DateTime<Utc>,
531}
532
533#[derive(Debug, Serialize, Clone, PartialEq)]
534pub struct SourceReferences {
535    pub updated_at: u64,
536    pub references: Vec<SourceReference>,
537}
538
539#[derive(Debug, Serialize, Clone, PartialEq)]
540pub struct SourceReference {
541    pub name: String,
542    pub namespace: Option<String>,
543    pub columns: Vec<String>,
544}
545
546impl From<SourceReference> for durable::SourceReference {
547    fn from(source_reference: SourceReference) -> durable::SourceReference {
548        durable::SourceReference {
549            name: source_reference.name,
550            namespace: source_reference.namespace,
551            columns: source_reference.columns,
552        }
553    }
554}
555
556impl SourceReferences {
557    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
558        durable::SourceReferences {
559            source_id,
560            updated_at: self.updated_at,
561            references: self.references.into_iter().map(Into::into).collect(),
562        }
563    }
564}
565
566impl From<durable::SourceReference> for SourceReference {
567    fn from(source_reference: durable::SourceReference) -> SourceReference {
568        SourceReference {
569            name: source_reference.name,
570            namespace: source_reference.namespace,
571            columns: source_reference.columns,
572        }
573    }
574}
575
576impl From<durable::SourceReferences> for SourceReferences {
577    fn from(source_references: durable::SourceReferences) -> SourceReferences {
578        SourceReferences {
579            updated_at: source_references.updated_at,
580            references: source_references
581                .references
582                .into_iter()
583                .map(|source_reference| source_reference.into())
584                .collect(),
585        }
586    }
587}
588
589impl From<mz_sql::plan::SourceReference> for SourceReference {
590    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
591        SourceReference {
592            name: source_reference.name,
593            namespace: source_reference.namespace,
594            columns: source_reference.columns,
595        }
596    }
597}
598
599impl From<mz_sql::plan::SourceReferences> for SourceReferences {
600    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
601        SourceReferences {
602            updated_at: source_references.updated_at,
603            references: source_references
604                .references
605                .into_iter()
606                .map(|source_reference| source_reference.into())
607                .collect(),
608        }
609    }
610}
611
612impl From<SourceReferences> for mz_sql::plan::SourceReferences {
613    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
614        mz_sql::plan::SourceReferences {
615            updated_at: source_references.updated_at,
616            references: source_references
617                .references
618                .into_iter()
619                .map(|source_reference| source_reference.into())
620                .collect(),
621        }
622    }
623}
624
625impl From<SourceReference> for mz_sql::plan::SourceReference {
626    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
627        mz_sql::plan::SourceReference {
628            name: source_reference.name,
629            namespace: source_reference.namespace,
630            columns: source_reference.columns,
631        }
632    }
633}
634
635#[derive(Clone, Debug, Serialize)]
636pub struct CatalogEntry {
637    pub item: CatalogItem,
638    #[serde(skip)]
639    pub referenced_by: Vec<CatalogItemId>,
640    // TODO(database-issues#7922)––this should have an invariant tied to it that all
641    // dependents (i.e. entries in this field) have IDs greater than this
642    // entry's ID.
643    #[serde(skip)]
644    pub used_by: Vec<CatalogItemId>,
645    pub id: CatalogItemId,
646    pub oid: u32,
647    pub name: QualifiedItemName,
648    pub owner_id: RoleId,
649    pub privileges: PrivilegeMap,
650}
651
652/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
653/// A single item in the catalog may be associated with multiple "collections".
654///
655/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
656/// currently running dataflow, etc.
657///
658/// Items in the Catalog have a stable name -> ID mapping, in other words for
659/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
660/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
661/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
662/// to a table. We can't just change the schema of the underlying Persist Shard
663/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
664/// allocate a new [`GlobalId`] to refer to the new version of the table, and
665/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
666///
667/// TODO(ct): Add a note here if we end up using this for associating continual
668/// tasks with a single catalog item.
669#[derive(Clone, Debug)]
670pub struct CatalogCollectionEntry {
671    pub entry: CatalogEntry,
672    pub version: RelationVersionSelector,
673}
674
675impl CatalogCollectionEntry {
676    pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
677        self.item().desc(name, self.version)
678    }
679
680    pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
681        self.item().desc_opt(self.version)
682    }
683}
684
685impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
686    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
687        CatalogCollectionEntry::desc(self, name)
688    }
689
690    fn global_id(&self) -> GlobalId {
691        match self.entry.item() {
692            CatalogItem::Source(source) => source.global_id,
693            CatalogItem::Log(log) => log.global_id,
694            CatalogItem::View(view) => view.global_id,
695            CatalogItem::MaterializedView(mv) => mv.global_id,
696            CatalogItem::Sink(sink) => sink.global_id,
697            CatalogItem::Index(index) => index.global_id,
698            CatalogItem::Type(ty) => ty.global_id,
699            CatalogItem::Func(func) => func.global_id,
700            CatalogItem::Secret(secret) => secret.global_id,
701            CatalogItem::Connection(conn) => conn.global_id,
702            CatalogItem::ContinualTask(ct) => ct.global_id,
703            CatalogItem::Table(table) => match self.version {
704                RelationVersionSelector::Latest => {
705                    let (_version, gid) = table
706                        .collections
707                        .last_key_value()
708                        .expect("at least one version");
709                    *gid
710                }
711                RelationVersionSelector::Specific(version) => table
712                    .collections
713                    .get(&version)
714                    .expect("catalog corruption, missing version!")
715                    .clone(),
716            },
717        }
718    }
719}
720
721impl Deref for CatalogCollectionEntry {
722    type Target = CatalogEntry;
723
724    fn deref(&self) -> &CatalogEntry {
725        &self.entry
726    }
727}
728
729impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
730    fn name(&self) -> &QualifiedItemName {
731        self.entry.name()
732    }
733
734    fn id(&self) -> CatalogItemId {
735        self.entry.id()
736    }
737
738    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
739        Box::new(self.entry.global_ids())
740    }
741
742    fn oid(&self) -> u32 {
743        self.entry.oid()
744    }
745
746    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
747        self.entry.func()
748    }
749
750    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
751        self.entry.source_desc()
752    }
753
754    fn connection(
755        &self,
756    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
757    {
758        mz_sql::catalog::CatalogItem::connection(&self.entry)
759    }
760
761    fn create_sql(&self) -> &str {
762        self.entry.create_sql()
763    }
764
765    fn item_type(&self) -> SqlCatalogItemType {
766        self.entry.item_type()
767    }
768
769    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
770        self.entry.index_details()
771    }
772
773    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
774        self.entry.writable_table_details()
775    }
776
777    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
778        self.entry.type_details()
779    }
780
781    fn references(&self) -> &ResolvedIds {
782        self.entry.references()
783    }
784
785    fn uses(&self) -> BTreeSet<CatalogItemId> {
786        self.entry.uses()
787    }
788
789    fn referenced_by(&self) -> &[CatalogItemId] {
790        self.entry.referenced_by()
791    }
792
793    fn used_by(&self) -> &[CatalogItemId] {
794        self.entry.used_by()
795    }
796
797    fn subsource_details(
798        &self,
799    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
800        self.entry.subsource_details()
801    }
802
803    fn source_export_details(
804        &self,
805    ) -> Option<(
806        CatalogItemId,
807        &UnresolvedItemName,
808        &SourceExportDetails,
809        &SourceExportDataConfig<ReferencedConnection>,
810    )> {
811        self.entry.source_export_details()
812    }
813
814    fn is_progress_source(&self) -> bool {
815        self.entry.is_progress_source()
816    }
817
818    fn progress_id(&self) -> Option<CatalogItemId> {
819        self.entry.progress_id()
820    }
821
822    fn owner_id(&self) -> RoleId {
823        *self.entry.owner_id()
824    }
825
826    fn privileges(&self) -> &PrivilegeMap {
827        self.entry.privileges()
828    }
829
830    fn cluster_id(&self) -> Option<ClusterId> {
831        self.entry.item().cluster_id()
832    }
833
834    fn at_version(
835        &self,
836        version: RelationVersionSelector,
837    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
838        Box::new(CatalogCollectionEntry {
839            entry: self.entry.clone(),
840            version,
841        })
842    }
843
844    fn latest_version(&self) -> Option<RelationVersion> {
845        self.entry.latest_version()
846    }
847}
848
849#[derive(Debug, Clone, Serialize)]
850pub enum CatalogItem {
851    Table(Table),
852    Source(Source),
853    Log(Log),
854    View(View),
855    MaterializedView(MaterializedView),
856    Sink(Sink),
857    Index(Index),
858    Type(Type),
859    Func(Func),
860    Secret(Secret),
861    Connection(Connection),
862    ContinualTask(ContinualTask),
863}
864
865impl From<CatalogEntry> for durable::Item {
866    fn from(entry: CatalogEntry) -> durable::Item {
867        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
868        durable::Item {
869            id: entry.id,
870            oid: entry.oid,
871            global_id,
872            schema_id: entry.name.qualifiers.schema_spec.into(),
873            name: entry.name.item,
874            create_sql,
875            owner_id: entry.owner_id,
876            privileges: entry.privileges.into_all_values().collect(),
877            extra_versions,
878        }
879    }
880}
881
882#[derive(Debug, Clone, Serialize)]
883pub struct Table {
884    /// Parse-able SQL that defines this table.
885    pub create_sql: Option<String>,
886    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
887    pub desc: VersionedRelationDesc,
888    /// Versions of this table, and the [`GlobalId`]s that refer to them.
889    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
890    pub collections: BTreeMap<RelationVersion, GlobalId>,
891    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
892    #[serde(skip)]
893    pub conn_id: Option<ConnectionId>,
894    /// Other catalog objects referenced by this table, e.g. custom types.
895    pub resolved_ids: ResolvedIds,
896    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
897    pub custom_logical_compaction_window: Option<CompactionWindow>,
898    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
899    /// session variable.
900    ///
901    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
902    pub is_retained_metrics_object: bool,
903    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
904    pub data_source: TableDataSource,
905}
906
907impl Table {
908    pub fn timeline(&self) -> Timeline {
909        match &self.data_source {
910            // The Coordinator controls insertions for writable tables
911            // (including system tables), so they are realtime.
912            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
913            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
914        }
915    }
916
917    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
918    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
919        self.collections.values().copied()
920    }
921
922    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
923    pub fn global_id_writes(&self) -> GlobalId {
924        self.collections
925            .last_key_value()
926            .expect("at least one version of a table")
927            .1
928            .clone()
929    }
930
931    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
932    pub fn collection_descs(
933        &self,
934    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
935        self.collections.iter().map(|(version, gid)| {
936            let desc = self
937                .desc
938                .at_version(RelationVersionSelector::Specific(*version));
939            (*gid, *version, desc)
940        })
941    }
942
943    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
944    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
945        let (version, _gid) = self
946            .collections
947            .iter()
948            .find(|(_version, gid)| *gid == id)
949            .expect("GlobalId to exist");
950        self.desc
951            .at_version(RelationVersionSelector::Specific(*version))
952    }
953}
954
955#[derive(Clone, Debug, Serialize)]
956pub enum TableDataSource {
957    /// The table owns data created via INSERT/UPDATE/DELETE statements.
958    TableWrites {
959        #[serde(skip)]
960        defaults: Vec<Expr<Aug>>,
961    },
962
963    /// The table receives its data from the identified `DataSourceDesc`.
964    /// This table type does not support INSERT/UPDATE/DELETE statements.
965    DataSource {
966        desc: DataSourceDesc,
967        timeline: Timeline,
968    },
969}
970
971#[derive(Debug, Clone, Serialize)]
972pub enum DataSourceDesc {
973    /// Receives data from an external system
974    Ingestion {
975        ingestion_desc: PlanIngestion,
976        cluster_id: ClusterId,
977    },
978    /// This source receives its data from the identified ingestion,
979    /// specifically the output identified by `external_reference`.
980    /// N.B. that `external_reference` should not be used to identify
981    /// anything downstream of purification, as the purification process
982    /// encodes source-specific identifiers into the `details` struct.
983    /// The `external_reference` field is only used here for displaying
984    /// human-readable names in system tables.
985    IngestionExport {
986        ingestion_id: CatalogItemId,
987        external_reference: UnresolvedItemName,
988        details: SourceExportDetails,
989        data_config: SourceExportDataConfig<ReferencedConnection>,
990    },
991    /// Receives introspection data from an internal system
992    Introspection(IntrospectionType),
993    /// Receives data from the source's reclocking/remapping operations.
994    Progress,
995    /// Receives data from HTTP requests.
996    Webhook {
997        /// Optional components used to validation a webhook request.
998        validate_using: Option<WebhookValidation>,
999        /// Describes how we deserialize the body of a webhook request.
1000        body_format: WebhookBodyFormat,
1001        /// Describes whether or not to include headers and how to map them.
1002        headers: WebhookHeaders,
1003        /// The cluster which this source is associated with.
1004        cluster_id: ClusterId,
1005    },
1006}
1007
1008impl DataSourceDesc {
1009    /// The key and value formats of the data source.
1010    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1011        match &self {
1012            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1013                match &ingestion_desc.desc.primary_export.encoding.as_ref() {
1014                    Some(encoding) => match &encoding.key {
1015                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1016                        None => (None, Some(encoding.value.type_())),
1017                    },
1018                    None => (None, None),
1019                }
1020            }
1021            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1022                Some(encoding) => match &encoding.key {
1023                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1024                    None => (None, Some(encoding.value.type_())),
1025                },
1026                None => (None, None),
1027            },
1028            DataSourceDesc::Introspection(_)
1029            | DataSourceDesc::Webhook { .. }
1030            | DataSourceDesc::Progress => (None, None),
1031        }
1032    }
1033
1034    /// Envelope of the data source.
1035    pub fn envelope(&self) -> Option<&str> {
1036        // Note how "none"/"append-only" is different from `None`. Source
1037        // sources don't have an envelope (internal logs, for example), while
1038        // other sources have an envelope that we call the "NONE"-envelope.
1039
1040        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1041            match envelope {
1042                SourceEnvelope::None(_) => "none",
1043                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1044                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1045                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1046                        // NOTE(aljoscha): Should we somehow mark that this is
1047                        // using upsert internally? See note above about
1048                        // DEBEZIUM.
1049                        "debezium"
1050                    }
1051                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1052                        "upsert-value-err-inline"
1053                    }
1054                },
1055                SourceEnvelope::CdcV2 => {
1056                    // TODO(aljoscha): Should we even report this? It's
1057                    // currently not exposed.
1058                    "materialize"
1059                }
1060            }
1061        }
1062
1063        match self {
1064            // NOTE(aljoscha): We could move the block for ingestions into
1065            // `SourceEnvelope` itself, but that one feels more like an internal
1066            // thing and adapter should own how we represent envelopes as a
1067            // string? It would not be hard to convince me otherwise, though.
1068            DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
1069                &ingestion_desc.desc.primary_export.envelope,
1070            )),
1071            DataSourceDesc::IngestionExport { data_config, .. } => {
1072                Some(envelope_string(&data_config.envelope))
1073            }
1074            DataSourceDesc::Introspection(_)
1075            | DataSourceDesc::Webhook { .. }
1076            | DataSourceDesc::Progress => None,
1077        }
1078    }
1079}
1080
1081#[derive(Debug, Clone, Serialize)]
1082pub struct Source {
1083    /// Parse-able SQL that defines this table.
1084    pub create_sql: Option<String>,
1085    /// [`GlobalId`] used to reference this source from outside the catalog.
1086    pub global_id: GlobalId,
1087    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1088    #[serde(skip)]
1089    pub data_source: DataSourceDesc,
1090    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1091    pub desc: RelationDesc,
1092    /// The timeline this source exists on.
1093    pub timeline: Timeline,
1094    /// Other catalog objects referenced by this table, e.g. custom types.
1095    pub resolved_ids: ResolvedIds,
1096    /// This value is ignored for subsources, i.e. for
1097    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1098    /// sources logical compaction window.
1099    pub custom_logical_compaction_window: Option<CompactionWindow>,
1100    /// Whether the source's logical compaction window is controlled by
1101    /// METRICS_RETENTION
1102    pub is_retained_metrics_object: bool,
1103}
1104
1105impl Source {
1106    /// Creates a new `Source`.
1107    ///
1108    /// # Panics
1109    /// - If an ingestion-based plan is not given a cluster_id.
1110    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1111    /// - If a non-ingestion-based source is given a cluster_id.
1112    pub fn new(
1113        plan: CreateSourcePlan,
1114        global_id: GlobalId,
1115        resolved_ids: ResolvedIds,
1116        custom_logical_compaction_window: Option<CompactionWindow>,
1117        is_retained_metrics_object: bool,
1118    ) -> Source {
1119        Source {
1120            create_sql: Some(plan.source.create_sql),
1121            data_source: match plan.source.data_source {
1122                mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1123                    DataSourceDesc::Ingestion {
1124                        ingestion_desc,
1125                        cluster_id: plan
1126                            .in_cluster
1127                            .expect("ingestion-based sources must be given a cluster ID"),
1128                    }
1129                }
1130                mz_sql::plan::DataSourceDesc::Progress => {
1131                    assert!(
1132                        plan.in_cluster.is_none(),
1133                        "subsources must not have a host config or cluster_id defined"
1134                    );
1135                    DataSourceDesc::Progress
1136                }
1137                mz_sql::plan::DataSourceDesc::IngestionExport {
1138                    ingestion_id,
1139                    external_reference,
1140                    details,
1141                    data_config,
1142                } => {
1143                    assert!(
1144                        plan.in_cluster.is_none(),
1145                        "subsources must not have a host config or cluster_id defined"
1146                    );
1147                    DataSourceDesc::IngestionExport {
1148                        ingestion_id,
1149                        external_reference,
1150                        details,
1151                        data_config,
1152                    }
1153                }
1154                mz_sql::plan::DataSourceDesc::Webhook {
1155                    validate_using,
1156                    body_format,
1157                    headers,
1158                    cluster_id,
1159                } => {
1160                    mz_ore::soft_assert_or_log!(
1161                        cluster_id.is_none(),
1162                        "cluster_id set at Source level for Webhooks"
1163                    );
1164                    DataSourceDesc::Webhook {
1165                        validate_using,
1166                        body_format,
1167                        headers,
1168                        cluster_id: plan
1169                            .in_cluster
1170                            .expect("webhook sources must be given a cluster ID"),
1171                    }
1172                }
1173            },
1174            desc: plan.source.desc,
1175            global_id,
1176            timeline: plan.timeline,
1177            resolved_ids,
1178            custom_logical_compaction_window: plan
1179                .source
1180                .compaction_window
1181                .or(custom_logical_compaction_window),
1182            is_retained_metrics_object,
1183        }
1184    }
1185
1186    /// Type of the source.
1187    pub fn source_type(&self) -> &str {
1188        match &self.data_source {
1189            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1190                ingestion_desc.desc.connection.name()
1191            }
1192            DataSourceDesc::Progress => "progress",
1193            DataSourceDesc::IngestionExport { .. } => "subsource",
1194            DataSourceDesc::Introspection(_) => "source",
1195            DataSourceDesc::Webhook { .. } => "webhook",
1196        }
1197    }
1198
1199    /// Connection ID of the source, if one exists.
1200    pub fn connection_id(&self) -> Option<CatalogItemId> {
1201        match &self.data_source {
1202            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1203                ingestion_desc.desc.connection.connection_id()
1204            }
1205            DataSourceDesc::IngestionExport { .. }
1206            | DataSourceDesc::Introspection(_)
1207            | DataSourceDesc::Webhook { .. }
1208            | DataSourceDesc::Progress => None,
1209        }
1210    }
1211
1212    /// The single [`GlobalId`] that refers to this Source.
1213    pub fn global_id(&self) -> GlobalId {
1214        self.global_id
1215    }
1216
1217    /// The expensive resource that each source consumes is persist shards. To
1218    /// prevent abuse, we want to prevent users from creating sources that use an
1219    /// unbounded number of persist shards. But we also don't want to count
1220    /// persist shards that are mandated by teh system (e.g., the progress
1221    /// shard) so that future versions of Materialize can introduce additional
1222    /// per-source shards (e.g., a per-source status shard) without impacting
1223    /// the limit calculation.
1224    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1225        match &self.data_source {
1226            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1227                match &ingestion_desc.desc.connection {
1228                    // These multi-output sources do not use their primary
1229                    // source's data shard, so we don't include it in accounting
1230                    // for users.
1231                    GenericSourceConnection::Postgres(_)
1232                    | GenericSourceConnection::MySql(_)
1233                    | GenericSourceConnection::SqlServer(_) => 0,
1234                    GenericSourceConnection::LoadGenerator(lg) => {
1235                        // TODO: make this a method on the load generator.
1236                        if lg.load_generator.views().is_empty() {
1237                            // Load generator writes directly to its persist shard
1238                            1
1239                        } else {
1240                            // Load generator has 1 persist shard per output,
1241                            // which will be accounted for by `SourceExport`.
1242                            0
1243                        }
1244                    }
1245                    GenericSourceConnection::Kafka(_) => 1,
1246                }
1247            }
1248            //  DataSourceDesc::IngestionExport represents a subsource, which
1249            //  use a data shard.
1250            DataSourceDesc::IngestionExport { .. } => 1,
1251            DataSourceDesc::Webhook { .. } => 1,
1252            // Introspection and progress subsources are not under the user's control, so shouldn't
1253            // count toward their quota.
1254            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1255        }
1256    }
1257}
1258
1259#[derive(Debug, Clone, Serialize)]
1260pub struct Log {
1261    /// The category of data this log stores.
1262    pub variant: LogVariant,
1263    /// [`GlobalId`] used to reference this log from outside the catalog.
1264    pub global_id: GlobalId,
1265}
1266
1267impl Log {
1268    /// The single [`GlobalId`] that refers to this Log.
1269    pub fn global_id(&self) -> GlobalId {
1270        self.global_id
1271    }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Sink {
1276    /// Parse-able SQL that defines this sink.
1277    pub create_sql: String,
1278    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1279    pub global_id: GlobalId,
1280    /// Collection we read into this sink.
1281    pub from: GlobalId,
1282    /// Connection to the external service we're sinking into, e.g. Kafka.
1283    pub connection: StorageSinkConnection<ReferencedConnection>,
1284    /// Envelope we use to sink into the external system.
1285    ///
1286    /// TODO(guswynn): this probably should just be in the `connection`.
1287    pub envelope: SinkEnvelope,
1288    /// Emit an initial snapshot into the sink.
1289    pub with_snapshot: bool,
1290    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1291    pub version: u64,
1292    /// Other catalog objects this sink references.
1293    pub resolved_ids: ResolvedIds,
1294    /// Cluster this sink runs on.
1295    pub cluster_id: ClusterId,
1296}
1297
1298impl Sink {
1299    pub fn sink_type(&self) -> &str {
1300        self.connection.name()
1301    }
1302
1303    /// Envelope of the sink.
1304    pub fn envelope(&self) -> Option<&str> {
1305        match &self.envelope {
1306            SinkEnvelope::Debezium => Some("debezium"),
1307            SinkEnvelope::Upsert => Some("upsert"),
1308        }
1309    }
1310
1311    /// Output a combined format string of the sink. For legacy reasons
1312    /// if the key-format is none or the key & value formats are
1313    /// both the same (either avro or json), we return the value format name,
1314    /// otherwise we return a composite name.
1315    pub fn combined_format(&self) -> Cow<'_, str> {
1316        let StorageSinkConnection::Kafka(connection) = &self.connection;
1317        connection.format.get_format_name()
1318    }
1319
1320    /// Output distinct key_format and value_format of the sink.
1321    pub fn formats(&self) -> (Option<&str>, &str) {
1322        let StorageSinkConnection::Kafka(connection) = &self.connection;
1323        let key_format = connection
1324            .format
1325            .key_format
1326            .as_ref()
1327            .map(|format| format.get_format_name());
1328        let value_format = connection.format.value_format.get_format_name();
1329        (key_format, value_format)
1330    }
1331
1332    pub fn connection_id(&self) -> Option<CatalogItemId> {
1333        self.connection.connection_id()
1334    }
1335
1336    /// The single [`GlobalId`] that this Sink can be referenced by.
1337    pub fn global_id(&self) -> GlobalId {
1338        self.global_id
1339    }
1340}
1341
1342#[derive(Debug, Clone, Serialize)]
1343pub struct View {
1344    /// Parse-able SQL that defines this view.
1345    pub create_sql: String,
1346    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1347    pub global_id: GlobalId,
1348    /// Unoptimized high-level expression from parsing the `create_sql`.
1349    pub raw_expr: Arc<HirRelationExpr>,
1350    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1351    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1352    /// Columns of this view.
1353    pub desc: RelationDesc,
1354    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1355    pub conn_id: Option<ConnectionId>,
1356    /// Other catalog objects that are referenced by this view, determined at name resolution.
1357    pub resolved_ids: ResolvedIds,
1358    /// All of the catalog objects that are referenced by this view.
1359    pub dependencies: DependencyIds,
1360}
1361
1362impl View {
1363    /// The single [`GlobalId`] this [`View`] can be referenced by.
1364    pub fn global_id(&self) -> GlobalId {
1365        self.global_id
1366    }
1367}
1368
1369#[derive(Debug, Clone, Serialize)]
1370pub struct MaterializedView {
1371    /// Parse-able SQL that defines this materialized view.
1372    pub create_sql: String,
1373    /// [`GlobalId`] used to reference this materialized view from outside the catalog.
1374    pub global_id: GlobalId,
1375    /// Raw high-level expression from planning, derived from the `create_sql`.
1376    pub raw_expr: Arc<HirRelationExpr>,
1377    /// Optimized mid-level expression, derived from the `raw_expr`.
1378    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1379    /// Columns for this materialized view.
1380    pub desc: RelationDesc,
1381    /// Other catalog items that this materialized view references, determined at name resolution.
1382    pub resolved_ids: ResolvedIds,
1383    /// All of the catalog objects that are referenced by this view.
1384    pub dependencies: DependencyIds,
1385    /// Cluster that this materialized view runs on.
1386    pub cluster_id: ClusterId,
1387    /// Column indexes that we assert are not `NULL`.
1388    ///
1389    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1390    pub non_null_assertions: Vec<usize>,
1391    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1392    pub custom_logical_compaction_window: Option<CompactionWindow>,
1393    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1394    pub refresh_schedule: Option<RefreshSchedule>,
1395    /// The initial `as_of` of the storage collection associated with the materialized view.
1396    ///
1397    /// Note: This doesn't change upon restarts.
1398    /// (The dataflow's initial `as_of` can be different.)
1399    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1400}
1401
1402impl MaterializedView {
1403    /// The single [`GlobalId`] this [`MaterializedView`] can be referenced by.
1404    pub fn global_id(&self) -> GlobalId {
1405        self.global_id
1406    }
1407}
1408
1409#[derive(Debug, Clone, Serialize)]
1410pub struct Index {
1411    /// Parse-able SQL that defines this table.
1412    pub create_sql: String,
1413    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1414    pub global_id: GlobalId,
1415    /// The [`GlobalId`] this Index is on.
1416    pub on: GlobalId,
1417    /// Keys of the index.
1418    pub keys: Arc<[MirScalarExpr]>,
1419    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1420    pub conn_id: Option<ConnectionId>,
1421    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1422    pub resolved_ids: ResolvedIds,
1423    /// Cluster this index is installed on.
1424    pub cluster_id: ClusterId,
1425    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1426    pub custom_logical_compaction_window: Option<CompactionWindow>,
1427    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1428    /// session variable.
1429    ///
1430    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1431    pub is_retained_metrics_object: bool,
1432}
1433
1434impl Index {
1435    /// The [`GlobalId`] that refers to this Index.
1436    pub fn global_id(&self) -> GlobalId {
1437        self.global_id
1438    }
1439}
1440
1441#[derive(Debug, Clone, Serialize)]
1442pub struct Type {
1443    /// Parse-able SQL that defines this type.
1444    pub create_sql: Option<String>,
1445    /// [`GlobalId`] used to reference this type from outside the catalog.
1446    pub global_id: GlobalId,
1447    #[serde(skip)]
1448    pub details: CatalogTypeDetails<IdReference>,
1449    pub desc: Option<RelationDesc>,
1450    /// Other catalog objects referenced by this type.
1451    pub resolved_ids: ResolvedIds,
1452}
1453
1454#[derive(Debug, Clone, Serialize)]
1455pub struct Func {
1456    /// Static definition of the function.
1457    #[serde(skip)]
1458    pub inner: &'static mz_sql::func::Func,
1459    /// [`GlobalId`] used to reference this function from outside the catalog.
1460    pub global_id: GlobalId,
1461}
1462
1463#[derive(Debug, Clone, Serialize)]
1464pub struct Secret {
1465    /// Parse-able SQL that defines this secret.
1466    pub create_sql: String,
1467    /// [`GlobalId`] used to reference this secret from outside the catalog.
1468    pub global_id: GlobalId,
1469}
1470
1471#[derive(Debug, Clone, Serialize)]
1472pub struct Connection {
1473    /// Parse-able SQL that defines this connection.
1474    pub create_sql: String,
1475    /// [`GlobalId`] used to reference this connection from the storage layer.
1476    pub global_id: GlobalId,
1477    /// The kind of connection.
1478    pub details: ConnectionDetails,
1479    /// Other objects this connection depends on.
1480    pub resolved_ids: ResolvedIds,
1481}
1482
1483impl Connection {
1484    /// The single [`GlobalId`] used to reference this connection.
1485    pub fn global_id(&self) -> GlobalId {
1486        self.global_id
1487    }
1488}
1489
1490#[derive(Debug, Clone, Serialize)]
1491pub struct ContinualTask {
1492    /// Parse-able SQL that defines this continual task.
1493    pub create_sql: String,
1494    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1495    pub global_id: GlobalId,
1496    /// [`GlobalId`] of the collection that we read into this continual task.
1497    pub input_id: GlobalId,
1498    pub with_snapshot: bool,
1499    /// ContinualTasks are self-referential. We make this work by using a
1500    /// placeholder `LocalId` for the CT itself through name resolution and
1501    /// planning. Then we fill in the real `GlobalId` before constructing this
1502    /// catalog item.
1503    pub raw_expr: Arc<HirRelationExpr>,
1504    /// Columns for this continual task.
1505    pub desc: RelationDesc,
1506    /// Other catalog items that this continual task references, determined at name resolution.
1507    pub resolved_ids: ResolvedIds,
1508    /// All of the catalog objects that are referenced by this continual task.
1509    pub dependencies: DependencyIds,
1510    /// Cluster that this continual task runs on.
1511    pub cluster_id: ClusterId,
1512    /// See the comment on [MaterializedView::initial_as_of].
1513    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1514}
1515
1516impl ContinualTask {
1517    /// The single [`GlobalId`] used to reference this continual task.
1518    pub fn global_id(&self) -> GlobalId {
1519        self.global_id
1520    }
1521}
1522
1523#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1524pub struct NetworkPolicy {
1525    pub name: String,
1526    pub id: NetworkPolicyId,
1527    pub oid: u32,
1528    pub rules: Vec<NetworkPolicyRule>,
1529    pub owner_id: RoleId,
1530    pub privileges: PrivilegeMap,
1531}
1532
1533impl From<NetworkPolicy> for durable::NetworkPolicy {
1534    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1535        durable::NetworkPolicy {
1536            id: policy.id,
1537            oid: policy.oid,
1538            name: policy.name,
1539            rules: policy.rules,
1540            owner_id: policy.owner_id,
1541            privileges: policy.privileges.into_all_values().collect(),
1542        }
1543    }
1544}
1545
1546impl From<durable::NetworkPolicy> for NetworkPolicy {
1547    fn from(
1548        durable::NetworkPolicy {
1549            id,
1550            oid,
1551            name,
1552            rules,
1553            owner_id,
1554            privileges,
1555        }: durable::NetworkPolicy,
1556    ) -> Self {
1557        NetworkPolicy {
1558            id,
1559            oid,
1560            name,
1561            rules,
1562            owner_id,
1563            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1564        }
1565    }
1566}
1567
1568impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1569    fn update_from(
1570        &mut self,
1571        durable::NetworkPolicy {
1572            id,
1573            oid,
1574            name,
1575            rules,
1576            owner_id,
1577            privileges,
1578        }: durable::NetworkPolicy,
1579    ) {
1580        self.id = id;
1581        self.oid = oid;
1582        self.name = name;
1583        self.rules = rules;
1584        self.owner_id = owner_id;
1585        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1586    }
1587}
1588
1589impl CatalogItem {
1590    /// Returns a string indicating the type of this catalog entry.
1591    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1592        match self {
1593            CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1594            CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1595            CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1596            CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1597            CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1598            CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1599            CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1600            CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1601            CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1602            CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1603            CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1604            CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1605        }
1606    }
1607
1608    /// Returns the [`GlobalId`]s that reference this item, if any.
1609    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1610        let gid = match self {
1611            CatalogItem::Source(source) => source.global_id,
1612            CatalogItem::Log(log) => log.global_id,
1613            CatalogItem::Sink(sink) => sink.global_id,
1614            CatalogItem::View(view) => view.global_id,
1615            CatalogItem::MaterializedView(mv) => mv.global_id,
1616            CatalogItem::ContinualTask(ct) => ct.global_id,
1617            CatalogItem::Index(index) => index.global_id,
1618            CatalogItem::Func(func) => func.global_id,
1619            CatalogItem::Type(ty) => ty.global_id,
1620            CatalogItem::Secret(secret) => secret.global_id,
1621            CatalogItem::Connection(conn) => conn.global_id,
1622            CatalogItem::Table(table) => {
1623                return itertools::Either::Left(table.collections.values().copied());
1624            }
1625        };
1626        itertools::Either::Right(std::iter::once(gid))
1627    }
1628
1629    /// Returns the most up-to-date [`GlobalId`] for this item.
1630    ///
1631    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1632    pub fn latest_global_id(&self) -> GlobalId {
1633        match self {
1634            CatalogItem::Source(source) => source.global_id,
1635            CatalogItem::Log(log) => log.global_id,
1636            CatalogItem::Sink(sink) => sink.global_id,
1637            CatalogItem::View(view) => view.global_id,
1638            CatalogItem::MaterializedView(mv) => mv.global_id,
1639            CatalogItem::ContinualTask(ct) => ct.global_id,
1640            CatalogItem::Index(index) => index.global_id,
1641            CatalogItem::Func(func) => func.global_id,
1642            CatalogItem::Type(ty) => ty.global_id,
1643            CatalogItem::Secret(secret) => secret.global_id,
1644            CatalogItem::Connection(conn) => conn.global_id,
1645            CatalogItem::Table(table) => table.global_id_writes(),
1646        }
1647    }
1648
1649    /// Whether this item represents a storage collection.
1650    pub fn is_storage_collection(&self) -> bool {
1651        match self {
1652            CatalogItem::Table(_)
1653            | CatalogItem::Source(_)
1654            | CatalogItem::MaterializedView(_)
1655            | CatalogItem::Sink(_)
1656            | CatalogItem::ContinualTask(_) => true,
1657            CatalogItem::Log(_)
1658            | CatalogItem::View(_)
1659            | CatalogItem::Index(_)
1660            | CatalogItem::Type(_)
1661            | CatalogItem::Func(_)
1662            | CatalogItem::Secret(_)
1663            | CatalogItem::Connection(_) => false,
1664        }
1665    }
1666
1667    pub fn desc(
1668        &self,
1669        name: &FullItemName,
1670        version: RelationVersionSelector,
1671    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1672        self.desc_opt(version)
1673            .ok_or_else(|| SqlCatalogError::InvalidDependency {
1674                name: name.to_string(),
1675                typ: self.typ(),
1676            })
1677    }
1678
1679    pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1680        match &self {
1681            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1682            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1683            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1684            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1685            CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1686            CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1687            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1688            CatalogItem::Func(_)
1689            | CatalogItem::Index(_)
1690            | CatalogItem::Sink(_)
1691            | CatalogItem::Secret(_)
1692            | CatalogItem::Connection(_) => None,
1693        }
1694    }
1695
1696    pub fn func(
1697        &self,
1698        entry: &CatalogEntry,
1699    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1700        match &self {
1701            CatalogItem::Func(func) => Ok(func.inner),
1702            _ => Err(SqlCatalogError::UnexpectedType {
1703                name: entry.name().item.to_string(),
1704                actual_type: entry.item_type(),
1705                expected_type: CatalogItemType::Func,
1706            }),
1707        }
1708    }
1709
1710    pub fn source_desc(
1711        &self,
1712        entry: &CatalogEntry,
1713    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1714        match &self {
1715            CatalogItem::Source(source) => match &source.data_source {
1716                DataSourceDesc::Ingestion { ingestion_desc, .. } => Ok(Some(&ingestion_desc.desc)),
1717                DataSourceDesc::IngestionExport { .. }
1718                | DataSourceDesc::Introspection(_)
1719                | DataSourceDesc::Webhook { .. }
1720                | DataSourceDesc::Progress => Ok(None),
1721            },
1722            _ => Err(SqlCatalogError::UnexpectedType {
1723                name: entry.name().item.to_string(),
1724                actual_type: entry.item_type(),
1725                expected_type: CatalogItemType::Source,
1726            }),
1727        }
1728    }
1729
1730    /// Reports whether this catalog entry is a progress source.
1731    pub fn is_progress_source(&self) -> bool {
1732        matches!(
1733            self,
1734            CatalogItem::Source(Source {
1735                data_source: DataSourceDesc::Progress,
1736                ..
1737            })
1738        )
1739    }
1740
1741    /// Collects the identifiers of the objects that were encountered when resolving names in the
1742    /// item's DDL statement.
1743    pub fn references(&self) -> &ResolvedIds {
1744        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1745        match self {
1746            CatalogItem::Func(_) => &*EMPTY,
1747            CatalogItem::Index(idx) => &idx.resolved_ids,
1748            CatalogItem::Sink(sink) => &sink.resolved_ids,
1749            CatalogItem::Source(source) => &source.resolved_ids,
1750            CatalogItem::Log(_) => &*EMPTY,
1751            CatalogItem::Table(table) => &table.resolved_ids,
1752            CatalogItem::Type(typ) => &typ.resolved_ids,
1753            CatalogItem::View(view) => &view.resolved_ids,
1754            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1755            CatalogItem::Secret(_) => &*EMPTY,
1756            CatalogItem::Connection(connection) => &connection.resolved_ids,
1757            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1758        }
1759    }
1760
1761    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1762    ///
1763    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1764    /// referenced. For example this will include any catalog objects used to implement functions
1765    /// and casts in the item.
1766    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1767        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1768        match self {
1769            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1770            // their implementation. However, currently there's no way to get that information.
1771            CatalogItem::Func(_) => {}
1772            CatalogItem::Index(_) => {}
1773            CatalogItem::Sink(_) => {}
1774            CatalogItem::Source(_) => {}
1775            CatalogItem::Log(_) => {}
1776            CatalogItem::Table(_) => {}
1777            CatalogItem::Type(_) => {}
1778            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1779            CatalogItem::MaterializedView(mview) => {
1780                uses.extend(mview.dependencies.0.iter().copied())
1781            }
1782            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1783            CatalogItem::Secret(_) => {}
1784            CatalogItem::Connection(_) => {}
1785        }
1786        uses
1787    }
1788
1789    /// Returns the connection ID that this item belongs to, if this item is
1790    /// temporary.
1791    pub fn conn_id(&self) -> Option<&ConnectionId> {
1792        match self {
1793            CatalogItem::View(view) => view.conn_id.as_ref(),
1794            CatalogItem::Index(index) => index.conn_id.as_ref(),
1795            CatalogItem::Table(table) => table.conn_id.as_ref(),
1796            CatalogItem::Log(_)
1797            | CatalogItem::Source(_)
1798            | CatalogItem::Sink(_)
1799            | CatalogItem::MaterializedView(_)
1800            | CatalogItem::Secret(_)
1801            | CatalogItem::Type(_)
1802            | CatalogItem::Func(_)
1803            | CatalogItem::Connection(_)
1804            | CatalogItem::ContinualTask(_) => None,
1805        }
1806    }
1807
1808    /// Indicates whether this item is temporary or not.
1809    pub fn is_temporary(&self) -> bool {
1810        self.conn_id().is_some()
1811    }
1812
1813    pub fn rename_schema_refs(
1814        &self,
1815        database_name: &str,
1816        cur_schema_name: &str,
1817        new_schema_name: &str,
1818    ) -> Result<CatalogItem, (String, String)> {
1819        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1820            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1821                .expect("invalid create sql persisted to catalog")
1822                .into_element()
1823                .ast;
1824
1825            // Rename all references to cur_schema_name.
1826            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1827                &mut create_stmt,
1828                database_name,
1829                cur_schema_name,
1830                new_schema_name,
1831            )?;
1832
1833            Ok(create_stmt.to_ast_string_stable())
1834        };
1835
1836        match self {
1837            CatalogItem::Table(i) => {
1838                let mut i = i.clone();
1839                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1840                Ok(CatalogItem::Table(i))
1841            }
1842            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1843            CatalogItem::Source(i) => {
1844                let mut i = i.clone();
1845                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1846                Ok(CatalogItem::Source(i))
1847            }
1848            CatalogItem::Sink(i) => {
1849                let mut i = i.clone();
1850                i.create_sql = do_rewrite(i.create_sql)?;
1851                Ok(CatalogItem::Sink(i))
1852            }
1853            CatalogItem::View(i) => {
1854                let mut i = i.clone();
1855                i.create_sql = do_rewrite(i.create_sql)?;
1856                Ok(CatalogItem::View(i))
1857            }
1858            CatalogItem::MaterializedView(i) => {
1859                let mut i = i.clone();
1860                i.create_sql = do_rewrite(i.create_sql)?;
1861                Ok(CatalogItem::MaterializedView(i))
1862            }
1863            CatalogItem::Index(i) => {
1864                let mut i = i.clone();
1865                i.create_sql = do_rewrite(i.create_sql)?;
1866                Ok(CatalogItem::Index(i))
1867            }
1868            CatalogItem::Secret(i) => {
1869                let mut i = i.clone();
1870                i.create_sql = do_rewrite(i.create_sql)?;
1871                Ok(CatalogItem::Secret(i))
1872            }
1873            CatalogItem::Connection(i) => {
1874                let mut i = i.clone();
1875                i.create_sql = do_rewrite(i.create_sql)?;
1876                Ok(CatalogItem::Connection(i))
1877            }
1878            CatalogItem::Type(i) => {
1879                let mut i = i.clone();
1880                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1881                Ok(CatalogItem::Type(i))
1882            }
1883            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1884            CatalogItem::ContinualTask(i) => {
1885                let mut i = i.clone();
1886                i.create_sql = do_rewrite(i.create_sql)?;
1887                Ok(CatalogItem::ContinualTask(i))
1888            }
1889        }
1890    }
1891
1892    /// Returns a clone of `self` with all instances of `from` renamed to `to`
1893    /// (with the option of including the item's own name) or errors if request
1894    /// is ambiguous.
1895    pub fn rename_item_refs(
1896        &self,
1897        from: FullItemName,
1898        to_item_name: String,
1899        rename_self: bool,
1900    ) -> Result<CatalogItem, String> {
1901        let do_rewrite = |create_sql: String| -> Result<String, String> {
1902            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1903                .expect("invalid create sql persisted to catalog")
1904                .into_element()
1905                .ast;
1906            if rename_self {
1907                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1908            }
1909            // Determination of what constitutes an ambiguous request is done here.
1910            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1911            Ok(create_stmt.to_ast_string_stable())
1912        };
1913
1914        match self {
1915            CatalogItem::Table(i) => {
1916                let mut i = i.clone();
1917                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1918                Ok(CatalogItem::Table(i))
1919            }
1920            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1921            CatalogItem::Source(i) => {
1922                let mut i = i.clone();
1923                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1924                Ok(CatalogItem::Source(i))
1925            }
1926            CatalogItem::Sink(i) => {
1927                let mut i = i.clone();
1928                i.create_sql = do_rewrite(i.create_sql)?;
1929                Ok(CatalogItem::Sink(i))
1930            }
1931            CatalogItem::View(i) => {
1932                let mut i = i.clone();
1933                i.create_sql = do_rewrite(i.create_sql)?;
1934                Ok(CatalogItem::View(i))
1935            }
1936            CatalogItem::MaterializedView(i) => {
1937                let mut i = i.clone();
1938                i.create_sql = do_rewrite(i.create_sql)?;
1939                Ok(CatalogItem::MaterializedView(i))
1940            }
1941            CatalogItem::Index(i) => {
1942                let mut i = i.clone();
1943                i.create_sql = do_rewrite(i.create_sql)?;
1944                Ok(CatalogItem::Index(i))
1945            }
1946            CatalogItem::Secret(i) => {
1947                let mut i = i.clone();
1948                i.create_sql = do_rewrite(i.create_sql)?;
1949                Ok(CatalogItem::Secret(i))
1950            }
1951            CatalogItem::Func(_) | CatalogItem::Type(_) => {
1952                unreachable!("{}s cannot be renamed", self.typ())
1953            }
1954            CatalogItem::Connection(i) => {
1955                let mut i = i.clone();
1956                i.create_sql = do_rewrite(i.create_sql)?;
1957                Ok(CatalogItem::Connection(i))
1958            }
1959            CatalogItem::ContinualTask(i) => {
1960                let mut i = i.clone();
1961                i.create_sql = do_rewrite(i.create_sql)?;
1962                Ok(CatalogItem::ContinualTask(i))
1963            }
1964        }
1965    }
1966
1967    /// Updates the retain history for an item. Returns the previous retain history value. Returns
1968    /// an error if this item does not support retain history.
1969    pub fn update_retain_history(
1970        &mut self,
1971        value: Option<Value>,
1972        window: CompactionWindow,
1973    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
1974        let update = |mut ast: &mut Statement<Raw>| {
1975            // Each statement type has unique option types. This macro handles them commonly.
1976            macro_rules! update_retain_history {
1977                ( $stmt:ident, $opt:ident, $name:ident ) => {{
1978                    // Replace or add the option.
1979                    let pos = $stmt
1980                        .with_options
1981                        .iter()
1982                        // In case there are ever multiple, look for the last one.
1983                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
1984                    if let Some(value) = value {
1985                        let next = mz_sql_parser::ast::$opt {
1986                            name: mz_sql_parser::ast::$name::RetainHistory,
1987                            value: Some(WithOptionValue::RetainHistoryFor(value)),
1988                        };
1989                        if let Some(idx) = pos {
1990                            let previous = $stmt.with_options[idx].clone();
1991                            $stmt.with_options[idx] = next;
1992                            previous.value
1993                        } else {
1994                            $stmt.with_options.push(next);
1995                            None
1996                        }
1997                    } else {
1998                        if let Some(idx) = pos {
1999                            $stmt.with_options.swap_remove(idx).value
2000                        } else {
2001                            None
2002                        }
2003                    }
2004                }};
2005            }
2006            let previous = match &mut ast {
2007                Statement::CreateTable(stmt) => {
2008                    update_retain_history!(stmt, TableOption, TableOptionName)
2009                }
2010                Statement::CreateIndex(stmt) => {
2011                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2012                }
2013                Statement::CreateSource(stmt) => {
2014                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2015                }
2016                Statement::CreateMaterializedView(stmt) => {
2017                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2018                }
2019                _ => {
2020                    return Err(());
2021                }
2022            };
2023            Ok(previous)
2024        };
2025
2026        let res = self.update_sql(update)?;
2027        let cw = self
2028            .custom_logical_compaction_window_mut()
2029            .expect("item must have compaction window");
2030        *cw = Some(window);
2031        Ok(res)
2032    }
2033
2034    pub fn add_column(
2035        &mut self,
2036        name: ColumnName,
2037        typ: SqlColumnType,
2038        sql: RawDataType,
2039    ) -> Result<RelationVersion, PlanError> {
2040        let CatalogItem::Table(table) = self else {
2041            return Err(PlanError::Unsupported {
2042                feature: "adding columns to a non-Table".to_string(),
2043                discussion_no: None,
2044            });
2045        };
2046        let next_version = table.desc.add_column(name.clone(), typ);
2047
2048        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2049            Statement::CreateTable(stmt) => {
2050                let version = ColumnOptionDef {
2051                    name: None,
2052                    option: ColumnOption::Versioned {
2053                        action: ColumnVersioned::Added,
2054                        version: next_version.into(),
2055                    },
2056                };
2057                let column = ColumnDef {
2058                    name: name.into(),
2059                    data_type: sql,
2060                    collation: None,
2061                    options: vec![version],
2062                };
2063                stmt.columns.push(column);
2064                Ok(())
2065            }
2066            _ => Err(()),
2067        };
2068
2069        self.update_sql(update)
2070            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2071        Ok(next_version)
2072    }
2073
2074    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2075    /// otherwise returns f's result.
2076    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2077    where
2078        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2079    {
2080        let create_sql = match self {
2081            CatalogItem::Table(Table { create_sql, .. })
2082            | CatalogItem::Type(Type { create_sql, .. })
2083            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2084            CatalogItem::Sink(Sink { create_sql, .. })
2085            | CatalogItem::View(View { create_sql, .. })
2086            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2087            | CatalogItem::Index(Index { create_sql, .. })
2088            | CatalogItem::Secret(Secret { create_sql, .. })
2089            | CatalogItem::Connection(Connection { create_sql, .. })
2090            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2091            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2092        };
2093        let Some(create_sql) = create_sql else {
2094            return Err(());
2095        };
2096        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2097            .expect("non-system items must be parseable")
2098            .into_element()
2099            .ast;
2100        debug!("rewrite: {}", ast.to_ast_string_redacted());
2101        let t = f(&mut ast)?;
2102        *create_sql = ast.to_ast_string_stable();
2103        debug!("rewrote: {}", ast.to_ast_string_redacted());
2104        Ok(t)
2105    }
2106
2107    /// If the object is considered a "compute object"
2108    /// (i.e., it is managed by the compute controller),
2109    /// this function returns its cluster ID. Otherwise, it returns nothing.
2110    ///
2111    /// This function differs from `cluster_id` because while all
2112    /// compute objects run on a cluster, the converse is not true.
2113    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2114        match self {
2115            CatalogItem::Index(index) => Some(index.cluster_id),
2116            CatalogItem::Table(_)
2117            | CatalogItem::Source(_)
2118            | CatalogItem::Log(_)
2119            | CatalogItem::View(_)
2120            | CatalogItem::MaterializedView(_)
2121            | CatalogItem::Sink(_)
2122            | CatalogItem::Type(_)
2123            | CatalogItem::Func(_)
2124            | CatalogItem::Secret(_)
2125            | CatalogItem::Connection(_)
2126            | CatalogItem::ContinualTask(_) => None,
2127        }
2128    }
2129
2130    pub fn cluster_id(&self) -> Option<ClusterId> {
2131        match self {
2132            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2133            CatalogItem::Index(index) => Some(index.cluster_id),
2134            CatalogItem::Source(source) => match &source.data_source {
2135                DataSourceDesc::Ingestion { cluster_id, .. } => Some(*cluster_id),
2136                // This is somewhat of a lie because the export runs on the same
2137                // cluster as its ingestion but we don't yet have a way of
2138                // cross-referencing the items
2139                DataSourceDesc::IngestionExport { .. } => None,
2140                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2141                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2142            },
2143            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2144            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2145            CatalogItem::Table(_)
2146            | CatalogItem::Log(_)
2147            | CatalogItem::View(_)
2148            | CatalogItem::Type(_)
2149            | CatalogItem::Func(_)
2150            | CatalogItem::Secret(_)
2151            | CatalogItem::Connection(_) => None,
2152        }
2153    }
2154
2155    /// The custom compaction window, if any has been set. This does not reflect any propagated
2156    /// compaction window (i.e., source -> subsource).
2157    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2158        match self {
2159            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2160            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2161            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2162            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2163            CatalogItem::Log(_)
2164            | CatalogItem::View(_)
2165            | CatalogItem::Sink(_)
2166            | CatalogItem::Type(_)
2167            | CatalogItem::Func(_)
2168            | CatalogItem::Secret(_)
2169            | CatalogItem::Connection(_)
2170            | CatalogItem::ContinualTask(_) => None,
2171        }
2172    }
2173
2174    /// Mutable access to the custom compaction window, or None if this type does not support custom
2175    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2176    /// subsource).
2177    pub fn custom_logical_compaction_window_mut(
2178        &mut self,
2179    ) -> Option<&mut Option<CompactionWindow>> {
2180        let cw = match self {
2181            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2182            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2183            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2184            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2185            CatalogItem::Log(_)
2186            | CatalogItem::View(_)
2187            | CatalogItem::Sink(_)
2188            | CatalogItem::Type(_)
2189            | CatalogItem::Func(_)
2190            | CatalogItem::Secret(_)
2191            | CatalogItem::Connection(_)
2192            | CatalogItem::ContinualTask(_) => return None,
2193        };
2194        Some(cw)
2195    }
2196
2197    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2198    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2199    ///
2200    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2201    /// sensible default (currently 1s).
2202    ///
2203    /// For objects that do not have the concept of compaction window, return None.
2204    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2205        let custom_logical_compaction_window = match self {
2206            CatalogItem::Table(_)
2207            | CatalogItem::Source(_)
2208            | CatalogItem::Index(_)
2209            | CatalogItem::MaterializedView(_)
2210            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2211            CatalogItem::Log(_)
2212            | CatalogItem::View(_)
2213            | CatalogItem::Sink(_)
2214            | CatalogItem::Type(_)
2215            | CatalogItem::Func(_)
2216            | CatalogItem::Secret(_)
2217            | CatalogItem::Connection(_) => return None,
2218        };
2219        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2220    }
2221
2222    /// Whether the item's logical compaction window
2223    /// is controlled by the METRICS_RETENTION
2224    /// system var.
2225    pub fn is_retained_metrics_object(&self) -> bool {
2226        match self {
2227            CatalogItem::Table(table) => table.is_retained_metrics_object,
2228            CatalogItem::Source(source) => source.is_retained_metrics_object,
2229            CatalogItem::Index(index) => index.is_retained_metrics_object,
2230            CatalogItem::Log(_)
2231            | CatalogItem::View(_)
2232            | CatalogItem::MaterializedView(_)
2233            | CatalogItem::Sink(_)
2234            | CatalogItem::Type(_)
2235            | CatalogItem::Func(_)
2236            | CatalogItem::Secret(_)
2237            | CatalogItem::Connection(_)
2238            | CatalogItem::ContinualTask(_) => false,
2239        }
2240    }
2241
2242    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2243        match self {
2244            CatalogItem::Table(table) => {
2245                let create_sql = table
2246                    .create_sql
2247                    .clone()
2248                    .expect("builtin tables cannot be serialized");
2249                let mut collections = table.collections.clone();
2250                let global_id = collections
2251                    .remove(&RelationVersion::root())
2252                    .expect("at least one version");
2253                (create_sql, global_id, collections)
2254            }
2255            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2256            CatalogItem::Source(source) => {
2257                assert!(
2258                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2259                    "cannot serialize introspection/builtin sources",
2260                );
2261                let create_sql = source
2262                    .create_sql
2263                    .clone()
2264                    .expect("builtin sources cannot be serialized");
2265                (create_sql, source.global_id, BTreeMap::new())
2266            }
2267            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2268            CatalogItem::MaterializedView(mview) => {
2269                (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2270            }
2271            CatalogItem::Index(index) => {
2272                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2273            }
2274            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2275            CatalogItem::Type(typ) => {
2276                let create_sql = typ
2277                    .create_sql
2278                    .clone()
2279                    .expect("builtin types cannot be serialized");
2280                (create_sql, typ.global_id, BTreeMap::new())
2281            }
2282            CatalogItem::Secret(secret) => {
2283                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2284            }
2285            CatalogItem::Connection(connection) => (
2286                connection.create_sql.clone(),
2287                connection.global_id,
2288                BTreeMap::new(),
2289            ),
2290            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2291            CatalogItem::ContinualTask(ct) => {
2292                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2293            }
2294        }
2295    }
2296
2297    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2298        match self {
2299            CatalogItem::Table(mut table) => {
2300                let create_sql = table
2301                    .create_sql
2302                    .expect("builtin tables cannot be serialized");
2303                let global_id = table
2304                    .collections
2305                    .remove(&RelationVersion::root())
2306                    .expect("at least one version");
2307                (create_sql, global_id, table.collections)
2308            }
2309            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2310            CatalogItem::Source(source) => {
2311                assert!(
2312                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2313                    "cannot serialize introspection/builtin sources",
2314                );
2315                let create_sql = source
2316                    .create_sql
2317                    .expect("builtin sources cannot be serialized");
2318                (create_sql, source.global_id, BTreeMap::new())
2319            }
2320            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2321            CatalogItem::MaterializedView(mview) => {
2322                (mview.create_sql, mview.global_id, BTreeMap::new())
2323            }
2324            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2325            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2326            CatalogItem::Type(typ) => {
2327                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2328                (create_sql, typ.global_id, BTreeMap::new())
2329            }
2330            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2331            CatalogItem::Connection(connection) => {
2332                (connection.create_sql, connection.global_id, BTreeMap::new())
2333            }
2334            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2335            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2336        }
2337    }
2338}
2339
2340impl CatalogEntry {
2341    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`],
2342    /// returning an error if this [`CatalogEntry`] does not produce rows.
2343    ///
2344    /// If you need to get the [`RelationDesc`] for a specific version, see [`CatalogItem::desc`].
2345    pub fn desc_latest(
2346        &self,
2347        name: &FullItemName,
2348    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2349        self.item.desc(name, RelationVersionSelector::Latest)
2350    }
2351
2352    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2353    /// produces rows.
2354    pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2355        self.item.desc_opt(RelationVersionSelector::Latest)
2356    }
2357
2358    /// Reports if the item has columns.
2359    pub fn has_columns(&self) -> bool {
2360        self.desc_opt_latest().is_some()
2361    }
2362
2363    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2364    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2365        self.item.func(self)
2366    }
2367
2368    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2369    pub fn index(&self) -> Option<&Index> {
2370        match self.item() {
2371            CatalogItem::Index(idx) => Some(idx),
2372            _ => None,
2373        }
2374    }
2375
2376    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2377    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2378        match self.item() {
2379            CatalogItem::MaterializedView(mv) => Some(mv),
2380            _ => None,
2381        }
2382    }
2383
2384    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2385    pub fn table(&self) -> Option<&Table> {
2386        match self.item() {
2387            CatalogItem::Table(tbl) => Some(tbl),
2388            _ => None,
2389        }
2390    }
2391
2392    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2393    pub fn source(&self) -> Option<&Source> {
2394        match self.item() {
2395            CatalogItem::Source(src) => Some(src),
2396            _ => None,
2397        }
2398    }
2399
2400    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2401    pub fn sink(&self) -> Option<&Sink> {
2402        match self.item() {
2403            CatalogItem::Sink(sink) => Some(sink),
2404            _ => None,
2405        }
2406    }
2407
2408    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2409    pub fn secret(&self) -> Option<&Secret> {
2410        match self.item() {
2411            CatalogItem::Secret(secret) => Some(secret),
2412            _ => None,
2413        }
2414    }
2415
2416    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2417        match self.item() {
2418            CatalogItem::Connection(connection) => Ok(connection),
2419            _ => {
2420                let db_name = match self.name().qualifiers.database_spec {
2421                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2422                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2423                };
2424                Err(SqlCatalogError::UnknownConnection(format!(
2425                    "{}{}.{}",
2426                    db_name,
2427                    self.name().qualifiers.schema_spec,
2428                    self.name().item
2429                )))
2430            }
2431        }
2432    }
2433
2434    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2435    /// this `CatalogEntry`, if any.
2436    pub fn source_desc(
2437        &self,
2438    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2439        self.item.source_desc(self)
2440    }
2441
2442    /// Reports whether this catalog entry is a connection.
2443    pub fn is_connection(&self) -> bool {
2444        matches!(self.item(), CatalogItem::Connection(_))
2445    }
2446
2447    /// Reports whether this catalog entry is a table.
2448    pub fn is_table(&self) -> bool {
2449        matches!(self.item(), CatalogItem::Table(_))
2450    }
2451
2452    /// Reports whether this catalog entry is a source. Note that this includes
2453    /// subsources.
2454    pub fn is_source(&self) -> bool {
2455        matches!(self.item(), CatalogItem::Source(_))
2456    }
2457
2458    /// Reports whether this catalog entry is a subsource and, if it is, the
2459    /// ingestion it is an export of, as well as the item it exports.
2460    pub fn subsource_details(
2461        &self,
2462    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2463        match &self.item() {
2464            CatalogItem::Source(source) => match &source.data_source {
2465                DataSourceDesc::IngestionExport {
2466                    ingestion_id,
2467                    external_reference,
2468                    details,
2469                    data_config: _,
2470                } => Some((*ingestion_id, external_reference, details)),
2471                _ => None,
2472            },
2473            _ => None,
2474        }
2475    }
2476
2477    /// Reports whether this catalog entry is a source export and, if it is, the
2478    /// ingestion it is an export of, as well as the item it exports.
2479    pub fn source_export_details(
2480        &self,
2481    ) -> Option<(
2482        CatalogItemId,
2483        &UnresolvedItemName,
2484        &SourceExportDetails,
2485        &SourceExportDataConfig<ReferencedConnection>,
2486    )> {
2487        match &self.item() {
2488            CatalogItem::Source(source) => match &source.data_source {
2489                DataSourceDesc::IngestionExport {
2490                    ingestion_id,
2491                    external_reference,
2492                    details,
2493                    data_config,
2494                } => Some((*ingestion_id, external_reference, details, data_config)),
2495                _ => None,
2496            },
2497            CatalogItem::Table(table) => match &table.data_source {
2498                TableDataSource::DataSource {
2499                    desc:
2500                        DataSourceDesc::IngestionExport {
2501                            ingestion_id,
2502                            external_reference,
2503                            details,
2504                            data_config,
2505                        },
2506                    timeline: _,
2507                } => Some((*ingestion_id, external_reference, details, data_config)),
2508                _ => None,
2509            },
2510            _ => None,
2511        }
2512    }
2513
2514    /// Reports whether this catalog entry is a progress source.
2515    pub fn is_progress_source(&self) -> bool {
2516        self.item().is_progress_source()
2517    }
2518
2519    /// Returns the `GlobalId` of all of this entry's progress ID.
2520    pub fn progress_id(&self) -> Option<CatalogItemId> {
2521        match &self.item() {
2522            CatalogItem::Source(source) => match &source.data_source {
2523                DataSourceDesc::Ingestion { ingestion_desc, .. } => {
2524                    Some(ingestion_desc.progress_subsource)
2525                }
2526                DataSourceDesc::IngestionExport { .. }
2527                | DataSourceDesc::Introspection(_)
2528                | DataSourceDesc::Progress
2529                | DataSourceDesc::Webhook { .. } => None,
2530            },
2531            CatalogItem::Table(_)
2532            | CatalogItem::Log(_)
2533            | CatalogItem::View(_)
2534            | CatalogItem::MaterializedView(_)
2535            | CatalogItem::Sink(_)
2536            | CatalogItem::Index(_)
2537            | CatalogItem::Type(_)
2538            | CatalogItem::Func(_)
2539            | CatalogItem::Secret(_)
2540            | CatalogItem::Connection(_)
2541            | CatalogItem::ContinualTask(_) => None,
2542        }
2543    }
2544
2545    /// Reports whether this catalog entry is a sink.
2546    pub fn is_sink(&self) -> bool {
2547        matches!(self.item(), CatalogItem::Sink(_))
2548    }
2549
2550    /// Reports whether this catalog entry is a materialized view.
2551    pub fn is_materialized_view(&self) -> bool {
2552        matches!(self.item(), CatalogItem::MaterializedView(_))
2553    }
2554
2555    /// Reports whether this catalog entry is a view.
2556    pub fn is_view(&self) -> bool {
2557        matches!(self.item(), CatalogItem::View(_))
2558    }
2559
2560    /// Reports whether this catalog entry is a secret.
2561    pub fn is_secret(&self) -> bool {
2562        matches!(self.item(), CatalogItem::Secret(_))
2563    }
2564
2565    /// Reports whether this catalog entry is an introspection source.
2566    pub fn is_introspection_source(&self) -> bool {
2567        matches!(self.item(), CatalogItem::Log(_))
2568    }
2569
2570    /// Reports whether this catalog entry is an index.
2571    pub fn is_index(&self) -> bool {
2572        matches!(self.item(), CatalogItem::Index(_))
2573    }
2574
2575    /// Reports whether this catalog entry is a continual task.
2576    pub fn is_continual_task(&self) -> bool {
2577        matches!(self.item(), CatalogItem::ContinualTask(_))
2578    }
2579
2580    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2581    pub fn is_relation(&self) -> bool {
2582        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2583    }
2584
2585    /// Collects the identifiers of the objects that were encountered when
2586    /// resolving names in the item's DDL statement.
2587    pub fn references(&self) -> &ResolvedIds {
2588        self.item.references()
2589    }
2590
2591    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2592    ///
2593    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2594    /// referenced. For example this will include any catalog objects used to implement functions
2595    /// and casts in the item.
2596    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2597        self.item.uses()
2598    }
2599
2600    /// Returns the `CatalogItem` associated with this catalog entry.
2601    pub fn item(&self) -> &CatalogItem {
2602        &self.item
2603    }
2604
2605    /// Returns the [`CatalogItemId`] of this catalog entry.
2606    pub fn id(&self) -> CatalogItemId {
2607        self.id
2608    }
2609
2610    /// Returns all of the [`GlobalId`]s associated with this item.
2611    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2612        self.item().global_ids()
2613    }
2614
2615    pub fn latest_global_id(&self) -> GlobalId {
2616        self.item().latest_global_id()
2617    }
2618
2619    /// Returns the OID of this catalog entry.
2620    pub fn oid(&self) -> u32 {
2621        self.oid
2622    }
2623
2624    /// Returns the fully qualified name of this catalog entry.
2625    pub fn name(&self) -> &QualifiedItemName {
2626        &self.name
2627    }
2628
2629    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2630    pub fn referenced_by(&self) -> &[CatalogItemId] {
2631        &self.referenced_by
2632    }
2633
2634    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2635    pub fn used_by(&self) -> &[CatalogItemId] {
2636        &self.used_by
2637    }
2638
2639    /// Returns the connection ID that this item belongs to, if this item is
2640    /// temporary.
2641    pub fn conn_id(&self) -> Option<&ConnectionId> {
2642        self.item.conn_id()
2643    }
2644
2645    /// Returns the role ID of the entry owner.
2646    pub fn owner_id(&self) -> &RoleId {
2647        &self.owner_id
2648    }
2649
2650    /// Returns the privileges of the entry.
2651    pub fn privileges(&self) -> &PrivilegeMap {
2652        &self.privileges
2653    }
2654}
2655
2656#[derive(Debug, Clone, Default)]
2657pub struct CommentsMap {
2658    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2659}
2660
2661impl CommentsMap {
2662    pub fn update_comment(
2663        &mut self,
2664        object_id: CommentObjectId,
2665        sub_component: Option<usize>,
2666        comment: Option<String>,
2667    ) -> Option<String> {
2668        let object_comments = self.map.entry(object_id).or_default();
2669
2670        // Either replace the existing comment, or remove it if comment is None/NULL.
2671        let (empty, prev) = if let Some(comment) = comment {
2672            let prev = object_comments.insert(sub_component, comment);
2673            (false, prev)
2674        } else {
2675            let prev = object_comments.remove(&sub_component);
2676            (object_comments.is_empty(), prev)
2677        };
2678
2679        // Cleanup entries that are now empty.
2680        if empty {
2681            self.map.remove(&object_id);
2682        }
2683
2684        // Return the previous comment, if there was one, for easy removal.
2685        prev
2686    }
2687
2688    /// Remove all comments for `object_id` from the map.
2689    ///
2690    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2691    /// relations you can also have comments on the individual columns. Dropping the comments for a
2692    /// relation will also drop all of the comments on any columns.
2693    pub fn drop_comments(
2694        &mut self,
2695        object_ids: &BTreeSet<CommentObjectId>,
2696    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2697        let mut removed_comments = Vec::new();
2698
2699        for object_id in object_ids {
2700            if let Some(comments) = self.map.remove(object_id) {
2701                let removed = comments
2702                    .into_iter()
2703                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2704                removed_comments.extend(removed);
2705            }
2706        }
2707
2708        removed_comments
2709    }
2710
2711    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2712        self.map
2713            .iter()
2714            .map(|(id, comments)| {
2715                comments
2716                    .iter()
2717                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2718            })
2719            .flatten()
2720    }
2721
2722    pub fn get_object_comments(
2723        &self,
2724        object_id: CommentObjectId,
2725    ) -> Option<&BTreeMap<Option<usize>, String>> {
2726        self.map.get(&object_id)
2727    }
2728}
2729
2730impl Serialize for CommentsMap {
2731    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2732    where
2733        S: serde::Serializer,
2734    {
2735        let comment_count = self
2736            .map
2737            .iter()
2738            .map(|(_object_id, comments)| comments.len())
2739            .sum();
2740
2741        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2742        for (object_id, sub) in &self.map {
2743            for (sub_component, comment) in sub {
2744                seq.serialize_element(&(
2745                    format!("{object_id:?}"),
2746                    format!("{sub_component:?}"),
2747                    comment,
2748                ))?;
2749            }
2750        }
2751        seq.end()
2752    }
2753}
2754
2755#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2756pub struct DefaultPrivileges {
2757    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2758    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2759}
2760
2761// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2762// map_key_to_string.
2763#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2764struct RoleDefaultPrivileges(
2765    /// Denormalized, the key is the grantee Role.
2766    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2767    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2768);
2769
2770impl Deref for RoleDefaultPrivileges {
2771    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2772
2773    fn deref(&self) -> &Self::Target {
2774        &self.0
2775    }
2776}
2777
2778impl DerefMut for RoleDefaultPrivileges {
2779    fn deref_mut(&mut self) -> &mut Self::Target {
2780        &mut self.0
2781    }
2782}
2783
2784impl DefaultPrivileges {
2785    /// Add a new default privilege into the set of all default privileges.
2786    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2787        if privilege.acl_mode.is_empty() {
2788            return;
2789        }
2790
2791        let privileges = self.privileges.entry(object).or_default();
2792        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2793            default_privilege.acl_mode |= privilege.acl_mode;
2794        } else {
2795            privileges.insert(privilege.grantee, privilege);
2796        }
2797    }
2798
2799    /// Revoke a default privilege from the set of all default privileges.
2800    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2801        if let Some(privileges) = self.privileges.get_mut(object) {
2802            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2803                default_privilege.acl_mode =
2804                    default_privilege.acl_mode.difference(privilege.acl_mode);
2805                if default_privilege.acl_mode.is_empty() {
2806                    privileges.remove(&privilege.grantee);
2807                }
2808            }
2809            if privileges.is_empty() {
2810                self.privileges.remove(object);
2811            }
2812        }
2813    }
2814
2815    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2816    /// any exist.
2817    pub fn get_privileges_for_grantee(
2818        &self,
2819        object: &DefaultPrivilegeObject,
2820        grantee: &RoleId,
2821    ) -> Option<&AclMode> {
2822        self.privileges
2823            .get(object)
2824            .and_then(|privileges| privileges.get(grantee))
2825            .map(|privilege| &privilege.acl_mode)
2826    }
2827
2828    /// Get all default privileges that apply to the provided object details.
2829    pub fn get_applicable_privileges(
2830        &self,
2831        role_id: RoleId,
2832        database_id: Option<DatabaseId>,
2833        schema_id: Option<SchemaId>,
2834        object_type: mz_sql::catalog::ObjectType,
2835    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2836        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
2837        // don't require the caller to worry about that and we will map their `object_type` to the
2838        // correct type for privileges.
2839        let privilege_object_type = if object_type.is_relation() {
2840            mz_sql::catalog::ObjectType::Table
2841        } else {
2842            object_type
2843        };
2844        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2845
2846        // Collect all entries that apply to the provided object details.
2847        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
2848        // entries in the vec below. That's OK because we consolidate the results after.
2849        [
2850            DefaultPrivilegeObject {
2851                role_id,
2852                database_id,
2853                schema_id,
2854                object_type: privilege_object_type,
2855            },
2856            DefaultPrivilegeObject {
2857                role_id,
2858                database_id,
2859                schema_id: None,
2860                object_type: privilege_object_type,
2861            },
2862            DefaultPrivilegeObject {
2863                role_id,
2864                database_id: None,
2865                schema_id: None,
2866                object_type: privilege_object_type,
2867            },
2868            DefaultPrivilegeObject {
2869                role_id: RoleId::Public,
2870                database_id,
2871                schema_id,
2872                object_type: privilege_object_type,
2873            },
2874            DefaultPrivilegeObject {
2875                role_id: RoleId::Public,
2876                database_id,
2877                schema_id: None,
2878                object_type: privilege_object_type,
2879            },
2880            DefaultPrivilegeObject {
2881                role_id: RoleId::Public,
2882                database_id: None,
2883                schema_id: None,
2884                object_type: privilege_object_type,
2885            },
2886        ]
2887        .into_iter()
2888        .filter_map(|object| self.privileges.get(&object))
2889        .flat_map(|acl_map| acl_map.values())
2890        // Consolidate privileges with a common grantee.
2891        .fold(
2892            BTreeMap::new(),
2893            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2894                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2895                *accum_acl_mode |= *acl_mode;
2896                accum
2897            },
2898        )
2899        .into_iter()
2900        // Restrict the acl_mode to only privileges valid for the provided object type. If the
2901        // default privilege has an object type of Table, then it may contain privileges valid for
2902        // tables but not other relations. If the passed in object type is another relation, then
2903        // we need to remove any privilege that is not valid for the specified relation.
2904        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2905        // Filter out empty privileges.
2906        .filter(|(_, acl_mode)| !acl_mode.is_empty())
2907        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2908            grantee: *grantee,
2909            acl_mode,
2910        })
2911    }
2912
2913    pub fn iter(
2914        &self,
2915    ) -> impl Iterator<
2916        Item = (
2917            &DefaultPrivilegeObject,
2918            impl Iterator<Item = &DefaultPrivilegeAclItem>,
2919        ),
2920    > {
2921        self.privileges
2922            .iter()
2923            .map(|(object, acl_map)| (object, acl_map.values()))
2924    }
2925}
2926
2927#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2928pub struct ClusterConfig {
2929    pub variant: ClusterVariant,
2930    pub workload_class: Option<String>,
2931}
2932
2933impl ClusterConfig {
2934    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2935        match &self.variant {
2936            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2937            ClusterVariant::Unmanaged => None,
2938        }
2939    }
2940}
2941
2942impl From<ClusterConfig> for durable::ClusterConfig {
2943    fn from(config: ClusterConfig) -> Self {
2944        Self {
2945            variant: config.variant.into(),
2946            workload_class: config.workload_class,
2947        }
2948    }
2949}
2950
2951impl From<durable::ClusterConfig> for ClusterConfig {
2952    fn from(config: durable::ClusterConfig) -> Self {
2953        Self {
2954            variant: config.variant.into(),
2955            workload_class: config.workload_class,
2956        }
2957    }
2958}
2959
2960#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2961pub struct ClusterVariantManaged {
2962    pub size: String,
2963    pub availability_zones: Vec<String>,
2964    pub logging: ReplicaLogging,
2965    pub replication_factor: u32,
2966    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2967    pub schedule: ClusterSchedule,
2968}
2969
2970impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
2971    fn from(managed: ClusterVariantManaged) -> Self {
2972        Self {
2973            size: managed.size,
2974            availability_zones: managed.availability_zones,
2975            logging: managed.logging,
2976            replication_factor: managed.replication_factor,
2977            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2978            schedule: managed.schedule,
2979        }
2980    }
2981}
2982
2983impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
2984    fn from(managed: durable::ClusterVariantManaged) -> Self {
2985        Self {
2986            size: managed.size,
2987            availability_zones: managed.availability_zones,
2988            logging: managed.logging,
2989            replication_factor: managed.replication_factor,
2990            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2991            schedule: managed.schedule,
2992        }
2993    }
2994}
2995
2996#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2997pub enum ClusterVariant {
2998    Managed(ClusterVariantManaged),
2999    Unmanaged,
3000}
3001
3002impl From<ClusterVariant> for durable::ClusterVariant {
3003    fn from(variant: ClusterVariant) -> Self {
3004        match variant {
3005            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3006            ClusterVariant::Unmanaged => Self::Unmanaged,
3007        }
3008    }
3009}
3010
3011impl From<durable::ClusterVariant> for ClusterVariant {
3012    fn from(variant: durable::ClusterVariant) -> Self {
3013        match variant {
3014            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3015            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3016        }
3017    }
3018}
3019
3020impl mz_sql::catalog::CatalogDatabase for Database {
3021    fn name(&self) -> &str {
3022        &self.name
3023    }
3024
3025    fn id(&self) -> DatabaseId {
3026        self.id
3027    }
3028
3029    fn has_schemas(&self) -> bool {
3030        !self.schemas_by_name.is_empty()
3031    }
3032
3033    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3034        &self.schemas_by_name
3035    }
3036
3037    // `as` is ok to use to cast to a trait object.
3038    #[allow(clippy::as_conversions)]
3039    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3040        self.schemas_by_id
3041            .values()
3042            .map(|schema| schema as &dyn CatalogSchema)
3043            .collect()
3044    }
3045
3046    fn owner_id(&self) -> RoleId {
3047        self.owner_id
3048    }
3049
3050    fn privileges(&self) -> &PrivilegeMap {
3051        &self.privileges
3052    }
3053}
3054
3055impl mz_sql::catalog::CatalogSchema for Schema {
3056    fn database(&self) -> &ResolvedDatabaseSpecifier {
3057        &self.name.database
3058    }
3059
3060    fn name(&self) -> &QualifiedSchemaName {
3061        &self.name
3062    }
3063
3064    fn id(&self) -> &SchemaSpecifier {
3065        &self.id
3066    }
3067
3068    fn has_items(&self) -> bool {
3069        !self.items.is_empty()
3070    }
3071
3072    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3073        Box::new(
3074            self.items
3075                .values()
3076                .chain(self.functions.values())
3077                .chain(self.types.values())
3078                .copied(),
3079        )
3080    }
3081
3082    fn owner_id(&self) -> RoleId {
3083        self.owner_id
3084    }
3085
3086    fn privileges(&self) -> &PrivilegeMap {
3087        &self.privileges
3088    }
3089}
3090
3091impl mz_sql::catalog::CatalogRole for Role {
3092    fn name(&self) -> &str {
3093        &self.name
3094    }
3095
3096    fn id(&self) -> RoleId {
3097        self.id
3098    }
3099
3100    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3101        &self.membership.map
3102    }
3103
3104    fn attributes(&self) -> &RoleAttributes {
3105        &self.attributes
3106    }
3107
3108    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3109        &self.vars.map
3110    }
3111}
3112
3113impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3114    fn name(&self) -> &str {
3115        &self.name
3116    }
3117
3118    fn id(&self) -> NetworkPolicyId {
3119        self.id
3120    }
3121
3122    fn owner_id(&self) -> RoleId {
3123        self.owner_id
3124    }
3125
3126    fn privileges(&self) -> &PrivilegeMap {
3127        &self.privileges
3128    }
3129}
3130
3131impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3132    fn name(&self) -> &str {
3133        &self.name
3134    }
3135
3136    fn id(&self) -> ClusterId {
3137        self.id
3138    }
3139
3140    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3141        &self.bound_objects
3142    }
3143
3144    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3145        &self.replica_id_by_name_
3146    }
3147
3148    // `as` is ok to use to cast to a trait object.
3149    #[allow(clippy::as_conversions)]
3150    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3151        self.replicas()
3152            .map(|replica| replica as &dyn CatalogClusterReplica)
3153            .collect()
3154    }
3155
3156    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3157        self.replica(id).expect("catalog out of sync")
3158    }
3159
3160    fn owner_id(&self) -> RoleId {
3161        self.owner_id
3162    }
3163
3164    fn privileges(&self) -> &PrivilegeMap {
3165        &self.privileges
3166    }
3167
3168    fn is_managed(&self) -> bool {
3169        self.is_managed()
3170    }
3171
3172    fn managed_size(&self) -> Option<&str> {
3173        match &self.config.variant {
3174            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3175            _ => None,
3176        }
3177    }
3178
3179    fn schedule(&self) -> Option<&ClusterSchedule> {
3180        match &self.config.variant {
3181            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3182            _ => None,
3183        }
3184    }
3185
3186    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3187        self.try_to_plan()
3188    }
3189}
3190
3191impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3192    fn name(&self) -> &str {
3193        &self.name
3194    }
3195
3196    fn cluster_id(&self) -> ClusterId {
3197        self.cluster_id
3198    }
3199
3200    fn replica_id(&self) -> ReplicaId {
3201        self.replica_id
3202    }
3203
3204    fn owner_id(&self) -> RoleId {
3205        self.owner_id
3206    }
3207
3208    fn internal(&self) -> bool {
3209        self.config.location.internal()
3210    }
3211}
3212
3213impl mz_sql::catalog::CatalogItem for CatalogEntry {
3214    fn name(&self) -> &QualifiedItemName {
3215        self.name()
3216    }
3217
3218    fn id(&self) -> CatalogItemId {
3219        self.id()
3220    }
3221
3222    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3223        Box::new(self.global_ids())
3224    }
3225
3226    fn oid(&self) -> u32 {
3227        self.oid()
3228    }
3229
3230    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3231        self.func()
3232    }
3233
3234    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3235        self.source_desc()
3236    }
3237
3238    fn connection(
3239        &self,
3240    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3241    {
3242        Ok(self.connection()?.details.to_connection())
3243    }
3244
3245    fn create_sql(&self) -> &str {
3246        match self.item() {
3247            CatalogItem::Table(Table { create_sql, .. }) => {
3248                create_sql.as_deref().unwrap_or("<builtin>")
3249            }
3250            CatalogItem::Source(Source { create_sql, .. }) => {
3251                create_sql.as_deref().unwrap_or("<builtin>")
3252            }
3253            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3254            CatalogItem::View(View { create_sql, .. }) => create_sql,
3255            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3256            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3257            CatalogItem::Type(Type { create_sql, .. }) => {
3258                create_sql.as_deref().unwrap_or("<builtin>")
3259            }
3260            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3261            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3262            CatalogItem::Func(_) => "<builtin>",
3263            CatalogItem::Log(_) => "<builtin>",
3264            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3265        }
3266    }
3267
3268    fn item_type(&self) -> SqlCatalogItemType {
3269        self.item().typ()
3270    }
3271
3272    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3273        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3274            Some((keys, *on))
3275        } else {
3276            None
3277        }
3278    }
3279
3280    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3281        if let CatalogItem::Table(Table {
3282            data_source: TableDataSource::TableWrites { defaults },
3283            ..
3284        }) = self.item()
3285        {
3286            Some(defaults.as_slice())
3287        } else {
3288            None
3289        }
3290    }
3291
3292    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3293        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3294            Some(details)
3295        } else {
3296            None
3297        }
3298    }
3299
3300    fn references(&self) -> &ResolvedIds {
3301        self.references()
3302    }
3303
3304    fn uses(&self) -> BTreeSet<CatalogItemId> {
3305        self.uses()
3306    }
3307
3308    fn referenced_by(&self) -> &[CatalogItemId] {
3309        self.referenced_by()
3310    }
3311
3312    fn used_by(&self) -> &[CatalogItemId] {
3313        self.used_by()
3314    }
3315
3316    fn subsource_details(
3317        &self,
3318    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3319        self.subsource_details()
3320    }
3321
3322    fn source_export_details(
3323        &self,
3324    ) -> Option<(
3325        CatalogItemId,
3326        &UnresolvedItemName,
3327        &SourceExportDetails,
3328        &SourceExportDataConfig<ReferencedConnection>,
3329    )> {
3330        self.source_export_details()
3331    }
3332
3333    fn is_progress_source(&self) -> bool {
3334        self.is_progress_source()
3335    }
3336
3337    fn progress_id(&self) -> Option<CatalogItemId> {
3338        self.progress_id()
3339    }
3340
3341    fn owner_id(&self) -> RoleId {
3342        self.owner_id
3343    }
3344
3345    fn privileges(&self) -> &PrivilegeMap {
3346        &self.privileges
3347    }
3348
3349    fn cluster_id(&self) -> Option<ClusterId> {
3350        self.item().cluster_id()
3351    }
3352
3353    fn at_version(
3354        &self,
3355        version: RelationVersionSelector,
3356    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3357        Box::new(CatalogCollectionEntry {
3358            entry: self.clone(),
3359            version,
3360        })
3361    }
3362
3363    fn latest_version(&self) -> Option<RelationVersion> {
3364        self.table().map(|t| t.desc.latest_version())
3365    }
3366}
3367
3368/// A single update to the catalog state.
3369#[derive(Debug)]
3370pub struct StateUpdate {
3371    pub kind: StateUpdateKind,
3372    pub ts: Timestamp,
3373    pub diff: StateDiff,
3374}
3375
3376/// The contents of a single state update.
3377///
3378/// Variants are listed in dependency order.
3379#[derive(Debug, Clone)]
3380pub enum StateUpdateKind {
3381    Role(durable::objects::Role),
3382    RoleAuth(durable::objects::RoleAuth),
3383    Database(durable::objects::Database),
3384    Schema(durable::objects::Schema),
3385    DefaultPrivilege(durable::objects::DefaultPrivilege),
3386    SystemPrivilege(MzAclItem),
3387    SystemConfiguration(durable::objects::SystemConfiguration),
3388    Cluster(durable::objects::Cluster),
3389    NetworkPolicy(durable::objects::NetworkPolicy),
3390    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3391    ClusterReplica(durable::objects::ClusterReplica),
3392    SourceReferences(durable::objects::SourceReferences),
3393    SystemObjectMapping(durable::objects::SystemObjectMapping),
3394    // Temporary items are not actually updated via the durable catalog, but this allows us to
3395    // model them the same way as all other items.
3396    TemporaryItem(TemporaryItem),
3397    Item(durable::objects::Item),
3398    Comment(durable::objects::Comment),
3399    AuditLog(durable::objects::AuditLog),
3400    // Storage updates.
3401    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3402    UnfinalizedShard(durable::objects::UnfinalizedShard),
3403}
3404
3405/// Valid diffs for catalog state updates.
3406#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3407pub enum StateDiff {
3408    Retraction,
3409    Addition,
3410}
3411
3412impl From<StateDiff> for Diff {
3413    fn from(diff: StateDiff) -> Self {
3414        match diff {
3415            StateDiff::Retraction => Diff::MINUS_ONE,
3416            StateDiff::Addition => Diff::ONE,
3417        }
3418    }
3419}
3420impl TryFrom<Diff> for StateDiff {
3421    type Error = String;
3422
3423    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3424        match diff {
3425            Diff::MINUS_ONE => Ok(Self::Retraction),
3426            Diff::ONE => Ok(Self::Addition),
3427            diff => Err(format!("invalid diff {diff}")),
3428        }
3429    }
3430}
3431
3432/// Information needed to process an update to a temporary item.
3433#[derive(Debug, Clone)]
3434pub struct TemporaryItem {
3435    pub id: CatalogItemId,
3436    pub oid: u32,
3437    pub name: QualifiedItemName,
3438    pub item: CatalogItem,
3439    pub owner_id: RoleId,
3440    pub privileges: PrivilegeMap,
3441}
3442
3443impl From<CatalogEntry> for TemporaryItem {
3444    fn from(entry: CatalogEntry) -> Self {
3445        TemporaryItem {
3446            id: entry.id,
3447            oid: entry.oid,
3448            name: entry.name,
3449            item: entry.item,
3450            owner_id: entry.owner_id,
3451            privileges: entry.privileges,
3452        }
3453    }
3454}
3455
3456/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3457#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3458pub enum BootstrapStateUpdateKind {
3459    Role(durable::objects::Role),
3460    RoleAuth(durable::objects::RoleAuth),
3461    Database(durable::objects::Database),
3462    Schema(durable::objects::Schema),
3463    DefaultPrivilege(durable::objects::DefaultPrivilege),
3464    SystemPrivilege(MzAclItem),
3465    SystemConfiguration(durable::objects::SystemConfiguration),
3466    Cluster(durable::objects::Cluster),
3467    NetworkPolicy(durable::objects::NetworkPolicy),
3468    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3469    ClusterReplica(durable::objects::ClusterReplica),
3470    SourceReferences(durable::objects::SourceReferences),
3471    SystemObjectMapping(durable::objects::SystemObjectMapping),
3472    Item(durable::objects::Item),
3473    Comment(durable::objects::Comment),
3474    AuditLog(durable::objects::AuditLog),
3475    // Storage updates.
3476    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3477    UnfinalizedShard(durable::objects::UnfinalizedShard),
3478}
3479
3480impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3481    fn from(value: BootstrapStateUpdateKind) -> Self {
3482        match value {
3483            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3484            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3485            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3486            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3487            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3488                StateUpdateKind::DefaultPrivilege(kind)
3489            }
3490            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3491                StateUpdateKind::SystemPrivilege(kind)
3492            }
3493            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3494                StateUpdateKind::SystemConfiguration(kind)
3495            }
3496            BootstrapStateUpdateKind::SourceReferences(kind) => {
3497                StateUpdateKind::SourceReferences(kind)
3498            }
3499            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3500            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3501            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3502                StateUpdateKind::IntrospectionSourceIndex(kind)
3503            }
3504            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3505            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3506                StateUpdateKind::SystemObjectMapping(kind)
3507            }
3508            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3509            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3510            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3511            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3512                StateUpdateKind::StorageCollectionMetadata(kind)
3513            }
3514            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3515                StateUpdateKind::UnfinalizedShard(kind)
3516            }
3517        }
3518    }
3519}
3520
3521impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3522    type Error = TemporaryItem;
3523
3524    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3525        match value {
3526            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3527            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3528            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3529            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3530            StateUpdateKind::DefaultPrivilege(kind) => {
3531                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3532            }
3533            StateUpdateKind::SystemPrivilege(kind) => {
3534                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3535            }
3536            StateUpdateKind::SystemConfiguration(kind) => {
3537                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3538            }
3539            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3540            StateUpdateKind::NetworkPolicy(kind) => {
3541                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3542            }
3543            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3544                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3545            }
3546            StateUpdateKind::ClusterReplica(kind) => {
3547                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3548            }
3549            StateUpdateKind::SourceReferences(kind) => {
3550                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3551            }
3552            StateUpdateKind::SystemObjectMapping(kind) => {
3553                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3554            }
3555            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3556            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3557            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3558            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3559            StateUpdateKind::StorageCollectionMetadata(kind) => {
3560                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3561            }
3562            StateUpdateKind::UnfinalizedShard(kind) => {
3563                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3564            }
3565        }
3566    }
3567}