Skip to main content

mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
25use mz_controller_types::{ClusterId, ReplicaId};
26use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
27use mz_ore::collections::CollectionExt;
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::refresh_schedule::RefreshSchedule;
32use mz_repr::role_id::RoleId;
33use mz_repr::{
34    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
35    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
36};
37use mz_sql::ast::display::AstDisplay;
38use mz_sql::ast::{
39    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
40    UnresolvedItemName, Value, WithOptionValue,
41};
42use mz_sql::catalog::{
43    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
44    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
45    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
46    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
47};
48use mz_sql::names::{
49    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
50    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
51};
52use mz_sql::plan::{
53    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
54    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
55    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
56    WebhookValidation,
57};
58use mz_sql::rbac;
59use mz_sql::session::vars::OwnedVarInput;
60use mz_storage_client::controller::IntrospectionType;
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
63use mz_storage_types::sources::load_generator::LoadGenerator;
64use mz_storage_types::sources::{
65    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
66    SourceExportDetails, Timeline,
67};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use timely::progress::Antichain;
71use tracing::debug;
72
73use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
74use crate::durable;
75use crate::durable::objects::item_type;
76
77/// Used to update `self` from the input value while consuming the input value.
78pub trait UpdateFrom<T>: From<T> {
79    fn update_from(&mut self, from: T);
80}
81
82#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
83pub struct Database {
84    pub name: String,
85    pub id: DatabaseId,
86    pub oid: u32,
87    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
88    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
89    pub schemas_by_name: BTreeMap<String, SchemaId>,
90    pub owner_id: RoleId,
91    pub privileges: PrivilegeMap,
92}
93
94impl From<Database> for durable::Database {
95    fn from(database: Database) -> durable::Database {
96        durable::Database {
97            id: database.id,
98            oid: database.oid,
99            name: database.name,
100            owner_id: database.owner_id,
101            privileges: database.privileges.into_all_values().collect(),
102        }
103    }
104}
105
106impl From<durable::Database> for Database {
107    fn from(
108        durable::Database {
109            id,
110            oid,
111            name,
112            owner_id,
113            privileges,
114        }: durable::Database,
115    ) -> Database {
116        Database {
117            id,
118            oid,
119            schemas_by_id: BTreeMap::new(),
120            schemas_by_name: BTreeMap::new(),
121            name,
122            owner_id,
123            privileges: PrivilegeMap::from_mz_acl_items(privileges),
124        }
125    }
126}
127
128impl UpdateFrom<durable::Database> for Database {
129    fn update_from(
130        &mut self,
131        durable::Database {
132            id,
133            oid,
134            name,
135            owner_id,
136            privileges,
137        }: durable::Database,
138    ) {
139        self.id = id;
140        self.oid = oid;
141        self.name = name;
142        self.owner_id = owner_id;
143        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
144    }
145}
146
147#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
148pub struct Schema {
149    pub name: QualifiedSchemaName,
150    pub id: SchemaSpecifier,
151    pub oid: u32,
152    pub items: BTreeMap<String, CatalogItemId>,
153    pub functions: BTreeMap<String, CatalogItemId>,
154    pub types: BTreeMap<String, CatalogItemId>,
155    pub owner_id: RoleId,
156    pub privileges: PrivilegeMap,
157}
158
159impl From<Schema> for durable::Schema {
160    fn from(schema: Schema) -> durable::Schema {
161        durable::Schema {
162            id: schema.id.into(),
163            oid: schema.oid,
164            name: schema.name.schema,
165            database_id: schema.name.database.id(),
166            owner_id: schema.owner_id,
167            privileges: schema.privileges.into_all_values().collect(),
168        }
169    }
170}
171
172impl From<durable::Schema> for Schema {
173    fn from(
174        durable::Schema {
175            id,
176            oid,
177            name,
178            database_id,
179            owner_id,
180            privileges,
181        }: durable::Schema,
182    ) -> Schema {
183        Schema {
184            name: QualifiedSchemaName {
185                database: database_id.into(),
186                schema: name,
187            },
188            id: id.into(),
189            oid,
190            items: BTreeMap::new(),
191            functions: BTreeMap::new(),
192            types: BTreeMap::new(),
193            owner_id,
194            privileges: PrivilegeMap::from_mz_acl_items(privileges),
195        }
196    }
197}
198
199impl UpdateFrom<durable::Schema> for Schema {
200    fn update_from(
201        &mut self,
202        durable::Schema {
203            id,
204            oid,
205            name,
206            database_id,
207            owner_id,
208            privileges,
209        }: durable::Schema,
210    ) {
211        self.name = QualifiedSchemaName {
212            database: database_id.into(),
213            schema: name,
214        };
215        self.id = id.into();
216        self.oid = oid;
217        self.owner_id = owner_id;
218        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
219    }
220}
221
222#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
223pub struct Role {
224    pub name: String,
225    pub id: RoleId,
226    pub oid: u32,
227    pub attributes: RoleAttributes,
228    pub membership: RoleMembership,
229    pub vars: RoleVars,
230}
231
232impl Role {
233    pub fn is_user(&self) -> bool {
234        self.id.is_user()
235    }
236
237    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
238        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
239    }
240}
241
242impl From<Role> for durable::Role {
243    fn from(role: Role) -> durable::Role {
244        durable::Role {
245            id: role.id,
246            oid: role.oid,
247            name: role.name,
248            attributes: role.attributes,
249            membership: role.membership,
250            vars: role.vars,
251        }
252    }
253}
254
255impl From<durable::Role> for Role {
256    fn from(
257        durable::Role {
258            id,
259            oid,
260            name,
261            attributes,
262            membership,
263            vars,
264        }: durable::Role,
265    ) -> Self {
266        Role {
267            name,
268            id,
269            oid,
270            attributes,
271            membership,
272            vars,
273        }
274    }
275}
276
277impl UpdateFrom<durable::Role> for Role {
278    fn update_from(
279        &mut self,
280        durable::Role {
281            id,
282            oid,
283            name,
284            attributes,
285            membership,
286            vars,
287        }: durable::Role,
288    ) {
289        self.id = id;
290        self.oid = oid;
291        self.name = name;
292        self.attributes = attributes;
293        self.membership = membership;
294        self.vars = vars;
295    }
296}
297
298#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
299pub struct RoleAuth {
300    pub role_id: RoleId,
301    pub password_hash: Option<String>,
302    pub updated_at: u64,
303}
304
305impl From<RoleAuth> for durable::RoleAuth {
306    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
307        durable::RoleAuth {
308            role_id: role_auth.role_id,
309            password_hash: role_auth.password_hash,
310            updated_at: role_auth.updated_at,
311        }
312    }
313}
314
315impl From<durable::RoleAuth> for RoleAuth {
316    fn from(
317        durable::RoleAuth {
318            role_id,
319            password_hash,
320            updated_at,
321        }: durable::RoleAuth,
322    ) -> RoleAuth {
323        RoleAuth {
324            role_id,
325            password_hash,
326            updated_at,
327        }
328    }
329}
330
331impl UpdateFrom<durable::RoleAuth> for RoleAuth {
332    fn update_from(&mut self, from: durable::RoleAuth) {
333        self.role_id = from.role_id;
334        self.password_hash = from.password_hash;
335    }
336}
337
338#[derive(Debug, Serialize, Clone, PartialEq)]
339pub struct Cluster {
340    pub name: String,
341    pub id: ClusterId,
342    pub config: ClusterConfig,
343    #[serde(skip)]
344    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
345    /// Objects bound to this cluster. Does not include introspection source
346    /// indexes.
347    pub bound_objects: BTreeSet<CatalogItemId>,
348    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
349    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
350    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
351    pub owner_id: RoleId,
352    pub privileges: PrivilegeMap,
353}
354
355impl Cluster {
356    /// The role of the cluster. Currently used to set alert severity.
357    pub fn role(&self) -> ClusterRole {
358        // NOTE - These roles power monitoring systems. Do not change
359        // them without talking to the cloud or observability groups.
360        if self.name == MZ_SYSTEM_CLUSTER.name {
361            ClusterRole::SystemCritical
362        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
363            ClusterRole::System
364        } else {
365            ClusterRole::User
366        }
367    }
368
369    /// Returns `true` if the cluster is a managed cluster.
370    pub fn is_managed(&self) -> bool {
371        matches!(self.config.variant, ClusterVariant::Managed { .. })
372    }
373
374    /// Lists the user replicas, which are those that do not have the internal flag set.
375    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
376        self.replicas().filter(|r| !r.config.location.internal())
377    }
378
379    /// Lists all replicas in the cluster
380    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381        self.replicas_by_id_.values()
382    }
383
384    /// Lookup a replica by ID.
385    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
386        self.replicas_by_id_.get(&replica_id)
387    }
388
389    /// Lookup a replica ID by name.
390    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
391        self.replica_id_by_name_.get(name).copied()
392    }
393
394    /// Returns the availability zones of this cluster, if they exist.
395    pub fn availability_zones(&self) -> Option<&[String]> {
396        match &self.config.variant {
397            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
398            ClusterVariant::Unmanaged => None,
399        }
400    }
401
402    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
403        let name = self.name.clone();
404        let variant = match &self.config.variant {
405            ClusterVariant::Managed(ClusterVariantManaged {
406                size,
407                availability_zones,
408                logging,
409                replication_factor,
410                optimizer_feature_overrides,
411                schedule,
412            }) => {
413                let introspection = match logging {
414                    ReplicaLogging {
415                        log_logging,
416                        interval: Some(interval),
417                    } => Some(ComputeReplicaIntrospectionConfig {
418                        debugging: *log_logging,
419                        interval: interval.clone(),
420                    }),
421                    ReplicaLogging {
422                        log_logging: _,
423                        interval: None,
424                    } => None,
425                };
426                let compute = ComputeReplicaConfig { introspection };
427                CreateClusterVariant::Managed(CreateClusterManagedPlan {
428                    replication_factor: replication_factor.clone(),
429                    size: size.clone(),
430                    availability_zones: availability_zones.clone(),
431                    compute,
432                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
433                    schedule: schedule.clone(),
434                })
435            }
436            ClusterVariant::Unmanaged => {
437                // Unmanaged clusters are deprecated, so hopefully we can remove
438                // them before we have to implement this.
439                return Err(PlanError::Unsupported {
440                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
441                    discussion_no: None,
442                });
443            }
444        };
445        let workload_class = self.config.workload_class.clone();
446        Ok(CreateClusterPlan {
447            name,
448            variant,
449            workload_class,
450        })
451    }
452}
453
454impl From<Cluster> for durable::Cluster {
455    fn from(cluster: Cluster) -> durable::Cluster {
456        durable::Cluster {
457            id: cluster.id,
458            name: cluster.name,
459            owner_id: cluster.owner_id,
460            privileges: cluster.privileges.into_all_values().collect(),
461            config: cluster.config.into(),
462        }
463    }
464}
465
466impl From<durable::Cluster> for Cluster {
467    fn from(
468        durable::Cluster {
469            id,
470            name,
471            owner_id,
472            privileges,
473            config,
474        }: durable::Cluster,
475    ) -> Self {
476        Cluster {
477            name: name.clone(),
478            id,
479            bound_objects: BTreeSet::new(),
480            log_indexes: BTreeMap::new(),
481            replica_id_by_name_: BTreeMap::new(),
482            replicas_by_id_: BTreeMap::new(),
483            owner_id,
484            privileges: PrivilegeMap::from_mz_acl_items(privileges),
485            config: config.into(),
486        }
487    }
488}
489
490impl UpdateFrom<durable::Cluster> for Cluster {
491    fn update_from(
492        &mut self,
493        durable::Cluster {
494            id,
495            name,
496            owner_id,
497            privileges,
498            config,
499        }: durable::Cluster,
500    ) {
501        self.id = id;
502        self.name = name;
503        self.owner_id = owner_id;
504        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
505        self.config = config.into();
506    }
507}
508
509#[derive(Debug, Serialize, Clone, PartialEq)]
510pub struct ClusterReplica {
511    pub name: String,
512    pub cluster_id: ClusterId,
513    pub replica_id: ReplicaId,
514    pub config: ReplicaConfig,
515    pub owner_id: RoleId,
516}
517
518impl From<ClusterReplica> for durable::ClusterReplica {
519    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
520        durable::ClusterReplica {
521            cluster_id: replica.cluster_id,
522            replica_id: replica.replica_id,
523            name: replica.name,
524            config: replica.config.into(),
525            owner_id: replica.owner_id,
526        }
527    }
528}
529
530#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
531pub struct ClusterReplicaProcessStatus {
532    pub status: ClusterStatus,
533    pub time: DateTime<Utc>,
534}
535
536#[derive(Debug, Serialize, Clone, PartialEq)]
537pub struct SourceReferences {
538    pub updated_at: u64,
539    pub references: Vec<SourceReference>,
540}
541
542#[derive(Debug, Serialize, Clone, PartialEq)]
543pub struct SourceReference {
544    pub name: String,
545    pub namespace: Option<String>,
546    pub columns: Vec<String>,
547}
548
549impl From<SourceReference> for durable::SourceReference {
550    fn from(source_reference: SourceReference) -> durable::SourceReference {
551        durable::SourceReference {
552            name: source_reference.name,
553            namespace: source_reference.namespace,
554            columns: source_reference.columns,
555        }
556    }
557}
558
559impl SourceReferences {
560    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
561        durable::SourceReferences {
562            source_id,
563            updated_at: self.updated_at,
564            references: self.references.into_iter().map(Into::into).collect(),
565        }
566    }
567}
568
569impl From<durable::SourceReference> for SourceReference {
570    fn from(source_reference: durable::SourceReference) -> SourceReference {
571        SourceReference {
572            name: source_reference.name,
573            namespace: source_reference.namespace,
574            columns: source_reference.columns,
575        }
576    }
577}
578
579impl From<durable::SourceReferences> for SourceReferences {
580    fn from(source_references: durable::SourceReferences) -> SourceReferences {
581        SourceReferences {
582            updated_at: source_references.updated_at,
583            references: source_references
584                .references
585                .into_iter()
586                .map(|source_reference| source_reference.into())
587                .collect(),
588        }
589    }
590}
591
592impl From<mz_sql::plan::SourceReference> for SourceReference {
593    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
594        SourceReference {
595            name: source_reference.name,
596            namespace: source_reference.namespace,
597            columns: source_reference.columns,
598        }
599    }
600}
601
602impl From<mz_sql::plan::SourceReferences> for SourceReferences {
603    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
604        SourceReferences {
605            updated_at: source_references.updated_at,
606            references: source_references
607                .references
608                .into_iter()
609                .map(|source_reference| source_reference.into())
610                .collect(),
611        }
612    }
613}
614
615impl From<SourceReferences> for mz_sql::plan::SourceReferences {
616    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
617        mz_sql::plan::SourceReferences {
618            updated_at: source_references.updated_at,
619            references: source_references
620                .references
621                .into_iter()
622                .map(|source_reference| source_reference.into())
623                .collect(),
624        }
625    }
626}
627
628impl From<SourceReference> for mz_sql::plan::SourceReference {
629    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
630        mz_sql::plan::SourceReference {
631            name: source_reference.name,
632            namespace: source_reference.namespace,
633            columns: source_reference.columns,
634        }
635    }
636}
637
638#[derive(Clone, Debug, Serialize)]
639pub struct CatalogEntry {
640    pub item: CatalogItem,
641    #[serde(skip)]
642    pub referenced_by: Vec<CatalogItemId>,
643    // TODO(database-issues#7922)––this should have an invariant tied to it that all
644    // dependents (i.e. entries in this field) have IDs greater than this
645    // entry's ID.
646    #[serde(skip)]
647    pub used_by: Vec<CatalogItemId>,
648    pub id: CatalogItemId,
649    pub oid: u32,
650    pub name: QualifiedItemName,
651    pub owner_id: RoleId,
652    pub privileges: PrivilegeMap,
653}
654
655/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
656/// A single item in the catalog may be associated with multiple "collections".
657///
658/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
659/// currently running dataflow, etc.
660///
661/// Items in the Catalog have a stable name -> ID mapping, in other words for
662/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
663/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
664/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
665/// to a table. We can't just change the schema of the underlying Persist Shard
666/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
667/// allocate a new [`GlobalId`] to refer to the new version of the table, and
668/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
669///
670/// TODO(ct): Add a note here if we end up using this for associating continual
671/// tasks with a single catalog item.
672#[derive(Clone, Debug)]
673pub struct CatalogCollectionEntry {
674    pub entry: CatalogEntry,
675    pub version: RelationVersionSelector,
676}
677
678impl CatalogCollectionEntry {
679    pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
680        self.item().relation_desc(self.version)
681    }
682}
683
684impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
685    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
686        self.item().relation_desc(self.version)
687    }
688
689    fn global_id(&self) -> GlobalId {
690        self.entry
691            .item()
692            .global_id_for_version(self.version)
693            .expect("catalog corruption, missing version!")
694    }
695}
696
697impl Deref for CatalogCollectionEntry {
698    type Target = CatalogEntry;
699
700    fn deref(&self) -> &CatalogEntry {
701        &self.entry
702    }
703}
704
705impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
706    fn name(&self) -> &QualifiedItemName {
707        self.entry.name()
708    }
709
710    fn id(&self) -> CatalogItemId {
711        self.entry.id()
712    }
713
714    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
715        Box::new(self.entry.global_ids())
716    }
717
718    fn oid(&self) -> u32 {
719        self.entry.oid()
720    }
721
722    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
723        self.entry.func()
724    }
725
726    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
727        self.entry.source_desc()
728    }
729
730    fn connection(
731        &self,
732    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
733    {
734        mz_sql::catalog::CatalogItem::connection(&self.entry)
735    }
736
737    fn create_sql(&self) -> &str {
738        self.entry.create_sql()
739    }
740
741    fn item_type(&self) -> SqlCatalogItemType {
742        self.entry.item_type()
743    }
744
745    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
746        self.entry.index_details()
747    }
748
749    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
750        self.entry.writable_table_details()
751    }
752
753    fn replacement_target(&self) -> Option<CatalogItemId> {
754        self.entry.replacement_target()
755    }
756
757    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
758        self.entry.type_details()
759    }
760
761    fn references(&self) -> &ResolvedIds {
762        self.entry.references()
763    }
764
765    fn uses(&self) -> BTreeSet<CatalogItemId> {
766        self.entry.uses()
767    }
768
769    fn referenced_by(&self) -> &[CatalogItemId] {
770        self.entry.referenced_by()
771    }
772
773    fn used_by(&self) -> &[CatalogItemId] {
774        self.entry.used_by()
775    }
776
777    fn subsource_details(
778        &self,
779    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
780        self.entry.subsource_details()
781    }
782
783    fn source_export_details(
784        &self,
785    ) -> Option<(
786        CatalogItemId,
787        &UnresolvedItemName,
788        &SourceExportDetails,
789        &SourceExportDataConfig<ReferencedConnection>,
790    )> {
791        self.entry.source_export_details()
792    }
793
794    fn is_progress_source(&self) -> bool {
795        self.entry.is_progress_source()
796    }
797
798    fn progress_id(&self) -> Option<CatalogItemId> {
799        self.entry.progress_id()
800    }
801
802    fn owner_id(&self) -> RoleId {
803        *self.entry.owner_id()
804    }
805
806    fn privileges(&self) -> &PrivilegeMap {
807        self.entry.privileges()
808    }
809
810    fn cluster_id(&self) -> Option<ClusterId> {
811        self.entry.item().cluster_id()
812    }
813
814    fn at_version(
815        &self,
816        version: RelationVersionSelector,
817    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
818        Box::new(CatalogCollectionEntry {
819            entry: self.entry.clone(),
820            version,
821        })
822    }
823
824    fn latest_version(&self) -> Option<RelationVersion> {
825        self.entry.latest_version()
826    }
827}
828
829#[derive(Debug, Clone, Serialize)]
830pub enum CatalogItem {
831    Table(Table),
832    Source(Source),
833    Log(Log),
834    View(View),
835    MaterializedView(MaterializedView),
836    Sink(Sink),
837    Index(Index),
838    Type(Type),
839    Func(Func),
840    Secret(Secret),
841    Connection(Connection),
842    ContinualTask(ContinualTask),
843}
844
845impl From<CatalogEntry> for durable::Item {
846    fn from(entry: CatalogEntry) -> durable::Item {
847        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
848        durable::Item {
849            id: entry.id,
850            oid: entry.oid,
851            global_id,
852            schema_id: entry.name.qualifiers.schema_spec.into(),
853            name: entry.name.item,
854            create_sql,
855            owner_id: entry.owner_id,
856            privileges: entry.privileges.into_all_values().collect(),
857            extra_versions,
858        }
859    }
860}
861
862#[derive(Debug, Clone, Serialize)]
863pub struct Table {
864    /// Parse-able SQL that defines this table.
865    pub create_sql: Option<String>,
866    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
867    pub desc: VersionedRelationDesc,
868    /// Versions of this table, and the [`GlobalId`]s that refer to them.
869    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
870    pub collections: BTreeMap<RelationVersion, GlobalId>,
871    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
872    #[serde(skip)]
873    pub conn_id: Option<ConnectionId>,
874    /// Other catalog objects referenced by this table, e.g. custom types.
875    pub resolved_ids: ResolvedIds,
876    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
877    pub custom_logical_compaction_window: Option<CompactionWindow>,
878    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
879    /// session variable.
880    ///
881    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
882    pub is_retained_metrics_object: bool,
883    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
884    pub data_source: TableDataSource,
885}
886
887impl Table {
888    pub fn timeline(&self) -> Timeline {
889        match &self.data_source {
890            // The Coordinator controls insertions for writable tables
891            // (including system tables), so they are realtime.
892            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
893            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
894        }
895    }
896
897    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
898    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
899        self.collections.values().copied()
900    }
901
902    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
903    pub fn global_id_writes(&self) -> GlobalId {
904        *self
905            .collections
906            .last_key_value()
907            .expect("at least one version of a table")
908            .1
909    }
910
911    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
912    pub fn collection_descs(
913        &self,
914    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
915        self.collections.iter().map(|(version, gid)| {
916            let desc = self
917                .desc
918                .at_version(RelationVersionSelector::Specific(*version));
919            (*gid, *version, desc)
920        })
921    }
922
923    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
924    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
925        let (version, _gid) = self
926            .collections
927            .iter()
928            .find(|(_version, gid)| *gid == id)
929            .expect("GlobalId to exist");
930        self.desc
931            .at_version(RelationVersionSelector::Specific(*version))
932    }
933}
934
935#[derive(Clone, Debug, Serialize)]
936pub enum TableDataSource {
937    /// The table owns data created via INSERT/UPDATE/DELETE statements.
938    TableWrites {
939        #[serde(skip)]
940        defaults: Vec<Expr<Aug>>,
941    },
942
943    /// The table receives its data from the identified `DataSourceDesc`.
944    /// This table type does not support INSERT/UPDATE/DELETE statements.
945    DataSource {
946        desc: DataSourceDesc,
947        timeline: Timeline,
948    },
949}
950
951#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
952pub enum DataSourceDesc {
953    /// Receives data from an external system
954    Ingestion {
955        desc: SourceDesc<ReferencedConnection>,
956        cluster_id: ClusterId,
957    },
958    /// Receives data from an external system
959    OldSyntaxIngestion {
960        desc: SourceDesc<ReferencedConnection>,
961        cluster_id: ClusterId,
962        // If we're dealing with an old syntax ingestion the progress id will be some other collection
963        // and the ingestion itself will have the data from an external reference
964        progress_subsource: CatalogItemId,
965        data_config: SourceExportDataConfig<ReferencedConnection>,
966        details: SourceExportDetails,
967    },
968    /// This source receives its data from the identified ingestion,
969    /// specifically the output identified by `external_reference`.
970    /// N.B. that `external_reference` should not be used to identify
971    /// anything downstream of purification, as the purification process
972    /// encodes source-specific identifiers into the `details` struct.
973    /// The `external_reference` field is only used here for displaying
974    /// human-readable names in system tables.
975    IngestionExport {
976        ingestion_id: CatalogItemId,
977        external_reference: UnresolvedItemName,
978        details: SourceExportDetails,
979        data_config: SourceExportDataConfig<ReferencedConnection>,
980    },
981    /// Receives introspection data from an internal system
982    Introspection(IntrospectionType),
983    /// Receives data from the source's reclocking/remapping operations.
984    Progress,
985    /// Receives data from HTTP requests.
986    Webhook {
987        /// Optional components used to validation a webhook request.
988        validate_using: Option<WebhookValidation>,
989        /// Describes how we deserialize the body of a webhook request.
990        body_format: WebhookBodyFormat,
991        /// Describes whether or not to include headers and how to map them.
992        headers: WebhookHeaders,
993        /// The cluster which this source is associated with.
994        cluster_id: ClusterId,
995    },
996    /// Exposes the contents of the catalog shard.
997    Catalog,
998}
999
1000impl From<IntrospectionType> for DataSourceDesc {
1001    fn from(typ: IntrospectionType) -> Self {
1002        Self::Introspection(typ)
1003    }
1004}
1005
1006impl DataSourceDesc {
1007    /// The key and value formats of the data source.
1008    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1009        match &self {
1010            DataSourceDesc::Ingestion { .. } => (None, None),
1011            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1012                match &data_config.encoding.as_ref() {
1013                    Some(encoding) => match &encoding.key {
1014                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1015                        None => (None, Some(encoding.value.type_())),
1016                    },
1017                    None => (None, None),
1018                }
1019            }
1020            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1021                Some(encoding) => match &encoding.key {
1022                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1023                    None => (None, Some(encoding.value.type_())),
1024                },
1025                None => (None, None),
1026            },
1027            DataSourceDesc::Introspection(_)
1028            | DataSourceDesc::Webhook { .. }
1029            | DataSourceDesc::Progress
1030            | DataSourceDesc::Catalog => (None, None),
1031        }
1032    }
1033
1034    /// Envelope of the data source.
1035    pub fn envelope(&self) -> Option<&str> {
1036        // Note how "none"/"append-only" is different from `None`. Source
1037        // sources don't have an envelope (internal logs, for example), while
1038        // other sources have an envelope that we call the "NONE"-envelope.
1039
1040        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1041            match envelope {
1042                SourceEnvelope::None(_) => "none",
1043                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1044                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1045                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1046                        // NOTE(aljoscha): Should we somehow mark that this is
1047                        // using upsert internally? See note above about
1048                        // DEBEZIUM.
1049                        "debezium"
1050                    }
1051                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1052                        "upsert-value-err-inline"
1053                    }
1054                },
1055                SourceEnvelope::CdcV2 => {
1056                    // TODO(aljoscha): Should we even report this? It's
1057                    // currently not exposed.
1058                    "materialize"
1059                }
1060            }
1061        }
1062
1063        match self {
1064            // NOTE(aljoscha): We could move the block for ingestions into
1065            // `SourceEnvelope` itself, but that one feels more like an internal
1066            // thing and adapter should own how we represent envelopes as a
1067            // string? It would not be hard to convince me otherwise, though.
1068            DataSourceDesc::Ingestion { .. } => None,
1069            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1070                Some(envelope_string(&data_config.envelope))
1071            }
1072            DataSourceDesc::IngestionExport { data_config, .. } => {
1073                Some(envelope_string(&data_config.envelope))
1074            }
1075            DataSourceDesc::Introspection(_)
1076            | DataSourceDesc::Webhook { .. }
1077            | DataSourceDesc::Progress
1078            | DataSourceDesc::Catalog => None,
1079        }
1080    }
1081}
1082
1083#[derive(Debug, Clone, Serialize)]
1084pub struct Source {
1085    /// Parse-able SQL that defines this table.
1086    pub create_sql: Option<String>,
1087    /// [`GlobalId`] used to reference this source from outside the catalog.
1088    pub global_id: GlobalId,
1089    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1090    #[serde(skip)]
1091    pub data_source: DataSourceDesc,
1092    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1093    pub desc: RelationDesc,
1094    /// The timeline this source exists on.
1095    pub timeline: Timeline,
1096    /// Other catalog objects referenced by this table, e.g. custom types.
1097    pub resolved_ids: ResolvedIds,
1098    /// This value is ignored for subsources, i.e. for
1099    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1100    /// sources logical compaction window.
1101    pub custom_logical_compaction_window: Option<CompactionWindow>,
1102    /// Whether the source's logical compaction window is controlled by
1103    /// METRICS_RETENTION
1104    pub is_retained_metrics_object: bool,
1105}
1106
1107impl Source {
1108    /// Creates a new `Source`.
1109    ///
1110    /// # Panics
1111    /// - If an ingestion-based plan is not given a cluster_id.
1112    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1113    /// - If a non-ingestion-based source is given a cluster_id.
1114    pub fn new(
1115        plan: CreateSourcePlan,
1116        global_id: GlobalId,
1117        resolved_ids: ResolvedIds,
1118        custom_logical_compaction_window: Option<CompactionWindow>,
1119        is_retained_metrics_object: bool,
1120    ) -> Source {
1121        Source {
1122            create_sql: Some(plan.source.create_sql),
1123            data_source: match plan.source.data_source {
1124                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1125                    desc,
1126                    cluster_id: plan
1127                        .in_cluster
1128                        .expect("ingestion-based sources must be given a cluster ID"),
1129                },
1130                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1131                    desc,
1132                    progress_subsource,
1133                    data_config,
1134                    details,
1135                } => DataSourceDesc::OldSyntaxIngestion {
1136                    desc,
1137                    cluster_id: plan
1138                        .in_cluster
1139                        .expect("ingestion-based sources must be given a cluster ID"),
1140                    progress_subsource,
1141                    data_config,
1142                    details,
1143                },
1144                mz_sql::plan::DataSourceDesc::Progress => {
1145                    assert!(
1146                        plan.in_cluster.is_none(),
1147                        "subsources must not have a host config or cluster_id defined"
1148                    );
1149                    DataSourceDesc::Progress
1150                }
1151                mz_sql::plan::DataSourceDesc::IngestionExport {
1152                    ingestion_id,
1153                    external_reference,
1154                    details,
1155                    data_config,
1156                } => {
1157                    assert!(
1158                        plan.in_cluster.is_none(),
1159                        "subsources must not have a host config or cluster_id defined"
1160                    );
1161                    DataSourceDesc::IngestionExport {
1162                        ingestion_id,
1163                        external_reference,
1164                        details,
1165                        data_config,
1166                    }
1167                }
1168                mz_sql::plan::DataSourceDesc::Webhook {
1169                    validate_using,
1170                    body_format,
1171                    headers,
1172                    cluster_id,
1173                } => {
1174                    mz_ore::soft_assert_or_log!(
1175                        cluster_id.is_none(),
1176                        "cluster_id set at Source level for Webhooks"
1177                    );
1178                    DataSourceDesc::Webhook {
1179                        validate_using,
1180                        body_format,
1181                        headers,
1182                        cluster_id: plan
1183                            .in_cluster
1184                            .expect("webhook sources must be given a cluster ID"),
1185                    }
1186                }
1187            },
1188            desc: plan.source.desc,
1189            global_id,
1190            timeline: plan.timeline,
1191            resolved_ids,
1192            custom_logical_compaction_window: plan
1193                .source
1194                .compaction_window
1195                .or(custom_logical_compaction_window),
1196            is_retained_metrics_object,
1197        }
1198    }
1199
1200    /// Type of the source.
1201    pub fn source_type(&self) -> &str {
1202        match &self.data_source {
1203            DataSourceDesc::Ingestion { desc, .. }
1204            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1205            DataSourceDesc::Progress => "progress",
1206            DataSourceDesc::IngestionExport { .. } => "subsource",
1207            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1208            DataSourceDesc::Webhook { .. } => "webhook",
1209        }
1210    }
1211
1212    /// Connection ID of the source, if one exists.
1213    pub fn connection_id(&self) -> Option<CatalogItemId> {
1214        match &self.data_source {
1215            DataSourceDesc::Ingestion { desc, .. }
1216            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1217            DataSourceDesc::IngestionExport { .. }
1218            | DataSourceDesc::Introspection(_)
1219            | DataSourceDesc::Webhook { .. }
1220            | DataSourceDesc::Progress
1221            | DataSourceDesc::Catalog => None,
1222        }
1223    }
1224
1225    /// The single [`GlobalId`] that refers to this Source.
1226    pub fn global_id(&self) -> GlobalId {
1227        self.global_id
1228    }
1229
1230    /// The expensive resource that each source consumes is persist shards. To
1231    /// prevent abuse, we want to prevent users from creating sources that use an
1232    /// unbounded number of persist shards. But we also don't want to count
1233    /// persist shards that are mandated by the system (e.g., the progress
1234    /// shard) so that future versions of Materialize can introduce additional
1235    /// per-source shards (e.g., a per-source status shard) without impacting
1236    /// the limit calculation.
1237    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1238        match &self.data_source {
1239            DataSourceDesc::Ingestion { .. } => 0,
1240            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1241                match &desc.connection {
1242                    // These multi-output sources do not use their primary
1243                    // source's data shard, so we don't include it in accounting
1244                    // for users.
1245                    GenericSourceConnection::Postgres(_)
1246                    | GenericSourceConnection::MySql(_)
1247                    | GenericSourceConnection::SqlServer(_) => 0,
1248                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1249                        // Load generators that output data in their primary shard
1250                        LoadGenerator::Clock
1251                        | LoadGenerator::Counter { .. }
1252                        | LoadGenerator::Datums
1253                        | LoadGenerator::KeyValue(_) => 1,
1254                        LoadGenerator::Auction
1255                        | LoadGenerator::Marketing
1256                        | LoadGenerator::Tpch { .. } => 0,
1257                    },
1258                    GenericSourceConnection::Kafka(_) => 1,
1259                }
1260            }
1261            //  DataSourceDesc::IngestionExport represents a subsource, which
1262            //  use a data shard.
1263            DataSourceDesc::IngestionExport { .. } => 1,
1264            DataSourceDesc::Webhook { .. } => 1,
1265            // Introspection, catalog, and progress subsources are not under the user's control, so
1266            // shouldn't count toward their quota.
1267            DataSourceDesc::Introspection(_)
1268            | DataSourceDesc::Progress
1269            | DataSourceDesc::Catalog => 0,
1270        }
1271    }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Log {
1276    /// The category of data this log stores.
1277    pub variant: LogVariant,
1278    /// [`GlobalId`] used to reference this log from outside the catalog.
1279    pub global_id: GlobalId,
1280}
1281
1282impl Log {
1283    /// The single [`GlobalId`] that refers to this Log.
1284    pub fn global_id(&self) -> GlobalId {
1285        self.global_id
1286    }
1287}
1288
1289#[derive(Debug, Clone, Serialize)]
1290pub struct Sink {
1291    /// Parse-able SQL that defines this sink.
1292    pub create_sql: String,
1293    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1294    pub global_id: GlobalId,
1295    /// Collection we read into this sink.
1296    pub from: GlobalId,
1297    /// Connection to the external service we're sinking into, e.g. Kafka.
1298    pub connection: StorageSinkConnection<ReferencedConnection>,
1299    /// Envelope we use to sink into the external system.
1300    ///
1301    /// TODO(guswynn): this probably should just be in the `connection`.
1302    pub envelope: SinkEnvelope,
1303    /// Emit an initial snapshot into the sink.
1304    pub with_snapshot: bool,
1305    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1306    pub version: u64,
1307    /// Other catalog objects this sink references.
1308    pub resolved_ids: ResolvedIds,
1309    /// Cluster this sink runs on.
1310    pub cluster_id: ClusterId,
1311    /// Commit interval for the sink.
1312    pub commit_interval: Option<Duration>,
1313}
1314
1315impl Sink {
1316    pub fn sink_type(&self) -> &str {
1317        self.connection.name()
1318    }
1319
1320    /// Envelope of the sink.
1321    pub fn envelope(&self) -> Option<&str> {
1322        match &self.envelope {
1323            SinkEnvelope::Debezium => Some("debezium"),
1324            SinkEnvelope::Upsert => Some("upsert"),
1325            SinkEnvelope::Append => Some("append"),
1326        }
1327    }
1328
1329    /// Output a combined format string of the sink. For legacy reasons
1330    /// if the key-format is none or the key & value formats are
1331    /// both the same (either avro or json), we return the value format name,
1332    /// otherwise we return a composite name.
1333    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1334        match &self.connection {
1335            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1336            StorageSinkConnection::Iceberg(_) => None,
1337        }
1338    }
1339
1340    /// Output distinct key_format and value_format of the sink.
1341    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1342        match &self.connection {
1343            StorageSinkConnection::Kafka(connection) => {
1344                let key_format = connection
1345                    .format
1346                    .key_format
1347                    .as_ref()
1348                    .map(|f| f.get_format_name());
1349                let value_format = connection.format.value_format.get_format_name();
1350                Some((key_format, value_format))
1351            }
1352            StorageSinkConnection::Iceberg(_) => None,
1353        }
1354    }
1355
1356    pub fn connection_id(&self) -> Option<CatalogItemId> {
1357        self.connection.connection_id()
1358    }
1359
1360    /// The single [`GlobalId`] that this Sink can be referenced by.
1361    pub fn global_id(&self) -> GlobalId {
1362        self.global_id
1363    }
1364}
1365
1366#[derive(Debug, Clone, Serialize)]
1367pub struct View {
1368    /// Parse-able SQL that defines this view.
1369    pub create_sql: String,
1370    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1371    pub global_id: GlobalId,
1372    /// Unoptimized high-level expression from parsing the `create_sql`.
1373    pub raw_expr: Arc<HirRelationExpr>,
1374    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1375    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1376    /// Columns of this view.
1377    pub desc: RelationDesc,
1378    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1379    pub conn_id: Option<ConnectionId>,
1380    /// Other catalog objects that are referenced by this view, determined at name resolution.
1381    pub resolved_ids: ResolvedIds,
1382    /// All of the catalog objects that are referenced by this view.
1383    pub dependencies: DependencyIds,
1384}
1385
1386impl View {
1387    /// The single [`GlobalId`] this [`View`] can be referenced by.
1388    pub fn global_id(&self) -> GlobalId {
1389        self.global_id
1390    }
1391}
1392
1393#[derive(Debug, Clone, Serialize)]
1394pub struct MaterializedView {
1395    /// Parse-able SQL that defines this materialized view.
1396    pub create_sql: String,
1397    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1398    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1399    pub collections: BTreeMap<RelationVersion, GlobalId>,
1400    /// Raw high-level expression from planning, derived from the `create_sql`.
1401    pub raw_expr: Arc<HirRelationExpr>,
1402    /// Optimized mid-level expression, derived from the `raw_expr`.
1403    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1404    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1405    pub desc: VersionedRelationDesc,
1406    /// Other catalog items that this materialized view references, determined at name resolution.
1407    pub resolved_ids: ResolvedIds,
1408    /// All of the catalog objects that are referenced by this view.
1409    pub dependencies: DependencyIds,
1410    /// ID of the materialized view this materialized view is intended to replace.
1411    pub replacement_target: Option<CatalogItemId>,
1412    /// Cluster that this materialized view runs on.
1413    pub cluster_id: ClusterId,
1414    /// If set, only install this materialized view's dataflow on the specified replica.
1415    pub target_replica: Option<ReplicaId>,
1416    /// Column indexes that we assert are not `NULL`.
1417    ///
1418    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1419    pub non_null_assertions: Vec<usize>,
1420    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1421    pub custom_logical_compaction_window: Option<CompactionWindow>,
1422    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1423    pub refresh_schedule: Option<RefreshSchedule>,
1424    /// The initial `as_of` of the storage collection associated with the materialized view.
1425    ///
1426    /// Note: This doesn't change upon restarts.
1427    /// (The dataflow's initial `as_of` can be different.)
1428    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1429}
1430
1431impl MaterializedView {
1432    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1433    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1434        self.collections.values().copied()
1435    }
1436
1437    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1438    /// version.
1439    pub fn global_id_writes(&self) -> GlobalId {
1440        *self
1441            .collections
1442            .last_key_value()
1443            .expect("at least one version of a materialized view")
1444            .1
1445    }
1446
1447    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1448    pub fn collection_descs(
1449        &self,
1450    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1451        self.collections.iter().map(|(version, gid)| {
1452            let desc = self
1453                .desc
1454                .at_version(RelationVersionSelector::Specific(*version));
1455            (*gid, *version, desc)
1456        })
1457    }
1458
1459    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1460    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1461        let (version, _gid) = self
1462            .collections
1463            .iter()
1464            .find(|(_version, gid)| *gid == id)
1465            .expect("GlobalId to exist");
1466        self.desc
1467            .at_version(RelationVersionSelector::Specific(*version))
1468    }
1469
1470    /// Apply the given replacement materialized view to this [`MaterializedView`].
1471    pub fn apply_replacement(&mut self, replacement: Self) {
1472        let target_id = replacement
1473            .replacement_target
1474            .expect("replacement has target");
1475
1476        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1477            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1478                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1479            });
1480            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1481                cmvs
1482            } else {
1483                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1484            }
1485        }
1486
1487        let old_stmt = parse(&self.create_sql);
1488        let rpl_stmt = parse(&replacement.create_sql);
1489        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1490            if_exists: old_stmt.if_exists,
1491            name: old_stmt.name,
1492            columns: rpl_stmt.columns,
1493            replacement_for: None,
1494            in_cluster: rpl_stmt.in_cluster,
1495            in_cluster_replica: rpl_stmt.in_cluster_replica,
1496            query: rpl_stmt.query,
1497            as_of: rpl_stmt.as_of,
1498            with_options: rpl_stmt.with_options,
1499        };
1500        let create_sql = new_stmt.to_ast_string_stable();
1501
1502        let mut collections = std::mem::take(&mut self.collections);
1503        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1504        // necessary evolve the relation schema, so that version might be lower than the actual
1505        // latest version.
1506        let latest_version = collections.keys().max().expect("at least one version");
1507        let new_version = latest_version.bump();
1508        collections.insert(new_version, replacement.global_id_writes());
1509
1510        let mut resolved_ids = replacement.resolved_ids;
1511        resolved_ids.remove_item(&target_id);
1512        let mut dependencies = replacement.dependencies;
1513        dependencies.0.remove(&target_id);
1514
1515        *self = Self {
1516            create_sql,
1517            collections,
1518            raw_expr: replacement.raw_expr,
1519            optimized_expr: replacement.optimized_expr,
1520            desc: replacement.desc,
1521            resolved_ids,
1522            dependencies,
1523            replacement_target: None,
1524            cluster_id: replacement.cluster_id,
1525            target_replica: replacement.target_replica,
1526            non_null_assertions: replacement.non_null_assertions,
1527            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1528            refresh_schedule: replacement.refresh_schedule,
1529            initial_as_of: replacement.initial_as_of,
1530        };
1531    }
1532}
1533
1534#[derive(Debug, Clone, Serialize)]
1535pub struct Index {
1536    /// Parse-able SQL that defines this table.
1537    pub create_sql: String,
1538    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1539    pub global_id: GlobalId,
1540    /// The [`GlobalId`] this Index is on.
1541    pub on: GlobalId,
1542    /// Keys of the index.
1543    pub keys: Arc<[MirScalarExpr]>,
1544    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1545    pub conn_id: Option<ConnectionId>,
1546    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1547    pub resolved_ids: ResolvedIds,
1548    /// Cluster this index is installed on.
1549    pub cluster_id: ClusterId,
1550    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1551    pub custom_logical_compaction_window: Option<CompactionWindow>,
1552    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1553    /// session variable.
1554    ///
1555    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1556    pub is_retained_metrics_object: bool,
1557}
1558
1559impl Index {
1560    /// The [`GlobalId`] that refers to this Index.
1561    pub fn global_id(&self) -> GlobalId {
1562        self.global_id
1563    }
1564}
1565
1566#[derive(Debug, Clone, Serialize)]
1567pub struct Type {
1568    /// Parse-able SQL that defines this type.
1569    pub create_sql: Option<String>,
1570    /// [`GlobalId`] used to reference this type from outside the catalog.
1571    pub global_id: GlobalId,
1572    #[serde(skip)]
1573    pub details: CatalogTypeDetails<IdReference>,
1574    /// Other catalog objects referenced by this type.
1575    pub resolved_ids: ResolvedIds,
1576}
1577
1578#[derive(Debug, Clone, Serialize)]
1579pub struct Func {
1580    /// Static definition of the function.
1581    #[serde(skip)]
1582    pub inner: &'static mz_sql::func::Func,
1583    /// [`GlobalId`] used to reference this function from outside the catalog.
1584    pub global_id: GlobalId,
1585}
1586
1587#[derive(Debug, Clone, Serialize)]
1588pub struct Secret {
1589    /// Parse-able SQL that defines this secret.
1590    pub create_sql: String,
1591    /// [`GlobalId`] used to reference this secret from outside the catalog.
1592    pub global_id: GlobalId,
1593}
1594
1595#[derive(Debug, Clone, Serialize)]
1596pub struct Connection {
1597    /// Parse-able SQL that defines this connection.
1598    pub create_sql: String,
1599    /// [`GlobalId`] used to reference this connection from the storage layer.
1600    pub global_id: GlobalId,
1601    /// The kind of connection.
1602    pub details: ConnectionDetails,
1603    /// Other objects this connection depends on.
1604    pub resolved_ids: ResolvedIds,
1605}
1606
1607impl Connection {
1608    /// The single [`GlobalId`] used to reference this connection.
1609    pub fn global_id(&self) -> GlobalId {
1610        self.global_id
1611    }
1612}
1613
1614#[derive(Debug, Clone, Serialize)]
1615pub struct ContinualTask {
1616    /// Parse-able SQL that defines this continual task.
1617    pub create_sql: String,
1618    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1619    pub global_id: GlobalId,
1620    /// [`GlobalId`] of the collection that we read into this continual task.
1621    pub input_id: GlobalId,
1622    pub with_snapshot: bool,
1623    /// ContinualTasks are self-referential. We make this work by using a
1624    /// placeholder `LocalId` for the CT itself through name resolution and
1625    /// planning. Then we fill in the real `GlobalId` before constructing this
1626    /// catalog item.
1627    pub raw_expr: Arc<HirRelationExpr>,
1628    /// Columns for this continual task.
1629    pub desc: RelationDesc,
1630    /// Other catalog items that this continual task references, determined at name resolution.
1631    pub resolved_ids: ResolvedIds,
1632    /// All of the catalog objects that are referenced by this continual task.
1633    pub dependencies: DependencyIds,
1634    /// Cluster that this continual task runs on.
1635    pub cluster_id: ClusterId,
1636    /// See the comment on [MaterializedView::initial_as_of].
1637    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1638}
1639
1640impl ContinualTask {
1641    /// The single [`GlobalId`] used to reference this continual task.
1642    pub fn global_id(&self) -> GlobalId {
1643        self.global_id
1644    }
1645}
1646
1647#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1648pub struct NetworkPolicy {
1649    pub name: String,
1650    pub id: NetworkPolicyId,
1651    pub oid: u32,
1652    pub rules: Vec<NetworkPolicyRule>,
1653    pub owner_id: RoleId,
1654    pub privileges: PrivilegeMap,
1655}
1656
1657impl From<NetworkPolicy> for durable::NetworkPolicy {
1658    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1659        durable::NetworkPolicy {
1660            id: policy.id,
1661            oid: policy.oid,
1662            name: policy.name,
1663            rules: policy.rules,
1664            owner_id: policy.owner_id,
1665            privileges: policy.privileges.into_all_values().collect(),
1666        }
1667    }
1668}
1669
1670impl From<durable::NetworkPolicy> for NetworkPolicy {
1671    fn from(
1672        durable::NetworkPolicy {
1673            id,
1674            oid,
1675            name,
1676            rules,
1677            owner_id,
1678            privileges,
1679        }: durable::NetworkPolicy,
1680    ) -> Self {
1681        NetworkPolicy {
1682            id,
1683            oid,
1684            name,
1685            rules,
1686            owner_id,
1687            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1688        }
1689    }
1690}
1691
1692impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1693    fn update_from(
1694        &mut self,
1695        durable::NetworkPolicy {
1696            id,
1697            oid,
1698            name,
1699            rules,
1700            owner_id,
1701            privileges,
1702        }: durable::NetworkPolicy,
1703    ) {
1704        self.id = id;
1705        self.oid = oid;
1706        self.name = name;
1707        self.rules = rules;
1708        self.owner_id = owner_id;
1709        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1710    }
1711}
1712
1713impl CatalogItem {
1714    /// Returns a string indicating the type of this catalog entry.
1715    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1716        match self {
1717            CatalogItem::Table(_) => CatalogItemType::Table,
1718            CatalogItem::Source(_) => CatalogItemType::Source,
1719            CatalogItem::Log(_) => CatalogItemType::Source,
1720            CatalogItem::Sink(_) => CatalogItemType::Sink,
1721            CatalogItem::View(_) => CatalogItemType::View,
1722            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1723            CatalogItem::Index(_) => CatalogItemType::Index,
1724            CatalogItem::Type(_) => CatalogItemType::Type,
1725            CatalogItem::Func(_) => CatalogItemType::Func,
1726            CatalogItem::Secret(_) => CatalogItemType::Secret,
1727            CatalogItem::Connection(_) => CatalogItemType::Connection,
1728            CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1729        }
1730    }
1731
1732    /// Returns the [`GlobalId`]s that reference this item, if any.
1733    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1734        let gid = match self {
1735            CatalogItem::Source(source) => source.global_id,
1736            CatalogItem::Log(log) => log.global_id,
1737            CatalogItem::Sink(sink) => sink.global_id,
1738            CatalogItem::View(view) => view.global_id,
1739            CatalogItem::MaterializedView(mv) => {
1740                return itertools::Either::Left(mv.collections.values().copied());
1741            }
1742            CatalogItem::ContinualTask(ct) => ct.global_id,
1743            CatalogItem::Index(index) => index.global_id,
1744            CatalogItem::Func(func) => func.global_id,
1745            CatalogItem::Type(ty) => ty.global_id,
1746            CatalogItem::Secret(secret) => secret.global_id,
1747            CatalogItem::Connection(conn) => conn.global_id,
1748            CatalogItem::Table(table) => {
1749                return itertools::Either::Left(table.collections.values().copied());
1750            }
1751        };
1752        itertools::Either::Right(std::iter::once(gid))
1753    }
1754
1755    /// Returns the most up-to-date [`GlobalId`] for this item.
1756    ///
1757    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1758    pub fn latest_global_id(&self) -> GlobalId {
1759        match self {
1760            CatalogItem::Source(source) => source.global_id,
1761            CatalogItem::Log(log) => log.global_id,
1762            CatalogItem::Sink(sink) => sink.global_id,
1763            CatalogItem::View(view) => view.global_id,
1764            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1765            CatalogItem::ContinualTask(ct) => ct.global_id,
1766            CatalogItem::Index(index) => index.global_id,
1767            CatalogItem::Func(func) => func.global_id,
1768            CatalogItem::Type(ty) => ty.global_id,
1769            CatalogItem::Secret(secret) => secret.global_id,
1770            CatalogItem::Connection(conn) => conn.global_id,
1771            CatalogItem::Table(table) => table.global_id_writes(),
1772        }
1773    }
1774
1775    /// Whether this item represents a storage collection.
1776    pub fn is_storage_collection(&self) -> bool {
1777        match self {
1778            CatalogItem::Table(_)
1779            | CatalogItem::Source(_)
1780            | CatalogItem::MaterializedView(_)
1781            | CatalogItem::Sink(_)
1782            | CatalogItem::ContinualTask(_) => true,
1783            CatalogItem::Log(_)
1784            | CatalogItem::View(_)
1785            | CatalogItem::Index(_)
1786            | CatalogItem::Type(_)
1787            | CatalogItem::Func(_)
1788            | CatalogItem::Secret(_)
1789            | CatalogItem::Connection(_) => false,
1790        }
1791    }
1792
1793    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1794    /// version.
1795    ///
1796    /// Some item types honor `version` so callers can ask for the schema that
1797    /// matches a specific [`GlobalId`] or historical definition. Other relation
1798    /// types ignore `version` because they have a single shape. Non-relational
1799    /// items ( for example functions, indexes, sinks, secrets, and connections)
1800    /// return `None`.
1801    pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1802        match &self {
1803            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1804            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1805            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1806            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1807            CatalogItem::MaterializedView(mview) => {
1808                Some(Cow::Owned(mview.desc.at_version(version)))
1809            }
1810            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1811            CatalogItem::Func(_)
1812            | CatalogItem::Index(_)
1813            | CatalogItem::Sink(_)
1814            | CatalogItem::Secret(_)
1815            | CatalogItem::Connection(_)
1816            | CatalogItem::Type(_) => None,
1817        }
1818    }
1819
1820    pub fn func(
1821        &self,
1822        entry: &CatalogEntry,
1823    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1824        match &self {
1825            CatalogItem::Func(func) => Ok(func.inner),
1826            _ => Err(SqlCatalogError::UnexpectedType {
1827                name: entry.name().item.to_string(),
1828                actual_type: entry.item_type(),
1829                expected_type: CatalogItemType::Func,
1830            }),
1831        }
1832    }
1833
1834    pub fn source_desc(
1835        &self,
1836        entry: &CatalogEntry,
1837    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1838        match &self {
1839            CatalogItem::Source(source) => match &source.data_source {
1840                DataSourceDesc::Ingestion { desc, .. }
1841                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1842                DataSourceDesc::IngestionExport { .. }
1843                | DataSourceDesc::Introspection(_)
1844                | DataSourceDesc::Webhook { .. }
1845                | DataSourceDesc::Progress
1846                | DataSourceDesc::Catalog => Ok(None),
1847            },
1848            _ => Err(SqlCatalogError::UnexpectedType {
1849                name: entry.name().item.to_string(),
1850                actual_type: entry.item_type(),
1851                expected_type: CatalogItemType::Source,
1852            }),
1853        }
1854    }
1855
1856    /// Reports whether this catalog entry is a progress source.
1857    pub fn is_progress_source(&self) -> bool {
1858        matches!(
1859            self,
1860            CatalogItem::Source(Source {
1861                data_source: DataSourceDesc::Progress,
1862                ..
1863            })
1864        )
1865    }
1866
1867    /// Collects the identifiers of the objects that were encountered when resolving names in the
1868    /// item's DDL statement.
1869    pub fn references(&self) -> &ResolvedIds {
1870        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1871        match self {
1872            CatalogItem::Func(_) => &*EMPTY,
1873            CatalogItem::Index(idx) => &idx.resolved_ids,
1874            CatalogItem::Sink(sink) => &sink.resolved_ids,
1875            CatalogItem::Source(source) => &source.resolved_ids,
1876            CatalogItem::Log(_) => &*EMPTY,
1877            CatalogItem::Table(table) => &table.resolved_ids,
1878            CatalogItem::Type(typ) => &typ.resolved_ids,
1879            CatalogItem::View(view) => &view.resolved_ids,
1880            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1881            CatalogItem::Secret(_) => &*EMPTY,
1882            CatalogItem::Connection(connection) => &connection.resolved_ids,
1883            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1884        }
1885    }
1886
1887    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1888    ///
1889    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1890    /// referenced. For example this will include any catalog objects used to implement functions
1891    /// and casts in the item.
1892    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1893        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1894        match self {
1895            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1896            // their implementation. However, currently there's no way to get that information.
1897            CatalogItem::Func(_) => {}
1898            CatalogItem::Index(_) => {}
1899            CatalogItem::Sink(_) => {}
1900            CatalogItem::Source(_) => {}
1901            CatalogItem::Log(_) => {}
1902            CatalogItem::Table(_) => {}
1903            CatalogItem::Type(_) => {}
1904            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1905            CatalogItem::MaterializedView(mview) => {
1906                uses.extend(mview.dependencies.0.iter().copied())
1907            }
1908            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1909            CatalogItem::Secret(_) => {}
1910            CatalogItem::Connection(_) => {}
1911        }
1912        uses
1913    }
1914
1915    /// Returns the connection ID that this item belongs to, if this item is
1916    /// temporary.
1917    pub fn conn_id(&self) -> Option<&ConnectionId> {
1918        match self {
1919            CatalogItem::View(view) => view.conn_id.as_ref(),
1920            CatalogItem::Index(index) => index.conn_id.as_ref(),
1921            CatalogItem::Table(table) => table.conn_id.as_ref(),
1922            CatalogItem::Log(_)
1923            | CatalogItem::Source(_)
1924            | CatalogItem::Sink(_)
1925            | CatalogItem::MaterializedView(_)
1926            | CatalogItem::Secret(_)
1927            | CatalogItem::Type(_)
1928            | CatalogItem::Func(_)
1929            | CatalogItem::Connection(_)
1930            | CatalogItem::ContinualTask(_) => None,
1931        }
1932    }
1933
1934    /// Sets the connection ID that this item belongs to, which makes it a
1935    /// temporary item.
1936    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1937        match self {
1938            CatalogItem::View(view) => view.conn_id = conn_id,
1939            CatalogItem::Index(index) => index.conn_id = conn_id,
1940            CatalogItem::Table(table) => table.conn_id = conn_id,
1941            CatalogItem::Log(_)
1942            | CatalogItem::Source(_)
1943            | CatalogItem::Sink(_)
1944            | CatalogItem::MaterializedView(_)
1945            | CatalogItem::Secret(_)
1946            | CatalogItem::Type(_)
1947            | CatalogItem::Func(_)
1948            | CatalogItem::Connection(_)
1949            | CatalogItem::ContinualTask(_) => (),
1950        }
1951    }
1952
1953    /// Indicates whether this item is temporary or not.
1954    pub fn is_temporary(&self) -> bool {
1955        self.conn_id().is_some()
1956    }
1957
1958    pub fn rename_schema_refs(
1959        &self,
1960        database_name: &str,
1961        cur_schema_name: &str,
1962        new_schema_name: &str,
1963    ) -> Result<CatalogItem, (String, String)> {
1964        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1965            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1966                .expect("invalid create sql persisted to catalog")
1967                .into_element()
1968                .ast;
1969
1970            // Rename all references to cur_schema_name.
1971            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1972                &mut create_stmt,
1973                database_name,
1974                cur_schema_name,
1975                new_schema_name,
1976            )?;
1977
1978            Ok(create_stmt.to_ast_string_stable())
1979        };
1980
1981        match self {
1982            CatalogItem::Table(i) => {
1983                let mut i = i.clone();
1984                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1985                Ok(CatalogItem::Table(i))
1986            }
1987            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1988            CatalogItem::Source(i) => {
1989                let mut i = i.clone();
1990                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1991                Ok(CatalogItem::Source(i))
1992            }
1993            CatalogItem::Sink(i) => {
1994                let mut i = i.clone();
1995                i.create_sql = do_rewrite(i.create_sql)?;
1996                Ok(CatalogItem::Sink(i))
1997            }
1998            CatalogItem::View(i) => {
1999                let mut i = i.clone();
2000                i.create_sql = do_rewrite(i.create_sql)?;
2001                Ok(CatalogItem::View(i))
2002            }
2003            CatalogItem::MaterializedView(i) => {
2004                let mut i = i.clone();
2005                i.create_sql = do_rewrite(i.create_sql)?;
2006                Ok(CatalogItem::MaterializedView(i))
2007            }
2008            CatalogItem::Index(i) => {
2009                let mut i = i.clone();
2010                i.create_sql = do_rewrite(i.create_sql)?;
2011                Ok(CatalogItem::Index(i))
2012            }
2013            CatalogItem::Secret(i) => {
2014                let mut i = i.clone();
2015                i.create_sql = do_rewrite(i.create_sql)?;
2016                Ok(CatalogItem::Secret(i))
2017            }
2018            CatalogItem::Connection(i) => {
2019                let mut i = i.clone();
2020                i.create_sql = do_rewrite(i.create_sql)?;
2021                Ok(CatalogItem::Connection(i))
2022            }
2023            CatalogItem::Type(i) => {
2024                let mut i = i.clone();
2025                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2026                Ok(CatalogItem::Type(i))
2027            }
2028            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2029            CatalogItem::ContinualTask(i) => {
2030                let mut i = i.clone();
2031                i.create_sql = do_rewrite(i.create_sql)?;
2032                Ok(CatalogItem::ContinualTask(i))
2033            }
2034        }
2035    }
2036
2037    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2038    /// (with the option of including the item's own name) or errors if request
2039    /// is ambiguous.
2040    pub fn rename_item_refs(
2041        &self,
2042        from: FullItemName,
2043        to_item_name: String,
2044        rename_self: bool,
2045    ) -> Result<CatalogItem, String> {
2046        let do_rewrite = |create_sql: String| -> Result<String, String> {
2047            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2048                .expect("invalid create sql persisted to catalog")
2049                .into_element()
2050                .ast;
2051            if rename_self {
2052                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2053            }
2054            // Determination of what constitutes an ambiguous request is done here.
2055            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2056            Ok(create_stmt.to_ast_string_stable())
2057        };
2058
2059        match self {
2060            CatalogItem::Table(i) => {
2061                let mut i = i.clone();
2062                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2063                Ok(CatalogItem::Table(i))
2064            }
2065            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2066            CatalogItem::Source(i) => {
2067                let mut i = i.clone();
2068                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2069                Ok(CatalogItem::Source(i))
2070            }
2071            CatalogItem::Sink(i) => {
2072                let mut i = i.clone();
2073                i.create_sql = do_rewrite(i.create_sql)?;
2074                Ok(CatalogItem::Sink(i))
2075            }
2076            CatalogItem::View(i) => {
2077                let mut i = i.clone();
2078                i.create_sql = do_rewrite(i.create_sql)?;
2079                Ok(CatalogItem::View(i))
2080            }
2081            CatalogItem::MaterializedView(i) => {
2082                let mut i = i.clone();
2083                i.create_sql = do_rewrite(i.create_sql)?;
2084                Ok(CatalogItem::MaterializedView(i))
2085            }
2086            CatalogItem::Index(i) => {
2087                let mut i = i.clone();
2088                i.create_sql = do_rewrite(i.create_sql)?;
2089                Ok(CatalogItem::Index(i))
2090            }
2091            CatalogItem::Secret(i) => {
2092                let mut i = i.clone();
2093                i.create_sql = do_rewrite(i.create_sql)?;
2094                Ok(CatalogItem::Secret(i))
2095            }
2096            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2097                unreachable!("{}s cannot be renamed", self.typ())
2098            }
2099            CatalogItem::Connection(i) => {
2100                let mut i = i.clone();
2101                i.create_sql = do_rewrite(i.create_sql)?;
2102                Ok(CatalogItem::Connection(i))
2103            }
2104            CatalogItem::ContinualTask(i) => {
2105                let mut i = i.clone();
2106                i.create_sql = do_rewrite(i.create_sql)?;
2107                Ok(CatalogItem::ContinualTask(i))
2108            }
2109        }
2110    }
2111
2112    /// Returns a clone of `self` with all instances of `old_id` replaced with `new_id`.
2113    pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2114        let do_rewrite = |create_sql: String| -> String {
2115            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2116                .expect("invalid create sql persisted to catalog")
2117                .into_element()
2118                .ast;
2119            mz_sql::ast::transform::create_stmt_replace_ids(
2120                &mut create_stmt,
2121                &[(old_id, new_id)].into(),
2122            );
2123            create_stmt.to_ast_string_stable()
2124        };
2125
2126        match self {
2127            CatalogItem::Table(i) => {
2128                let mut i = i.clone();
2129                i.create_sql = i.create_sql.map(do_rewrite);
2130                CatalogItem::Table(i)
2131            }
2132            CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2133            CatalogItem::Source(i) => {
2134                let mut i = i.clone();
2135                i.create_sql = i.create_sql.map(do_rewrite);
2136                CatalogItem::Source(i)
2137            }
2138            CatalogItem::Sink(i) => {
2139                let mut i = i.clone();
2140                i.create_sql = do_rewrite(i.create_sql);
2141                CatalogItem::Sink(i)
2142            }
2143            CatalogItem::View(i) => {
2144                let mut i = i.clone();
2145                i.create_sql = do_rewrite(i.create_sql);
2146                CatalogItem::View(i)
2147            }
2148            CatalogItem::MaterializedView(i) => {
2149                let mut i = i.clone();
2150                i.create_sql = do_rewrite(i.create_sql);
2151                CatalogItem::MaterializedView(i)
2152            }
2153            CatalogItem::Index(i) => {
2154                let mut i = i.clone();
2155                i.create_sql = do_rewrite(i.create_sql);
2156                CatalogItem::Index(i)
2157            }
2158            CatalogItem::Secret(i) => {
2159                let mut i = i.clone();
2160                i.create_sql = do_rewrite(i.create_sql);
2161                CatalogItem::Secret(i)
2162            }
2163            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2164                unreachable!("references of {}s cannot be replaced", self.typ())
2165            }
2166            CatalogItem::Connection(i) => {
2167                let mut i = i.clone();
2168                i.create_sql = do_rewrite(i.create_sql);
2169                CatalogItem::Connection(i)
2170            }
2171            CatalogItem::ContinualTask(i) => {
2172                let mut i = i.clone();
2173                i.create_sql = do_rewrite(i.create_sql);
2174                CatalogItem::ContinualTask(i)
2175            }
2176        }
2177    }
2178    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2179    /// an error if this item does not support retain history.
2180    pub fn update_retain_history(
2181        &mut self,
2182        value: Option<Value>,
2183        window: CompactionWindow,
2184    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2185        let update = |mut ast: &mut Statement<Raw>| {
2186            // Each statement type has unique option types. This macro handles them commonly.
2187            macro_rules! update_retain_history {
2188                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2189                    // Replace or add the option.
2190                    let pos = $stmt
2191                        .with_options
2192                        .iter()
2193                        // In case there are ever multiple, look for the last one.
2194                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2195                    if let Some(value) = value {
2196                        let next = mz_sql_parser::ast::$opt {
2197                            name: mz_sql_parser::ast::$name::RetainHistory,
2198                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2199                        };
2200                        if let Some(idx) = pos {
2201                            let previous = $stmt.with_options[idx].clone();
2202                            $stmt.with_options[idx] = next;
2203                            previous.value
2204                        } else {
2205                            $stmt.with_options.push(next);
2206                            None
2207                        }
2208                    } else {
2209                        if let Some(idx) = pos {
2210                            $stmt.with_options.swap_remove(idx).value
2211                        } else {
2212                            None
2213                        }
2214                    }
2215                }};
2216            }
2217            let previous = match &mut ast {
2218                Statement::CreateTable(stmt) => {
2219                    update_retain_history!(stmt, TableOption, TableOptionName)
2220                }
2221                Statement::CreateIndex(stmt) => {
2222                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2223                }
2224                Statement::CreateSource(stmt) => {
2225                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2226                }
2227                Statement::CreateMaterializedView(stmt) => {
2228                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2229                }
2230                _ => {
2231                    return Err(());
2232                }
2233            };
2234            Ok(previous)
2235        };
2236
2237        let res = self.update_sql(update)?;
2238        let cw = self
2239            .custom_logical_compaction_window_mut()
2240            .expect("item must have compaction window");
2241        *cw = Some(window);
2242        Ok(res)
2243    }
2244
2245    /// Updates the timestamp interval for a source. Returns an error if this item is not a source.
2246    pub fn update_timestamp_interval(
2247        &mut self,
2248        value: Option<Value>,
2249        interval: Duration,
2250    ) -> Result<(), ()> {
2251        let update = |ast: &mut Statement<Raw>| {
2252            match ast {
2253                Statement::CreateSource(stmt) => {
2254                    let pos = stmt.with_options.iter().rposition(|o| {
2255                        o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2256                    });
2257                    if let Some(value) = value {
2258                        let next = mz_sql_parser::ast::CreateSourceOption {
2259                            name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2260                            value: Some(WithOptionValue::Value(value)),
2261                        };
2262                        if let Some(idx) = pos {
2263                            stmt.with_options[idx] = next;
2264                        } else {
2265                            stmt.with_options.push(next);
2266                        }
2267                    } else {
2268                        if let Some(idx) = pos {
2269                            stmt.with_options.swap_remove(idx);
2270                        }
2271                    }
2272                }
2273                _ => return Err(()),
2274            };
2275            Ok(())
2276        };
2277
2278        self.update_sql(update)?;
2279
2280        // Update the in-memory SourceDesc timestamp_interval.
2281        match self {
2282            CatalogItem::Source(source) => {
2283                match &mut source.data_source {
2284                    DataSourceDesc::Ingestion { desc, .. }
2285                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2286                        desc.timestamp_interval = interval;
2287                    }
2288                    _ => return Err(()),
2289                }
2290                Ok(())
2291            }
2292            _ => Err(()),
2293        }
2294    }
2295
2296    pub fn add_column(
2297        &mut self,
2298        name: ColumnName,
2299        typ: SqlColumnType,
2300        sql: RawDataType,
2301    ) -> Result<RelationVersion, PlanError> {
2302        let CatalogItem::Table(table) = self else {
2303            return Err(PlanError::Unsupported {
2304                feature: "adding columns to a non-Table".to_string(),
2305                discussion_no: None,
2306            });
2307        };
2308        let next_version = table.desc.add_column(name.clone(), typ);
2309
2310        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2311            Statement::CreateTable(stmt) => {
2312                let version = ColumnOptionDef {
2313                    name: None,
2314                    option: ColumnOption::Versioned {
2315                        action: ColumnVersioned::Added,
2316                        version: next_version.into(),
2317                    },
2318                };
2319                let column = ColumnDef {
2320                    name: name.into(),
2321                    data_type: sql,
2322                    collation: None,
2323                    options: vec![version],
2324                };
2325                stmt.columns.push(column);
2326                Ok(())
2327            }
2328            _ => Err(()),
2329        };
2330
2331        self.update_sql(update)
2332            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2333        Ok(next_version)
2334    }
2335
2336    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2337    /// otherwise returns f's result.
2338    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2339    where
2340        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2341    {
2342        let create_sql = match self {
2343            CatalogItem::Table(Table { create_sql, .. })
2344            | CatalogItem::Type(Type { create_sql, .. })
2345            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2346            CatalogItem::Sink(Sink { create_sql, .. })
2347            | CatalogItem::View(View { create_sql, .. })
2348            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2349            | CatalogItem::Index(Index { create_sql, .. })
2350            | CatalogItem::Secret(Secret { create_sql, .. })
2351            | CatalogItem::Connection(Connection { create_sql, .. })
2352            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2353            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2354        };
2355        let Some(create_sql) = create_sql else {
2356            return Err(());
2357        };
2358        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2359            .expect("non-system items must be parseable")
2360            .into_element()
2361            .ast;
2362        debug!("rewrite: {}", ast.to_ast_string_redacted());
2363        let t = f(&mut ast)?;
2364        *create_sql = ast.to_ast_string_stable();
2365        debug!("rewrote: {}", ast.to_ast_string_redacted());
2366        Ok(t)
2367    }
2368
2369    /// If the object is considered a "compute object"
2370    /// (i.e., it is managed by the compute controller),
2371    /// this function returns its cluster ID. Otherwise, it returns nothing.
2372    ///
2373    /// This function differs from `cluster_id` because while all
2374    /// compute objects run on a cluster, the converse is not true.
2375    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2376        match self {
2377            CatalogItem::Index(index) => Some(index.cluster_id),
2378            CatalogItem::Table(_)
2379            | CatalogItem::Source(_)
2380            | CatalogItem::Log(_)
2381            | CatalogItem::View(_)
2382            | CatalogItem::MaterializedView(_)
2383            | CatalogItem::Sink(_)
2384            | CatalogItem::Type(_)
2385            | CatalogItem::Func(_)
2386            | CatalogItem::Secret(_)
2387            | CatalogItem::Connection(_)
2388            | CatalogItem::ContinualTask(_) => None,
2389        }
2390    }
2391
2392    pub fn cluster_id(&self) -> Option<ClusterId> {
2393        match self {
2394            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2395            CatalogItem::Index(index) => Some(index.cluster_id),
2396            CatalogItem::Source(source) => match &source.data_source {
2397                DataSourceDesc::Ingestion { cluster_id, .. }
2398                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2399                // This is somewhat of a lie because the export runs on the same
2400                // cluster as its ingestion but we don't yet have a way of
2401                // cross-referencing the items
2402                DataSourceDesc::IngestionExport { .. } => None,
2403                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2404                DataSourceDesc::Introspection(_)
2405                | DataSourceDesc::Progress
2406                | DataSourceDesc::Catalog => None,
2407            },
2408            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2409            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2410            CatalogItem::Table(_)
2411            | CatalogItem::Log(_)
2412            | CatalogItem::View(_)
2413            | CatalogItem::Type(_)
2414            | CatalogItem::Func(_)
2415            | CatalogItem::Secret(_)
2416            | CatalogItem::Connection(_) => None,
2417        }
2418    }
2419
2420    /// The custom compaction window, if any has been set. This does not reflect any propagated
2421    /// compaction window (i.e., source -> subsource).
2422    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2423        match self {
2424            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2425            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2426            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2427            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2428            CatalogItem::Log(_)
2429            | CatalogItem::View(_)
2430            | CatalogItem::Sink(_)
2431            | CatalogItem::Type(_)
2432            | CatalogItem::Func(_)
2433            | CatalogItem::Secret(_)
2434            | CatalogItem::Connection(_)
2435            | CatalogItem::ContinualTask(_) => None,
2436        }
2437    }
2438
2439    /// Mutable access to the custom compaction window, or None if this type does not support custom
2440    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2441    /// subsource).
2442    pub fn custom_logical_compaction_window_mut(
2443        &mut self,
2444    ) -> Option<&mut Option<CompactionWindow>> {
2445        let cw = match self {
2446            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2447            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2448            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2449            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2450            CatalogItem::Log(_)
2451            | CatalogItem::View(_)
2452            | CatalogItem::Sink(_)
2453            | CatalogItem::Type(_)
2454            | CatalogItem::Func(_)
2455            | CatalogItem::Secret(_)
2456            | CatalogItem::Connection(_)
2457            | CatalogItem::ContinualTask(_) => return None,
2458        };
2459        Some(cw)
2460    }
2461
2462    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2463    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2464    ///
2465    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2466    /// sensible default (currently 1s).
2467    ///
2468    /// For objects that do not have the concept of compaction window, return None.
2469    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2470        let custom_logical_compaction_window = match self {
2471            CatalogItem::Table(_)
2472            | CatalogItem::Source(_)
2473            | CatalogItem::Index(_)
2474            | CatalogItem::MaterializedView(_)
2475            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2476            CatalogItem::Log(_)
2477            | CatalogItem::View(_)
2478            | CatalogItem::Sink(_)
2479            | CatalogItem::Type(_)
2480            | CatalogItem::Func(_)
2481            | CatalogItem::Secret(_)
2482            | CatalogItem::Connection(_) => return None,
2483        };
2484        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2485    }
2486
2487    /// Whether the item's logical compaction window
2488    /// is controlled by the METRICS_RETENTION
2489    /// system var.
2490    pub fn is_retained_metrics_object(&self) -> bool {
2491        match self {
2492            CatalogItem::Table(table) => table.is_retained_metrics_object,
2493            CatalogItem::Source(source) => source.is_retained_metrics_object,
2494            CatalogItem::Index(index) => index.is_retained_metrics_object,
2495            CatalogItem::Log(_)
2496            | CatalogItem::View(_)
2497            | CatalogItem::MaterializedView(_)
2498            | CatalogItem::Sink(_)
2499            | CatalogItem::Type(_)
2500            | CatalogItem::Func(_)
2501            | CatalogItem::Secret(_)
2502            | CatalogItem::Connection(_)
2503            | CatalogItem::ContinualTask(_) => false,
2504        }
2505    }
2506
2507    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2508        match self {
2509            CatalogItem::Table(table) => {
2510                let create_sql = table
2511                    .create_sql
2512                    .clone()
2513                    .expect("builtin tables cannot be serialized");
2514                let mut collections = table.collections.clone();
2515                let global_id = collections
2516                    .remove(&RelationVersion::root())
2517                    .expect("at least one version");
2518                (create_sql, global_id, collections)
2519            }
2520            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2521            CatalogItem::Source(source) => {
2522                assert!(
2523                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2524                    "cannot serialize introspection/builtin sources",
2525                );
2526                let create_sql = source
2527                    .create_sql
2528                    .clone()
2529                    .expect("builtin sources cannot be serialized");
2530                (create_sql, source.global_id, BTreeMap::new())
2531            }
2532            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2533            CatalogItem::MaterializedView(mview) => {
2534                let mut collections = mview.collections.clone();
2535                let global_id = collections
2536                    .remove(&RelationVersion::root())
2537                    .expect("at least one version");
2538                (mview.create_sql.clone(), global_id, collections)
2539            }
2540            CatalogItem::Index(index) => {
2541                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2542            }
2543            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2544            CatalogItem::Type(typ) => {
2545                let create_sql = typ
2546                    .create_sql
2547                    .clone()
2548                    .expect("builtin types cannot be serialized");
2549                (create_sql, typ.global_id, BTreeMap::new())
2550            }
2551            CatalogItem::Secret(secret) => {
2552                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2553            }
2554            CatalogItem::Connection(connection) => (
2555                connection.create_sql.clone(),
2556                connection.global_id,
2557                BTreeMap::new(),
2558            ),
2559            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2560            CatalogItem::ContinualTask(ct) => {
2561                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2562            }
2563        }
2564    }
2565
2566    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2567        match self {
2568            CatalogItem::Table(mut table) => {
2569                let create_sql = table
2570                    .create_sql
2571                    .expect("builtin tables cannot be serialized");
2572                let global_id = table
2573                    .collections
2574                    .remove(&RelationVersion::root())
2575                    .expect("at least one version");
2576                (create_sql, global_id, table.collections)
2577            }
2578            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2579            CatalogItem::Source(source) => {
2580                assert!(
2581                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2582                    "cannot serialize introspection/builtin sources",
2583                );
2584                let create_sql = source
2585                    .create_sql
2586                    .expect("builtin sources cannot be serialized");
2587                (create_sql, source.global_id, BTreeMap::new())
2588            }
2589            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2590            CatalogItem::MaterializedView(mut mview) => {
2591                let global_id = mview
2592                    .collections
2593                    .remove(&RelationVersion::root())
2594                    .expect("at least one version");
2595                (mview.create_sql, global_id, mview.collections)
2596            }
2597            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2598            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2599            CatalogItem::Type(typ) => {
2600                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2601                (create_sql, typ.global_id, BTreeMap::new())
2602            }
2603            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2604            CatalogItem::Connection(connection) => {
2605                (connection.create_sql, connection.global_id, BTreeMap::new())
2606            }
2607            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2608            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2609        }
2610    }
2611
2612    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2613    /// not have versions or if the version does not exist.
2614    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2615        let collections = match self {
2616            CatalogItem::MaterializedView(mv) => &mv.collections,
2617            CatalogItem::Table(table) => &table.collections,
2618            CatalogItem::Source(source) => return Some(source.global_id),
2619            CatalogItem::Log(log) => return Some(log.global_id),
2620            CatalogItem::View(view) => return Some(view.global_id),
2621            CatalogItem::Sink(sink) => return Some(sink.global_id),
2622            CatalogItem::Index(index) => return Some(index.global_id),
2623            CatalogItem::Type(ty) => return Some(ty.global_id),
2624            CatalogItem::Func(func) => return Some(func.global_id),
2625            CatalogItem::Secret(secret) => return Some(secret.global_id),
2626            CatalogItem::Connection(conn) => return Some(conn.global_id),
2627            CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2628        };
2629        match version {
2630            RelationVersionSelector::Latest => collections.values().last().copied(),
2631            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2632        }
2633    }
2634}
2635
2636impl CatalogEntry {
2637    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2638    /// produces rows.
2639    pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2640        self.item.relation_desc(RelationVersionSelector::Latest)
2641    }
2642
2643    /// Reports if the item has columns.
2644    pub fn has_columns(&self) -> bool {
2645        match self.item() {
2646            CatalogItem::Type(Type { details, .. }) => {
2647                matches!(details.typ, CatalogType::Record { .. })
2648            }
2649            _ => self.relation_desc_latest().is_some(),
2650        }
2651    }
2652
2653    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2654    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2655        self.item.func(self)
2656    }
2657
2658    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2659    pub fn index(&self) -> Option<&Index> {
2660        match self.item() {
2661            CatalogItem::Index(idx) => Some(idx),
2662            _ => None,
2663        }
2664    }
2665
2666    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2667    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2668        match self.item() {
2669            CatalogItem::MaterializedView(mv) => Some(mv),
2670            _ => None,
2671        }
2672    }
2673
2674    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2675    pub fn table(&self) -> Option<&Table> {
2676        match self.item() {
2677            CatalogItem::Table(tbl) => Some(tbl),
2678            _ => None,
2679        }
2680    }
2681
2682    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2683    pub fn source(&self) -> Option<&Source> {
2684        match self.item() {
2685            CatalogItem::Source(src) => Some(src),
2686            _ => None,
2687        }
2688    }
2689
2690    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2691    pub fn sink(&self) -> Option<&Sink> {
2692        match self.item() {
2693            CatalogItem::Sink(sink) => Some(sink),
2694            _ => None,
2695        }
2696    }
2697
2698    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2699    pub fn secret(&self) -> Option<&Secret> {
2700        match self.item() {
2701            CatalogItem::Secret(secret) => Some(secret),
2702            _ => None,
2703        }
2704    }
2705
2706    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2707        match self.item() {
2708            CatalogItem::Connection(connection) => Ok(connection),
2709            _ => {
2710                let db_name = match self.name().qualifiers.database_spec {
2711                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2712                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2713                };
2714                Err(SqlCatalogError::UnknownConnection(format!(
2715                    "{}{}.{}",
2716                    db_name,
2717                    self.name().qualifiers.schema_spec,
2718                    self.name().item
2719                )))
2720            }
2721        }
2722    }
2723
2724    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2725    /// this `CatalogEntry`, if any.
2726    pub fn source_desc(
2727        &self,
2728    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2729        self.item.source_desc(self)
2730    }
2731
2732    /// Reports whether this catalog entry is a connection.
2733    pub fn is_connection(&self) -> bool {
2734        matches!(self.item(), CatalogItem::Connection(_))
2735    }
2736
2737    /// Reports whether this catalog entry is a table.
2738    pub fn is_table(&self) -> bool {
2739        matches!(self.item(), CatalogItem::Table(_))
2740    }
2741
2742    /// Reports whether this catalog entry is a source. Note that this includes
2743    /// subsources.
2744    pub fn is_source(&self) -> bool {
2745        matches!(self.item(), CatalogItem::Source(_))
2746    }
2747
2748    /// Reports whether this catalog entry is a subsource and, if it is, the
2749    /// ingestion it is an export of, as well as the item it exports.
2750    pub fn subsource_details(
2751        &self,
2752    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2753        match &self.item() {
2754            CatalogItem::Source(source) => match &source.data_source {
2755                DataSourceDesc::IngestionExport {
2756                    ingestion_id,
2757                    external_reference,
2758                    details,
2759                    data_config: _,
2760                } => Some((*ingestion_id, external_reference, details)),
2761                _ => None,
2762            },
2763            _ => None,
2764        }
2765    }
2766
2767    /// Reports whether this catalog entry is a source export and, if it is, the
2768    /// ingestion it is an export of, as well as the item it exports.
2769    pub fn source_export_details(
2770        &self,
2771    ) -> Option<(
2772        CatalogItemId,
2773        &UnresolvedItemName,
2774        &SourceExportDetails,
2775        &SourceExportDataConfig<ReferencedConnection>,
2776    )> {
2777        match &self.item() {
2778            CatalogItem::Source(source) => match &source.data_source {
2779                DataSourceDesc::IngestionExport {
2780                    ingestion_id,
2781                    external_reference,
2782                    details,
2783                    data_config,
2784                } => Some((*ingestion_id, external_reference, details, data_config)),
2785                _ => None,
2786            },
2787            CatalogItem::Table(table) => match &table.data_source {
2788                TableDataSource::DataSource {
2789                    desc:
2790                        DataSourceDesc::IngestionExport {
2791                            ingestion_id,
2792                            external_reference,
2793                            details,
2794                            data_config,
2795                        },
2796                    timeline: _,
2797                } => Some((*ingestion_id, external_reference, details, data_config)),
2798                _ => None,
2799            },
2800            _ => None,
2801        }
2802    }
2803
2804    /// Reports whether this catalog entry is a progress source.
2805    pub fn is_progress_source(&self) -> bool {
2806        self.item().is_progress_source()
2807    }
2808
2809    /// Returns the `GlobalId` of all of this entry's progress ID.
2810    pub fn progress_id(&self) -> Option<CatalogItemId> {
2811        match &self.item() {
2812            CatalogItem::Source(source) => match &source.data_source {
2813                DataSourceDesc::Ingestion { .. } => Some(self.id),
2814                DataSourceDesc::OldSyntaxIngestion {
2815                    progress_subsource, ..
2816                } => Some(*progress_subsource),
2817                DataSourceDesc::IngestionExport { .. }
2818                | DataSourceDesc::Introspection(_)
2819                | DataSourceDesc::Progress
2820                | DataSourceDesc::Webhook { .. }
2821                | DataSourceDesc::Catalog => None,
2822            },
2823            CatalogItem::Table(_)
2824            | CatalogItem::Log(_)
2825            | CatalogItem::View(_)
2826            | CatalogItem::MaterializedView(_)
2827            | CatalogItem::Sink(_)
2828            | CatalogItem::Index(_)
2829            | CatalogItem::Type(_)
2830            | CatalogItem::Func(_)
2831            | CatalogItem::Secret(_)
2832            | CatalogItem::Connection(_)
2833            | CatalogItem::ContinualTask(_) => None,
2834        }
2835    }
2836
2837    /// Reports whether this catalog entry is a sink.
2838    pub fn is_sink(&self) -> bool {
2839        matches!(self.item(), CatalogItem::Sink(_))
2840    }
2841
2842    /// Reports whether this catalog entry is a materialized view.
2843    pub fn is_materialized_view(&self) -> bool {
2844        matches!(self.item(), CatalogItem::MaterializedView(_))
2845    }
2846
2847    /// Reports whether this catalog entry is a view.
2848    pub fn is_view(&self) -> bool {
2849        matches!(self.item(), CatalogItem::View(_))
2850    }
2851
2852    /// Reports whether this catalog entry is a secret.
2853    pub fn is_secret(&self) -> bool {
2854        matches!(self.item(), CatalogItem::Secret(_))
2855    }
2856
2857    /// Reports whether this catalog entry is an introspection source.
2858    pub fn is_introspection_source(&self) -> bool {
2859        matches!(self.item(), CatalogItem::Log(_))
2860    }
2861
2862    /// Reports whether this catalog entry is an index.
2863    pub fn is_index(&self) -> bool {
2864        matches!(self.item(), CatalogItem::Index(_))
2865    }
2866
2867    /// Reports whether this catalog entry is a continual task.
2868    pub fn is_continual_task(&self) -> bool {
2869        matches!(self.item(), CatalogItem::ContinualTask(_))
2870    }
2871
2872    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2873    pub fn is_relation(&self) -> bool {
2874        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2875    }
2876
2877    /// Collects the identifiers of the objects that were encountered when
2878    /// resolving names in the item's DDL statement.
2879    pub fn references(&self) -> &ResolvedIds {
2880        self.item.references()
2881    }
2882
2883    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2884    ///
2885    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2886    /// referenced. For example this will include any catalog objects used to implement functions
2887    /// and casts in the item.
2888    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2889        self.item.uses()
2890    }
2891
2892    /// Returns the `CatalogItem` associated with this catalog entry.
2893    pub fn item(&self) -> &CatalogItem {
2894        &self.item
2895    }
2896
2897    /// Returns the [`CatalogItemId`] of this catalog entry.
2898    pub fn id(&self) -> CatalogItemId {
2899        self.id
2900    }
2901
2902    /// Returns all of the [`GlobalId`]s associated with this item.
2903    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2904        self.item().global_ids()
2905    }
2906
2907    pub fn latest_global_id(&self) -> GlobalId {
2908        self.item().latest_global_id()
2909    }
2910
2911    /// Returns the OID of this catalog entry.
2912    pub fn oid(&self) -> u32 {
2913        self.oid
2914    }
2915
2916    /// Returns the fully qualified name of this catalog entry.
2917    pub fn name(&self) -> &QualifiedItemName {
2918        &self.name
2919    }
2920
2921    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2922    pub fn referenced_by(&self) -> &[CatalogItemId] {
2923        &self.referenced_by
2924    }
2925
2926    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2927    pub fn used_by(&self) -> &[CatalogItemId] {
2928        &self.used_by
2929    }
2930
2931    /// Returns the connection ID that this item belongs to, if this item is
2932    /// temporary.
2933    pub fn conn_id(&self) -> Option<&ConnectionId> {
2934        self.item.conn_id()
2935    }
2936
2937    /// Returns the role ID of the entry owner.
2938    pub fn owner_id(&self) -> &RoleId {
2939        &self.owner_id
2940    }
2941
2942    /// Returns the privileges of the entry.
2943    pub fn privileges(&self) -> &PrivilegeMap {
2944        &self.privileges
2945    }
2946
2947    /// Returns the comment object ID for this entry.
2948    pub fn comment_object_id(&self) -> CommentObjectId {
2949        use CatalogItemType::*;
2950        match self.item_type() {
2951            Table => CommentObjectId::Table(self.id),
2952            Source => CommentObjectId::Source(self.id),
2953            Sink => CommentObjectId::Sink(self.id),
2954            View => CommentObjectId::View(self.id),
2955            MaterializedView => CommentObjectId::MaterializedView(self.id),
2956            Index => CommentObjectId::Index(self.id),
2957            Func => CommentObjectId::Func(self.id),
2958            Connection => CommentObjectId::Connection(self.id),
2959            Type => CommentObjectId::Type(self.id),
2960            Secret => CommentObjectId::Secret(self.id),
2961            ContinualTask => CommentObjectId::ContinualTask(self.id),
2962        }
2963    }
2964}
2965
2966#[derive(Debug, Clone, Default)]
2967pub struct CommentsMap {
2968    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2969}
2970
2971impl CommentsMap {
2972    pub fn update_comment(
2973        &mut self,
2974        object_id: CommentObjectId,
2975        sub_component: Option<usize>,
2976        comment: Option<String>,
2977    ) -> Option<String> {
2978        let object_comments = self.map.entry(object_id).or_default();
2979
2980        // Either replace the existing comment, or remove it if comment is None/NULL.
2981        let (empty, prev) = if let Some(comment) = comment {
2982            let prev = object_comments.insert(sub_component, comment);
2983            (false, prev)
2984        } else {
2985            let prev = object_comments.remove(&sub_component);
2986            (object_comments.is_empty(), prev)
2987        };
2988
2989        // Cleanup entries that are now empty.
2990        if empty {
2991            self.map.remove(&object_id);
2992        }
2993
2994        // Return the previous comment, if there was one, for easy removal.
2995        prev
2996    }
2997
2998    /// Remove all comments for `object_id` from the map.
2999    ///
3000    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
3001    /// relations you can also have comments on the individual columns. Dropping the comments for a
3002    /// relation will also drop all of the comments on any columns.
3003    pub fn drop_comments(
3004        &mut self,
3005        object_ids: &BTreeSet<CommentObjectId>,
3006    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3007        let mut removed_comments = Vec::new();
3008
3009        for object_id in object_ids {
3010            if let Some(comments) = self.map.remove(object_id) {
3011                let removed = comments
3012                    .into_iter()
3013                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3014                removed_comments.extend(removed);
3015            }
3016        }
3017
3018        removed_comments
3019    }
3020
3021    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3022        self.map
3023            .iter()
3024            .map(|(id, comments)| {
3025                comments
3026                    .iter()
3027                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3028            })
3029            .flatten()
3030    }
3031
3032    pub fn get_object_comments(
3033        &self,
3034        object_id: CommentObjectId,
3035    ) -> Option<&BTreeMap<Option<usize>, String>> {
3036        self.map.get(&object_id)
3037    }
3038}
3039
3040impl Serialize for CommentsMap {
3041    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3042    where
3043        S: serde::Serializer,
3044    {
3045        let comment_count = self
3046            .map
3047            .iter()
3048            .map(|(_object_id, comments)| comments.len())
3049            .sum();
3050
3051        let mut seq = serializer.serialize_seq(Some(comment_count))?;
3052        for (object_id, sub) in &self.map {
3053            for (sub_component, comment) in sub {
3054                seq.serialize_element(&(
3055                    format!("{object_id:?}"),
3056                    format!("{sub_component:?}"),
3057                    comment,
3058                ))?;
3059            }
3060        }
3061        seq.end()
3062    }
3063}
3064
3065#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3066pub struct DefaultPrivileges {
3067    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3068    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3069}
3070
3071// Use a new type here because otherwise we have two levels of BTreeMap, both needing
3072// map_key_to_string.
3073#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3074struct RoleDefaultPrivileges(
3075    /// Denormalized, the key is the grantee Role.
3076    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3077    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3078);
3079
3080impl Deref for RoleDefaultPrivileges {
3081    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3082
3083    fn deref(&self) -> &Self::Target {
3084        &self.0
3085    }
3086}
3087
3088impl DerefMut for RoleDefaultPrivileges {
3089    fn deref_mut(&mut self) -> &mut Self::Target {
3090        &mut self.0
3091    }
3092}
3093
3094impl DefaultPrivileges {
3095    /// Add a new default privilege into the set of all default privileges.
3096    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3097        if privilege.acl_mode.is_empty() {
3098            return;
3099        }
3100
3101        let privileges = self.privileges.entry(object).or_default();
3102        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3103            default_privilege.acl_mode |= privilege.acl_mode;
3104        } else {
3105            privileges.insert(privilege.grantee, privilege);
3106        }
3107    }
3108
3109    /// Revoke a default privilege from the set of all default privileges.
3110    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3111        if let Some(privileges) = self.privileges.get_mut(object) {
3112            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3113                default_privilege.acl_mode =
3114                    default_privilege.acl_mode.difference(privilege.acl_mode);
3115                if default_privilege.acl_mode.is_empty() {
3116                    privileges.remove(&privilege.grantee);
3117                }
3118            }
3119            if privileges.is_empty() {
3120                self.privileges.remove(object);
3121            }
3122        }
3123    }
3124
3125    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
3126    /// any exist.
3127    pub fn get_privileges_for_grantee(
3128        &self,
3129        object: &DefaultPrivilegeObject,
3130        grantee: &RoleId,
3131    ) -> Option<&AclMode> {
3132        self.privileges
3133            .get(object)
3134            .and_then(|privileges| privileges.get(grantee))
3135            .map(|privilege| &privilege.acl_mode)
3136    }
3137
3138    /// Get all default privileges that apply to the provided object details.
3139    pub fn get_applicable_privileges(
3140        &self,
3141        role_id: RoleId,
3142        database_id: Option<DatabaseId>,
3143        schema_id: Option<SchemaId>,
3144        object_type: mz_sql::catalog::ObjectType,
3145    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3146        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
3147        // don't require the caller to worry about that and we will map their `object_type` to the
3148        // correct type for privileges.
3149        let privilege_object_type = if object_type.is_relation() {
3150            mz_sql::catalog::ObjectType::Table
3151        } else {
3152            object_type
3153        };
3154        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3155
3156        // Collect all entries that apply to the provided object details.
3157        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3158        // entries in the vec below. That's OK because we consolidate the results after.
3159        [
3160            DefaultPrivilegeObject {
3161                role_id,
3162                database_id,
3163                schema_id,
3164                object_type: privilege_object_type,
3165            },
3166            DefaultPrivilegeObject {
3167                role_id,
3168                database_id,
3169                schema_id: None,
3170                object_type: privilege_object_type,
3171            },
3172            DefaultPrivilegeObject {
3173                role_id,
3174                database_id: None,
3175                schema_id: None,
3176                object_type: privilege_object_type,
3177            },
3178            DefaultPrivilegeObject {
3179                role_id: RoleId::Public,
3180                database_id,
3181                schema_id,
3182                object_type: privilege_object_type,
3183            },
3184            DefaultPrivilegeObject {
3185                role_id: RoleId::Public,
3186                database_id,
3187                schema_id: None,
3188                object_type: privilege_object_type,
3189            },
3190            DefaultPrivilegeObject {
3191                role_id: RoleId::Public,
3192                database_id: None,
3193                schema_id: None,
3194                object_type: privilege_object_type,
3195            },
3196        ]
3197        .into_iter()
3198        .filter_map(|object| self.privileges.get(&object))
3199        .flat_map(|acl_map| acl_map.values())
3200        // Consolidate privileges with a common grantee.
3201        .fold(
3202            BTreeMap::new(),
3203            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3204                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3205                *accum_acl_mode |= *acl_mode;
3206                accum
3207            },
3208        )
3209        .into_iter()
3210        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3211        // default privilege has an object type of Table, then it may contain privileges valid for
3212        // tables but not other relations. If the passed in object type is another relation, then
3213        // we need to remove any privilege that is not valid for the specified relation.
3214        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3215        // Filter out empty privileges.
3216        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3217        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3218            grantee: *grantee,
3219            acl_mode,
3220        })
3221    }
3222
3223    pub fn iter(
3224        &self,
3225    ) -> impl Iterator<
3226        Item = (
3227            &DefaultPrivilegeObject,
3228            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3229        ),
3230    > {
3231        self.privileges
3232            .iter()
3233            .map(|(object, acl_map)| (object, acl_map.values()))
3234    }
3235}
3236
3237#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3238pub struct ClusterConfig {
3239    pub variant: ClusterVariant,
3240    pub workload_class: Option<String>,
3241}
3242
3243impl ClusterConfig {
3244    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3245        match &self.variant {
3246            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3247            ClusterVariant::Unmanaged => None,
3248        }
3249    }
3250}
3251
3252impl From<ClusterConfig> for durable::ClusterConfig {
3253    fn from(config: ClusterConfig) -> Self {
3254        Self {
3255            variant: config.variant.into(),
3256            workload_class: config.workload_class,
3257        }
3258    }
3259}
3260
3261impl From<durable::ClusterConfig> for ClusterConfig {
3262    fn from(config: durable::ClusterConfig) -> Self {
3263        Self {
3264            variant: config.variant.into(),
3265            workload_class: config.workload_class,
3266        }
3267    }
3268}
3269
3270#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3271pub struct ClusterVariantManaged {
3272    pub size: String,
3273    pub availability_zones: Vec<String>,
3274    pub logging: ReplicaLogging,
3275    pub replication_factor: u32,
3276    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3277    pub schedule: ClusterSchedule,
3278}
3279
3280impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3281    fn from(managed: ClusterVariantManaged) -> Self {
3282        Self {
3283            size: managed.size,
3284            availability_zones: managed.availability_zones,
3285            logging: managed.logging,
3286            replication_factor: managed.replication_factor,
3287            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3288            schedule: managed.schedule,
3289        }
3290    }
3291}
3292
3293impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3294    fn from(managed: durable::ClusterVariantManaged) -> Self {
3295        Self {
3296            size: managed.size,
3297            availability_zones: managed.availability_zones,
3298            logging: managed.logging,
3299            replication_factor: managed.replication_factor,
3300            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3301            schedule: managed.schedule,
3302        }
3303    }
3304}
3305
3306#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3307pub enum ClusterVariant {
3308    Managed(ClusterVariantManaged),
3309    Unmanaged,
3310}
3311
3312impl From<ClusterVariant> for durable::ClusterVariant {
3313    fn from(variant: ClusterVariant) -> Self {
3314        match variant {
3315            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3316            ClusterVariant::Unmanaged => Self::Unmanaged,
3317        }
3318    }
3319}
3320
3321impl From<durable::ClusterVariant> for ClusterVariant {
3322    fn from(variant: durable::ClusterVariant) -> Self {
3323        match variant {
3324            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3325            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3326        }
3327    }
3328}
3329
3330impl mz_sql::catalog::CatalogDatabase for Database {
3331    fn name(&self) -> &str {
3332        &self.name
3333    }
3334
3335    fn id(&self) -> DatabaseId {
3336        self.id
3337    }
3338
3339    fn has_schemas(&self) -> bool {
3340        !self.schemas_by_name.is_empty()
3341    }
3342
3343    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3344        &self.schemas_by_name
3345    }
3346
3347    // `as` is ok to use to cast to a trait object.
3348    #[allow(clippy::as_conversions)]
3349    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3350        self.schemas_by_id
3351            .values()
3352            .map(|schema| schema as &dyn CatalogSchema)
3353            .collect()
3354    }
3355
3356    fn owner_id(&self) -> RoleId {
3357        self.owner_id
3358    }
3359
3360    fn privileges(&self) -> &PrivilegeMap {
3361        &self.privileges
3362    }
3363}
3364
3365impl mz_sql::catalog::CatalogSchema for Schema {
3366    fn database(&self) -> &ResolvedDatabaseSpecifier {
3367        &self.name.database
3368    }
3369
3370    fn name(&self) -> &QualifiedSchemaName {
3371        &self.name
3372    }
3373
3374    fn id(&self) -> &SchemaSpecifier {
3375        &self.id
3376    }
3377
3378    fn has_items(&self) -> bool {
3379        !self.items.is_empty()
3380    }
3381
3382    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3383        Box::new(
3384            self.items
3385                .values()
3386                .chain(self.functions.values())
3387                .chain(self.types.values())
3388                .copied(),
3389        )
3390    }
3391
3392    fn owner_id(&self) -> RoleId {
3393        self.owner_id
3394    }
3395
3396    fn privileges(&self) -> &PrivilegeMap {
3397        &self.privileges
3398    }
3399}
3400
3401impl mz_sql::catalog::CatalogRole for Role {
3402    fn name(&self) -> &str {
3403        &self.name
3404    }
3405
3406    fn id(&self) -> RoleId {
3407        self.id
3408    }
3409
3410    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3411        &self.membership.map
3412    }
3413
3414    fn attributes(&self) -> &RoleAttributes {
3415        &self.attributes
3416    }
3417
3418    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3419        &self.vars.map
3420    }
3421}
3422
3423impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3424    fn name(&self) -> &str {
3425        &self.name
3426    }
3427
3428    fn id(&self) -> NetworkPolicyId {
3429        self.id
3430    }
3431
3432    fn owner_id(&self) -> RoleId {
3433        self.owner_id
3434    }
3435
3436    fn privileges(&self) -> &PrivilegeMap {
3437        &self.privileges
3438    }
3439}
3440
3441impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3442    fn name(&self) -> &str {
3443        &self.name
3444    }
3445
3446    fn id(&self) -> ClusterId {
3447        self.id
3448    }
3449
3450    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3451        &self.bound_objects
3452    }
3453
3454    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3455        &self.replica_id_by_name_
3456    }
3457
3458    // `as` is ok to use to cast to a trait object.
3459    #[allow(clippy::as_conversions)]
3460    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3461        self.replicas()
3462            .map(|replica| replica as &dyn CatalogClusterReplica)
3463            .collect()
3464    }
3465
3466    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3467        self.replica(id).expect("catalog out of sync")
3468    }
3469
3470    fn owner_id(&self) -> RoleId {
3471        self.owner_id
3472    }
3473
3474    fn privileges(&self) -> &PrivilegeMap {
3475        &self.privileges
3476    }
3477
3478    fn is_managed(&self) -> bool {
3479        self.is_managed()
3480    }
3481
3482    fn managed_size(&self) -> Option<&str> {
3483        match &self.config.variant {
3484            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3485            ClusterVariant::Unmanaged => None,
3486        }
3487    }
3488
3489    fn schedule(&self) -> Option<&ClusterSchedule> {
3490        match &self.config.variant {
3491            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3492            ClusterVariant::Unmanaged => None,
3493        }
3494    }
3495
3496    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3497        self.try_to_plan()
3498    }
3499}
3500
3501impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3502    fn name(&self) -> &str {
3503        &self.name
3504    }
3505
3506    fn cluster_id(&self) -> ClusterId {
3507        self.cluster_id
3508    }
3509
3510    fn replica_id(&self) -> ReplicaId {
3511        self.replica_id
3512    }
3513
3514    fn owner_id(&self) -> RoleId {
3515        self.owner_id
3516    }
3517
3518    fn internal(&self) -> bool {
3519        self.config.location.internal()
3520    }
3521}
3522
3523impl mz_sql::catalog::CatalogItem for CatalogEntry {
3524    fn name(&self) -> &QualifiedItemName {
3525        self.name()
3526    }
3527
3528    fn id(&self) -> CatalogItemId {
3529        self.id()
3530    }
3531
3532    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3533        Box::new(self.global_ids())
3534    }
3535
3536    fn oid(&self) -> u32 {
3537        self.oid()
3538    }
3539
3540    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3541        self.func()
3542    }
3543
3544    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3545        self.source_desc()
3546    }
3547
3548    fn connection(
3549        &self,
3550    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3551    {
3552        Ok(self.connection()?.details.to_connection())
3553    }
3554
3555    fn create_sql(&self) -> &str {
3556        match self.item() {
3557            CatalogItem::Table(Table { create_sql, .. }) => {
3558                create_sql.as_deref().unwrap_or("<builtin>")
3559            }
3560            CatalogItem::Source(Source { create_sql, .. }) => {
3561                create_sql.as_deref().unwrap_or("<builtin>")
3562            }
3563            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3564            CatalogItem::View(View { create_sql, .. }) => create_sql,
3565            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3566            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3567            CatalogItem::Type(Type { create_sql, .. }) => {
3568                create_sql.as_deref().unwrap_or("<builtin>")
3569            }
3570            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3571            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3572            CatalogItem::Func(_) => "<builtin>",
3573            CatalogItem::Log(_) => "<builtin>",
3574            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3575        }
3576    }
3577
3578    fn item_type(&self) -> SqlCatalogItemType {
3579        self.item().typ()
3580    }
3581
3582    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3583        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3584            Some((keys, *on))
3585        } else {
3586            None
3587        }
3588    }
3589
3590    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3591        if let CatalogItem::Table(Table {
3592            data_source: TableDataSource::TableWrites { defaults },
3593            ..
3594        }) = self.item()
3595        {
3596            Some(defaults.as_slice())
3597        } else {
3598            None
3599        }
3600    }
3601
3602    fn replacement_target(&self) -> Option<CatalogItemId> {
3603        if let CatalogItem::MaterializedView(mv) = self.item() {
3604            mv.replacement_target
3605        } else {
3606            None
3607        }
3608    }
3609
3610    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3611        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3612            Some(details)
3613        } else {
3614            None
3615        }
3616    }
3617
3618    fn references(&self) -> &ResolvedIds {
3619        self.references()
3620    }
3621
3622    fn uses(&self) -> BTreeSet<CatalogItemId> {
3623        self.uses()
3624    }
3625
3626    fn referenced_by(&self) -> &[CatalogItemId] {
3627        self.referenced_by()
3628    }
3629
3630    fn used_by(&self) -> &[CatalogItemId] {
3631        self.used_by()
3632    }
3633
3634    fn subsource_details(
3635        &self,
3636    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3637        self.subsource_details()
3638    }
3639
3640    fn source_export_details(
3641        &self,
3642    ) -> Option<(
3643        CatalogItemId,
3644        &UnresolvedItemName,
3645        &SourceExportDetails,
3646        &SourceExportDataConfig<ReferencedConnection>,
3647    )> {
3648        self.source_export_details()
3649    }
3650
3651    fn is_progress_source(&self) -> bool {
3652        self.is_progress_source()
3653    }
3654
3655    fn progress_id(&self) -> Option<CatalogItemId> {
3656        self.progress_id()
3657    }
3658
3659    fn owner_id(&self) -> RoleId {
3660        self.owner_id
3661    }
3662
3663    fn privileges(&self) -> &PrivilegeMap {
3664        &self.privileges
3665    }
3666
3667    fn cluster_id(&self) -> Option<ClusterId> {
3668        self.item().cluster_id()
3669    }
3670
3671    fn at_version(
3672        &self,
3673        version: RelationVersionSelector,
3674    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3675        Box::new(CatalogCollectionEntry {
3676            entry: self.clone(),
3677            version,
3678        })
3679    }
3680
3681    fn latest_version(&self) -> Option<RelationVersion> {
3682        self.table().map(|t| t.desc.latest_version())
3683    }
3684}
3685
3686/// A single update to the catalog state.
3687#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3688pub struct StateUpdate {
3689    pub kind: StateUpdateKind,
3690    pub ts: Timestamp,
3691    pub diff: StateDiff,
3692}
3693
3694/// The contents of a single state update.
3695///
3696/// Variants are listed in dependency order.
3697#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3698pub enum StateUpdateKind {
3699    Role(durable::objects::Role),
3700    RoleAuth(durable::objects::RoleAuth),
3701    Database(durable::objects::Database),
3702    Schema(durable::objects::Schema),
3703    DefaultPrivilege(durable::objects::DefaultPrivilege),
3704    SystemPrivilege(MzAclItem),
3705    SystemConfiguration(durable::objects::SystemConfiguration),
3706    Cluster(durable::objects::Cluster),
3707    NetworkPolicy(durable::objects::NetworkPolicy),
3708    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3709    ClusterReplica(durable::objects::ClusterReplica),
3710    SourceReferences(durable::objects::SourceReferences),
3711    SystemObjectMapping(durable::objects::SystemObjectMapping),
3712    // Temporary items are not actually updated via the durable catalog, but
3713    // this allows us to model them the same way as all other items in parts of
3714    // the pipeline.
3715    TemporaryItem(TemporaryItem),
3716    Item(durable::objects::Item),
3717    Comment(durable::objects::Comment),
3718    AuditLog(durable::objects::AuditLog),
3719    // Storage updates.
3720    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3721    UnfinalizedShard(durable::objects::UnfinalizedShard),
3722}
3723
3724/// Valid diffs for catalog state updates.
3725#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3726pub enum StateDiff {
3727    Retraction,
3728    Addition,
3729}
3730
3731impl From<StateDiff> for Diff {
3732    fn from(diff: StateDiff) -> Self {
3733        match diff {
3734            StateDiff::Retraction => Diff::MINUS_ONE,
3735            StateDiff::Addition => Diff::ONE,
3736        }
3737    }
3738}
3739impl TryFrom<Diff> for StateDiff {
3740    type Error = String;
3741
3742    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3743        match diff {
3744            Diff::MINUS_ONE => Ok(Self::Retraction),
3745            Diff::ONE => Ok(Self::Addition),
3746            diff => Err(format!("invalid diff {diff}")),
3747        }
3748    }
3749}
3750
3751/// Information needed to process an update to a temporary item.
3752#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3753pub struct TemporaryItem {
3754    pub id: CatalogItemId,
3755    pub oid: u32,
3756    pub global_id: GlobalId,
3757    pub schema_id: SchemaId,
3758    pub name: String,
3759    pub conn_id: Option<ConnectionId>,
3760    pub create_sql: String,
3761    pub owner_id: RoleId,
3762    pub privileges: Vec<MzAclItem>,
3763    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3764}
3765
3766impl From<CatalogEntry> for TemporaryItem {
3767    fn from(entry: CatalogEntry) -> Self {
3768        let conn_id = entry.conn_id().cloned();
3769        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3770
3771        TemporaryItem {
3772            id: entry.id,
3773            oid: entry.oid,
3774            global_id,
3775            schema_id: entry.name.qualifiers.schema_spec.into(),
3776            name: entry.name.item,
3777            conn_id,
3778            create_sql,
3779            owner_id: entry.owner_id,
3780            privileges: entry.privileges.into_all_values().collect(),
3781            extra_versions,
3782        }
3783    }
3784}
3785
3786impl TemporaryItem {
3787    pub fn item_type(&self) -> CatalogItemType {
3788        item_type(&self.create_sql)
3789    }
3790}
3791
3792/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3793#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3794pub enum BootstrapStateUpdateKind {
3795    Role(durable::objects::Role),
3796    RoleAuth(durable::objects::RoleAuth),
3797    Database(durable::objects::Database),
3798    Schema(durable::objects::Schema),
3799    DefaultPrivilege(durable::objects::DefaultPrivilege),
3800    SystemPrivilege(MzAclItem),
3801    SystemConfiguration(durable::objects::SystemConfiguration),
3802    Cluster(durable::objects::Cluster),
3803    NetworkPolicy(durable::objects::NetworkPolicy),
3804    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3805    ClusterReplica(durable::objects::ClusterReplica),
3806    SourceReferences(durable::objects::SourceReferences),
3807    SystemObjectMapping(durable::objects::SystemObjectMapping),
3808    Item(durable::objects::Item),
3809    Comment(durable::objects::Comment),
3810    AuditLog(durable::objects::AuditLog),
3811    // Storage updates.
3812    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3813    UnfinalizedShard(durable::objects::UnfinalizedShard),
3814}
3815
3816impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3817    fn from(value: BootstrapStateUpdateKind) -> Self {
3818        match value {
3819            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3820            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3821            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3822            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3823            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3824                StateUpdateKind::DefaultPrivilege(kind)
3825            }
3826            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3827                StateUpdateKind::SystemPrivilege(kind)
3828            }
3829            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3830                StateUpdateKind::SystemConfiguration(kind)
3831            }
3832            BootstrapStateUpdateKind::SourceReferences(kind) => {
3833                StateUpdateKind::SourceReferences(kind)
3834            }
3835            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3836            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3837            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3838                StateUpdateKind::IntrospectionSourceIndex(kind)
3839            }
3840            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3841            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3842                StateUpdateKind::SystemObjectMapping(kind)
3843            }
3844            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3845            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3846            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3847            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3848                StateUpdateKind::StorageCollectionMetadata(kind)
3849            }
3850            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3851                StateUpdateKind::UnfinalizedShard(kind)
3852            }
3853        }
3854    }
3855}
3856
3857impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3858    type Error = TemporaryItem;
3859
3860    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3861        match value {
3862            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3863            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3864            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3865            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3866            StateUpdateKind::DefaultPrivilege(kind) => {
3867                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3868            }
3869            StateUpdateKind::SystemPrivilege(kind) => {
3870                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3871            }
3872            StateUpdateKind::SystemConfiguration(kind) => {
3873                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3874            }
3875            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3876            StateUpdateKind::NetworkPolicy(kind) => {
3877                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3878            }
3879            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3880                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3881            }
3882            StateUpdateKind::ClusterReplica(kind) => {
3883                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3884            }
3885            StateUpdateKind::SourceReferences(kind) => {
3886                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3887            }
3888            StateUpdateKind::SystemObjectMapping(kind) => {
3889                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3890            }
3891            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3892            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3893            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3894            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3895            StateUpdateKind::StorageCollectionMetadata(kind) => {
3896                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3897            }
3898            StateUpdateKind::UnfinalizedShard(kind) => {
3899                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3900            }
3901        }
3902    }
3903}