Skip to main content

mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42    UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
56    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
57    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
58    WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68    SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81/// Used to update `self` from the input value while consuming the input value.
82pub trait UpdateFrom<T>: From<T> {
83    fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88    pub name: String,
89    pub id: DatabaseId,
90    pub oid: u32,
91    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93    pub schemas_by_name: BTreeMap<String, SchemaId>,
94    pub owner_id: RoleId,
95    pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99    fn from(database: Database) -> durable::Database {
100        durable::Database {
101            id: database.id,
102            oid: database.oid,
103            name: database.name,
104            owner_id: database.owner_id,
105            privileges: database.privileges.into_all_values().collect(),
106        }
107    }
108}
109
110impl From<durable::Database> for Database {
111    fn from(
112        durable::Database {
113            id,
114            oid,
115            name,
116            owner_id,
117            privileges,
118        }: durable::Database,
119    ) -> Database {
120        Database {
121            id,
122            oid,
123            schemas_by_id: BTreeMap::new(),
124            schemas_by_name: BTreeMap::new(),
125            name,
126            owner_id,
127            privileges: PrivilegeMap::from_mz_acl_items(privileges),
128        }
129    }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133    fn update_from(
134        &mut self,
135        durable::Database {
136            id,
137            oid,
138            name,
139            owner_id,
140            privileges,
141        }: durable::Database,
142    ) {
143        self.id = id;
144        self.oid = oid;
145        self.name = name;
146        self.owner_id = owner_id;
147        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148    }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153    pub name: QualifiedSchemaName,
154    pub id: SchemaSpecifier,
155    pub oid: u32,
156    pub items: BTreeMap<String, CatalogItemId>,
157    pub functions: BTreeMap<String, CatalogItemId>,
158    pub types: BTreeMap<String, CatalogItemId>,
159    pub owner_id: RoleId,
160    pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164    fn from(schema: Schema) -> durable::Schema {
165        durable::Schema {
166            id: schema.id.into(),
167            oid: schema.oid,
168            name: schema.name.schema,
169            database_id: schema.name.database.id(),
170            owner_id: schema.owner_id,
171            privileges: schema.privileges.into_all_values().collect(),
172        }
173    }
174}
175
176impl From<durable::Schema> for Schema {
177    fn from(
178        durable::Schema {
179            id,
180            oid,
181            name,
182            database_id,
183            owner_id,
184            privileges,
185        }: durable::Schema,
186    ) -> Schema {
187        Schema {
188            name: QualifiedSchemaName {
189                database: database_id.into(),
190                schema: name,
191            },
192            id: id.into(),
193            oid,
194            items: BTreeMap::new(),
195            functions: BTreeMap::new(),
196            types: BTreeMap::new(),
197            owner_id,
198            privileges: PrivilegeMap::from_mz_acl_items(privileges),
199        }
200    }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204    fn update_from(
205        &mut self,
206        durable::Schema {
207            id,
208            oid,
209            name,
210            database_id,
211            owner_id,
212            privileges,
213        }: durable::Schema,
214    ) {
215        self.name = QualifiedSchemaName {
216            database: database_id.into(),
217            schema: name,
218        };
219        self.id = id.into();
220        self.oid = oid;
221        self.owner_id = owner_id;
222        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223    }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228    pub name: String,
229    pub id: RoleId,
230    pub oid: u32,
231    pub attributes: RoleAttributes,
232    pub membership: RoleMembership,
233    pub vars: RoleVars,
234}
235
236impl Role {
237    pub fn is_user(&self) -> bool {
238        self.id.is_user()
239    }
240
241    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243    }
244}
245
246impl From<Role> for durable::Role {
247    fn from(role: Role) -> durable::Role {
248        durable::Role {
249            id: role.id,
250            oid: role.oid,
251            name: role.name,
252            attributes: role.attributes,
253            membership: role.membership,
254            vars: role.vars,
255        }
256    }
257}
258
259impl From<durable::Role> for Role {
260    fn from(
261        durable::Role {
262            id,
263            oid,
264            name,
265            attributes,
266            membership,
267            vars,
268        }: durable::Role,
269    ) -> Self {
270        Role {
271            name,
272            id,
273            oid,
274            attributes,
275            membership,
276            vars,
277        }
278    }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282    fn update_from(
283        &mut self,
284        durable::Role {
285            id,
286            oid,
287            name,
288            attributes,
289            membership,
290            vars,
291        }: durable::Role,
292    ) {
293        self.id = id;
294        self.oid = oid;
295        self.name = name;
296        self.attributes = attributes;
297        self.membership = membership;
298        self.vars = vars;
299    }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304    pub role_id: RoleId,
305    pub password_hash: Option<String>,
306    pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311        durable::RoleAuth {
312            role_id: role_auth.role_id,
313            password_hash: role_auth.password_hash,
314            updated_at: role_auth.updated_at,
315        }
316    }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320    fn from(
321        durable::RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }: durable::RoleAuth,
326    ) -> RoleAuth {
327        RoleAuth {
328            role_id,
329            password_hash,
330            updated_at,
331        }
332    }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336    fn update_from(&mut self, from: durable::RoleAuth) {
337        self.role_id = from.role_id;
338        self.password_hash = from.password_hash;
339        self.updated_at = from.updated_at;
340    }
341}
342
343#[derive(Debug, Serialize, Clone, PartialEq)]
344pub struct Cluster {
345    pub name: String,
346    pub id: ClusterId,
347    pub config: ClusterConfig,
348    #[serde(skip)]
349    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
350    /// Objects bound to this cluster. Does not include introspection source
351    /// indexes.
352    pub bound_objects: BTreeSet<CatalogItemId>,
353    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
354    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
355    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
356    pub owner_id: RoleId,
357    pub privileges: PrivilegeMap,
358}
359
360impl Cluster {
361    /// The role of the cluster. Currently used to set alert severity.
362    pub fn role(&self) -> ClusterRole {
363        // NOTE - These roles power monitoring systems. Do not change
364        // them without talking to the cloud or observability groups.
365        if self.name == MZ_SYSTEM_CLUSTER.name {
366            ClusterRole::SystemCritical
367        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
368            ClusterRole::System
369        } else {
370            ClusterRole::User
371        }
372    }
373
374    /// Returns `true` if the cluster is a managed cluster.
375    pub fn is_managed(&self) -> bool {
376        matches!(self.config.variant, ClusterVariant::Managed { .. })
377    }
378
379    /// Lists the user replicas, which are those that do not have the internal flag set.
380    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381        self.replicas().filter(|r| !r.config.location.internal())
382    }
383
384    /// Lists all replicas in the cluster
385    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
386        self.replicas_by_id_.values()
387    }
388
389    /// Lookup a replica by ID.
390    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
391        self.replicas_by_id_.get(&replica_id)
392    }
393
394    /// Lookup a replica ID by name.
395    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
396        self.replica_id_by_name_.get(name).copied()
397    }
398
399    /// Returns the availability zones of this cluster, if they exist.
400    pub fn availability_zones(&self) -> Option<&[String]> {
401        match &self.config.variant {
402            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
403            ClusterVariant::Unmanaged => None,
404        }
405    }
406
407    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
408        let name = self.name.clone();
409        let variant = match &self.config.variant {
410            ClusterVariant::Managed(ClusterVariantManaged {
411                size,
412                availability_zones,
413                logging,
414                replication_factor,
415                optimizer_feature_overrides,
416                schedule,
417            }) => {
418                let introspection = match logging {
419                    ReplicaLogging {
420                        log_logging,
421                        interval: Some(interval),
422                    } => Some(ComputeReplicaIntrospectionConfig {
423                        debugging: *log_logging,
424                        interval: interval.clone(),
425                    }),
426                    ReplicaLogging {
427                        log_logging: _,
428                        interval: None,
429                    } => None,
430                };
431                let compute = ComputeReplicaConfig { introspection };
432                CreateClusterVariant::Managed(CreateClusterManagedPlan {
433                    replication_factor: replication_factor.clone(),
434                    size: size.clone(),
435                    availability_zones: availability_zones.clone(),
436                    compute,
437                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
438                    schedule: schedule.clone(),
439                })
440            }
441            ClusterVariant::Unmanaged => {
442                // Unmanaged clusters are deprecated, so hopefully we can remove
443                // them before we have to implement this.
444                return Err(PlanError::Unsupported {
445                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
446                    discussion_no: None,
447                });
448            }
449        };
450        let workload_class = self.config.workload_class.clone();
451        Ok(CreateClusterPlan {
452            name,
453            variant,
454            workload_class,
455        })
456    }
457}
458
459impl From<Cluster> for durable::Cluster {
460    fn from(cluster: Cluster) -> durable::Cluster {
461        durable::Cluster {
462            id: cluster.id,
463            name: cluster.name,
464            owner_id: cluster.owner_id,
465            privileges: cluster.privileges.into_all_values().collect(),
466            config: cluster.config.into(),
467        }
468    }
469}
470
471impl From<durable::Cluster> for Cluster {
472    fn from(
473        durable::Cluster {
474            id,
475            name,
476            owner_id,
477            privileges,
478            config,
479        }: durable::Cluster,
480    ) -> Self {
481        Cluster {
482            name: name.clone(),
483            id,
484            bound_objects: BTreeSet::new(),
485            log_indexes: BTreeMap::new(),
486            replica_id_by_name_: BTreeMap::new(),
487            replicas_by_id_: BTreeMap::new(),
488            owner_id,
489            privileges: PrivilegeMap::from_mz_acl_items(privileges),
490            config: config.into(),
491        }
492    }
493}
494
495impl UpdateFrom<durable::Cluster> for Cluster {
496    fn update_from(
497        &mut self,
498        durable::Cluster {
499            id,
500            name,
501            owner_id,
502            privileges,
503            config,
504        }: durable::Cluster,
505    ) {
506        self.id = id;
507        self.name = name;
508        self.owner_id = owner_id;
509        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
510        self.config = config.into();
511    }
512}
513
514#[derive(Debug, Serialize, Clone, PartialEq)]
515pub struct ClusterReplica {
516    pub name: String,
517    pub cluster_id: ClusterId,
518    pub replica_id: ReplicaId,
519    pub config: ReplicaConfig,
520    pub owner_id: RoleId,
521}
522
523impl From<ClusterReplica> for durable::ClusterReplica {
524    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
525        durable::ClusterReplica {
526            cluster_id: replica.cluster_id,
527            replica_id: replica.replica_id,
528            name: replica.name,
529            config: replica.config.into(),
530            owner_id: replica.owner_id,
531        }
532    }
533}
534
535#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
536pub struct ClusterReplicaProcessStatus {
537    pub status: ClusterStatus,
538    pub time: DateTime<Utc>,
539}
540
541#[derive(Debug, Serialize, Clone, PartialEq)]
542pub struct SourceReferences {
543    pub updated_at: u64,
544    pub references: Vec<SourceReference>,
545}
546
547#[derive(Debug, Serialize, Clone, PartialEq)]
548pub struct SourceReference {
549    pub name: String,
550    pub namespace: Option<String>,
551    pub columns: Vec<String>,
552}
553
554impl From<SourceReference> for durable::SourceReference {
555    fn from(source_reference: SourceReference) -> durable::SourceReference {
556        durable::SourceReference {
557            name: source_reference.name,
558            namespace: source_reference.namespace,
559            columns: source_reference.columns,
560        }
561    }
562}
563
564impl SourceReferences {
565    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
566        durable::SourceReferences {
567            source_id,
568            updated_at: self.updated_at,
569            references: self.references.into_iter().map(Into::into).collect(),
570        }
571    }
572}
573
574impl From<durable::SourceReference> for SourceReference {
575    fn from(source_reference: durable::SourceReference) -> SourceReference {
576        SourceReference {
577            name: source_reference.name,
578            namespace: source_reference.namespace,
579            columns: source_reference.columns,
580        }
581    }
582}
583
584impl From<durable::SourceReferences> for SourceReferences {
585    fn from(source_references: durable::SourceReferences) -> SourceReferences {
586        SourceReferences {
587            updated_at: source_references.updated_at,
588            references: source_references
589                .references
590                .into_iter()
591                .map(|source_reference| source_reference.into())
592                .collect(),
593        }
594    }
595}
596
597impl From<mz_sql::plan::SourceReference> for SourceReference {
598    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
599        SourceReference {
600            name: source_reference.name,
601            namespace: source_reference.namespace,
602            columns: source_reference.columns,
603        }
604    }
605}
606
607impl From<mz_sql::plan::SourceReferences> for SourceReferences {
608    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
609        SourceReferences {
610            updated_at: source_references.updated_at,
611            references: source_references
612                .references
613                .into_iter()
614                .map(|source_reference| source_reference.into())
615                .collect(),
616        }
617    }
618}
619
620impl From<SourceReferences> for mz_sql::plan::SourceReferences {
621    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
622        mz_sql::plan::SourceReferences {
623            updated_at: source_references.updated_at,
624            references: source_references
625                .references
626                .into_iter()
627                .map(|source_reference| source_reference.into())
628                .collect(),
629        }
630    }
631}
632
633impl From<SourceReference> for mz_sql::plan::SourceReference {
634    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
635        mz_sql::plan::SourceReference {
636            name: source_reference.name,
637            namespace: source_reference.namespace,
638            columns: source_reference.columns,
639        }
640    }
641}
642
643#[derive(Clone, Debug, Serialize)]
644pub struct CatalogEntry {
645    pub item: CatalogItem,
646    #[serde(skip)]
647    pub referenced_by: Vec<CatalogItemId>,
648    // TODO(database-issues#7922)––this should have an invariant tied to it that all
649    // dependents (i.e. entries in this field) have IDs greater than this
650    // entry's ID.
651    #[serde(skip)]
652    pub used_by: Vec<CatalogItemId>,
653    pub id: CatalogItemId,
654    pub oid: u32,
655    pub name: QualifiedItemName,
656    pub owner_id: RoleId,
657    pub privileges: PrivilegeMap,
658}
659
660/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
661/// A single item in the catalog may be associated with multiple "collections".
662///
663/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
664/// currently running dataflow, etc.
665///
666/// Items in the Catalog have a stable name -> ID mapping, in other words for
667/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
668/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
669/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
670/// to a table. We can't just change the schema of the underlying Persist Shard
671/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
672/// allocate a new [`GlobalId`] to refer to the new version of the table, and
673/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
674#[derive(Clone, Debug)]
675pub struct CatalogCollectionEntry {
676    pub entry: CatalogEntry,
677    pub version: RelationVersionSelector,
678}
679
680impl CatalogCollectionEntry {
681    pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
682        self.item().relation_desc(self.version)
683    }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
688        self.item().relation_desc(self.version)
689    }
690
691    fn global_id(&self) -> GlobalId {
692        self.entry
693            .item()
694            .global_id_for_version(self.version)
695            .expect("catalog corruption, missing version!")
696    }
697}
698
699impl Deref for CatalogCollectionEntry {
700    type Target = CatalogEntry;
701
702    fn deref(&self) -> &CatalogEntry {
703        &self.entry
704    }
705}
706
707impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
708    fn name(&self) -> &QualifiedItemName {
709        self.entry.name()
710    }
711
712    fn id(&self) -> CatalogItemId {
713        self.entry.id()
714    }
715
716    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
717        Box::new(self.entry.global_ids())
718    }
719
720    fn oid(&self) -> u32 {
721        self.entry.oid()
722    }
723
724    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
725        self.entry.func()
726    }
727
728    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
729        self.entry.source_desc()
730    }
731
732    fn connection(
733        &self,
734    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
735    {
736        mz_sql::catalog::CatalogItem::connection(&self.entry)
737    }
738
739    fn create_sql(&self) -> &str {
740        self.entry.create_sql()
741    }
742
743    fn item_type(&self) -> SqlCatalogItemType {
744        self.entry.item_type()
745    }
746
747    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
748        self.entry.index_details()
749    }
750
751    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
752        self.entry.writable_table_details()
753    }
754
755    fn replacement_target(&self) -> Option<CatalogItemId> {
756        self.entry.replacement_target()
757    }
758
759    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
760        self.entry.type_details()
761    }
762
763    fn references(&self) -> &ResolvedIds {
764        self.entry.references()
765    }
766
767    fn uses(&self) -> BTreeSet<CatalogItemId> {
768        self.entry.uses()
769    }
770
771    fn referenced_by(&self) -> &[CatalogItemId] {
772        self.entry.referenced_by()
773    }
774
775    fn used_by(&self) -> &[CatalogItemId] {
776        self.entry.used_by()
777    }
778
779    fn subsource_details(
780        &self,
781    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
782        self.entry.subsource_details()
783    }
784
785    fn source_export_details(
786        &self,
787    ) -> Option<(
788        CatalogItemId,
789        &UnresolvedItemName,
790        &SourceExportDetails,
791        &SourceExportDataConfig<ReferencedConnection>,
792    )> {
793        self.entry.source_export_details()
794    }
795
796    fn is_progress_source(&self) -> bool {
797        self.entry.is_progress_source()
798    }
799
800    fn progress_id(&self) -> Option<CatalogItemId> {
801        self.entry.progress_id()
802    }
803
804    fn owner_id(&self) -> RoleId {
805        *self.entry.owner_id()
806    }
807
808    fn privileges(&self) -> &PrivilegeMap {
809        self.entry.privileges()
810    }
811
812    fn cluster_id(&self) -> Option<ClusterId> {
813        self.entry.item().cluster_id()
814    }
815
816    fn at_version(
817        &self,
818        version: RelationVersionSelector,
819    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
820        Box::new(CatalogCollectionEntry {
821            entry: self.entry.clone(),
822            version,
823        })
824    }
825
826    fn latest_version(&self) -> Option<RelationVersion> {
827        self.entry.latest_version()
828    }
829}
830
831#[derive(Debug, Clone, Serialize)]
832pub enum CatalogItem {
833    Table(Table),
834    Source(Source),
835    Log(Log),
836    View(View),
837    MaterializedView(MaterializedView),
838    Sink(Sink),
839    Index(Index),
840    Type(Type),
841    Func(Func),
842    Secret(Secret),
843    Connection(Connection),
844}
845
846impl From<CatalogEntry> for durable::Item {
847    fn from(entry: CatalogEntry) -> durable::Item {
848        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
849        durable::Item {
850            id: entry.id,
851            oid: entry.oid,
852            global_id,
853            schema_id: entry.name.qualifiers.schema_spec.into(),
854            name: entry.name.item,
855            create_sql,
856            owner_id: entry.owner_id,
857            privileges: entry.privileges.into_all_values().collect(),
858            extra_versions,
859        }
860    }
861}
862
863#[derive(Debug, Clone, Serialize)]
864pub struct Table {
865    /// Parse-able SQL that defines this table.
866    pub create_sql: Option<String>,
867    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
868    pub desc: VersionedRelationDesc,
869    /// Versions of this table, and the [`GlobalId`]s that refer to them.
870    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
871    pub collections: BTreeMap<RelationVersion, GlobalId>,
872    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
873    #[serde(skip)]
874    pub conn_id: Option<ConnectionId>,
875    /// Other catalog objects referenced by this table, e.g. custom types.
876    pub resolved_ids: ResolvedIds,
877    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
878    pub custom_logical_compaction_window: Option<CompactionWindow>,
879    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
880    /// session variable.
881    ///
882    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
883    pub is_retained_metrics_object: bool,
884    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
885    pub data_source: TableDataSource,
886}
887
888impl Table {
889    pub fn timeline(&self) -> Timeline {
890        match &self.data_source {
891            // The Coordinator controls insertions for writable tables
892            // (including system tables), so they are realtime.
893            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
894            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
895        }
896    }
897
898    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
899    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
900        self.collections.values().copied()
901    }
902
903    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
904    pub fn global_id_writes(&self) -> GlobalId {
905        *self
906            .collections
907            .last_key_value()
908            .expect("at least one version of a table")
909            .1
910    }
911
912    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
913    pub fn collection_descs(
914        &self,
915    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
916        self.collections.iter().map(|(version, gid)| {
917            let desc = self
918                .desc
919                .at_version(RelationVersionSelector::Specific(*version));
920            (*gid, *version, desc)
921        })
922    }
923
924    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
925    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
926        let (version, _gid) = self
927            .collections
928            .iter()
929            .find(|(_version, gid)| *gid == id)
930            .expect("GlobalId to exist");
931        self.desc
932            .at_version(RelationVersionSelector::Specific(*version))
933    }
934}
935
936#[derive(Clone, Debug, Serialize)]
937pub enum TableDataSource {
938    /// The table owns data created via INSERT/UPDATE/DELETE statements.
939    TableWrites {
940        #[serde(skip)]
941        defaults: Vec<Expr<Aug>>,
942    },
943
944    /// The table receives its data from the identified `DataSourceDesc`.
945    /// This table type does not support INSERT/UPDATE/DELETE statements.
946    DataSource {
947        desc: DataSourceDesc,
948        timeline: Timeline,
949    },
950}
951
952#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
953pub enum DataSourceDesc {
954    /// Receives data from an external system
955    Ingestion {
956        desc: SourceDesc<ReferencedConnection>,
957        cluster_id: ClusterId,
958    },
959    /// Receives data from an external system
960    OldSyntaxIngestion {
961        desc: SourceDesc<ReferencedConnection>,
962        cluster_id: ClusterId,
963        // If we're dealing with an old syntax ingestion the progress id will be some other collection
964        // and the ingestion itself will have the data from an external reference
965        progress_subsource: CatalogItemId,
966        data_config: SourceExportDataConfig<ReferencedConnection>,
967        details: SourceExportDetails,
968    },
969    /// This source receives its data from the identified ingestion,
970    /// specifically the output identified by `external_reference`.
971    /// N.B. that `external_reference` should not be used to identify
972    /// anything downstream of purification, as the purification process
973    /// encodes source-specific identifiers into the `details` struct.
974    /// The `external_reference` field is only used here for displaying
975    /// human-readable names in system tables.
976    IngestionExport {
977        ingestion_id: CatalogItemId,
978        external_reference: UnresolvedItemName,
979        details: SourceExportDetails,
980        data_config: SourceExportDataConfig<ReferencedConnection>,
981    },
982    /// Receives introspection data from an internal system
983    Introspection(IntrospectionType),
984    /// Receives data from the source's reclocking/remapping operations.
985    Progress,
986    /// Receives data from HTTP requests.
987    Webhook {
988        /// Optional components used to validation a webhook request.
989        validate_using: Option<WebhookValidation>,
990        /// Describes how we deserialize the body of a webhook request.
991        body_format: WebhookBodyFormat,
992        /// Describes whether or not to include headers and how to map them.
993        headers: WebhookHeaders,
994        /// The cluster which this source is associated with.
995        cluster_id: ClusterId,
996    },
997    /// Exposes the contents of the catalog shard.
998    Catalog,
999}
1000
1001impl From<IntrospectionType> for DataSourceDesc {
1002    fn from(typ: IntrospectionType) -> Self {
1003        Self::Introspection(typ)
1004    }
1005}
1006
1007impl DataSourceDesc {
1008    /// The key and value formats of the data source.
1009    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1010        match &self {
1011            DataSourceDesc::Ingestion { .. } => (None, None),
1012            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1013                match &data_config.encoding.as_ref() {
1014                    Some(encoding) => match &encoding.key {
1015                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1016                        None => (None, Some(encoding.value.type_())),
1017                    },
1018                    None => (None, None),
1019                }
1020            }
1021            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1022                Some(encoding) => match &encoding.key {
1023                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1024                    None => (None, Some(encoding.value.type_())),
1025                },
1026                None => (None, None),
1027            },
1028            DataSourceDesc::Introspection(_)
1029            | DataSourceDesc::Webhook { .. }
1030            | DataSourceDesc::Progress
1031            | DataSourceDesc::Catalog => (None, None),
1032        }
1033    }
1034
1035    /// Envelope of the data source.
1036    pub fn envelope(&self) -> Option<&str> {
1037        // Note how "none"/"append-only" is different from `None`. Source
1038        // sources don't have an envelope (internal logs, for example), while
1039        // other sources have an envelope that we call the "NONE"-envelope.
1040
1041        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1042            match envelope {
1043                SourceEnvelope::None(_) => "none",
1044                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1045                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1046                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1047                        // NOTE(aljoscha): Should we somehow mark that this is
1048                        // using upsert internally? See note above about
1049                        // DEBEZIUM.
1050                        "debezium"
1051                    }
1052                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1053                        "upsert-value-err-inline"
1054                    }
1055                },
1056                SourceEnvelope::CdcV2 => {
1057                    // TODO(aljoscha): Should we even report this? It's
1058                    // currently not exposed.
1059                    "materialize"
1060                }
1061            }
1062        }
1063
1064        match self {
1065            // NOTE(aljoscha): We could move the block for ingestions into
1066            // `SourceEnvelope` itself, but that one feels more like an internal
1067            // thing and adapter should own how we represent envelopes as a
1068            // string? It would not be hard to convince me otherwise, though.
1069            DataSourceDesc::Ingestion { .. } => None,
1070            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1071                Some(envelope_string(&data_config.envelope))
1072            }
1073            DataSourceDesc::IngestionExport { data_config, .. } => {
1074                Some(envelope_string(&data_config.envelope))
1075            }
1076            DataSourceDesc::Introspection(_)
1077            | DataSourceDesc::Webhook { .. }
1078            | DataSourceDesc::Progress
1079            | DataSourceDesc::Catalog => None,
1080        }
1081    }
1082}
1083
1084#[derive(Debug, Clone, Serialize)]
1085pub struct Source {
1086    /// Parse-able SQL that defines this table.
1087    pub create_sql: Option<String>,
1088    /// [`GlobalId`] used to reference this source from outside the catalog.
1089    pub global_id: GlobalId,
1090    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1091    #[serde(skip)]
1092    pub data_source: DataSourceDesc,
1093    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1094    pub desc: RelationDesc,
1095    /// The timeline this source exists on.
1096    pub timeline: Timeline,
1097    /// Other catalog objects referenced by this table, e.g. custom types.
1098    pub resolved_ids: ResolvedIds,
1099    /// This value is ignored for subsources, i.e. for
1100    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1101    /// sources logical compaction window.
1102    pub custom_logical_compaction_window: Option<CompactionWindow>,
1103    /// Whether the source's logical compaction window is controlled by
1104    /// METRICS_RETENTION
1105    pub is_retained_metrics_object: bool,
1106}
1107
1108impl Source {
1109    /// Creates a new `Source`.
1110    ///
1111    /// # Panics
1112    /// - If an ingestion-based plan is not given a cluster_id.
1113    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1114    /// - If a non-ingestion-based source is given a cluster_id.
1115    pub fn new(
1116        plan: CreateSourcePlan,
1117        global_id: GlobalId,
1118        resolved_ids: ResolvedIds,
1119        custom_logical_compaction_window: Option<CompactionWindow>,
1120        is_retained_metrics_object: bool,
1121    ) -> Source {
1122        Source {
1123            create_sql: Some(plan.source.create_sql),
1124            data_source: match plan.source.data_source {
1125                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1126                    desc,
1127                    cluster_id: plan
1128                        .in_cluster
1129                        .expect("ingestion-based sources must be given a cluster ID"),
1130                },
1131                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1132                    desc,
1133                    progress_subsource,
1134                    data_config,
1135                    details,
1136                } => DataSourceDesc::OldSyntaxIngestion {
1137                    desc,
1138                    cluster_id: plan
1139                        .in_cluster
1140                        .expect("ingestion-based sources must be given a cluster ID"),
1141                    progress_subsource,
1142                    data_config,
1143                    details,
1144                },
1145                mz_sql::plan::DataSourceDesc::Progress => {
1146                    assert!(
1147                        plan.in_cluster.is_none(),
1148                        "subsources must not have a host config or cluster_id defined"
1149                    );
1150                    DataSourceDesc::Progress
1151                }
1152                mz_sql::plan::DataSourceDesc::IngestionExport {
1153                    ingestion_id,
1154                    external_reference,
1155                    details,
1156                    data_config,
1157                } => {
1158                    assert!(
1159                        plan.in_cluster.is_none(),
1160                        "subsources must not have a host config or cluster_id defined"
1161                    );
1162                    DataSourceDesc::IngestionExport {
1163                        ingestion_id,
1164                        external_reference,
1165                        details,
1166                        data_config,
1167                    }
1168                }
1169                mz_sql::plan::DataSourceDesc::Webhook {
1170                    validate_using,
1171                    body_format,
1172                    headers,
1173                    cluster_id,
1174                } => {
1175                    mz_ore::soft_assert_or_log!(
1176                        cluster_id.is_none(),
1177                        "cluster_id set at Source level for Webhooks"
1178                    );
1179                    DataSourceDesc::Webhook {
1180                        validate_using,
1181                        body_format,
1182                        headers,
1183                        cluster_id: plan
1184                            .in_cluster
1185                            .expect("webhook sources must be given a cluster ID"),
1186                    }
1187                }
1188            },
1189            desc: plan.source.desc,
1190            global_id,
1191            timeline: plan.timeline,
1192            resolved_ids,
1193            custom_logical_compaction_window: plan
1194                .source
1195                .compaction_window
1196                .or(custom_logical_compaction_window),
1197            is_retained_metrics_object,
1198        }
1199    }
1200
1201    /// Type of the source.
1202    pub fn source_type(&self) -> &str {
1203        match &self.data_source {
1204            DataSourceDesc::Ingestion { desc, .. }
1205            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1206            DataSourceDesc::Progress => "progress",
1207            DataSourceDesc::IngestionExport { .. } => "subsource",
1208            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1209            DataSourceDesc::Webhook { .. } => "webhook",
1210        }
1211    }
1212
1213    /// Connection ID of the source, if one exists.
1214    pub fn connection_id(&self) -> Option<CatalogItemId> {
1215        match &self.data_source {
1216            DataSourceDesc::Ingestion { desc, .. }
1217            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1218            DataSourceDesc::IngestionExport { .. }
1219            | DataSourceDesc::Introspection(_)
1220            | DataSourceDesc::Webhook { .. }
1221            | DataSourceDesc::Progress
1222            | DataSourceDesc::Catalog => None,
1223        }
1224    }
1225
1226    /// The single [`GlobalId`] that refers to this Source.
1227    pub fn global_id(&self) -> GlobalId {
1228        self.global_id
1229    }
1230
1231    /// The expensive resource that each source consumes is persist shards. To
1232    /// prevent abuse, we want to prevent users from creating sources that use an
1233    /// unbounded number of persist shards. But we also don't want to count
1234    /// persist shards that are mandated by the system (e.g., the progress
1235    /// shard) so that future versions of Materialize can introduce additional
1236    /// per-source shards (e.g., a per-source status shard) without impacting
1237    /// the limit calculation.
1238    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1239        match &self.data_source {
1240            DataSourceDesc::Ingestion { .. } => 0,
1241            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1242                match &desc.connection {
1243                    // These multi-output sources do not use their primary
1244                    // source's data shard, so we don't include it in accounting
1245                    // for users.
1246                    GenericSourceConnection::Postgres(_)
1247                    | GenericSourceConnection::MySql(_)
1248                    | GenericSourceConnection::SqlServer(_) => 0,
1249                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1250                        // Load generators that output data in their primary shard
1251                        LoadGenerator::Clock
1252                        | LoadGenerator::Counter { .. }
1253                        | LoadGenerator::Datums
1254                        | LoadGenerator::KeyValue(_) => 1,
1255                        LoadGenerator::Auction
1256                        | LoadGenerator::Marketing
1257                        | LoadGenerator::Tpch { .. } => 0,
1258                    },
1259                    GenericSourceConnection::Kafka(_) => 1,
1260                }
1261            }
1262            //  DataSourceDesc::IngestionExport represents a subsource, which
1263            //  use a data shard.
1264            DataSourceDesc::IngestionExport { .. } => 1,
1265            DataSourceDesc::Webhook { .. } => 1,
1266            // Introspection, catalog, and progress subsources are not under the user's control, so
1267            // shouldn't count toward their quota.
1268            DataSourceDesc::Introspection(_)
1269            | DataSourceDesc::Progress
1270            | DataSourceDesc::Catalog => 0,
1271        }
1272    }
1273}
1274
1275#[derive(Debug, Clone, Serialize)]
1276pub struct Log {
1277    /// The category of data this log stores.
1278    pub variant: LogVariant,
1279    /// [`GlobalId`] used to reference this log from outside the catalog.
1280    pub global_id: GlobalId,
1281}
1282
1283impl Log {
1284    /// The single [`GlobalId`] that refers to this Log.
1285    pub fn global_id(&self) -> GlobalId {
1286        self.global_id
1287    }
1288}
1289
1290#[derive(Debug, Clone, Serialize)]
1291pub struct Sink {
1292    /// Parse-able SQL that defines this sink.
1293    pub create_sql: String,
1294    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1295    pub global_id: GlobalId,
1296    /// Collection we read into this sink.
1297    pub from: GlobalId,
1298    /// Connection to the external service we're sinking into, e.g. Kafka.
1299    pub connection: StorageSinkConnection<ReferencedConnection>,
1300    /// Envelope we use to sink into the external system.
1301    ///
1302    /// TODO(guswynn): this probably should just be in the `connection`.
1303    pub envelope: SinkEnvelope,
1304    /// Emit an initial snapshot into the sink.
1305    pub with_snapshot: bool,
1306    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1307    pub version: u64,
1308    /// Other catalog objects this sink references.
1309    pub resolved_ids: ResolvedIds,
1310    /// Cluster this sink runs on.
1311    pub cluster_id: ClusterId,
1312    /// Commit interval for the sink.
1313    pub commit_interval: Option<Duration>,
1314}
1315
1316impl Sink {
1317    pub fn sink_type(&self) -> &str {
1318        self.connection.name()
1319    }
1320
1321    /// Envelope of the sink.
1322    pub fn envelope(&self) -> Option<&str> {
1323        match &self.envelope {
1324            SinkEnvelope::Debezium => Some("debezium"),
1325            SinkEnvelope::Upsert => Some("upsert"),
1326            SinkEnvelope::Append => Some("append"),
1327        }
1328    }
1329
1330    /// Output a combined format string of the sink. For legacy reasons
1331    /// if the key-format is none or the key & value formats are
1332    /// both the same (either avro or json), we return the value format name,
1333    /// otherwise we return a composite name.
1334    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1335        match &self.connection {
1336            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1337            StorageSinkConnection::Iceberg(_) => None,
1338        }
1339    }
1340
1341    /// Output distinct key_format and value_format of the sink.
1342    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1343        match &self.connection {
1344            StorageSinkConnection::Kafka(connection) => {
1345                let key_format = connection
1346                    .format
1347                    .key_format
1348                    .as_ref()
1349                    .map(|f| f.get_format_name());
1350                let value_format = connection.format.value_format.get_format_name();
1351                Some((key_format, value_format))
1352            }
1353            StorageSinkConnection::Iceberg(_) => None,
1354        }
1355    }
1356
1357    pub fn connection_id(&self) -> Option<CatalogItemId> {
1358        self.connection.connection_id()
1359    }
1360
1361    /// The single [`GlobalId`] that this Sink can be referenced by.
1362    pub fn global_id(&self) -> GlobalId {
1363        self.global_id
1364    }
1365}
1366
1367#[derive(Debug, Clone, Serialize)]
1368pub struct View {
1369    /// Parse-able SQL that defines this view.
1370    pub create_sql: String,
1371    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1372    pub global_id: GlobalId,
1373    /// Unoptimized high-level expression from parsing the `create_sql`.
1374    pub raw_expr: Arc<HirRelationExpr>,
1375    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1376    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1377    /// Columns of this view.
1378    pub desc: RelationDesc,
1379    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1380    pub conn_id: Option<ConnectionId>,
1381    /// Other catalog objects that are referenced by this view, determined at name resolution.
1382    pub resolved_ids: ResolvedIds,
1383    /// All of the catalog objects that are referenced by this view.
1384    pub dependencies: DependencyIds,
1385}
1386
1387impl View {
1388    /// The single [`GlobalId`] this [`View`] can be referenced by.
1389    pub fn global_id(&self) -> GlobalId {
1390        self.global_id
1391    }
1392}
1393
1394#[derive(Debug, Clone, Serialize)]
1395pub struct MaterializedView {
1396    /// Parse-able SQL that defines this materialized view.
1397    pub create_sql: String,
1398    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1399    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1400    pub collections: BTreeMap<RelationVersion, GlobalId>,
1401    /// Raw high-level expression from planning, derived from the `create_sql`.
1402    pub raw_expr: Arc<HirRelationExpr>,
1403    /// Optimized mid-level expression, derived from the `raw_expr`.
1404    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1405    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1406    pub desc: VersionedRelationDesc,
1407    /// Other catalog items that this materialized view references, determined at name resolution.
1408    pub resolved_ids: ResolvedIds,
1409    /// All of the catalog objects that are referenced by this view.
1410    pub dependencies: DependencyIds,
1411    /// ID of the materialized view this materialized view is intended to replace.
1412    pub replacement_target: Option<CatalogItemId>,
1413    /// Cluster that this materialized view runs on.
1414    pub cluster_id: ClusterId,
1415    /// If set, only install this materialized view's dataflow on the specified replica.
1416    pub target_replica: Option<ReplicaId>,
1417    /// Column indexes that we assert are not `NULL`.
1418    ///
1419    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1420    pub non_null_assertions: Vec<usize>,
1421    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1422    pub custom_logical_compaction_window: Option<CompactionWindow>,
1423    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1424    pub refresh_schedule: Option<RefreshSchedule>,
1425    /// The initial `as_of` of the storage collection associated with the materialized view.
1426    ///
1427    /// Note: This doesn't change upon restarts.
1428    /// (The dataflow's initial `as_of` can be different.)
1429    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1430    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1431    // consistency checks do two dumps and compare them. One of these states comes from the durable
1432    // catalog, but the following fields are not restored when the consistency check loads the
1433    // durable catalog, hence we need `#[serde(skip)]`.
1434    /// Optimized global MIR plan, set after global optimization.
1435    #[serde(skip)]
1436    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1437    /// Physical (LIR) plan, set after physical optimization.
1438    #[serde(skip)]
1439    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1440    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1441    #[serde(skip)]
1442    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1443}
1444
1445impl MaterializedView {
1446    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1447    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1448        self.collections.values().copied()
1449    }
1450
1451    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1452    /// version.
1453    pub fn global_id_writes(&self) -> GlobalId {
1454        *self
1455            .collections
1456            .last_key_value()
1457            .expect("at least one version of a materialized view")
1458            .1
1459    }
1460
1461    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1462    pub fn collection_descs(
1463        &self,
1464    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1465        self.collections.iter().map(|(version, gid)| {
1466            let desc = self
1467                .desc
1468                .at_version(RelationVersionSelector::Specific(*version));
1469            (*gid, *version, desc)
1470        })
1471    }
1472
1473    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1474    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1475        let (version, _gid) = self
1476            .collections
1477            .iter()
1478            .find(|(_version, gid)| *gid == id)
1479            .expect("GlobalId to exist");
1480        self.desc
1481            .at_version(RelationVersionSelector::Specific(*version))
1482    }
1483
1484    /// Apply the given replacement materialized view to this [`MaterializedView`].
1485    pub fn apply_replacement(&mut self, replacement: Self) {
1486        let target_id = replacement
1487            .replacement_target
1488            .expect("replacement has target");
1489
1490        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1491            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1492                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1493            });
1494            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1495                cmvs
1496            } else {
1497                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1498            }
1499        }
1500
1501        let old_stmt = parse(&self.create_sql);
1502        let rpl_stmt = parse(&replacement.create_sql);
1503        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1504            if_exists: old_stmt.if_exists,
1505            name: old_stmt.name,
1506            columns: rpl_stmt.columns,
1507            replacement_for: None,
1508            in_cluster: rpl_stmt.in_cluster,
1509            in_cluster_replica: rpl_stmt.in_cluster_replica,
1510            query: rpl_stmt.query,
1511            as_of: rpl_stmt.as_of,
1512            with_options: rpl_stmt.with_options,
1513        };
1514        let create_sql = new_stmt.to_ast_string_stable();
1515
1516        let mut collections = std::mem::take(&mut self.collections);
1517        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1518        // necessary evolve the relation schema, so that version might be lower than the actual
1519        // latest version.
1520        let latest_version = collections.keys().max().expect("at least one version");
1521        let new_version = latest_version.bump();
1522        collections.insert(new_version, replacement.global_id_writes());
1523
1524        let mut resolved_ids = replacement.resolved_ids;
1525        resolved_ids.remove_item(&target_id);
1526        let mut dependencies = replacement.dependencies;
1527        dependencies.0.remove(&target_id);
1528
1529        *self = Self {
1530            create_sql,
1531            collections,
1532            raw_expr: replacement.raw_expr,
1533            locally_optimized_expr: replacement.locally_optimized_expr,
1534            desc: replacement.desc,
1535            resolved_ids,
1536            dependencies,
1537            replacement_target: None,
1538            cluster_id: replacement.cluster_id,
1539            target_replica: replacement.target_replica,
1540            non_null_assertions: replacement.non_null_assertions,
1541            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1542            refresh_schedule: replacement.refresh_schedule,
1543            initial_as_of: replacement.initial_as_of,
1544            optimized_plan: replacement.optimized_plan,
1545            physical_plan: replacement.physical_plan,
1546            dataflow_metainfo: replacement.dataflow_metainfo,
1547        };
1548    }
1549}
1550
1551#[derive(Debug, Clone, Serialize)]
1552pub struct Index {
1553    /// Parse-able SQL that defines this table.
1554    pub create_sql: String,
1555    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1556    pub global_id: GlobalId,
1557    /// The [`GlobalId`] this Index is on.
1558    pub on: GlobalId,
1559    /// Keys of the index.
1560    pub keys: Arc<[MirScalarExpr]>,
1561    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1562    pub conn_id: Option<ConnectionId>,
1563    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1564    pub resolved_ids: ResolvedIds,
1565    /// Cluster this index is installed on.
1566    pub cluster_id: ClusterId,
1567    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1568    pub custom_logical_compaction_window: Option<CompactionWindow>,
1569    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1570    /// session variable.
1571    ///
1572    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1573    pub is_retained_metrics_object: bool,
1574    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1575    // consistency checks do two dumps and compare them. One of these states comes from the durable
1576    // catalog, but the following fields are not restored when the consistency check loads the
1577    // durable catalog, hence we need `#[serde(skip)]`.
1578    /// Optimized global MIR plan, set after global optimization.
1579    #[serde(skip)]
1580    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1581    /// Physical (LIR) plan, set after physical optimization.
1582    #[serde(skip)]
1583    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1584    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1585    #[serde(skip)]
1586    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1587}
1588
1589impl Index {
1590    /// The [`GlobalId`] that refers to this Index.
1591    pub fn global_id(&self) -> GlobalId {
1592        self.global_id
1593    }
1594}
1595
1596#[derive(Debug, Clone, Serialize)]
1597pub struct Type {
1598    /// Parse-able SQL that defines this type.
1599    pub create_sql: Option<String>,
1600    /// [`GlobalId`] used to reference this type from outside the catalog.
1601    pub global_id: GlobalId,
1602    #[serde(skip)]
1603    pub details: CatalogTypeDetails<IdReference>,
1604    /// Other catalog objects referenced by this type.
1605    pub resolved_ids: ResolvedIds,
1606}
1607
1608#[derive(Debug, Clone, Serialize)]
1609pub struct Func {
1610    /// Static definition of the function.
1611    #[serde(skip)]
1612    pub inner: &'static mz_sql::func::Func,
1613    /// [`GlobalId`] used to reference this function from outside the catalog.
1614    pub global_id: GlobalId,
1615}
1616
1617#[derive(Debug, Clone, Serialize)]
1618pub struct Secret {
1619    /// Parse-able SQL that defines this secret.
1620    pub create_sql: String,
1621    /// [`GlobalId`] used to reference this secret from outside the catalog.
1622    pub global_id: GlobalId,
1623}
1624
1625#[derive(Debug, Clone, Serialize)]
1626pub struct Connection {
1627    /// Parse-able SQL that defines this connection.
1628    pub create_sql: String,
1629    /// [`GlobalId`] used to reference this connection from the storage layer.
1630    pub global_id: GlobalId,
1631    /// The kind of connection.
1632    pub details: ConnectionDetails,
1633    /// Other objects this connection depends on.
1634    pub resolved_ids: ResolvedIds,
1635}
1636
1637impl Connection {
1638    /// The single [`GlobalId`] used to reference this connection.
1639    pub fn global_id(&self) -> GlobalId {
1640        self.global_id
1641    }
1642}
1643
1644#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1645pub struct NetworkPolicy {
1646    pub name: String,
1647    pub id: NetworkPolicyId,
1648    pub oid: u32,
1649    pub rules: Vec<NetworkPolicyRule>,
1650    pub owner_id: RoleId,
1651    pub privileges: PrivilegeMap,
1652}
1653
1654impl From<NetworkPolicy> for durable::NetworkPolicy {
1655    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1656        durable::NetworkPolicy {
1657            id: policy.id,
1658            oid: policy.oid,
1659            name: policy.name,
1660            rules: policy.rules,
1661            owner_id: policy.owner_id,
1662            privileges: policy.privileges.into_all_values().collect(),
1663        }
1664    }
1665}
1666
1667impl From<durable::NetworkPolicy> for NetworkPolicy {
1668    fn from(
1669        durable::NetworkPolicy {
1670            id,
1671            oid,
1672            name,
1673            rules,
1674            owner_id,
1675            privileges,
1676        }: durable::NetworkPolicy,
1677    ) -> Self {
1678        NetworkPolicy {
1679            id,
1680            oid,
1681            name,
1682            rules,
1683            owner_id,
1684            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1685        }
1686    }
1687}
1688
1689impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1690    fn update_from(
1691        &mut self,
1692        durable::NetworkPolicy {
1693            id,
1694            oid,
1695            name,
1696            rules,
1697            owner_id,
1698            privileges,
1699        }: durable::NetworkPolicy,
1700    ) {
1701        self.id = id;
1702        self.oid = oid;
1703        self.name = name;
1704        self.rules = rules;
1705        self.owner_id = owner_id;
1706        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1707    }
1708}
1709
1710impl CatalogItem {
1711    /// Returns a string indicating the type of this catalog entry.
1712    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1713        match self {
1714            CatalogItem::Table(_) => CatalogItemType::Table,
1715            CatalogItem::Source(_) => CatalogItemType::Source,
1716            CatalogItem::Log(_) => CatalogItemType::Source,
1717            CatalogItem::Sink(_) => CatalogItemType::Sink,
1718            CatalogItem::View(_) => CatalogItemType::View,
1719            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1720            CatalogItem::Index(_) => CatalogItemType::Index,
1721            CatalogItem::Type(_) => CatalogItemType::Type,
1722            CatalogItem::Func(_) => CatalogItemType::Func,
1723            CatalogItem::Secret(_) => CatalogItemType::Secret,
1724            CatalogItem::Connection(_) => CatalogItemType::Connection,
1725        }
1726    }
1727
1728    /// Returns the [`GlobalId`]s that reference this item, if any.
1729    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1730        let gid = match self {
1731            CatalogItem::Source(source) => source.global_id,
1732            CatalogItem::Log(log) => log.global_id,
1733            CatalogItem::Sink(sink) => sink.global_id,
1734            CatalogItem::View(view) => view.global_id,
1735            CatalogItem::MaterializedView(mv) => {
1736                return itertools::Either::Left(mv.collections.values().copied());
1737            }
1738            CatalogItem::Index(index) => index.global_id,
1739            CatalogItem::Func(func) => func.global_id,
1740            CatalogItem::Type(ty) => ty.global_id,
1741            CatalogItem::Secret(secret) => secret.global_id,
1742            CatalogItem::Connection(conn) => conn.global_id,
1743            CatalogItem::Table(table) => {
1744                return itertools::Either::Left(table.collections.values().copied());
1745            }
1746        };
1747        itertools::Either::Right(std::iter::once(gid))
1748    }
1749
1750    /// Returns the most up-to-date [`GlobalId`] for this item.
1751    ///
1752    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1753    pub fn latest_global_id(&self) -> GlobalId {
1754        match self {
1755            CatalogItem::Source(source) => source.global_id,
1756            CatalogItem::Log(log) => log.global_id,
1757            CatalogItem::Sink(sink) => sink.global_id,
1758            CatalogItem::View(view) => view.global_id,
1759            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1760            CatalogItem::Index(index) => index.global_id,
1761            CatalogItem::Func(func) => func.global_id,
1762            CatalogItem::Type(ty) => ty.global_id,
1763            CatalogItem::Secret(secret) => secret.global_id,
1764            CatalogItem::Connection(conn) => conn.global_id,
1765            CatalogItem::Table(table) => table.global_id_writes(),
1766        }
1767    }
1768
1769    /// Returns the optimized global MIR plan, if this item has one.
1770    pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1771        match self {
1772            CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1773            CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1774            _ => None,
1775        }
1776    }
1777
1778    /// Returns the physical (LIR) plan, if this item has one.
1779    pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1780        match self {
1781            CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1782            CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1783            _ => None,
1784        }
1785    }
1786
1787    /// Returns the dataflow metainfo, if this item has one.
1788    pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1789        match self {
1790            CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1791            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1792            _ => None,
1793        }
1794    }
1795
1796    /// Returns mutable references to the plan fields (`optimized_plan`,
1797    /// `physical_plan`, `dataflow_metainfo`) on plan-bearing items
1798    /// (`Index`, `MaterializedView`), or `None` for
1799    /// other item kinds.
1800    pub fn plan_fields_mut(
1801        &mut self,
1802    ) -> Option<(
1803        &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1804        &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1805        &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1806    )> {
1807        match self {
1808            CatalogItem::Index(idx) => Some((
1809                &mut idx.optimized_plan,
1810                &mut idx.physical_plan,
1811                &mut idx.dataflow_metainfo,
1812            )),
1813            CatalogItem::MaterializedView(mv) => Some((
1814                &mut mv.optimized_plan,
1815                &mut mv.physical_plan,
1816                &mut mv.dataflow_metainfo,
1817            )),
1818            _ => None,
1819        }
1820    }
1821
1822    /// Whether this item represents a storage collection.
1823    pub fn is_storage_collection(&self) -> bool {
1824        match self {
1825            CatalogItem::Table(_)
1826            | CatalogItem::Source(_)
1827            | CatalogItem::MaterializedView(_)
1828            | CatalogItem::Sink(_) => true,
1829            CatalogItem::Log(_)
1830            | CatalogItem::View(_)
1831            | CatalogItem::Index(_)
1832            | CatalogItem::Type(_)
1833            | CatalogItem::Func(_)
1834            | CatalogItem::Secret(_)
1835            | CatalogItem::Connection(_) => false,
1836        }
1837    }
1838
1839    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1840    /// version.
1841    ///
1842    /// Some item types honor `version` so callers can ask for the schema that
1843    /// matches a specific [`GlobalId`] or historical definition. Other relation
1844    /// types ignore `version` because they have a single shape. Non-relational
1845    /// items ( for example functions, indexes, sinks, secrets, and connections)
1846    /// return `None`.
1847    pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1848        match &self {
1849            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1850            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1851            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1852            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1853            CatalogItem::MaterializedView(mview) => {
1854                Some(Cow::Owned(mview.desc.at_version(version)))
1855            }
1856            CatalogItem::Func(_)
1857            | CatalogItem::Index(_)
1858            | CatalogItem::Sink(_)
1859            | CatalogItem::Secret(_)
1860            | CatalogItem::Connection(_)
1861            | CatalogItem::Type(_) => None,
1862        }
1863    }
1864
1865    pub fn func(
1866        &self,
1867        entry: &CatalogEntry,
1868    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1869        match &self {
1870            CatalogItem::Func(func) => Ok(func.inner),
1871            _ => Err(SqlCatalogError::UnexpectedType {
1872                name: entry.name().item.to_string(),
1873                actual_type: entry.item_type(),
1874                expected_type: CatalogItemType::Func,
1875            }),
1876        }
1877    }
1878
1879    pub fn source_desc(
1880        &self,
1881        entry: &CatalogEntry,
1882    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1883        match &self {
1884            CatalogItem::Source(source) => match &source.data_source {
1885                DataSourceDesc::Ingestion { desc, .. }
1886                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1887                DataSourceDesc::IngestionExport { .. }
1888                | DataSourceDesc::Introspection(_)
1889                | DataSourceDesc::Webhook { .. }
1890                | DataSourceDesc::Progress
1891                | DataSourceDesc::Catalog => Ok(None),
1892            },
1893            _ => Err(SqlCatalogError::UnexpectedType {
1894                name: entry.name().item.to_string(),
1895                actual_type: entry.item_type(),
1896                expected_type: CatalogItemType::Source,
1897            }),
1898        }
1899    }
1900
1901    /// Reports whether this catalog entry is a progress source.
1902    pub fn is_progress_source(&self) -> bool {
1903        matches!(
1904            self,
1905            CatalogItem::Source(Source {
1906                data_source: DataSourceDesc::Progress,
1907                ..
1908            })
1909        )
1910    }
1911
1912    /// Collects the identifiers of the objects that were encountered when resolving names in the
1913    /// item's DDL statement.
1914    pub fn references(&self) -> &ResolvedIds {
1915        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1916        match self {
1917            CatalogItem::Func(_) => &*EMPTY,
1918            CatalogItem::Index(idx) => &idx.resolved_ids,
1919            CatalogItem::Sink(sink) => &sink.resolved_ids,
1920            CatalogItem::Source(source) => &source.resolved_ids,
1921            CatalogItem::Log(_) => &*EMPTY,
1922            CatalogItem::Table(table) => &table.resolved_ids,
1923            CatalogItem::Type(typ) => &typ.resolved_ids,
1924            CatalogItem::View(view) => &view.resolved_ids,
1925            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1926            CatalogItem::Secret(_) => &*EMPTY,
1927            CatalogItem::Connection(connection) => &connection.resolved_ids,
1928        }
1929    }
1930
1931    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1932    ///
1933    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1934    /// referenced. For example this will include any catalog objects used to implement functions
1935    /// and casts in the item.
1936    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1937        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1938        match self {
1939            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1940            // their implementation. However, currently there's no way to get that information.
1941            CatalogItem::Func(_) => {}
1942            CatalogItem::Index(_) => {}
1943            CatalogItem::Sink(_) => {}
1944            CatalogItem::Source(_) => {}
1945            CatalogItem::Log(_) => {}
1946            CatalogItem::Table(_) => {}
1947            CatalogItem::Type(_) => {}
1948            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1949            CatalogItem::MaterializedView(mview) => {
1950                uses.extend(mview.dependencies.0.iter().copied())
1951            }
1952            CatalogItem::Secret(_) => {}
1953            CatalogItem::Connection(_) => {}
1954        }
1955        uses
1956    }
1957
1958    /// Returns the connection ID that this item belongs to, if this item is
1959    /// temporary.
1960    pub fn conn_id(&self) -> Option<&ConnectionId> {
1961        match self {
1962            CatalogItem::View(view) => view.conn_id.as_ref(),
1963            CatalogItem::Index(index) => index.conn_id.as_ref(),
1964            CatalogItem::Table(table) => table.conn_id.as_ref(),
1965            CatalogItem::Log(_)
1966            | CatalogItem::Source(_)
1967            | CatalogItem::Sink(_)
1968            | CatalogItem::MaterializedView(_)
1969            | CatalogItem::Secret(_)
1970            | CatalogItem::Type(_)
1971            | CatalogItem::Func(_)
1972            | CatalogItem::Connection(_) => None,
1973        }
1974    }
1975
1976    /// Sets the connection ID that this item belongs to, which makes it a
1977    /// temporary item.
1978    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1979        match self {
1980            CatalogItem::View(view) => view.conn_id = conn_id,
1981            CatalogItem::Index(index) => index.conn_id = conn_id,
1982            CatalogItem::Table(table) => table.conn_id = conn_id,
1983            CatalogItem::Log(_)
1984            | CatalogItem::Source(_)
1985            | CatalogItem::Sink(_)
1986            | CatalogItem::MaterializedView(_)
1987            | CatalogItem::Secret(_)
1988            | CatalogItem::Type(_)
1989            | CatalogItem::Func(_)
1990            | CatalogItem::Connection(_) => (),
1991        }
1992    }
1993
1994    /// Indicates whether this item is temporary or not.
1995    pub fn is_temporary(&self) -> bool {
1996        self.conn_id().is_some()
1997    }
1998
1999    pub fn rename_schema_refs(
2000        &self,
2001        database_name: &str,
2002        cur_schema_name: &str,
2003        new_schema_name: &str,
2004    ) -> Result<CatalogItem, (String, String)> {
2005        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2006            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2007                .expect("invalid create sql persisted to catalog")
2008                .into_element()
2009                .ast;
2010
2011            // Rename all references to cur_schema_name.
2012            mz_sql::ast::transform::create_stmt_rename_schema_refs(
2013                &mut create_stmt,
2014                database_name,
2015                cur_schema_name,
2016                new_schema_name,
2017            )?;
2018
2019            Ok(create_stmt.to_ast_string_stable())
2020        };
2021
2022        match self {
2023            CatalogItem::Table(i) => {
2024                let mut i = i.clone();
2025                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2026                Ok(CatalogItem::Table(i))
2027            }
2028            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2029            CatalogItem::Source(i) => {
2030                let mut i = i.clone();
2031                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2032                Ok(CatalogItem::Source(i))
2033            }
2034            CatalogItem::Sink(i) => {
2035                let mut i = i.clone();
2036                i.create_sql = do_rewrite(i.create_sql)?;
2037                Ok(CatalogItem::Sink(i))
2038            }
2039            CatalogItem::View(i) => {
2040                let mut i = i.clone();
2041                i.create_sql = do_rewrite(i.create_sql)?;
2042                Ok(CatalogItem::View(i))
2043            }
2044            CatalogItem::MaterializedView(i) => {
2045                let mut i = i.clone();
2046                i.create_sql = do_rewrite(i.create_sql)?;
2047                Ok(CatalogItem::MaterializedView(i))
2048            }
2049            CatalogItem::Index(i) => {
2050                let mut i = i.clone();
2051                i.create_sql = do_rewrite(i.create_sql)?;
2052                Ok(CatalogItem::Index(i))
2053            }
2054            CatalogItem::Secret(i) => {
2055                let mut i = i.clone();
2056                i.create_sql = do_rewrite(i.create_sql)?;
2057                Ok(CatalogItem::Secret(i))
2058            }
2059            CatalogItem::Connection(i) => {
2060                let mut i = i.clone();
2061                i.create_sql = do_rewrite(i.create_sql)?;
2062                Ok(CatalogItem::Connection(i))
2063            }
2064            CatalogItem::Type(i) => {
2065                let mut i = i.clone();
2066                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2067                Ok(CatalogItem::Type(i))
2068            }
2069            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2070        }
2071    }
2072
2073    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2074    /// (with the option of including the item's own name) or errors if request
2075    /// is ambiguous.
2076    pub fn rename_item_refs(
2077        &self,
2078        from: FullItemName,
2079        to_item_name: String,
2080        rename_self: bool,
2081    ) -> Result<CatalogItem, String> {
2082        let do_rewrite = |create_sql: String| -> Result<String, String> {
2083            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2084                .expect("invalid create sql persisted to catalog")
2085                .into_element()
2086                .ast;
2087            if rename_self {
2088                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2089            }
2090            // Determination of what constitutes an ambiguous request is done here.
2091            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2092            Ok(create_stmt.to_ast_string_stable())
2093        };
2094
2095        match self {
2096            CatalogItem::Table(i) => {
2097                let mut i = i.clone();
2098                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2099                Ok(CatalogItem::Table(i))
2100            }
2101            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2102            CatalogItem::Source(i) => {
2103                let mut i = i.clone();
2104                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2105                Ok(CatalogItem::Source(i))
2106            }
2107            CatalogItem::Sink(i) => {
2108                let mut i = i.clone();
2109                i.create_sql = do_rewrite(i.create_sql)?;
2110                Ok(CatalogItem::Sink(i))
2111            }
2112            CatalogItem::View(i) => {
2113                let mut i = i.clone();
2114                i.create_sql = do_rewrite(i.create_sql)?;
2115                Ok(CatalogItem::View(i))
2116            }
2117            CatalogItem::MaterializedView(i) => {
2118                let mut i = i.clone();
2119                i.create_sql = do_rewrite(i.create_sql)?;
2120                Ok(CatalogItem::MaterializedView(i))
2121            }
2122            CatalogItem::Index(i) => {
2123                let mut i = i.clone();
2124                i.create_sql = do_rewrite(i.create_sql)?;
2125                Ok(CatalogItem::Index(i))
2126            }
2127            CatalogItem::Secret(i) => {
2128                let mut i = i.clone();
2129                i.create_sql = do_rewrite(i.create_sql)?;
2130                Ok(CatalogItem::Secret(i))
2131            }
2132            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2133                unreachable!("{}s cannot be renamed", self.typ())
2134            }
2135            CatalogItem::Connection(i) => {
2136                let mut i = i.clone();
2137                i.create_sql = do_rewrite(i.create_sql)?;
2138                Ok(CatalogItem::Connection(i))
2139            }
2140        }
2141    }
2142
2143    /// Returns a clone of `self` with all instances of `old_id` replaced with `new_id`.
2144    pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2145        let do_rewrite = |create_sql: String| -> String {
2146            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2147                .expect("invalid create sql persisted to catalog")
2148                .into_element()
2149                .ast;
2150            mz_sql::ast::transform::create_stmt_replace_ids(
2151                &mut create_stmt,
2152                &[(old_id, new_id)].into(),
2153            );
2154            create_stmt.to_ast_string_stable()
2155        };
2156
2157        match self {
2158            CatalogItem::Table(i) => {
2159                let mut i = i.clone();
2160                i.create_sql = i.create_sql.map(do_rewrite);
2161                CatalogItem::Table(i)
2162            }
2163            CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2164            CatalogItem::Source(i) => {
2165                let mut i = i.clone();
2166                i.create_sql = i.create_sql.map(do_rewrite);
2167                CatalogItem::Source(i)
2168            }
2169            CatalogItem::Sink(i) => {
2170                let mut i = i.clone();
2171                i.create_sql = do_rewrite(i.create_sql);
2172                CatalogItem::Sink(i)
2173            }
2174            CatalogItem::View(i) => {
2175                let mut i = i.clone();
2176                i.create_sql = do_rewrite(i.create_sql);
2177                CatalogItem::View(i)
2178            }
2179            CatalogItem::MaterializedView(i) => {
2180                let mut i = i.clone();
2181                i.create_sql = do_rewrite(i.create_sql);
2182                CatalogItem::MaterializedView(i)
2183            }
2184            CatalogItem::Index(i) => {
2185                let mut i = i.clone();
2186                i.create_sql = do_rewrite(i.create_sql);
2187                CatalogItem::Index(i)
2188            }
2189            CatalogItem::Secret(i) => {
2190                let mut i = i.clone();
2191                i.create_sql = do_rewrite(i.create_sql);
2192                CatalogItem::Secret(i)
2193            }
2194            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2195                unreachable!("references of {}s cannot be replaced", self.typ())
2196            }
2197            CatalogItem::Connection(i) => {
2198                let mut i = i.clone();
2199                i.create_sql = do_rewrite(i.create_sql);
2200                CatalogItem::Connection(i)
2201            }
2202        }
2203    }
2204    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2205    /// an error if this item does not support retain history.
2206    pub fn update_retain_history(
2207        &mut self,
2208        value: Option<Value>,
2209        window: CompactionWindow,
2210    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2211        let update = |mut ast: &mut Statement<Raw>| {
2212            // Each statement type has unique option types. This macro handles them commonly.
2213            macro_rules! update_retain_history {
2214                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2215                    // Replace or add the option.
2216                    let pos = $stmt
2217                        .with_options
2218                        .iter()
2219                        // In case there are ever multiple, look for the last one.
2220                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2221                    if let Some(value) = value {
2222                        let next = mz_sql_parser::ast::$opt {
2223                            name: mz_sql_parser::ast::$name::RetainHistory,
2224                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2225                        };
2226                        if let Some(idx) = pos {
2227                            let previous = $stmt.with_options[idx].clone();
2228                            $stmt.with_options[idx] = next;
2229                            previous.value
2230                        } else {
2231                            $stmt.with_options.push(next);
2232                            None
2233                        }
2234                    } else {
2235                        if let Some(idx) = pos {
2236                            $stmt.with_options.swap_remove(idx).value
2237                        } else {
2238                            None
2239                        }
2240                    }
2241                }};
2242            }
2243            let previous = match &mut ast {
2244                Statement::CreateTable(stmt) => {
2245                    update_retain_history!(stmt, TableOption, TableOptionName)
2246                }
2247                Statement::CreateIndex(stmt) => {
2248                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2249                }
2250                Statement::CreateSource(stmt) => {
2251                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2252                }
2253                Statement::CreateMaterializedView(stmt) => {
2254                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2255                }
2256                _ => {
2257                    return Err(());
2258                }
2259            };
2260            Ok(previous)
2261        };
2262
2263        let res = self.update_sql(update)?;
2264        let cw = self
2265            .custom_logical_compaction_window_mut()
2266            .expect("item must have compaction window");
2267        *cw = Some(window);
2268        Ok(res)
2269    }
2270
2271    /// Updates the timestamp interval for a source. Returns the previous timestamp interval
2272    /// value, if any. Returns an error if this item is not a source.
2273    pub fn update_timestamp_interval(
2274        &mut self,
2275        value: Option<Value>,
2276        interval: Duration,
2277    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2278        let update = |ast: &mut Statement<Raw>| match ast {
2279            Statement::CreateSource(stmt) => {
2280                let pos = stmt.with_options.iter().rposition(|o| {
2281                    o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2282                });
2283                let previous = if let Some(value) = value {
2284                    let next = mz_sql_parser::ast::CreateSourceOption {
2285                        name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2286                        value: Some(WithOptionValue::Value(value)),
2287                    };
2288                    if let Some(idx) = pos {
2289                        let previous = stmt.with_options[idx].clone();
2290                        stmt.with_options[idx] = next;
2291                        previous.value
2292                    } else {
2293                        stmt.with_options.push(next);
2294                        None
2295                    }
2296                } else if let Some(idx) = pos {
2297                    stmt.with_options.swap_remove(idx).value
2298                } else {
2299                    None
2300                };
2301                Ok(previous)
2302            }
2303            _ => Err(()),
2304        };
2305
2306        let previous = self.update_sql(update)?;
2307
2308        // Update the in-memory SourceDesc timestamp_interval.
2309        match self {
2310            CatalogItem::Source(source) => {
2311                match &mut source.data_source {
2312                    DataSourceDesc::Ingestion { desc, .. }
2313                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2314                        desc.timestamp_interval = interval;
2315                    }
2316                    _ => return Err(()),
2317                }
2318                Ok(previous)
2319            }
2320            _ => Err(()),
2321        }
2322    }
2323
2324    pub fn add_column(
2325        &mut self,
2326        name: ColumnName,
2327        typ: SqlColumnType,
2328        sql: RawDataType,
2329    ) -> Result<RelationVersion, PlanError> {
2330        let CatalogItem::Table(table) = self else {
2331            return Err(PlanError::Unsupported {
2332                feature: "adding columns to a non-Table".to_string(),
2333                discussion_no: None,
2334            });
2335        };
2336        let next_version = table.desc.add_column(name.clone(), typ);
2337
2338        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2339            Statement::CreateTable(stmt) => {
2340                let version = ColumnOptionDef {
2341                    name: None,
2342                    option: ColumnOption::Versioned {
2343                        action: ColumnVersioned::Added,
2344                        version: next_version.into(),
2345                    },
2346                };
2347                let column = ColumnDef {
2348                    name: name.into(),
2349                    data_type: sql,
2350                    collation: None,
2351                    options: vec![version],
2352                };
2353                stmt.columns.push(column);
2354                Ok(())
2355            }
2356            _ => Err(()),
2357        };
2358
2359        self.update_sql(update)
2360            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2361        Ok(next_version)
2362    }
2363
2364    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2365    /// otherwise returns f's result.
2366    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2367    where
2368        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2369    {
2370        let create_sql = match self {
2371            CatalogItem::Table(Table { create_sql, .. })
2372            | CatalogItem::Type(Type { create_sql, .. })
2373            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2374            CatalogItem::Sink(Sink { create_sql, .. })
2375            | CatalogItem::View(View { create_sql, .. })
2376            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2377            | CatalogItem::Index(Index { create_sql, .. })
2378            | CatalogItem::Secret(Secret { create_sql, .. })
2379            | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
2380            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2381        };
2382        let Some(create_sql) = create_sql else {
2383            return Err(());
2384        };
2385        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2386            .expect("non-system items must be parseable")
2387            .into_element()
2388            .ast;
2389        debug!("rewrite: {}", ast.to_ast_string_redacted());
2390        let t = f(&mut ast)?;
2391        *create_sql = ast.to_ast_string_stable();
2392        debug!("rewrote: {}", ast.to_ast_string_redacted());
2393        Ok(t)
2394    }
2395
2396    /// If the object is considered a "compute object"
2397    /// (i.e., it is managed by the compute controller),
2398    /// this function returns its cluster ID. Otherwise, it returns nothing.
2399    ///
2400    /// This function differs from `cluster_id` because while all
2401    /// compute objects run on a cluster, the converse is not true.
2402    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2403        match self {
2404            CatalogItem::Index(index) => Some(index.cluster_id),
2405            CatalogItem::Table(_)
2406            | CatalogItem::Source(_)
2407            | CatalogItem::Log(_)
2408            | CatalogItem::View(_)
2409            | CatalogItem::MaterializedView(_)
2410            | CatalogItem::Sink(_)
2411            | CatalogItem::Type(_)
2412            | CatalogItem::Func(_)
2413            | CatalogItem::Secret(_)
2414            | CatalogItem::Connection(_) => None,
2415        }
2416    }
2417
2418    pub fn cluster_id(&self) -> Option<ClusterId> {
2419        match self {
2420            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2421            CatalogItem::Index(index) => Some(index.cluster_id),
2422            CatalogItem::Source(source) => match &source.data_source {
2423                DataSourceDesc::Ingestion { cluster_id, .. }
2424                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2425                // This is somewhat of a lie because the export runs on the same
2426                // cluster as its ingestion but we don't yet have a way of
2427                // cross-referencing the items
2428                DataSourceDesc::IngestionExport { .. } => None,
2429                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2430                DataSourceDesc::Introspection(_)
2431                | DataSourceDesc::Progress
2432                | DataSourceDesc::Catalog => None,
2433            },
2434            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2435            CatalogItem::Table(_)
2436            | CatalogItem::Log(_)
2437            | CatalogItem::View(_)
2438            | CatalogItem::Type(_)
2439            | CatalogItem::Func(_)
2440            | CatalogItem::Secret(_)
2441            | CatalogItem::Connection(_) => None,
2442        }
2443    }
2444
2445    /// The custom compaction window, if any has been set. This does not reflect any propagated
2446    /// compaction window (i.e., source -> subsource).
2447    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2448        match self {
2449            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2450            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2451            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2452            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2453            CatalogItem::Log(_)
2454            | CatalogItem::View(_)
2455            | CatalogItem::Sink(_)
2456            | CatalogItem::Type(_)
2457            | CatalogItem::Func(_)
2458            | CatalogItem::Secret(_)
2459            | CatalogItem::Connection(_) => None,
2460        }
2461    }
2462
2463    /// Mutable access to the custom compaction window, or None if this type does not support custom
2464    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2465    /// subsource).
2466    pub fn custom_logical_compaction_window_mut(
2467        &mut self,
2468    ) -> Option<&mut Option<CompactionWindow>> {
2469        let cw = match self {
2470            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2471            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2472            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2473            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2474            CatalogItem::Log(_)
2475            | CatalogItem::View(_)
2476            | CatalogItem::Sink(_)
2477            | CatalogItem::Type(_)
2478            | CatalogItem::Func(_)
2479            | CatalogItem::Secret(_)
2480            | CatalogItem::Connection(_) => return None,
2481        };
2482        Some(cw)
2483    }
2484
2485    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2486    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2487    ///
2488    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2489    /// sensible default (currently 1s).
2490    ///
2491    /// For objects that do not have the concept of compaction window, return None.
2492    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2493        let custom_logical_compaction_window = match self {
2494            CatalogItem::Table(_)
2495            | CatalogItem::Source(_)
2496            | CatalogItem::Index(_)
2497            | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
2498            CatalogItem::Log(_)
2499            | CatalogItem::View(_)
2500            | CatalogItem::Sink(_)
2501            | CatalogItem::Type(_)
2502            | CatalogItem::Func(_)
2503            | CatalogItem::Secret(_)
2504            | CatalogItem::Connection(_) => return None,
2505        };
2506        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2507    }
2508
2509    /// Whether the item's logical compaction window
2510    /// is controlled by the METRICS_RETENTION
2511    /// system var.
2512    pub fn is_retained_metrics_object(&self) -> bool {
2513        match self {
2514            CatalogItem::Table(table) => table.is_retained_metrics_object,
2515            CatalogItem::Source(source) => source.is_retained_metrics_object,
2516            CatalogItem::Index(index) => index.is_retained_metrics_object,
2517            CatalogItem::Log(_)
2518            | CatalogItem::View(_)
2519            | CatalogItem::MaterializedView(_)
2520            | CatalogItem::Sink(_)
2521            | CatalogItem::Type(_)
2522            | CatalogItem::Func(_)
2523            | CatalogItem::Secret(_)
2524            | CatalogItem::Connection(_) => false,
2525        }
2526    }
2527
2528    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2529        match self {
2530            CatalogItem::Table(table) => {
2531                let create_sql = table
2532                    .create_sql
2533                    .clone()
2534                    .expect("builtin tables cannot be serialized");
2535                let mut collections = table.collections.clone();
2536                let global_id = collections
2537                    .remove(&RelationVersion::root())
2538                    .expect("at least one version");
2539                (create_sql, global_id, collections)
2540            }
2541            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2542            CatalogItem::Source(source) => {
2543                assert!(
2544                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2545                    "cannot serialize introspection/builtin sources",
2546                );
2547                let create_sql = source
2548                    .create_sql
2549                    .clone()
2550                    .expect("builtin sources cannot be serialized");
2551                (create_sql, source.global_id, BTreeMap::new())
2552            }
2553            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2554            CatalogItem::MaterializedView(mview) => {
2555                let mut collections = mview.collections.clone();
2556                let global_id = collections
2557                    .remove(&RelationVersion::root())
2558                    .expect("at least one version");
2559                (mview.create_sql.clone(), global_id, collections)
2560            }
2561            CatalogItem::Index(index) => {
2562                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2563            }
2564            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2565            CatalogItem::Type(typ) => {
2566                let create_sql = typ
2567                    .create_sql
2568                    .clone()
2569                    .expect("builtin types cannot be serialized");
2570                (create_sql, typ.global_id, BTreeMap::new())
2571            }
2572            CatalogItem::Secret(secret) => {
2573                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2574            }
2575            CatalogItem::Connection(connection) => (
2576                connection.create_sql.clone(),
2577                connection.global_id,
2578                BTreeMap::new(),
2579            ),
2580            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2581        }
2582    }
2583
2584    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2585        match self {
2586            CatalogItem::Table(mut table) => {
2587                let create_sql = table
2588                    .create_sql
2589                    .expect("builtin tables cannot be serialized");
2590                let global_id = table
2591                    .collections
2592                    .remove(&RelationVersion::root())
2593                    .expect("at least one version");
2594                (create_sql, global_id, table.collections)
2595            }
2596            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2597            CatalogItem::Source(source) => {
2598                assert!(
2599                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2600                    "cannot serialize introspection/builtin sources",
2601                );
2602                let create_sql = source
2603                    .create_sql
2604                    .expect("builtin sources cannot be serialized");
2605                (create_sql, source.global_id, BTreeMap::new())
2606            }
2607            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2608            CatalogItem::MaterializedView(mut mview) => {
2609                let global_id = mview
2610                    .collections
2611                    .remove(&RelationVersion::root())
2612                    .expect("at least one version");
2613                (mview.create_sql, global_id, mview.collections)
2614            }
2615            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2616            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2617            CatalogItem::Type(typ) => {
2618                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2619                (create_sql, typ.global_id, BTreeMap::new())
2620            }
2621            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2622            CatalogItem::Connection(connection) => {
2623                (connection.create_sql, connection.global_id, BTreeMap::new())
2624            }
2625            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2626        }
2627    }
2628
2629    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2630    /// not have versions or if the version does not exist.
2631    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2632        let collections = match self {
2633            CatalogItem::MaterializedView(mv) => &mv.collections,
2634            CatalogItem::Table(table) => &table.collections,
2635            CatalogItem::Source(source) => return Some(source.global_id),
2636            CatalogItem::Log(log) => return Some(log.global_id),
2637            CatalogItem::View(view) => return Some(view.global_id),
2638            CatalogItem::Sink(sink) => return Some(sink.global_id),
2639            CatalogItem::Index(index) => return Some(index.global_id),
2640            CatalogItem::Type(ty) => return Some(ty.global_id),
2641            CatalogItem::Func(func) => return Some(func.global_id),
2642            CatalogItem::Secret(secret) => return Some(secret.global_id),
2643            CatalogItem::Connection(conn) => return Some(conn.global_id),
2644        };
2645        match version {
2646            RelationVersionSelector::Latest => collections.values().last().copied(),
2647            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2648        }
2649    }
2650}
2651
2652impl CatalogEntry {
2653    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2654    /// produces rows.
2655    pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2656        self.item.relation_desc(RelationVersionSelector::Latest)
2657    }
2658
2659    /// Reports if the item has columns.
2660    pub fn has_columns(&self) -> bool {
2661        match self.item() {
2662            CatalogItem::Type(Type { details, .. }) => {
2663                matches!(details.typ, CatalogType::Record { .. })
2664            }
2665            _ => self.relation_desc_latest().is_some(),
2666        }
2667    }
2668
2669    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2670    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2671        self.item.func(self)
2672    }
2673
2674    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2675    pub fn index(&self) -> Option<&Index> {
2676        match self.item() {
2677            CatalogItem::Index(idx) => Some(idx),
2678            _ => None,
2679        }
2680    }
2681
2682    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2683    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2684        match self.item() {
2685            CatalogItem::MaterializedView(mv) => Some(mv),
2686            _ => None,
2687        }
2688    }
2689
2690    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2691    pub fn table(&self) -> Option<&Table> {
2692        match self.item() {
2693            CatalogItem::Table(tbl) => Some(tbl),
2694            _ => None,
2695        }
2696    }
2697
2698    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2699    pub fn source(&self) -> Option<&Source> {
2700        match self.item() {
2701            CatalogItem::Source(src) => Some(src),
2702            _ => None,
2703        }
2704    }
2705
2706    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2707    pub fn sink(&self) -> Option<&Sink> {
2708        match self.item() {
2709            CatalogItem::Sink(sink) => Some(sink),
2710            _ => None,
2711        }
2712    }
2713
2714    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2715    pub fn secret(&self) -> Option<&Secret> {
2716        match self.item() {
2717            CatalogItem::Secret(secret) => Some(secret),
2718            _ => None,
2719        }
2720    }
2721
2722    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2723        match self.item() {
2724            CatalogItem::Connection(connection) => Ok(connection),
2725            _ => {
2726                let db_name = match self.name().qualifiers.database_spec {
2727                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2728                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2729                };
2730                Err(SqlCatalogError::UnknownConnection(format!(
2731                    "{}{}.{}",
2732                    db_name,
2733                    self.name().qualifiers.schema_spec,
2734                    self.name().item
2735                )))
2736            }
2737        }
2738    }
2739
2740    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2741    /// this `CatalogEntry`, if any.
2742    pub fn source_desc(
2743        &self,
2744    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2745        self.item.source_desc(self)
2746    }
2747
2748    /// Reports whether this catalog entry is a connection.
2749    pub fn is_connection(&self) -> bool {
2750        matches!(self.item(), CatalogItem::Connection(_))
2751    }
2752
2753    /// Reports whether this catalog entry is a table.
2754    pub fn is_table(&self) -> bool {
2755        matches!(self.item(), CatalogItem::Table(_))
2756    }
2757
2758    /// Reports whether this catalog entry is a source. Note that this includes
2759    /// subsources.
2760    pub fn is_source(&self) -> bool {
2761        matches!(self.item(), CatalogItem::Source(_))
2762    }
2763
2764    /// Reports whether this catalog entry is a subsource and, if it is, the
2765    /// ingestion it is an export of, as well as the item it exports.
2766    pub fn subsource_details(
2767        &self,
2768    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2769        match &self.item() {
2770            CatalogItem::Source(source) => match &source.data_source {
2771                DataSourceDesc::IngestionExport {
2772                    ingestion_id,
2773                    external_reference,
2774                    details,
2775                    data_config: _,
2776                } => Some((*ingestion_id, external_reference, details)),
2777                _ => None,
2778            },
2779            _ => None,
2780        }
2781    }
2782
2783    /// Reports whether this catalog entry is a source export and, if it is, the
2784    /// ingestion it is an export of, as well as the item it exports.
2785    pub fn source_export_details(
2786        &self,
2787    ) -> Option<(
2788        CatalogItemId,
2789        &UnresolvedItemName,
2790        &SourceExportDetails,
2791        &SourceExportDataConfig<ReferencedConnection>,
2792    )> {
2793        match &self.item() {
2794            CatalogItem::Source(source) => match &source.data_source {
2795                DataSourceDesc::IngestionExport {
2796                    ingestion_id,
2797                    external_reference,
2798                    details,
2799                    data_config,
2800                } => Some((*ingestion_id, external_reference, details, data_config)),
2801                _ => None,
2802            },
2803            CatalogItem::Table(table) => match &table.data_source {
2804                TableDataSource::DataSource {
2805                    desc:
2806                        DataSourceDesc::IngestionExport {
2807                            ingestion_id,
2808                            external_reference,
2809                            details,
2810                            data_config,
2811                        },
2812                    timeline: _,
2813                } => Some((*ingestion_id, external_reference, details, data_config)),
2814                _ => None,
2815            },
2816            _ => None,
2817        }
2818    }
2819
2820    /// Reports whether this catalog entry is a progress source.
2821    pub fn is_progress_source(&self) -> bool {
2822        self.item().is_progress_source()
2823    }
2824
2825    /// Returns the `GlobalId` of all of this entry's progress ID.
2826    pub fn progress_id(&self) -> Option<CatalogItemId> {
2827        match &self.item() {
2828            CatalogItem::Source(source) => match &source.data_source {
2829                DataSourceDesc::Ingestion { .. } => Some(self.id),
2830                DataSourceDesc::OldSyntaxIngestion {
2831                    progress_subsource, ..
2832                } => Some(*progress_subsource),
2833                DataSourceDesc::IngestionExport { .. }
2834                | DataSourceDesc::Introspection(_)
2835                | DataSourceDesc::Progress
2836                | DataSourceDesc::Webhook { .. }
2837                | DataSourceDesc::Catalog => None,
2838            },
2839            CatalogItem::Table(_)
2840            | CatalogItem::Log(_)
2841            | CatalogItem::View(_)
2842            | CatalogItem::MaterializedView(_)
2843            | CatalogItem::Sink(_)
2844            | CatalogItem::Index(_)
2845            | CatalogItem::Type(_)
2846            | CatalogItem::Func(_)
2847            | CatalogItem::Secret(_)
2848            | CatalogItem::Connection(_) => None,
2849        }
2850    }
2851
2852    /// Reports whether this catalog entry is a sink.
2853    pub fn is_sink(&self) -> bool {
2854        matches!(self.item(), CatalogItem::Sink(_))
2855    }
2856
2857    /// Reports whether this catalog entry is a materialized view.
2858    pub fn is_materialized_view(&self) -> bool {
2859        matches!(self.item(), CatalogItem::MaterializedView(_))
2860    }
2861
2862    /// Reports whether this catalog entry is a view.
2863    pub fn is_view(&self) -> bool {
2864        matches!(self.item(), CatalogItem::View(_))
2865    }
2866
2867    /// Reports whether this catalog entry is a secret.
2868    pub fn is_secret(&self) -> bool {
2869        matches!(self.item(), CatalogItem::Secret(_))
2870    }
2871
2872    /// Reports whether this catalog entry is an introspection source.
2873    pub fn is_introspection_source(&self) -> bool {
2874        matches!(self.item(), CatalogItem::Log(_))
2875    }
2876
2877    /// Reports whether this catalog entry is an index.
2878    pub fn is_index(&self) -> bool {
2879        matches!(self.item(), CatalogItem::Index(_))
2880    }
2881
2882    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2883    pub fn is_relation(&self) -> bool {
2884        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2885    }
2886
2887    /// Collects the identifiers of the objects that were encountered when
2888    /// resolving names in the item's DDL statement.
2889    pub fn references(&self) -> &ResolvedIds {
2890        self.item.references()
2891    }
2892
2893    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2894    ///
2895    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2896    /// referenced. For example this will include any catalog objects used to implement functions
2897    /// and casts in the item.
2898    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2899        self.item.uses()
2900    }
2901
2902    /// Returns the `CatalogItem` associated with this catalog entry.
2903    pub fn item(&self) -> &CatalogItem {
2904        &self.item
2905    }
2906
2907    /// Returns a mutable reference to the `CatalogItem` associated with this
2908    /// catalog entry.
2909    pub fn item_mut(&mut self) -> &mut CatalogItem {
2910        &mut self.item
2911    }
2912
2913    /// Returns the [`CatalogItemId`] of this catalog entry.
2914    pub fn id(&self) -> CatalogItemId {
2915        self.id
2916    }
2917
2918    /// Returns all of the [`GlobalId`]s associated with this item.
2919    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2920        self.item().global_ids()
2921    }
2922
2923    pub fn latest_global_id(&self) -> GlobalId {
2924        self.item().latest_global_id()
2925    }
2926
2927    /// Returns the OID of this catalog entry.
2928    pub fn oid(&self) -> u32 {
2929        self.oid
2930    }
2931
2932    /// Returns the fully qualified name of this catalog entry.
2933    pub fn name(&self) -> &QualifiedItemName {
2934        &self.name
2935    }
2936
2937    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2938    pub fn referenced_by(&self) -> &[CatalogItemId] {
2939        &self.referenced_by
2940    }
2941
2942    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2943    pub fn used_by(&self) -> &[CatalogItemId] {
2944        &self.used_by
2945    }
2946
2947    /// Returns the connection ID that this item belongs to, if this item is
2948    /// temporary.
2949    pub fn conn_id(&self) -> Option<&ConnectionId> {
2950        self.item.conn_id()
2951    }
2952
2953    /// Returns the role ID of the entry owner.
2954    pub fn owner_id(&self) -> &RoleId {
2955        &self.owner_id
2956    }
2957
2958    /// Returns the privileges of the entry.
2959    pub fn privileges(&self) -> &PrivilegeMap {
2960        &self.privileges
2961    }
2962
2963    /// Returns the comment object ID for this entry.
2964    pub fn comment_object_id(&self) -> CommentObjectId {
2965        use CatalogItemType::*;
2966        match self.item_type() {
2967            Table => CommentObjectId::Table(self.id),
2968            Source => CommentObjectId::Source(self.id),
2969            Sink => CommentObjectId::Sink(self.id),
2970            View => CommentObjectId::View(self.id),
2971            MaterializedView => CommentObjectId::MaterializedView(self.id),
2972            Index => CommentObjectId::Index(self.id),
2973            Func => CommentObjectId::Func(self.id),
2974            Connection => CommentObjectId::Connection(self.id),
2975            Type => CommentObjectId::Type(self.id),
2976            Secret => CommentObjectId::Secret(self.id),
2977        }
2978    }
2979}
2980
2981#[derive(Debug, Clone, Default)]
2982pub struct CommentsMap {
2983    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2984}
2985
2986impl CommentsMap {
2987    pub fn update_comment(
2988        &mut self,
2989        object_id: CommentObjectId,
2990        sub_component: Option<usize>,
2991        comment: Option<String>,
2992    ) -> Option<String> {
2993        let object_comments = self.map.entry(object_id).or_default();
2994
2995        // Either replace the existing comment, or remove it if comment is None/NULL.
2996        let (empty, prev) = if let Some(comment) = comment {
2997            let prev = object_comments.insert(sub_component, comment);
2998            (false, prev)
2999        } else {
3000            let prev = object_comments.remove(&sub_component);
3001            (object_comments.is_empty(), prev)
3002        };
3003
3004        // Cleanup entries that are now empty.
3005        if empty {
3006            self.map.remove(&object_id);
3007        }
3008
3009        // Return the previous comment, if there was one, for easy removal.
3010        prev
3011    }
3012
3013    /// Remove all comments for `object_id` from the map.
3014    ///
3015    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
3016    /// relations you can also have comments on the individual columns. Dropping the comments for a
3017    /// relation will also drop all of the comments on any columns.
3018    pub fn drop_comments(
3019        &mut self,
3020        object_ids: &BTreeSet<CommentObjectId>,
3021    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3022        let mut removed_comments = Vec::new();
3023
3024        for object_id in object_ids {
3025            if let Some(comments) = self.map.remove(object_id) {
3026                let removed = comments
3027                    .into_iter()
3028                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3029                removed_comments.extend(removed);
3030            }
3031        }
3032
3033        removed_comments
3034    }
3035
3036    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3037        self.map
3038            .iter()
3039            .map(|(id, comments)| {
3040                comments
3041                    .iter()
3042                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3043            })
3044            .flatten()
3045    }
3046
3047    pub fn get_object_comments(
3048        &self,
3049        object_id: CommentObjectId,
3050    ) -> Option<&BTreeMap<Option<usize>, String>> {
3051        self.map.get(&object_id)
3052    }
3053}
3054
3055impl Serialize for CommentsMap {
3056    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3057    where
3058        S: serde::Serializer,
3059    {
3060        let comment_count = self
3061            .map
3062            .iter()
3063            .map(|(_object_id, comments)| comments.len())
3064            .sum();
3065
3066        let mut seq = serializer.serialize_seq(Some(comment_count))?;
3067        for (object_id, sub) in &self.map {
3068            for (sub_component, comment) in sub {
3069                seq.serialize_element(&(
3070                    format!("{object_id:?}"),
3071                    format!("{sub_component:?}"),
3072                    comment,
3073                ))?;
3074            }
3075        }
3076        seq.end()
3077    }
3078}
3079
3080#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3081pub struct DefaultPrivileges {
3082    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3083    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3084}
3085
3086// Use a new type here because otherwise we have two levels of BTreeMap, both needing
3087// map_key_to_string.
3088#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3089struct RoleDefaultPrivileges(
3090    /// Denormalized, the key is the grantee Role.
3091    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3092    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3093);
3094
3095impl Deref for RoleDefaultPrivileges {
3096    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3097
3098    fn deref(&self) -> &Self::Target {
3099        &self.0
3100    }
3101}
3102
3103impl DerefMut for RoleDefaultPrivileges {
3104    fn deref_mut(&mut self) -> &mut Self::Target {
3105        &mut self.0
3106    }
3107}
3108
3109impl DefaultPrivileges {
3110    /// Add a new default privilege into the set of all default privileges.
3111    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3112        if privilege.acl_mode.is_empty() {
3113            return;
3114        }
3115
3116        let privileges = self.privileges.entry(object).or_default();
3117        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3118            default_privilege.acl_mode |= privilege.acl_mode;
3119        } else {
3120            privileges.insert(privilege.grantee, privilege);
3121        }
3122    }
3123
3124    /// Revoke a default privilege from the set of all default privileges.
3125    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3126        if let Some(privileges) = self.privileges.get_mut(object) {
3127            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3128                default_privilege.acl_mode =
3129                    default_privilege.acl_mode.difference(privilege.acl_mode);
3130                if default_privilege.acl_mode.is_empty() {
3131                    privileges.remove(&privilege.grantee);
3132                }
3133            }
3134            if privileges.is_empty() {
3135                self.privileges.remove(object);
3136            }
3137        }
3138    }
3139
3140    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
3141    /// any exist.
3142    pub fn get_privileges_for_grantee(
3143        &self,
3144        object: &DefaultPrivilegeObject,
3145        grantee: &RoleId,
3146    ) -> Option<&AclMode> {
3147        self.privileges
3148            .get(object)
3149            .and_then(|privileges| privileges.get(grantee))
3150            .map(|privilege| &privilege.acl_mode)
3151    }
3152
3153    /// Get all default privileges that apply to the provided object details.
3154    pub fn get_applicable_privileges(
3155        &self,
3156        role_id: RoleId,
3157        database_id: Option<DatabaseId>,
3158        schema_id: Option<SchemaId>,
3159        object_type: mz_sql::catalog::ObjectType,
3160    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3161        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
3162        // don't require the caller to worry about that and we will map their `object_type` to the
3163        // correct type for privileges.
3164        let privilege_object_type = if object_type.is_relation() {
3165            mz_sql::catalog::ObjectType::Table
3166        } else {
3167            object_type
3168        };
3169        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3170
3171        // Collect all entries that apply to the provided object details.
3172        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3173        // entries in the vec below. That's OK because we consolidate the results after.
3174        [
3175            DefaultPrivilegeObject {
3176                role_id,
3177                database_id,
3178                schema_id,
3179                object_type: privilege_object_type,
3180            },
3181            DefaultPrivilegeObject {
3182                role_id,
3183                database_id,
3184                schema_id: None,
3185                object_type: privilege_object_type,
3186            },
3187            DefaultPrivilegeObject {
3188                role_id,
3189                database_id: None,
3190                schema_id: None,
3191                object_type: privilege_object_type,
3192            },
3193            DefaultPrivilegeObject {
3194                role_id: RoleId::Public,
3195                database_id,
3196                schema_id,
3197                object_type: privilege_object_type,
3198            },
3199            DefaultPrivilegeObject {
3200                role_id: RoleId::Public,
3201                database_id,
3202                schema_id: None,
3203                object_type: privilege_object_type,
3204            },
3205            DefaultPrivilegeObject {
3206                role_id: RoleId::Public,
3207                database_id: None,
3208                schema_id: None,
3209                object_type: privilege_object_type,
3210            },
3211        ]
3212        .into_iter()
3213        .filter_map(|object| self.privileges.get(&object))
3214        .flat_map(|acl_map| acl_map.values())
3215        // Consolidate privileges with a common grantee.
3216        .fold(
3217            BTreeMap::new(),
3218            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3219                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3220                *accum_acl_mode |= *acl_mode;
3221                accum
3222            },
3223        )
3224        .into_iter()
3225        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3226        // default privilege has an object type of Table, then it may contain privileges valid for
3227        // tables but not other relations. If the passed in object type is another relation, then
3228        // we need to remove any privilege that is not valid for the specified relation.
3229        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3230        // Filter out empty privileges.
3231        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3232        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3233            grantee: *grantee,
3234            acl_mode,
3235        })
3236    }
3237
3238    pub fn iter(
3239        &self,
3240    ) -> impl Iterator<
3241        Item = (
3242            &DefaultPrivilegeObject,
3243            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3244        ),
3245    > {
3246        self.privileges
3247            .iter()
3248            .map(|(object, acl_map)| (object, acl_map.values()))
3249    }
3250}
3251
3252#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3253pub struct ClusterConfig {
3254    pub variant: ClusterVariant,
3255    pub workload_class: Option<String>,
3256}
3257
3258impl ClusterConfig {
3259    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3260        match &self.variant {
3261            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3262            ClusterVariant::Unmanaged => None,
3263        }
3264    }
3265}
3266
3267impl From<ClusterConfig> for durable::ClusterConfig {
3268    fn from(config: ClusterConfig) -> Self {
3269        Self {
3270            variant: config.variant.into(),
3271            workload_class: config.workload_class,
3272        }
3273    }
3274}
3275
3276impl From<durable::ClusterConfig> for ClusterConfig {
3277    fn from(config: durable::ClusterConfig) -> Self {
3278        Self {
3279            variant: config.variant.into(),
3280            workload_class: config.workload_class,
3281        }
3282    }
3283}
3284
3285#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3286pub struct ClusterVariantManaged {
3287    pub size: String,
3288    pub availability_zones: Vec<String>,
3289    pub logging: ReplicaLogging,
3290    pub replication_factor: u32,
3291    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3292    pub schedule: ClusterSchedule,
3293}
3294
3295impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3296    fn from(managed: ClusterVariantManaged) -> Self {
3297        Self {
3298            size: managed.size,
3299            availability_zones: managed.availability_zones,
3300            logging: managed.logging,
3301            replication_factor: managed.replication_factor,
3302            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3303            schedule: managed.schedule,
3304        }
3305    }
3306}
3307
3308impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3309    fn from(managed: durable::ClusterVariantManaged) -> Self {
3310        Self {
3311            size: managed.size,
3312            availability_zones: managed.availability_zones,
3313            logging: managed.logging,
3314            replication_factor: managed.replication_factor,
3315            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3316            schedule: managed.schedule,
3317        }
3318    }
3319}
3320
3321#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3322pub enum ClusterVariant {
3323    Managed(ClusterVariantManaged),
3324    Unmanaged,
3325}
3326
3327impl From<ClusterVariant> for durable::ClusterVariant {
3328    fn from(variant: ClusterVariant) -> Self {
3329        match variant {
3330            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3331            ClusterVariant::Unmanaged => Self::Unmanaged,
3332        }
3333    }
3334}
3335
3336impl From<durable::ClusterVariant> for ClusterVariant {
3337    fn from(variant: durable::ClusterVariant) -> Self {
3338        match variant {
3339            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3340            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3341        }
3342    }
3343}
3344
3345impl mz_sql::catalog::CatalogDatabase for Database {
3346    fn name(&self) -> &str {
3347        &self.name
3348    }
3349
3350    fn id(&self) -> DatabaseId {
3351        self.id
3352    }
3353
3354    fn has_schemas(&self) -> bool {
3355        !self.schemas_by_name.is_empty()
3356    }
3357
3358    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3359        &self.schemas_by_name
3360    }
3361
3362    // `as` is ok to use to cast to a trait object.
3363    #[allow(clippy::as_conversions)]
3364    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3365        self.schemas_by_id
3366            .values()
3367            .map(|schema| schema as &dyn CatalogSchema)
3368            .collect()
3369    }
3370
3371    fn owner_id(&self) -> RoleId {
3372        self.owner_id
3373    }
3374
3375    fn privileges(&self) -> &PrivilegeMap {
3376        &self.privileges
3377    }
3378}
3379
3380impl mz_sql::catalog::CatalogSchema for Schema {
3381    fn database(&self) -> &ResolvedDatabaseSpecifier {
3382        &self.name.database
3383    }
3384
3385    fn name(&self) -> &QualifiedSchemaName {
3386        &self.name
3387    }
3388
3389    fn id(&self) -> &SchemaSpecifier {
3390        &self.id
3391    }
3392
3393    fn has_items(&self) -> bool {
3394        !self.items.is_empty()
3395    }
3396
3397    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3398        Box::new(
3399            self.items
3400                .values()
3401                .chain(self.functions.values())
3402                .chain(self.types.values())
3403                .copied(),
3404        )
3405    }
3406
3407    fn owner_id(&self) -> RoleId {
3408        self.owner_id
3409    }
3410
3411    fn privileges(&self) -> &PrivilegeMap {
3412        &self.privileges
3413    }
3414}
3415
3416impl mz_sql::catalog::CatalogRole for Role {
3417    fn name(&self) -> &str {
3418        &self.name
3419    }
3420
3421    fn id(&self) -> RoleId {
3422        self.id
3423    }
3424
3425    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3426        &self.membership.map
3427    }
3428
3429    fn attributes(&self) -> &RoleAttributes {
3430        &self.attributes
3431    }
3432
3433    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3434        &self.vars.map
3435    }
3436}
3437
3438impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3439    fn name(&self) -> &str {
3440        &self.name
3441    }
3442
3443    fn id(&self) -> NetworkPolicyId {
3444        self.id
3445    }
3446
3447    fn owner_id(&self) -> RoleId {
3448        self.owner_id
3449    }
3450
3451    fn privileges(&self) -> &PrivilegeMap {
3452        &self.privileges
3453    }
3454}
3455
3456impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3457    fn name(&self) -> &str {
3458        &self.name
3459    }
3460
3461    fn id(&self) -> ClusterId {
3462        self.id
3463    }
3464
3465    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3466        &self.bound_objects
3467    }
3468
3469    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3470        &self.replica_id_by_name_
3471    }
3472
3473    // `as` is ok to use to cast to a trait object.
3474    #[allow(clippy::as_conversions)]
3475    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3476        self.replicas()
3477            .map(|replica| replica as &dyn CatalogClusterReplica)
3478            .collect()
3479    }
3480
3481    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3482        self.replica(id).expect("catalog out of sync")
3483    }
3484
3485    fn owner_id(&self) -> RoleId {
3486        self.owner_id
3487    }
3488
3489    fn privileges(&self) -> &PrivilegeMap {
3490        &self.privileges
3491    }
3492
3493    fn is_managed(&self) -> bool {
3494        self.is_managed()
3495    }
3496
3497    fn managed_size(&self) -> Option<&str> {
3498        match &self.config.variant {
3499            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3500            ClusterVariant::Unmanaged => None,
3501        }
3502    }
3503
3504    fn schedule(&self) -> Option<&ClusterSchedule> {
3505        match &self.config.variant {
3506            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3507            ClusterVariant::Unmanaged => None,
3508        }
3509    }
3510
3511    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3512        self.try_to_plan()
3513    }
3514}
3515
3516impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3517    fn name(&self) -> &str {
3518        &self.name
3519    }
3520
3521    fn cluster_id(&self) -> ClusterId {
3522        self.cluster_id
3523    }
3524
3525    fn replica_id(&self) -> ReplicaId {
3526        self.replica_id
3527    }
3528
3529    fn owner_id(&self) -> RoleId {
3530        self.owner_id
3531    }
3532
3533    fn internal(&self) -> bool {
3534        self.config.location.internal()
3535    }
3536}
3537
3538impl mz_sql::catalog::CatalogItem for CatalogEntry {
3539    fn name(&self) -> &QualifiedItemName {
3540        self.name()
3541    }
3542
3543    fn id(&self) -> CatalogItemId {
3544        self.id()
3545    }
3546
3547    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3548        Box::new(self.global_ids())
3549    }
3550
3551    fn oid(&self) -> u32 {
3552        self.oid()
3553    }
3554
3555    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3556        self.func()
3557    }
3558
3559    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3560        self.source_desc()
3561    }
3562
3563    fn connection(
3564        &self,
3565    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3566    {
3567        Ok(self.connection()?.details.to_connection())
3568    }
3569
3570    fn create_sql(&self) -> &str {
3571        match self.item() {
3572            CatalogItem::Table(Table { create_sql, .. }) => {
3573                create_sql.as_deref().unwrap_or("<builtin>")
3574            }
3575            CatalogItem::Source(Source { create_sql, .. }) => {
3576                create_sql.as_deref().unwrap_or("<builtin>")
3577            }
3578            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3579            CatalogItem::View(View { create_sql, .. }) => create_sql,
3580            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3581            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3582            CatalogItem::Type(Type { create_sql, .. }) => {
3583                create_sql.as_deref().unwrap_or("<builtin>")
3584            }
3585            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3586            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3587            CatalogItem::Func(_) => "<builtin>",
3588            CatalogItem::Log(_) => "<builtin>",
3589        }
3590    }
3591
3592    fn item_type(&self) -> SqlCatalogItemType {
3593        self.item().typ()
3594    }
3595
3596    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3597        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3598            Some((keys, *on))
3599        } else {
3600            None
3601        }
3602    }
3603
3604    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3605        if let CatalogItem::Table(Table {
3606            data_source: TableDataSource::TableWrites { defaults },
3607            ..
3608        }) = self.item()
3609        {
3610            Some(defaults.as_slice())
3611        } else {
3612            None
3613        }
3614    }
3615
3616    fn replacement_target(&self) -> Option<CatalogItemId> {
3617        if let CatalogItem::MaterializedView(mv) = self.item() {
3618            mv.replacement_target
3619        } else {
3620            None
3621        }
3622    }
3623
3624    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3625        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3626            Some(details)
3627        } else {
3628            None
3629        }
3630    }
3631
3632    fn references(&self) -> &ResolvedIds {
3633        self.references()
3634    }
3635
3636    fn uses(&self) -> BTreeSet<CatalogItemId> {
3637        self.uses()
3638    }
3639
3640    fn referenced_by(&self) -> &[CatalogItemId] {
3641        self.referenced_by()
3642    }
3643
3644    fn used_by(&self) -> &[CatalogItemId] {
3645        self.used_by()
3646    }
3647
3648    fn subsource_details(
3649        &self,
3650    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3651        self.subsource_details()
3652    }
3653
3654    fn source_export_details(
3655        &self,
3656    ) -> Option<(
3657        CatalogItemId,
3658        &UnresolvedItemName,
3659        &SourceExportDetails,
3660        &SourceExportDataConfig<ReferencedConnection>,
3661    )> {
3662        self.source_export_details()
3663    }
3664
3665    fn is_progress_source(&self) -> bool {
3666        self.is_progress_source()
3667    }
3668
3669    fn progress_id(&self) -> Option<CatalogItemId> {
3670        self.progress_id()
3671    }
3672
3673    fn owner_id(&self) -> RoleId {
3674        self.owner_id
3675    }
3676
3677    fn privileges(&self) -> &PrivilegeMap {
3678        &self.privileges
3679    }
3680
3681    fn cluster_id(&self) -> Option<ClusterId> {
3682        self.item().cluster_id()
3683    }
3684
3685    fn at_version(
3686        &self,
3687        version: RelationVersionSelector,
3688    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3689        Box::new(CatalogCollectionEntry {
3690            entry: self.clone(),
3691            version,
3692        })
3693    }
3694
3695    fn latest_version(&self) -> Option<RelationVersion> {
3696        self.table().map(|t| t.desc.latest_version())
3697    }
3698}
3699
3700/// A single update to the catalog state.
3701#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3702pub struct StateUpdate {
3703    pub kind: StateUpdateKind,
3704    pub ts: Timestamp,
3705    pub diff: StateDiff,
3706}
3707
3708/// The contents of a single state update.
3709///
3710/// Variants are listed in dependency order.
3711#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3712pub enum StateUpdateKind {
3713    Role(durable::objects::Role),
3714    RoleAuth(durable::objects::RoleAuth),
3715    Database(durable::objects::Database),
3716    Schema(durable::objects::Schema),
3717    DefaultPrivilege(durable::objects::DefaultPrivilege),
3718    SystemPrivilege(MzAclItem),
3719    SystemConfiguration(durable::objects::SystemConfiguration),
3720    Cluster(durable::objects::Cluster),
3721    NetworkPolicy(durable::objects::NetworkPolicy),
3722    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3723    ClusterReplica(durable::objects::ClusterReplica),
3724    SourceReferences(durable::objects::SourceReferences),
3725    SystemObjectMapping(durable::objects::SystemObjectMapping),
3726    // Temporary items are not actually updated via the durable catalog, but
3727    // this allows us to model them the same way as all other items in parts of
3728    // the pipeline.
3729    TemporaryItem(TemporaryItem),
3730    Item(durable::objects::Item),
3731    Comment(durable::objects::Comment),
3732    AuditLog(durable::objects::AuditLog),
3733    // Storage updates.
3734    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3735    UnfinalizedShard(durable::objects::UnfinalizedShard),
3736}
3737
3738/// Valid diffs for catalog state updates.
3739#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3740pub enum StateDiff {
3741    Retraction,
3742    Addition,
3743}
3744
3745impl From<StateDiff> for Diff {
3746    fn from(diff: StateDiff) -> Self {
3747        match diff {
3748            StateDiff::Retraction => Diff::MINUS_ONE,
3749            StateDiff::Addition => Diff::ONE,
3750        }
3751    }
3752}
3753impl TryFrom<Diff> for StateDiff {
3754    type Error = String;
3755
3756    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3757        match diff {
3758            Diff::MINUS_ONE => Ok(Self::Retraction),
3759            Diff::ONE => Ok(Self::Addition),
3760            diff => Err(format!("invalid diff {diff}")),
3761        }
3762    }
3763}
3764
3765/// Information needed to process an update to a temporary item.
3766#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3767pub struct TemporaryItem {
3768    pub id: CatalogItemId,
3769    pub oid: u32,
3770    pub global_id: GlobalId,
3771    pub schema_id: SchemaId,
3772    pub name: String,
3773    pub conn_id: Option<ConnectionId>,
3774    pub create_sql: String,
3775    pub owner_id: RoleId,
3776    pub privileges: Vec<MzAclItem>,
3777    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3778}
3779
3780impl From<CatalogEntry> for TemporaryItem {
3781    fn from(entry: CatalogEntry) -> Self {
3782        let conn_id = entry.conn_id().cloned();
3783        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3784
3785        TemporaryItem {
3786            id: entry.id,
3787            oid: entry.oid,
3788            global_id,
3789            schema_id: entry.name.qualifiers.schema_spec.into(),
3790            name: entry.name.item,
3791            conn_id,
3792            create_sql,
3793            owner_id: entry.owner_id,
3794            privileges: entry.privileges.into_all_values().collect(),
3795            extra_versions,
3796        }
3797    }
3798}
3799
3800impl TemporaryItem {
3801    pub fn item_type(&self) -> CatalogItemType {
3802        item_type(&self.create_sql)
3803    }
3804}
3805
3806/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3807#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3808pub enum BootstrapStateUpdateKind {
3809    Role(durable::objects::Role),
3810    RoleAuth(durable::objects::RoleAuth),
3811    Database(durable::objects::Database),
3812    Schema(durable::objects::Schema),
3813    DefaultPrivilege(durable::objects::DefaultPrivilege),
3814    SystemPrivilege(MzAclItem),
3815    SystemConfiguration(durable::objects::SystemConfiguration),
3816    Cluster(durable::objects::Cluster),
3817    NetworkPolicy(durable::objects::NetworkPolicy),
3818    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3819    ClusterReplica(durable::objects::ClusterReplica),
3820    SourceReferences(durable::objects::SourceReferences),
3821    SystemObjectMapping(durable::objects::SystemObjectMapping),
3822    Item(durable::objects::Item),
3823    Comment(durable::objects::Comment),
3824    AuditLog(durable::objects::AuditLog),
3825    // Storage updates.
3826    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3827    UnfinalizedShard(durable::objects::UnfinalizedShard),
3828}
3829
3830impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3831    fn from(value: BootstrapStateUpdateKind) -> Self {
3832        match value {
3833            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3834            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3835            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3836            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3837            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3838                StateUpdateKind::DefaultPrivilege(kind)
3839            }
3840            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3841                StateUpdateKind::SystemPrivilege(kind)
3842            }
3843            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3844                StateUpdateKind::SystemConfiguration(kind)
3845            }
3846            BootstrapStateUpdateKind::SourceReferences(kind) => {
3847                StateUpdateKind::SourceReferences(kind)
3848            }
3849            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3850            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3851            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3852                StateUpdateKind::IntrospectionSourceIndex(kind)
3853            }
3854            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3855            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3856                StateUpdateKind::SystemObjectMapping(kind)
3857            }
3858            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3859            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3860            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3861            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3862                StateUpdateKind::StorageCollectionMetadata(kind)
3863            }
3864            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3865                StateUpdateKind::UnfinalizedShard(kind)
3866            }
3867        }
3868    }
3869}
3870
3871impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3872    type Error = TemporaryItem;
3873
3874    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3875        match value {
3876            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3877            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3878            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3879            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3880            StateUpdateKind::DefaultPrivilege(kind) => {
3881                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3882            }
3883            StateUpdateKind::SystemPrivilege(kind) => {
3884                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3885            }
3886            StateUpdateKind::SystemConfiguration(kind) => {
3887                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3888            }
3889            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3890            StateUpdateKind::NetworkPolicy(kind) => {
3891                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3892            }
3893            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3894                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3895            }
3896            StateUpdateKind::ClusterReplica(kind) => {
3897                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3898            }
3899            StateUpdateKind::SourceReferences(kind) => {
3900                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3901            }
3902            StateUpdateKind::SystemObjectMapping(kind) => {
3903                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3904            }
3905            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3906            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3907            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3908            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3909            StateUpdateKind::StorageCollectionMetadata(kind) => {
3910                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3911            }
3912            StateUpdateKind::UnfinalizedShard(kind) => {
3913                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3914            }
3915        }
3916    }
3917}